In [37]:
# Импортируем библиотеки
import pyspark # noqa
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode_outer

In [14]:
# Создаем точку входа в Spark API
spark = SparkSession.builder \
    .appName("DataFrame_Test") \
    .getOrCreate()

In [15]:
# Создаем Датафрейм с продуктами:
# id - идентификатор продукта, 
# product_name - название продукта
data_for_products = spark.createDataFrame([
    (1, 'Apple'),
    (2, 'Banana'),
    (3, 'Orange'),
    (4, 'Carrot'),
    (5, 'Lettuce'),
    (6, 'Tomato'),
    (7, 'Potato'),
    (8, 'Broccoli'),
    (9, 'Grapes'),
    (10, 'Strawberry')],
    ['id', 'product_name', ]
)

In [16]:
data_for_products.show()

+---+------------+
| id|product_name|
+---+------------+
|  1|       Apple|
|  2|      Banana|
|  3|      Orange|
|  4|      Carrot|
|  5|     Lettuce|
|  6|      Tomato|
|  7|      Potato|
|  8|    Broccoli|
|  9|      Grapes|
| 10|  Strawberry|
+---+------------+


In [17]:
# Создаем Датафрейм с категориями:
# id - идентификатор категории, 
# category_name - название категории
data_for_categories = spark.createDataFrame([
    (1, 'Fruits'),
    (2, 'Vegetables'),
    (3, 'Berries'),
    (4, 'Root Vegetables'),
    (5, 'Leafy Greens'),
    (6, 'Citrus Fruits'),
    (7, 'Tubers'),
    (8, 'Tree Fruits'),
    (9, 'Climbing Plants'),
    (10, 'Allium Vegetables'), ],
    ['id', 'category_name', ]
)

In [18]:
data_for_categories.show()

+---+-----------------+
| id|    category_name|
+---+-----------------+
|  1|           Fruits|
|  2|       Vegetables|
|  3|          Berries|
|  4|  Root Vegetables|
|  5|     Leafy Greens|
|  6|    Citrus Fruits|
|  7|           Tubers|
|  8|      Tree Fruits|
|  9|  Climbing Plants|
| 10|Allium Vegetables|
+---+-----------------+


In [35]:
# Создаем Датафрейм со Связями первых двух Датафреймов:
# id_product - идентификатор продукта, 
# id_categories - идентификатор категории

# Для удобства сделаем данные со связями отдельно
data = [
    (1, [1]), # -- Apple -> Fruits
    (2, [1, 8]), # -- Banana -> Fruits & Tree Fruits
    (3, [1, 6, 8]), # -- Orange -> Citrus Fruits
    (4, [2, 4]), # -- Carrot -> Vegetables
    (5, []), # -- Lettuce
    (6, [8]), # -- Tomato -> Tree Fruits
    (7, []), # -- Potato
    (8, [2]), # -- Broccoli -> Vegetables
    (9, [3, 9]), # -- Grapes -> Berries
    (10, [3]) # -- Strawberry -> Berries
]

# Делаем DataFrame
data_for_connect = spark.createDataFrame(
    data,
    ["id_product", "id_category"]
)

In [36]:
data_for_connect.show()


+----------+-----------+
|id_product|id_category|
+----------+-----------+
|         1|        [1]|
|         2|     [1, 8]|
|         3|  [1, 6, 8]|
|         4|     [2, 4]|
|         5|         []|
|         6|        [8]|
|         7|         []|
|         8|        [2]|
|         9|     [3, 9]|
|        10|        [3]|
+----------+-----------+


In [57]:
# Получаем все пары "Имя продукта - Имя категории"
pairs_of_data = data_for_connect.select(
    col('id_product'),
    explode_outer(col('id_category')) \
        .alias('id_category')
)

In [52]:
pairs_of_data.show()

+----------+-----------+
|id_product|id_category|
+----------+-----------+
|         1|          1|
|         2|          1|
|         2|          8|
|         3|          1|
|         3|          6|
|         3|          8|
|         4|          2|
|         4|          4|
|         5|       NULL|
|         6|          8|
|         7|       NULL|
|         8|          2|
|         9|          3|
|         9|          9|
|        10|          3|
+----------+-----------+


In [55]:
# У нас все равно высвечиваются пустые продукты без категории.
# Сделаем фильтр на isNotNull()
pairs_of_data_filter = pairs_of_data.filter(
    pairs_of_data.id_category.isNotNull()
)

In [56]:
pairs_of_data_filter.show()

+----------+-----------+
|id_product|id_category|
+----------+-----------+
|         1|          1|
|         2|          1|
|         2|          8|
|         3|          1|
|         3|          6|
|         3|          8|
|         4|          2|
|         4|          4|
|         6|          8|
|         8|          2|
|         9|          3|
|         9|          9|
|        10|          3|
+----------+-----------+


In [61]:
# Получилось вывести все пары "Имя продукта - Имя категории" без NULL.
# Теперь получим имена всех продуктов, у которых нет категорий
empty_categories = pairs_of_data.filter(
    col('id_category').isNull()
).select('id_product')

In [62]:
empty_categories.show()

+----------+
|id_product|
+----------+
|         5|
|         7|
+----------+


# P.S
Это было интересно, тяжко и увлекательно. Я Pyscpark до этого не особо трогал, так как думал, что без знания Java я его не запущу. Но я смог, через боль и слезы))