In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=7758dfa829b1bfb078c09e0119fbafee011bdd035f1222ca7e7cc6b817be59af
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession

def create_spark_session(app_name="App"):
    return SparkSession.builder.appName(app_name).getOrCreate()

def load_data(spark, data, columns):
    return spark.createDataFrame(data, columns)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce

spark = SparkSession.builder \
    .appName("ProductCategoryExample") \
    .getOrCreate()

products_data = [
    (1, "Product A"),
    (2, "Product B"),
    (3, "Product C"),
    (4, "Product D"),
]

categories_data = [
    (1, "Category X"),
    (2, "Category Y"),
    (3, "Category Z"),
]

product_category_data = [
    (1, 1),
    (1, 2),
    (2, 2),
    (3, 3),
]

products_df = spark.createDataFrame(products_data, ["product_id", "product_name"])
categories_df = spark.createDataFrame(categories_data, ["category_id", "category_name"])
product_category_df = spark.createDataFrame(product_category_data, ["product_id", "category_id"])

products_df.show()
categories_df.show()
product_category_df.show()


+----------+------------+
|product_id|product_name|
+----------+------------+
|         1|   Product A|
|         2|   Product B|
|         3|   Product C|
|         4|   Product D|
+----------+------------+

+-----------+-------------+
|category_id|category_name|
+-----------+-------------+
|          1|   Category X|
|          2|   Category Y|
|          3|   Category Z|
+-----------+-------------+

+----------+-----------+
|product_id|category_id|
+----------+-----------+
|         1|          1|
|         1|          2|
|         2|          2|
|         3|          3|
+----------+-----------+



In [None]:
def get_product_category_pairs_and_orphans(products_df, categories_df, product_category_df):
    product_category_join = product_category_df \
        .join(products_df, "product_id", "inner") \
        .join(categories_df, "category_id", "inner") \
        .select("product_name", "category_name")

    product_category_pairs = product_category_join \
        .select("product_name", "category_name")

    products_with_categories = product_category_df.select("product_id").distinct()
    products_without_categories = products_df \
        .join(products_with_categories, "product_id", "left_anti") \
        .select("product_name")

    return product_category_pairs, products_without_categories

product_category_pairs, products_without_categories = get_product_category_pairs_and_orphans(products_df, categories_df, product_category_df)

print("Пары 'Имя продукта – Имя категории':")
product_category_pairs.show()

print("Продукты без категорий:")
products_without_categories.show()


Пары 'Имя продукта – Имя категории':
+------------+-------------+
|product_name|category_name|
+------------+-------------+
|   Product A|   Category X|
|   Product C|   Category Z|
|   Product B|   Category Y|
|   Product A|   Category Y|
+------------+-------------+

Продукты без категорий:
+------------+
|product_name|
+------------+
|   Product D|
+------------+

