In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=34b296a0439ac4d1d39615bf8b8e6e8bdd10c83115229268736c545b39cfe401
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MyApp").getOrCreate()


In [17]:
from pyspark.sql import functions as F



In [23]:
# Создаём примерные данные в виде [product_id, product_name]
products_data = [
    (1, "Product A"),
    (2, "Product B"),
    (3, "Product C"),
    (4, "Product D")
]

# Создаём примерные данные в виде [category_id, category_name, product_id]
# Тут третья колонна указывает принадлежность категории к id продукта
categories_data = [
    (1, "Category 1", 1),
    (2, "Category 2", 1),
    (3, "Category 1", 2)
]

In [11]:
# Создаем датафреймы
products_df = spark.createDataFrame(products_data, ["product_id", "product_name"])
products_df.show()

+----------+------------+
|product_id|product_name|
+----------+------------+
|         1|   Product A|
|         2|   Product B|
|         3|   Product C|
|         4|   Product D|
+----------+------------+



In [13]:
categories_data = [
    (1, "Category 1", 1),
    (2, "Category 2", 1),
    (3, "Category 1", 2)
]
categories_df = spark.createDataFrame(categories_data, ["category_id", "category_name", "product_id"])
categories_df.show()

+-----------+-------------+----------+
|category_id|category_name|product_id|
+-----------+-------------+----------+
|          1|   Category 1|         1|
|          2|   Category 2|         1|
|          3|   Category 1|         2|
+-----------+-------------+----------+



In [22]:
# Объединяем датафреймы по product_id, тем самым продукты, у которых нет категории получают пустое значение null
product_category_pairs = products_df.join(categories_df, "product_id", "left").select("product_id", "product_name", "category_name",)
product_category_pairs.show()


+----------+------------+-------------+
|product_id|product_name|category_name|
+----------+------------+-------------+
|         1|   Product A|   Category 2|
|         1|   Product A|   Category 1|
|         2|   Product B|   Category 1|
|         3|   Product C|         NULL|
|         4|   Product D|         NULL|
+----------+------------+-------------+



In [26]:
# Фильтруем и получаем только те продукты, которые не имеют никаких категории
products_without_categories = product_category_pairs.filter(product_category_pairs.category_name.isNull())
products_without_categories.show()

+----------+------------+-------------+
|product_id|product_name|category_name|
+----------+------------+-------------+
|         3|   Product C|         NULL|
|         4|   Product D|         NULL|
+----------+------------+-------------+

