In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("ETL")
    .config("spark.jars", "postgresql-42.6.0.jar,clickhouse-jdbc-0.4.6.jar")
    .getOrCreate()
)

pg_jdbc_url = "jdbc:postgresql://postgres:5432/spark_db"
pg_properties = {
    "user": "spark_user",
    "password": "spark_password",
    "driver": "org.postgresql.Driver",
}

ch_jdbc_url = "jdbc:clickhouse://clickhouse:8123/default"
ch_properties = {
    "driver": "com.clickhouse.jdbc.ClickHouseDriver",
    "user": "custom_user",
    "password": "custom_password",
}

In [2]:
from pyspark.sql.functions import col, lit

df_mock_data = spark.read.jdbc(pg_jdbc_url, "public.mock_data", properties=pg_properties)
df_mock_data.toPandas().head()

Unnamed: 0,id,customer_first_name,customer_last_name,customer_age,customer_email,customer_country,customer_postal_code,customer_pet_type,customer_pet_name,customer_pet_breed,...,product_reviews,product_release_date,product_expiry_date,supplier_name,supplier_contact,supplier_email,supplier_phone,supplier_address,supplier_city,supplier_country
0,1,Conni,Leydon,63,lswait0@amazon.com,France,77404 CEDEX,cat,Jan,Labrador Retriever,...,360,9/29/2022,2/28/2026,Thoughtbeat,Lenee Swait,lswait0@oracle.com,852-750-6042,Room 1411,Batangafo,Russia
1,2,Alec,Chamberlayne,26,cwalsh1@state.gov,Philippines,9401,dog,Shelia,Labrador Retriever,...,754,5/31/2016,9/11/2029,Eayo,Colet Walsh,cwalsh1@addthis.com,651-123-9474,Suite 4,Labuan,Sweden
2,3,Vaughan,Shapiro,47,jelnough2@xing.com,Madagascar,,cat,Gunner,Siamese,...,313,12/14/2012,1/5/2024,Gabspot,Jane Elnough,jelnough2@ezinearticles.com,325-676-3913,Room 1133,Veinticinco de Mayo,Portugal
3,4,Bank,Audas,81,lscay3@howstuffworks.com,Portugal,2860-010,bird,Nahum,Labrador Retriever,...,938,11/6/2010,6/3/2025,Jabberstorm,Lorilyn Scay,lscay3@opensource.org,112-474-3398,Room 1500,Saraktash,China
4,5,Anthe,Pletts,72,ctoll4@miibeian.gov.cn,Russia,628389,cat,Brod,Labrador Retriever,...,943,11/2/2018,7/10/2030,Trudeo,Court Toll,ctoll4@usgs.gov,647-688-5659,Apt 1574,Lakatnik,China


# Трансформация данных из исходной модели в снежинку

In [3]:
def write_distinct_to_lookup(df, source_column, target_table, lookup_column_name="name"):
    new_values_df = df.select(col(source_column).alias(lookup_column_name)).distinct().filter(col(lookup_column_name).isNotNull())
    try:
        existing_values_df = spark.read.jdbc(pg_jdbc_url, target_table, properties=pg_properties).select(lookup_column_name)

        values_to_insert_df = new_values_df.join(
            existing_values_df,
            new_values_df[lookup_column_name] == existing_values_df[lookup_column_name],
            "left_anti"
        )
    except Exception as e:
        values_to_insert_df = new_values_df
    
    if values_to_insert_df.count() > 0:
        values_to_insert_df.write.jdbc(url=pg_jdbc_url, table=target_table, mode="append", properties=pg_properties)


# countries
write_distinct_to_lookup(df_mock_data, "customer_country", "countries")
write_distinct_to_lookup(df_mock_data, "seller_country", "countries")
write_distinct_to_lookup(df_mock_data, "store_country", "countries")
write_distinct_to_lookup(df_mock_data, "supplier_country", "countries")

# cities
write_distinct_to_lookup(df_mock_data, "store_city", "cities")
write_distinct_to_lookup(df_mock_data, "supplier_city", "cities")

# pet_types
write_distinct_to_lookup(df_mock_data, "customer_pet_type", "pet_types")

# pet_breeds
write_distinct_to_lookup(df_mock_data, "customer_pet_breed", "pet_breeds")

# pet_categories
write_distinct_to_lookup(df_mock_data, "pet_category", "pet_categories")

# product_names
write_distinct_to_lookup(df_mock_data, "product_name", "product_names")

# product_categories
write_distinct_to_lookup(df_mock_data, "product_category", "product_categories")

# product_brands
write_distinct_to_lookup(df_mock_data, "product_brand", "product_brands")

# product_colors
write_distinct_to_lookup(df_mock_data, "product_color", "product_colors")

# product_sizes
write_distinct_to_lookup(df_mock_data, "product_size", "product_sizes")

# product_materials
write_distinct_to_lookup(df_mock_data, "product_material", "product_materials")

In [4]:
df_countries = spark.read.jdbc(pg_jdbc_url, "countries", properties=pg_properties).select(col("id").alias("country_id"), col("name").alias("country_name"))
df_cities = spark.read.jdbc(pg_jdbc_url, "cities", properties=pg_properties).select(col("id").alias("city_id"), col("name").alias("city_name"))
df_pet_types = spark.read.jdbc(pg_jdbc_url, "pet_types", properties=pg_properties).select(col("id").alias("pet_type_id"), col("name").alias("pet_type_name"))
df_pet_breeds = spark.read.jdbc(pg_jdbc_url, "pet_breeds", properties=pg_properties).select(col("id").alias("pet_breed_id"), col("name").alias("pet_breed_name"))
df_pet_categories = spark.read.jdbc(pg_jdbc_url, "pet_categories", properties=pg_properties).select(col("id").alias("pet_category_id"), col("name").alias("pet_category_name"))
df_product_names = spark.read.jdbc(pg_jdbc_url, "product_names", properties=pg_properties).select(col("id").alias("product_name_id"), col("name").alias("product_name_val"))
df_product_categories = spark.read.jdbc(pg_jdbc_url, "product_categories", properties=pg_properties).select(col("id").alias("product_category_id"), col("name").alias("product_category_val"))
df_product_brands = spark.read.jdbc(pg_jdbc_url, "product_brands", properties=pg_properties).select(col("id").alias("product_brand_id"), col("name").alias("product_brand_val"))
df_product_colors = spark.read.jdbc(pg_jdbc_url, "product_colors", properties=pg_properties).select(col("id").alias("product_color_id"), col("name").alias("product_color_val"))
df_product_sizes = spark.read.jdbc(pg_jdbc_url, "product_sizes", properties=pg_properties).select(col("id").alias("product_size_id"), col("name").alias("product_size_val"))
df_product_materials = spark.read.jdbc(pg_jdbc_url, "product_materials", properties=pg_properties).select(col("id").alias("product_material_id"), col("name").alias("product_material_val"))

In [5]:
# customers
df_customers = (
    df_mock_data.alias("md")
    .join(
        df_countries.alias("c"),
        col("md.customer_country") == col("c.country_name"),
        "left",
    )
    .join(
        df_pet_types.alias("pt"),
        col("md.customer_pet_type") == col("pt.pet_type_name"),
        "left",
    )
    .join(
        df_pet_breeds.alias("pb"),
        col("md.customer_pet_breed") == col("pb.pet_breed_name"),
        "left",
    )
    .join(
        df_pet_categories.alias("pc"),
        col("md.pet_category") == col("pc.pet_category_name"),
        "left",
    )
    .select(
        col("md.customer_first_name").alias("first_name"),
        col("md.customer_last_name").alias("last_name"),
        col("md.customer_age").alias("age"),
        col("md.customer_email").alias("email"),
        col("md.customer_postal_code").alias("postal_code"),
        col("md.customer_pet_name").alias("pet_name"),
        col("pt.pet_type_id").alias("pet_type_id"),
        col("pb.pet_breed_id").alias("pet_breed_id"),
        col("pc.pet_category_id").alias("pet_category_id"),
        col("c.country_id").alias("country_id"),
    )
    .distinct()
)

df_customers.write.jdbc(
    url=pg_jdbc_url, table="customers", mode="append", properties=pg_properties
)

In [6]:
# sellers
df_sellers = (
    df_mock_data.alias("md")
    .join(
        df_countries.alias("c"),
        col("md.seller_country") == col("c.country_name"),
        "left",
    )
    .select(
        col("md.seller_first_name").alias("first_name"),
        col("md.seller_last_name").alias("last_name"),
        col("md.seller_email").alias("email"),
        col("md.seller_postal_code").alias("postal_code"),
        col("c.country_id").alias("country_id"),
    )
    .distinct()
)

df_sellers.write.jdbc(
    url=pg_jdbc_url, table="sellers", mode="append", properties=pg_properties
)

In [7]:
# stores
df_stores = (
    df_mock_data.alias("md")
    .join(
        df_countries.alias("c"),
        col("md.store_country") == col("c.country_name"),
        "left",
    )
    .join(df_cities.alias("ci"), col("md.store_city") == col("ci.city_name"), "left")
    .select(
        col("md.store_name").alias("name"),
        col("md.store_location").alias("location"),
        col("md.store_state").alias("state"),
        col("md.store_phone").alias("phone"),
        col("md.store_email").alias("email"),
        col("c.country_id").alias("country_id"),
        col("ci.city_id").alias("city_id"),
    )
    .distinct()
)

df_stores.write.jdbc(
    url=pg_jdbc_url, table="stores", mode="append", properties=pg_properties
)

In [8]:
# suppliers
df_suppliers = (
    df_mock_data.alias("md")
    .join(
        df_countries.alias("c"),
        col("md.supplier_country") == col("c.country_name"),
        "left",
    )
    .join(df_cities.alias("ci"), col("md.supplier_city") == col("ci.city_name"), "left")
    .select(
        col("md.supplier_name").alias("name"),
        col("md.supplier_contact").alias("contact"),
        col("md.supplier_email").alias("email"),
        col("md.supplier_phone").alias("phone"),
        col("md.supplier_address").alias("address"),
        col("c.country_id").alias("country_id"),
        col("ci.city_id").alias("city_id"),
    )
    .distinct()
)

df_suppliers.write.jdbc(
    url=pg_jdbc_url, table="suppliers", mode="append", properties=pg_properties
)

In [10]:
# products
df_suppliers = spark.read.jdbc(pg_jdbc_url, "suppliers", properties=pg_properties)

df_products = (
    df_mock_data.alias("md")
    .join(
        df_product_names.alias("pn"),
        col("md.product_name") == col("pn.product_name_val"),
        "left",
    )
    .join(
        df_product_categories.alias("pcg"),
        col("md.product_category") == col("pcg.product_category_val"),
        "left",
    )
    .join(
        df_product_sizes.alias("ps"),
        col("md.product_size") == col("ps.product_size_val"),
        "left",
    )
    .join(
        df_product_colors.alias("pcol"),
        col("md.product_color") == col("pcol.product_color_val"),
        "left",
    )
    .join(
        df_product_brands.alias("pb"),
        col("md.product_brand") == col("pb.product_brand_val"),
        "left",
    )
    .join(
        df_product_materials.alias("pm"),
        col("md.product_material") == col("pm.product_material_val"),
        "left",
    )
    .join(
        df_suppliers.alias("s"),
        col("md.supplier_name") == col("s.name"),
        "left",
    )
    .select(
        col("md.product_price").alias("price"),
        col("md.product_quantity").alias("quantity"),
        col("md.product_weight").alias("weight"),
        col("md.product_description").alias("description"),
        col("md.product_rating").alias("rating"),
        col("md.product_reviews").alias("reviews"),
        col("md.product_release_date").alias("release_date"),
        col("md.product_expiry_date").alias("expiry_date"),
        col("pn.product_name_id").alias("product_name_id"),
        col("pcg.product_category_id").alias("product_category_id"),
        col("ps.product_size_id").alias("product_size_id"),
        col("pcol.product_color_id").alias("product_color_id"),
        col("pb.product_brand_id").alias("product_brand_id"),
        col("pm.product_material_id").alias("product_material_id"),
        col("s.id").alias("supplier_id"),
    )
    .distinct()
)

df_products.write.jdbc(
    url=pg_jdbc_url, table="products", mode="append", properties=pg_properties
)

In [11]:
df_customers_lookup = spark.read.jdbc(
    pg_jdbc_url, "customers", properties=pg_properties
).select(col("id").alias("customer_id_pk"), col("email").alias("customer_email_pk"))
df_sellers_lookup = spark.read.jdbc(
    pg_jdbc_url, "sellers", properties=pg_properties
).select(col("id").alias("seller_id_pk"), col("email").alias("seller_email_pk"))
df_products_lookup = (
    spark.read.jdbc(pg_jdbc_url, "products", properties=pg_properties)
    .alias("p")
    .join(
        df_product_names.alias("pn_lk"),
        col("p.product_name_id") == col("pn_lk.product_name_id"),
        "inner",
    )
    .select(
        col("p.id").alias("product_id_pk"),
        col("p.price").alias("product_price_pk"),
        col("p.quantity").alias("product_quantity_pk"),
        col("pn_lk.product_name_val").alias("product_name_pk"),
    )
)


df_sales = (
    df_mock_data.alias("md")
    .join(
        df_customers_lookup.alias("cust"),
        col("md.customer_email") == col("cust.customer_email_pk"),
        "left",
    )
    .join(
        df_sellers_lookup.alias("sell"),
        col("md.seller_email") == col("sell.seller_email_pk"),
        "left",
    )
    .join(
        df_products_lookup.alias("prod"),
        (col("md.product_price") == col("prod.product_price_pk"))
        & (col("md.product_quantity") == col("prod.product_quantity_pk"))
        & (col("md.product_name") == col("prod.product_name_pk")),
        "left",
    )
    .select(
        col("md.sale_date").alias("sale_date"),
        col("cust.customer_id_pk").alias("sale_customer_id"),
        col("sell.seller_id_pk").alias("sale_seller_id"),
        col("prod.product_id_pk").alias("sale_product_id"),
        col("md.sale_quantity").alias("sale_quantity"),
        col("md.sale_total_price").alias("sale_total_price"),
    )
)

df_sales.write.jdbc(
    url=pg_jdbc_url, table="sales", mode="append", properties=pg_properties
)

# Отчеты в clickhouse

### Витрина продаж по продуктам Цель: Анализ выручки, количества продаж и популярности продуктов. 
* Топ-10 самых продаваемых продуктов.* Общая выручка по категориям продукто.
* 
Средний рейтинг и количество отзывов для каждого продукта.

In [12]:
from pyspark.sql.functions import (
    col,
    sum,
    avg,
    count,
    rank,
    dense_rank,
    month,
    year,
    from_unixtime,
    to_date,
    date_format,
)
from pyspark.sql.window import Window

df_sales = spark.read.jdbc(pg_jdbc_url, "sales", properties=pg_properties)
df_customers = spark.read.jdbc(pg_jdbc_url, "customers", properties=pg_properties)
df_products = spark.read.jdbc(pg_jdbc_url, "products", properties=pg_properties)
df_sellers = spark.read.jdbc(pg_jdbc_url, "sellers", properties=pg_properties)
df_stores = spark.read.jdbc(pg_jdbc_url, "stores", properties=pg_properties)
df_suppliers = spark.read.jdbc(pg_jdbc_url, "suppliers", properties=pg_properties)
df_stores_renamed = df_stores.withColumnRenamed("id", "store_id").withColumnRenamed("name", "store_name")
df_sellers_renamed = df_sellers.withColumnRenamed("id", "seller_id")

df_product_names = spark.read.jdbc(
    pg_jdbc_url, "product_names", properties=pg_properties
).select(col("id").alias("product_name_id"), col("name").alias("product_name"))
df_product_categories = spark.read.jdbc(
    pg_jdbc_url, "product_categories", properties=pg_properties
).select(col("id").alias("product_category_id"), col("name").alias("product_category"))
df_countries = spark.read.jdbc(
    pg_jdbc_url, "countries", properties=pg_properties
).select(col("id").alias("country_id"), col("name").alias("country_name"))
df_cities = spark.read.jdbc(pg_jdbc_url, "cities", properties=pg_properties).select(
    col("id").alias("city_id"), col("name").alias("city_name")
)

df_product_sales = (
    df_sales.join(df_products, df_sales.sale_product_id == df_products.id, "inner")
    .join(
        df_product_names,
        df_products.product_name_id == df_product_names.product_name_id,
        "left",
    )
    .join(
        df_product_categories,
        df_products.product_category_id == df_product_categories.product_category_id,
        "left",
    )
)

product_summary = df_product_sales.groupBy("product_name", "product_category").agg(
    sum("sale_total_price").alias("total_revenue"),
    sum("sale_quantity").alias("total_sales_quantity"),
    avg("rating").alias("avg_product_rating"),
    sum("reviews").alias("total_product_reviews"),
)

window_spec_rank_product = Window.orderBy(col("total_revenue").desc())
top_10_products = (
    product_summary.withColumn("rank", rank().over(window_spec_rank_product))
    .filter(col("rank") <= 10)
    .drop("rank")
)

revenue_by_category = product_summary.groupBy("product_category").agg(
    sum("total_revenue").alias("category_total_revenue")
)

sales_by_product_mart = product_summary.select(
    col("product_name"),
    col("product_category"),
    col("total_revenue"),
    col("total_sales_quantity"),
    col("avg_product_rating"),
    col("total_product_reviews"),
)

sales_by_product_mart.write.jdbc(
    url=ch_jdbc_url,
    table="sales_by_product_mart",
    mode="append",
    properties=ch_properties,
)

sales_by_product_mart.toPandas().head(10)

Unnamed: 0,product_name,product_category,total_revenue,total_sales_quantity,avg_product_rating,total_product_reviews
0,Dog Food,Toy,8388194.0,180267,2.99715,16669825
1,Dog Food,Food,7857492.0,171988,3.020548,15409577
2,Dog Food,Cage,7657714.0,162261,3.058483,14762557
3,Bird Cage,Toy,8339287.0,173390,2.995688,15791066
4,Bird Cage,Cage,7950889.0,177294,3.003047,16585782
5,Bird Cage,Food,7715254.0,161557,3.0241,14870129
6,Cat Toy,Cage,7957720.0,173243,2.990916,15919350
7,Cat Toy,Food,7729019.0,170441,2.952139,16021320
8,Cat Toy,Toy,7853075.0,170663,3.080961,15648815


### Витрина продаж по клиентам Цель: Анализ покупательского поведения и сегментация клиентов.
* Топ-10 клиентов с наибольшей общей суммой покупок.* Распределение клиентов по странам.* 
Средний чек для каждого клиента.

In [13]:
df_customer_sales = (
    df_sales.alias("s")
    .join(df_customers.alias("c"), col("s.sale_customer_id") == col("c.id"), "inner")
    .join(df_countries.alias("co"), col("c.country_id") == col("co.country_id"), "left")
    .select(
        col("s.sale_total_price"),
        col("s.id").alias("sale_id"),
        col("c.first_name"),
        col("c.last_name"),
        col("c.email"),
        col("co.country_name"),
    )
)

customer_summary = df_customer_sales.groupBy(
    "first_name", "last_name", "email", "country_name"
).agg(
    sum("sale_total_price").alias("total_spent"),
    count("sale_id").alias("total_orders"),
    avg("sale_total_price").alias("average_order_value"),
)

sales_by_customer_mart = customer_summary.select(
    col("first_name").alias("customer_first_name"),
    col("last_name").alias("customer_last_name"),
    col("email").alias("customer_email"),
    col("country_name").alias("customer_country"),
    col("total_spent"),
    col("total_orders"),
    col("average_order_value"),
)

sales_by_customer_mart.write.jdbc(
    url=ch_jdbc_url,
    table="sales_by_customer_mart",
    mode="append",
    properties=ch_properties,
)

sales_by_customer_mart.toPandas().head(10)

Unnamed: 0,customer_first_name,customer_last_name,customer_email,customer_country,total_spent,total_orders,average_order_value
0,Leon,Mundall,mrobertson1f@drupal.org,Greece,16202.549744,35,462.929993
1,Evita,Clemmens,babelwhitee7@stumbleupon.com,Indonesia,883.440033,24,36.810001
2,Colby,Yitzhakof,eblasio6k@patch.com,Mexico,4877.879974,21,232.279999
3,Cole,Tonepohl,ctrundellj5@wired.com,Jamaica,8891.759766,24,370.48999
4,Linnie,Mackinder,gslixby9j@discuz.net,Colombia,4909.749985,25,196.389999
5,Lothaire,Dudin,cbrumbyem5@mac.com,China,5940.070053,29,204.830002
6,Deedee,Casazza,phelwigrg@google.co.uk,Zambia,8387.549835,27,310.649994
7,Franklyn,Gerold,fetuckmt@amazonaws.com,China,11976.719727,28,427.73999
8,Paolo,Debell,aivanichev3p@usa.gov,Indonesia,5048.190033,27,186.970001
9,Dilly,Corr,eoutlaw6o@deviantart.com,Palestinian Territory,6399.960205,28,228.570007


### Витрина продаж по времени Цель: Анализ сезонности и трендов продаж.
* Топ-10 самых продаваемых продуктов.
* Общая выручка по категориям продуктов.
* Средний рейтинг и количество отзывов для каждого продукта.

In [14]:
df_sales_time = df_sales.withColumn(
    "sale_date_parsed", to_date(col("sale_date"), "M/d/yyyy")
)

sales_by_time_mart = (
    df_sales_time.filter(col("sale_date_parsed").isNotNull())
    .groupBy(
        year("sale_date_parsed").alias("sale_year"),
        month("sale_date_parsed").alias("sale_month"),
    )
    .agg(
        sum("sale_total_price").alias("monthly_total_revenue"),
        count("id").alias("monthly_total_orders"),
        avg("sale_total_price").alias("monthly_average_order_value"),
    )
    .orderBy("sale_year", "sale_month")
)

sales_by_time_mart.write.jdbc(
    url=ch_jdbc_url,
    table="sales_by_time_mart",
    mode="append",
    properties=ch_properties,
)

sales_by_time_mart.toPandas().head()

Unnamed: 0,sale_year,sale_month,monthly_total_revenue,monthly_total_orders,monthly_average_order_value
0,2021,1,6282812.0,24427,257.20766
1,2021,2,5360731.0,20586,260.406656
2,2021,3,5937631.0,23997,247.432233
3,2021,4,5781788.0,23702,243.936726
4,2021,5,6075551.0,23861,254.622651


### Витрина продаж по магазинам Цель: Анализ эффективности магазинов.
* Топ-5 магазинов с наибольшей выручкой.* Распределение продаж по городам и странам.* 
Средний чек для каждого магазина.

In [15]:
df_store_sales = (
    df_sales.join(df_sellers_renamed, df_sales.sale_seller_id == df_sellers_renamed.seller_id, "inner")
    .join(df_stores_renamed, df_sellers_renamed.seller_id == df_stores_renamed.store_id, "inner")
    .join(df_countries, df_stores_renamed.country_id == df_countries.country_id, "left")
    .join(df_cities, df_stores_renamed.city_id == df_cities.city_id, "left")
)

store_summary = df_store_sales.groupBy("store_name", "city_name", "country_name").agg(
    sum("sale_total_price").alias("total_store_revenue"),
    count(df_sales.id).alias("total_store_orders"),
    avg("sale_total_price").alias("average_store_order_value"),
)

sales_by_store_mart = store_summary.select(
    col("store_name"),
    col("city_name").alias("store_city"),
    col("country_name").alias("store_country"),
    col("total_store_revenue"),
    col("total_store_orders"),
    col("average_store_order_value"),
)

sales_by_store_mart.write.jdbc(
    url=ch_jdbc_url,
    table="sales_by_store_mart",
    mode="append",
    properties=ch_properties,
)

sales_by_store_mart.toPandas().head()

Unnamed: 0,store_name,store_city,store_country,total_store_revenue,total_store_orders,average_store_order_value
0,Demimbu,Priboj,Sweden,7985.410187,17,469.730011
1,Tagopia,Mikuni,South Africa,13674.459503,37,369.579987
2,Chatterpoint,Ribeira das Taínhas,Afghanistan,697.479996,28,24.91
3,Tavu,Bangeran,Haiti,6893.100128,30,229.770004
4,Feedspan,Puerto de Nutrias,Brazil,7186.520081,22,326.660004


### Витрина продаж по поставщикам Цель: Анализ эффективности поставщиков.
* Топ-5 поставщиков с наибольшей выручкой.* Средняя цена товаров от каждого поставщика.
* 
Распределение продаж по странам поставщиков.

In [16]:
df_supplier_sales = (
    df_sales.join(df_products, df_sales.sale_product_id == df_products.id, "inner")
    .join(
        df_product_names,
        df_products.product_name_id == df_product_names.product_name_id,
        "left",
    )
    .join(df_suppliers, df_products.supplier_id == df_suppliers.id, "left")
    .join(df_countries, df_suppliers.country_id == df_countries.country_id, "left")
)

supplier_summary = df_supplier_sales.groupBy(df_suppliers.name.alias("supplier_name"), col("country_name")).agg(
    sum(col("sale_total_price")).alias("total_revenue_from_supplier"),
    avg(df_products.price).alias("average_product_price_from_supplier"),
    count(df_sales.id).alias("total_sales_count"),
)

sales_by_supplier_mart = supplier_summary.select(
    col("supplier_name"),
    col("country_name").alias("supplier_country"),
    col("total_revenue_from_supplier"),
    col("average_product_price_from_supplier"),
    col("total_sales_count"),
)

sales_by_supplier_mart.write.jdbc(
    url=ch_jdbc_url,
    table="sales_by_supplier_mart",
    mode="append",
    properties=ch_properties,
)

sales_by_supplier_mart.toPandas().head()

Unnamed: 0,supplier_name,supplier_country,total_revenue_from_supplier,average_product_price_from_supplier,total_sales_count
0,Teklist,Russia,12849.059921,47.96913,46
1,Kwideo,France,6586.080021,44.066296,27
2,Muxo,Thailand,20949.869923,47.954074,81
3,Eazzy,China,38897.899971,53.55,160
4,Lajo,Germany,8582.12999,52.257272,33


### Витрина качества продукции Цель: Анализ отзывов и рейтингов товаров.
* Продукты с наивысшим и наименьшим рейтингом.* Корреляция между рейтингом и объемом продаж.
* 
Продукты с наибольшим количеством отзывов.

In [17]:
df_product_quality = df_products.join(
    df_product_names,
    df_products.product_name_id == df_product_names.product_name_id,
    "left",
)

df_product_quality_with_sales = (
    df_product_quality.join(
        df_sales, df_product_quality.id == df_sales.sale_product_id, "left_outer"
    )
    .groupBy("product_name", "rating", "reviews")
    .agg(
        sum("sale_quantity").alias("total_quantity_sold"),
        sum("sale_total_price").alias("total_revenue_from_product"),
    )
)

product_quality_mart = df_product_quality_with_sales.select(
    col("product_name"),
    col("rating").alias("product_rating"),
    col("reviews").alias("total_reviews_count"),
    col("total_quantity_sold"),
    col("total_revenue_from_product"),
)

product_quality_mart.write.jdbc(
    url=ch_jdbc_url,
    table="product_quality_mart",
    mode="append",
    properties=ch_properties,
)
product_quality_mart.toPandas().head()

Unnamed: 0,product_name,product_rating,total_reviews_count,total_quantity_sold,total_revenue_from_product
0,Bird Cage,4.8,610,52,12305.019714
1,Bird Cage,1.8,499,33,11075.459839
2,Bird Cage,2.0,370,147,10109.820282
3,Bird Cage,3.5,882,216,875.879963
4,Bird Cage,2.8,974,232,7355.270142


In [18]:
spark.stop()