In [3]:
# Preinstall: https://sparkbyexamples.com/pyspark/install-pyspark-in-anaconda-jupyter-notebook/  - запуск pyspark

import findspark
import pyspark
from pyspark.sql import SparkSession


findspark.init()
spark = SparkSession.builder.getOrCreate()

## Тестовые данные ##
# Продукты
df_products = spark.createDataFrame([
    (1, 'apple'),
    (2, 'xiaomi'),
    (3, 'samsung'),
    (4, 'huawei'),
    (5, 'sony'),
    (6, 'sharp')],
    ['id_prod', 'products'])
df_products.show()

+-------+--------+
|id_prod|products|
+-------+--------+
|      1|   apple|
|      2|  xiaomi|
|      3| samsung|
|      4|  huawei|
|      5|    sony|
|      6|   sharp|
+-------+--------+



In [4]:
# Категории
df_categories = spark.createDataFrame([
    (1, 'phones'),
    (2, 'pads'),
    (3, 'laptops'),
    (4, 'monitors'),
    (5, 'microwaves')],
    ['id_cat', 'categories'])
df_categories.show()

+------+----------+
|id_cat|categories|
+------+----------+
|     1|    phones|
|     2|      pads|
|     3|   laptops|
|     4|  monitors|
|     5|microwaves|
+------+----------+



In [6]:
# Связь m2m Продукты-Категории
df_prod2cat = spark.createDataFrame([(1, 1), (1, 2), (1, 3), (2, 1), (2, 2), 
                                     (3, 1), (3, 2), (3, 4), (4, 1), (4, 4)],
                                    ['id_prod', 'id_cat'])
df_prod2cat.show()

+-------+------+
|id_prod|id_cat|
+-------+------+
|      1|     1|
|      1|     2|
|      1|     3|
|      2|     1|
|      2|     2|
|      3|     1|
|      3|     2|
|      3|     4|
|      4|     1|
|      4|     4|
+-------+------+



In [10]:
def prod_name_and_null_cat(df_p, df_c, df_m2m, product_name):
    
    # датафрейм m2m, где вместо id-шников названия продуктов и категорий, включая позиции null с обeих сторон
    df_pc = df_m2m.join(df_p, df_p.id_prod == df_m2m.id_prod, 'outer')\
    .join(df_c, df_c.id_cat == df_m2m.id_cat, 'outer')\
    .select('products', 'categories').sort('products')
    
    # датафрейм - все пары 'заданное Имя продукта' - 'Категория'
    df_pr_name = df_pc.filter(df_pc.products == product_name)
    
    # датафрейм - все продукты, категория которых null
    df_cat_null = df_pc.filter(df_pc.categories.isNull())
    
    return df_pr_name.union(df_cat_null)  # объединение двух результирующих датафреймов


# Проверка ф-ции на тестовых датафреймах
prod_name_and_null_cat(df_products, df_categories, df_prod2cat, 'samsung').show()

+--------+----------+
|products|categories|
+--------+----------+
| samsung|  monitors|
| samsung|      pads|
| samsung|    phones|
|   sharp|      null|
|    sony|      null|
+--------+----------+

