## Создаем датафреймы

In [42]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession.builder.getOrCreate()

DATA_PATH = "work/data_frames/"

# датафреймы
products_df = spark.read.option("header", "true").csv(DATA_PATH + "products.csv")
categories_df = spark.read.option("header", "true").csv(DATA_PATH + "categories.csv")
product_category_df = spark.read.option("header", "true").csv(DATA_PATH + "product_category.csv")

products_df = products_df.withColumn("product_id", col("product_id").cast("int"))
categories_df = categories_df.withColumn("category_id", col("category_id").cast("int"))
product_category_df = product_category_df.withColumn("product_id", col("product_id").cast("int")) \
    .withColumn("category_id", col("category_id").cast("int"))


## Соединяем датафреймы products_df, categories_data, product_category_df

In [43]:

product_with_category = products_df.join(
    product_category_df,
    on="product_id",
    how="left"
)


full_info = product_with_category.join(
    categories_df,
    on="category_id",
    how="left"
)

# Оставляем только имя продукта и имя категории
result = full_info.select("product_name", "category_name").show()


+------------+--------------------+
|product_name|       category_name|
+------------+--------------------+
|      Яблоко|              Фрукты|
|        Хлеб|Безглютеновые про...|
|      Молоко|   Молочные продукты|
|Куриное филе|     Мясные продукты|
|     Шоколад|            Сладости|
|      Огурец|               Овощи|
|         Сыр|Безглютеновые про...|
|         Сыр|   Молочные продукты|
| Добрый Кола|                NULL|
+------------+--------------------+

