# Анализ больших данных - лабораторная работа №2 - ETL реализованный с помощью Spark

## Работа с Spark для преобразования данных в снежинку

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col

In [2]:
spark = SparkSession.builder \
    .appName("BigDataLab2") \
    .config("spark.jars", 
            "/opt/spark/jars/postgresql-42.7.6.jar,"
            "/opt/spark/jars/clickhouse-jdbc-0.8.5-all.jar") \
    .getOrCreate()

In [3]:
pg_url = "jdbc:postgresql://postgres:5432/database"
pg_properties = {
    "user": "postgres",
    "password": "password",
    "driver": "org.postgresql.Driver"
}

Функции для чтения из БД и записи в БД таблиц

In [4]:
def read_table(table_name):
    return spark.read.jdbc(url=pg_url, table=table_name, properties=pg_properties)

In [5]:
def write_table(df, table_name):
    df.write \
        .format("jdbc") \
        .option("url", pg_url) \
        .option("dbtable", table_name) \
        .option("user", pg_properties["user"]) \
        .option("password", pg_properties["password"]) \
        .option("driver", "org.postgresql.Driver") \
        .mode("append") \
        .save()

In [6]:
mock_data = read_table("mock_data")

In [7]:
mock_data.show(5)
mock_data.count()

+-------------------+------------------+------------+--------------------+----------------+--------------------+-----------------+-----------------+------------------+-----------------+----------------+--------------------+--------------+------------------+------------+----------------+--------------------+----------------+----------+----------------+--------------+---------------+-------------+--------------------+----------+--------------+----------+-----------+-------------+------------+--------------------+------------+--------------------+-------------+------------+-------------+----------------+--------------------+--------------------+---------------+--------------------+-------------------+-------------+----------------+--------------------+--------------+----------------+------------------+----------------+---+
|customer_first_name|customer_last_name|customer_age|      customer_email|customer_country|customer_postal_code|customer_pet_type|customer_pet_name|customer_pet_breed|s

10000

In [8]:
ddl_queries = [
    "CREATE TABLE dim_country (country_id SERIAL PRIMARY KEY, country_name VARCHAR(100));",
    "CREATE TABLE dim_city (city_id SERIAL PRIMARY KEY, city_name VARCHAR(100));",
    "CREATE TABLE dim_pet_type (pet_type_id SERIAL PRIMARY KEY, pet_type_name VARCHAR(100));",
    "CREATE TABLE dim_pet_breed (pet_breed_id SERIAL PRIMARY KEY, pet_breed_name VARCHAR(100));",
    "CREATE TABLE dim_pet_category (pet_category_id SERIAL PRIMARY KEY, pet_category_name VARCHAR(100));",
    "CREATE TABLE dim_pet (pet_id SERIAL PRIMARY KEY, name VARCHAR(50), pet_type_id INT REFERENCES dim_pet_type(pet_type_id), pet_breed_id INT REFERENCES dim_pet_breed(pet_breed_id), pet_category_id INT REFERENCES dim_pet_category(pet_category_id));",
    "CREATE TABLE dim_customer (customer_id SERIAL PRIMARY KEY, first_name VARCHAR(100), last_name VARCHAR(100), age INT, email VARCHAR(255) UNIQUE, country_id INT REFERENCES dim_country(country_id), postal_code VARCHAR(20), pet_id INT REFERENCES dim_pet(pet_id));",
    "CREATE TABLE dim_seller (seller_id SERIAL PRIMARY KEY, first_name VARCHAR(100), last_name VARCHAR(100), email VARCHAR(255) UNIQUE, country_id INT REFERENCES dim_country(country_id), postal_code VARCHAR(20));",
    "CREATE TABLE dim_supplier (supplier_id SERIAL PRIMARY KEY, name VARCHAR(200), contact VARCHAR(200), email VARCHAR(255) UNIQUE, phone VARCHAR(50), address TEXT, city_id INT REFERENCES dim_city(city_id), country_id INT REFERENCES dim_country(country_id));",
    "CREATE TABLE dim_store (store_id SERIAL PRIMARY KEY, name VARCHAR(200), location VARCHAR(200), city_id INT REFERENCES dim_city(city_id), state VARCHAR(100), country_id INT REFERENCES dim_country(country_id), phone VARCHAR(50), email VARCHAR(255));",
    "CREATE TABLE dim_product_name (product_name_id SERIAL PRIMARY KEY, product_name VARCHAR(50) UNIQUE);",
    "CREATE TABLE dim_product_category (category_id SERIAL PRIMARY KEY, category_name VARCHAR(100) UNIQUE);",
    "CREATE TABLE dim_brand (brand_id SERIAL PRIMARY KEY, brand_name VARCHAR(100) UNIQUE);",
    "CREATE TABLE dim_material (material_id SERIAL PRIMARY KEY, material_name VARCHAR(100) UNIQUE);",
    "CREATE TABLE dim_color (color_id SERIAL PRIMARY KEY, color_name VARCHAR(50) UNIQUE);",
    "CREATE TABLE dim_size (size_id SERIAL PRIMARY KEY, size_name VARCHAR(50) UNIQUE);",
    "CREATE TABLE dim_product (product_id SERIAL PRIMARY KEY, product_name_id INT REFERENCES dim_product_name(product_name_id), category_id INT REFERENCES dim_product_category(category_id), price NUMERIC(12,2), quantity INT, weight NUMERIC(12,2), color_id INT REFERENCES dim_color(color_id), size_id INT REFERENCES dim_size(size_id), brand_id INT REFERENCES dim_brand(brand_id), material_id INT REFERENCES dim_material(material_id), description TEXT, rating NUMERIC(3,2), reviews INT, release_date DATE, expiry_date DATE, supplier_id INT REFERENCES dim_supplier(supplier_id));",
    "CREATE TABLE fact_sales (sale_id SERIAL PRIMARY KEY, sale_date DATE, customer_id INT REFERENCES dim_customer(customer_id), seller_id INT REFERENCES dim_seller(seller_id), store_id INT REFERENCES dim_store(store_id), product_id INT REFERENCES dim_product(product_id), sale_quantity INT, sale_total_price NUMERIC(14,2));"
]

for query in ddl_queries:
    spark.sparkContext._jvm.java.sql.DriverManager.getConnection(
        pg_url, pg_properties["user"], pg_properties["password"]
    ).createStatement().execute(query)

Таблица dim_country

In [9]:
dim_country = mock_data.select(col("customer_country").alias("country_name")) \
    .union(
        mock_data.select(col("seller_country").alias("country_name"))
    ) \
    .union(
        mock_data.select(col("store_country").alias("country_name"))
    ) \
    .union(
        mock_data.select(col("supplier_country").alias("country_name"))
    ) \
    .distinct()



In [10]:
dim_country.show()
dim_country.count()

+--------------------+
|        country_name|
+--------------------+
|                Chad|
|              Russia|
|            Paraguay|
|               Yemen|
| U.S. Virgin Islands|
|             Senegal|
|              Sweden|
|Svalbard and Jan ...|
|              Guyana|
|         Philippines|
|             Eritrea|
|            Djibouti|
|            Malaysia|
|              Turkey|
|              Malawi|
|                Iraq|
|             Germany|
|Northern Mariana ...|
|             Comoros|
|         Afghanistan|
+--------------------+
only showing top 20 rows



230

In [11]:
write_table(dim_country, "dim_country")

Таблица dim_city

In [12]:
dim_city = mock_data.select(col("store_city").alias("city_name")) \
    .union(
        mock_data.select(col("supplier_city").alias("city_name"))
    ) \
    .distinct()

In [13]:
dim_city.show()
dim_city.count()

+-----------+
|  city_name|
+-----------+
|     Takefu|
|Trollhättan|
|   Żyrardów|
|   Jaboatão|
|    Nanshan|
|Jiujianfang|
|      Tocok|
|  Sułkowice|
|   Borūjerd|
|      Pakel|
|      Tyler|
|      Trzin|
|    Palermo|
| Curpahuasi|
|     Raheny|
|      Apodi|
|       Īlām|
|   Tokarnia|
|    Odawara|
|      Bicaj|
+-----------+
only showing top 20 rows



14181

In [14]:
write_table(dim_city, "dim_city")

Таблица dim_pet_type

In [15]:
dim_pet_type = mock_data \
    .select(
        col("customer_pet_type").alias("pet_type_name")
    ) \
    .distinct()

In [16]:
dim_pet_type.show()
dim_pet_type.count()

+-------------+
|pet_type_name|
+-------------+
|          dog|
|          cat|
|         bird|
+-------------+



3

In [17]:
write_table(dim_pet_type, "dim_pet_type")

Таблица dim_pet_breed

In [18]:
dim_pet_breed = mock_data \
    .select(
        col("customer_pet_breed").alias("pet_breed_name")
    ) \
    .distinct()

In [19]:
dim_pet_breed.show()
dim_pet_breed.count()

+------------------+
|    pet_breed_name|
+------------------+
|Labrador Retriever|
|          Parakeet|
|           Siamese|
+------------------+



3

In [20]:
write_table(dim_pet_breed, "dim_pet_breed")

Таблица dim_pet_category

In [21]:
dim_pet_category = mock_data.alias("md") \
    .select(
        col("md.pet_category").alias("pet_category_name")
    ) \
    .distinct()

In [22]:
dim_pet_category.show()
dim_pet_category.count()

+-----------------+
|pet_category_name|
+-----------------+
|         Reptiles|
|             Fish|
|            Birds|
|             Dogs|
|             Cats|
+-----------------+



5

In [23]:
write_table(dim_pet_category, "dim_pet_category")

Таблица dim_pet

In [24]:
dim_pet_type_df = read_table("dim_pet_type")
dim_pet_breed_df = read_table("dim_pet_breed")
dim_pet_category_df = read_table("dim_pet_category")

dim_pet = mock_data.alias("md") \
    .join(dim_pet_type_df.alias("pt"), col("md.customer_pet_type") == col("pt.pet_type_name")) \
    .join(dim_pet_breed_df.alias("pb"), col("md.customer_pet_breed") == col("pb.pet_breed_name")) \
    .join(dim_pet_category_df.alias("pc"), col("md.pet_category") == col("pc.pet_category_name")) \
    .select(
        col("md.customer_pet_name").alias("name"),
        col("pt.pet_type_id"),
        col("pb.pet_breed_id"),
        col("pc.pet_category_id")   
    ) \
    .distinct()

In [25]:
dim_pet.show()
dim_pet.count()

+--------+-----------+------------+---------------+
|    name|pet_type_id|pet_breed_id|pet_category_id|
+--------+-----------+------------+---------------+
|    Bili|          3|           2|              3|
|   Cordi|          3|           2|              3|
|    Addy|          3|           2|              3|
|Isabelle|          1|           2|              3|
| Hedvige|          1|           2|              3|
|   Gayle|          3|           3|              3|
|   Worth|          1|           3|              3|
|   Maddy|          2|           1|              5|
| Inesita|          2|           1|              5|
|    Perl|          3|           2|              5|
|   Moria|          3|           2|              5|
|   Shawn|          3|           2|              5|
|  Lamont|          3|           2|              5|
|     Ira|          1|           2|              5|
|Catriona|          1|           2|              5|
| Ethelyn|          2|           3|              5|
| Florian|  

9850

In [26]:
write_table(dim_pet, "dim_pet")

Таблица dim_customer

In [27]:
dim_country_df = read_table("dim_country")
dim_pet_df = read_table("dim_pet")
dim_pet_type_df = read_table("dim_pet_type")
dim_pet_breed_df = read_table("dim_pet_breed")
dim_pet_category_df = read_table("dim_pet_category")

dim_customer = mock_data.alias("md") \
    .join(dim_country_df.alias("c"), col("md.customer_country") == col("c.country_name")) \
    .join(dim_pet_category_df.alias("dpc"), col("md.pet_category") == col("dpc.pet_category_name")) \
    .join(dim_pet_breed_df.alias("dpb"), col("md.customer_pet_breed") == col("dpb.pet_breed_name")) \
    .join(dim_pet_type_df.alias("dpt"), col("md.customer_pet_type") == col("dpt.pet_type_name")) \
    .join(
        dim_pet_df.alias("pt"),
        (col("md.customer_pet_name") == col("pt.name")) &
        (col("pt.pet_category_id") == col("dpc.pet_category_id")) &
        (col("pt.pet_breed_id") == col("dpb.pet_breed_id")) &
        (col("pt.pet_type_id") == col("dpt.pet_type_id"))
    ) \
    .select(
        col("md.customer_first_name").alias("first_name"),
        col("md.customer_last_name").alias("last_name"),
        col("md.customer_age").alias("age"),
        col("md.customer_email").alias("email"),
        col("c.country_id"),
        col("md.customer_postal_code").alias("postal_code"),
        col("pt.pet_id")
    ) \
    .distinct()


In [28]:
dim_customer.show()
dim_customer.count()

+----------+------------+---+--------------------+----------+-------------+------+
|first_name|   last_name|age|               email|country_id|  postal_code|pet_id|
+----------+------------+---+--------------------+----------+-------------+------+
|   Karlene|   Suermeier| 40|dlafaye6t@moonfru...|        28|37032 CEDEX 1|  1492|
| Hyacintha|     Marmyon| 21|ebewly1b@e-recht2...|        97|        34110|  5526|
|   Ulberto|    Seiffert| 27|syeldingh@zimbio.com|         7|       334 80|  2917|
|     Daron|      Dubois| 38|pmoggle3y@weebly.com|       160|    83750-000|  2928|
|     Hedda|     Enrrico| 45|rdodswell7s@googl...|        10|         1219|  3037|
|   Anderea|   Izakovitz| 59|     dcappsmv@si.edu|       171|       L-6562|  3479|
|   Charles|     Selburn| 25|ogarlettix@jiathi...|         2|       666679|  6670|
|     Molli|    McGeorge| 24|mmaccreacb@alibab...|       176|       87-410|  8550|
|   Marlena|     Huthart| 34|btolletb@blogline...|       169|     839-1301|   359|
|   

10000

In [29]:
write_table(dim_customer, "dim_customer")

Таблица dim_seller

In [30]:
dim_city_df = read_table("dim_city")

dim_seller = mock_data.alias("md") \
    .join(dim_country_df.alias("c"), col("md.seller_country") == col("c.country_name")) \
    .select(
        col("md.seller_first_name").alias("first_name"),
        col("md.seller_last_name").alias("last_name"),
        col("md.seller_email").alias("email"),
        col("c.country_id"),
        col("md.seller_postal_code").alias("postal_code"),
    ) \
    .distinct()

In [31]:
dim_seller.show()
dim_seller.count()

+----------+------------+--------------------+----------+-------------+
|first_name|   last_name|               email|country_id|  postal_code|
+----------+------------+--------------------+----------+-------------+
|   Mitchel|  Chadderton|mchaddertoncq@tin...|         2|       141309|
|     Sukey|       Pashe|    spasher8@163.com|         2|       632147|
| Inglebert|       Hearn|    ihearn5b@sun.com|         2|       427439|
|    Joshua|      Castan|jcastanei@photobu...|        10|         2614|
|      Nero|       Slyde|   nslydeoz@ucsd.edu|        10|         6116|
|    Daveen|  MacIlraith|dmacilraith27@123...|        10|         8407|
|     Berke|    Prestner| bprestnerp2@ucoz.ru|        10|         3813|
|   Filippa|     Gatchel|fgatchelpl@e-rech...|        10|         1470|
|   Feodora|        Meek|fmeekk9@engadget.com|        13|        01606|
|      Nari|     Fitchew|nfitchewfz@refere...|        28|79049 CEDEX 9|
|     Brody|Margaritelli|bmargaritelliof@r...|        28|61891 C

10000

In [32]:
write_table(dim_seller, "dim_seller")

Таблица dim_supplier

In [33]:
dim_supplier = mock_data.alias("md") \
    .join(dim_country_df.alias("c"), col("md.supplier_country") == col("c.country_name")) \
    .join(dim_city_df.alias("ct"), col("md.supplier_city") == col("ct.city_name")) \
    .select(
        col("md.supplier_name").alias("name"),
        col("md.supplier_contact").alias("contact"),
        col("md.supplier_email").alias("email"),
        col("md.supplier_phone").alias("phone"),
        col("md.supplier_address").alias("address"),
        col("ct.city_id"),
        col("c.country_id")
    ) \
    .distinct()

In [34]:
dim_supplier.show()
dim_supplier.count()

+----------+-------------------+--------------------+------------+------------+-------+----------+
|      name|            contact|               email|       phone|     address|city_id|country_id|
+----------+-------------------+--------------------+------------+------------+-------+----------+
|     Yamia|       Levy Duferie|lduferie7a@typepa...|690-727-9072|   Room 1745|   9584|         2|
| Browsebug|   Humphrey Kelinge|hkelinge2b@scienc...|587-999-7122|    Apt 1654|   4457|         7|
|   Zoombox|    Jordanna Klambt|jklambtb8@feedbur...|987-203-7278|   Room 1901|  12473|         7|
|   Dabtype|     Erma Charrette|  echarretteay@is.gd|590-264-6444|   Room 1682|  13176|        10|
|  Realcube|      Cybill Busain|cbusainex@paypal.com|742-902-5584|    Apt 1496|  14168|        10|
| Rhynoodle|     Giovanna Buzza|   gbuzzahc@hibu.com|743-170-5278|   9th Floor|  12665|        17|
|      Jayo|       Rossy Leedes|rleedesaj@unicef.org|226-483-8338|    Suite 43|  11983|        20|
|    Voont

10000

In [35]:
write_table(dim_supplier, "dim_supplier")

Табилца dim_store

In [36]:
dim_store = mock_data.alias("md") \
    .join(dim_country_df.alias("c"), col("md.store_country") == col("c.country_name")) \
    .join(dim_city_df.alias("ct"), col("md.store_city") == col("ct.city_name")) \
    .select(
        col("md.store_name").alias("name"),
        col("md.store_location").alias("location"),
        col("ct.city_id"),
        col("md.store_state").alias("state"),
        col("c.country_id"),
        col("md.store_phone").alias("phone"),
        col("md.store_email").alias("email")
    ) \
    .distinct()

In [37]:
dim_store.show()
dim_store.count()

+----------+------------+-------+-----+----------+------------+--------------------+
|      name|    location|city_id|state|country_id|       phone|               email|
+----------+------------+-------+-----+----------+------------+--------------------+
| Linklinks|    Suite 76|    680|   CA|        29|760-109-4086|hblackbourn7g@clo...|
|  Innotype|   Room 1471|   6980|   A5|        97|767-675-9070|cfreelandh8@about...|
|   Skippad|   Room 1421|    453|  VER|       123|923-120-1631|alamprecht89@geoc...|
|    Talane|    Room 770|   6170|   BD|       169|119-915-7580|kborgesio54@csmon...|
|    Leenti|   2nd Floor|   4382|   11|       169|369-916-3645|   cchandlaral@is.gd|
| Skipstorm|PO Box 35455|   6189|   11|       177|722-142-5422|    eloosra@bing.com|
| Photolist|  16th Floor|   3030|   ON|       199|719-912-7433|   mhawesky@digg.com|
|   Youfeed|     Apt 633|   7836|   NC|         2|704-280-5105|wzappelo4@merriam...|
|Brainverse|    Suite 83|   1939|   BE|        28|196-979-6650|mr

10000

In [38]:
write_table(dim_store, "dim_store")

Таблица dim_product_name

In [39]:
dim_product_name = mock_data \
    .select(
        col("product_name")
    ) \
    .distinct()

In [40]:
dim_product_name.show()
dim_product_name.count()

+------------+
|product_name|
+------------+
|   Bird Cage|
|    Dog Food|
|     Cat Toy|
+------------+



3

In [41]:
write_table(dim_product_name, "dim_product_name")

Таблица dim_product_category

In [42]:
dim_product_category = mock_data \
    .select(
        col("product_category").alias("category_name")
    ) \
    .distinct()

In [43]:
dim_product_category.show()
dim_product_category.count()

+-------------+
|category_name|
+-------------+
|         Cage|
|         Food|
|          Toy|
+-------------+



3

In [44]:
write_table(dim_product_category, "dim_product_category")

Табилца dim_brand

In [45]:
dim_brand = mock_data \
    .select(
        col("product_brand").alias("brand_name")
    ) \
    .distinct()

In [46]:
dim_brand.show()
dim_brand.count()

+------------+
|  brand_name|
+------------+
|     Jetwire|
|    Jaxworks|
|   Reallinks|
| Brainlounge|
|    Snaptags|
|    Feedfish|
|       Kamba|
|    Skipfire|
|      Quimba|
|       Quaxo|
|    Realfire|
|      Oyondu|
|      BlogXS|
|Thoughtworks|
|  Browsezoom|
|     Voonder|
|   Photofeed|
|   Babbleset|
|       Yabox|
|       Einti|
+------------+
only showing top 20 rows



383

In [47]:
write_table(dim_brand, "dim_brand")

Таблица dim_material

In [48]:
dim_material = mock_data \
    .select(
        col("product_material").alias("material_name")
    ) \
    .distinct()

In [49]:
dim_material.show()
dim_material.count()

+-------------+
|material_name|
+-------------+
|        Steel|
|        Vinyl|
|      Granite|
|        Glass|
|      Plastic|
|     Aluminum|
|         Wood|
|       Rubber|
|        Stone|
|   Plexiglass|
|        Brass|
+-------------+



11

In [50]:
write_table(dim_material, "dim_material")

Таблица dim_material

In [51]:
dim_color = mock_data \
    .select(
        col("product_color").alias("color_name")
    ) \
    .distinct()

In [52]:
dim_color.show()
dim_color.count()

+----------+
|color_name|
+----------+
|      Teal|
|     Khaki|
|   Crimson|
|    Orange|
|    Indigo|
|      Puce|
|    Fuscia|
| Turquoise|
|     Green|
|    Purple|
|Aquamarine|
|      Blue|
|    Violet|
|    Yellow|
|       Red|
|      Pink|
| Goldenrod|
|      Mauv|
|    Maroon|
+----------+



19

In [53]:
write_table(dim_color, "dim_color")

Таблица dim_size

In [54]:
dim_size = mock_data \
    .select(
        col("product_size").alias("size_name")
    ) \
    .distinct()

In [55]:
dim_size.show()
dim_size.count()

+---------+
|size_name|
+---------+
|   Medium|
|    Small|
|    Large|
+---------+



3

In [56]:
write_table(dim_size, "dim_size")

Таблица dim_product

In [57]:
dim_product_name_df = read_table("dim_product_name")
dim_product_category_df = read_table("dim_product_category")
dim_brand_df = read_table("dim_brand")
dim_material_df = read_table("dim_material")
dim_color_df = read_table("dim_color")
dim_size_df = read_table("dim_size")
dim_supplier_df = read_table("dim_supplier")

In [58]:
dim_product = mock_data.alias("md") \
    .join(dim_product_name_df.alias("dpn"), col("md.product_name") == col("dpn.product_name")) \
    .join(dim_product_category_df.alias("dpc"), col("md.product_category") == col("dpc.category_name")) \
    .join(dim_brand_df.alias("db"), col("md.product_brand") == col("db.brand_name")) \
    .join(dim_material_df.alias("dm"), col("md.product_material") == col("dm.material_name")) \
    .join(dim_color_df.alias("dc"), col("md.product_color") == col("dc.color_name")) \
    .join(dim_size_df.alias("ds"), col("md.product_size") == col("ds.size_name")) \
    .join(dim_supplier_df.alias("ppp"), col("md.supplier_email") == col("ppp.email")) \
    .select(
        col("dpn.product_name_id"),
        col("dpc.category_id"),
        col("md.product_price").alias("price"),
        col("md.product_quantity").alias("quantity"),
        col("md.product_weight").alias("weight"),
        col("dc.color_id"),
        col("ds.size_id"),
        col("db.brand_id"),
        col("dm.material_id"),
        col("md.product_description").alias("description"),
        col("md.product_rating").alias("rating"),
        col("md.product_reviews").alias("reviews"),
        col("md.product_release_date").alias("release_date"),
        col("md.product_expiry_date").alias("expiry_date"),
        col("ppp.supplier_id")
    ) \
    .distinct()

In [59]:
dim_product.show()
dim_product.count()

+---------------+-----------+--------------------+--------+--------------------+--------+-------+--------+-----------+--------------------+--------------------+-------+------------+-----------+-----------+
|product_name_id|category_id|               price|quantity|              weight|color_id|size_id|brand_id|material_id|         description|              rating|reviews|release_date|expiry_date|supplier_id|
+---------------+-----------+--------------------+--------+--------------------+--------+-------+--------+-----------+--------------------+--------------------+-------+------------+-----------+-----------+
|              3|          3|15.68000000000000...|      47|4.200000000000000000|      11|      3|     240|          5|Duis bibendum, fe...|3.300000000000000000|    991|  2020-07-06| 2023-02-13|       4133|
|              3|          1|25.33000000000000...|      46|45.90000000000000...|      12|      3|     277|          2|Sed sagittis. Nam...|4.700000000000000000|    192|  2021-0

10000

In [60]:
write_table(dim_product, "dim_product")

Таблица fact_sales

In [61]:
dim_customer_df = read_table("dim_customer")
dim_seller_df = read_table("dim_seller")
dim_store_df = read_table("dim_store")
dim_product_df = read_table("dim_product")

In [62]:
fact_sales = mock_data.alias("md") \
    .join(dim_customer_df.alias("c"), col("md.customer_email") == col("c.email")) \
    .join(dim_seller_df.alias("s"), col("md.seller_email") == col("s.email")) \
    .join(dim_store_df.alias("st"), 
          (col("md.store_name") == col("st.name")) &
          (col("md.store_location") == col("st.location")) &
          (col("md.store_phone") == col("st.phone"))
    ) \
    .join(dim_product_name_df.alias("dpn"), col("md.product_name") == col("dpn.product_name")) \
    .join(dim_product_df.alias("pr"),
          (col("dpn.product_name_id") == col("pr.product_name_id")) &
          (col("md.product_price") == col("pr.price")) &
          (col("md.product_quantity") == col("pr.quantity")) &
          (col("md.product_weight") == col("pr.weight"))
    ) \
    .select(
        col("md.sale_date"),
        col("md.sale_quantity"),
        col("md.sale_total_price"),
        col("c.customer_id"),
        col("s.seller_id"),
        col("st.store_id"),
        col("pr.product_id")
    )

In [63]:
fact_sales.show()
fact_sales.count()

+----------+-------------+--------------------+-----------+---------+--------+----------+
| sale_date|sale_quantity|    sale_total_price|customer_id|seller_id|store_id|product_id|
+----------+-------------+--------------------+-----------+---------+--------+----------+
|2021-01-14|            7|19.18000000000000...|       3101|     2707|    3006|      8413|
|2021-08-11|            4|134.3400000000000...|       3563|      435|     155|      7032|
|2021-01-22|            5|322.1600000000000...|       9084|     1279|    7210|      1808|
|2021-07-06|            1|271.1000000000000...|       1161|     1651|     811|      6702|
|2021-12-03|            6|89.39000000000000...|       4742|     6091|    9672|      5174|
|2021-06-28|            6|380.5500000000000...|       9484|     5410|    3761|      7564|
|2021-03-31|            6|80.96000000000000...|       9057|     5432|    6208|      6659|
|2021-02-25|            6|223.0900000000000...|       4848|     6878|    5077|      8975|
|2021-06-1

10000

In [64]:
write_table(fact_sales, "fact_sales")

## Работа с ClickHouse 

In [65]:
ch_url = "jdbc:clickhouse://clickhouse:8123/reports"
ch_properties = {
    "user": "clickhouse",
    "password": "password",
    "driver": "com.clickhouse.jdbc.ClickHouseDriver"
}

In [66]:
from pyspark.sql.window import Window

In [67]:
tables = [
    "dim_country", "dim_city", "dim_pet_type", "dim_pet_breed", "dim_pet_category",
    "dim_pet", "dim_customer", "dim_seller", "dim_supplier", "dim_store",
    "dim_product_name", "dim_product_category", "dim_brand", "dim_material",
    "dim_color", "dim_size", "dim_product", "fact_sales"
]

dataframes = {}
for table in tables:
    dataframes[table] = read_table(table)

fact_sales = dataframes["fact_sales"]
dim_country = dataframes["dim_country"]
dim_city = dataframes["dim_city"]
dim_customer = dataframes["dim_customer"]
dim_product = dataframes["dim_product"]
dim_product_name = dataframes["dim_product_name"]
dim_product_category = dataframes["dim_product_category"]
dim_store = dataframes["dim_store"]
dim_supplier = dataframes["dim_supplier"]

### 1. Витрин продаж по продуктам

Основная витрина

In [68]:
product_sales = fact_sales.groupBy("product_id").agg(
    F.sum("sale_quantity").alias("total_sales"),
    F.sum("sale_total_price").alias("total_revenue")
)

product_mart = (
    product_sales
    .join(dim_product, "product_id")
    .join(dim_product_name, "product_name_id")
    .join(dim_product_category, "category_id")
    .select(
        "product_id",
        "product_name",
        "category_name",
        "total_sales",
        "total_revenue",
        "rating",
        "reviews"
    )
)

product_mart.show()

product_mart.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "product_sales_mart") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (category_name, product_id)") \
    .mode("overwrite") \
    .save()

+----------+------------+-------------+-----------+-------------+------+-------+
|product_id|product_name|category_name|total_sales|total_revenue|rating|reviews|
+----------+------------+-------------+-----------+-------------+------+-------+
|      6397|   Bird Cage|         Cage|          2|       160.81|  1.30|     36|
|      1088|   Bird Cage|         Cage|          8|       189.19|  4.50|    284|
|      4900|   Bird Cage|         Cage|          4|       262.44|  3.70|    457|
|      1342|   Bird Cage|         Cage|          3|       289.86|  1.30|    165|
|      6620|   Bird Cage|         Cage|          2|       497.83|  3.30|    195|
|      4519|   Bird Cage|         Cage|          6|       127.03|  2.90|    536|
|      3794|   Bird Cage|         Cage|          2|       243.54|  4.50|    673|
|      1645|   Bird Cage|         Cage|         10|        20.78|  4.70|    876|
|      5803|   Bird Cage|         Cage|          8|       116.87|  1.60|    171|
|      2142|   Bird Cage|   

Топ-10 самых продаваемых продуктов

In [69]:
top_10_products = (
    product_mart
    .withColumn("sales_rank", F.row_number().over(Window.orderBy(F.desc("total_sales"))))
    .filter(F.col("sales_rank") <= 10)
    .select("sales_rank", "product_id", "product_name", "total_sales", "total_revenue")
)

top_10_products.show()

top_10_products.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "top_10_products") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY sales_rank") \
    .mode("overwrite") \
    .save()

+----------+----------+------------+-----------+-------------+
|sales_rank|product_id|product_name|total_sales|total_revenue|
+----------+----------+------------+-----------+-------------+
|         1|      9249|   Bird Cage|         10|       256.23|
|         2|      9236|   Bird Cage|         10|       468.60|
|         3|      4876|   Bird Cage|         10|       340.15|
|         4|      7092|   Bird Cage|         10|       117.82|
|         5|      4538|   Bird Cage|         10|        20.01|
|         6|      9721|   Bird Cage|         10|       415.87|
|         7|      4036|   Bird Cage|         10|       150.98|
|         8|      1645|   Bird Cage|         10|        20.78|
|         9|      9126|   Bird Cage|         10|       341.01|
|        10|      6068|   Bird Cage|         10|       209.79|
+----------+----------+------------+-----------+-------------+



Общая выручка по категориям продуктов

In [70]:
category_revenue = (
    product_mart
    .groupBy("category_name")
    .agg(
        F.sum("total_revenue").alias("category_revenue"),
        F.sum("total_sales").alias("total_products_sold")
    )
    .withColumn("revenue_share", F.col("category_revenue") / F.sum("category_revenue").over(Window.partitionBy()))
)

category_revenue.show()

category_revenue.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "product_category_revenue") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY category_revenue") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+-------------+----------------+-------------------+-------------+
|category_name|category_revenue|total_products_sold|revenue_share|
+-------------+----------------+-------------------+-------------+
|         Cage|       831117.94|              18057|     0.328524|
|         Food|       830632.55|              17931|     0.328332|
|          Toy|       868101.63|              18635|     0.343143|
+-------------+----------------+-------------------+-------------+



Средний рейтинг и количество отзывов для каждого продукта.

In [71]:
product_ratings = (
    product_mart
    .groupBy("product_name")
    .agg(
        F.avg("rating").alias("avg_rating"),
        F.sum("reviews").alias("total_reviews")
    )
    .orderBy(F.desc("avg_rating"))
)

product_ratings.show()

product_ratings.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "product_ratings") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY avg_rating") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+------------+----------+-------------+
|product_name|avg_rating|total_reviews|
+------------+----------+-------------+
|    Dog Food|  3.018299|      1653413|
|     Cat Toy|  3.006860|      1676222|
|   Bird Cage|  3.000149|      1682260|
+------------+----------+-------------+



### 2. Витрина продаж по клиентам

Основная витрина

In [72]:
customer_sales = fact_sales.groupBy("customer_id").agg(
    F.sum("sale_total_price").alias("total_spent"),
    F.count("sale_id").alias("total_orders"),
    F.avg("sale_total_price").alias("avg_order_value")
)

customer_mart = (
    customer_sales
    .join(dim_customer, "customer_id")
    .join(dim_country, dim_customer.country_id == dim_country.country_id)
    .select(
        "customer_id",
        F.concat(dim_customer["first_name"], F.lit(" "), dim_customer["last_name"]).alias("customer_name"),
        "age",
        "email",
        dim_country["country_name"].alias("country"),
        "total_spent",
        "total_orders",
        "avg_order_value"
    )
)

customer_mart.show()

customer_mart.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "customer_sales_mart") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY country") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+-----------+-------------------+---+--------------------+-----------+-----------+------------+---------------+
|customer_id|      customer_name|age|               email|    country|total_spent|total_orders|avg_order_value|
+-----------+-------------------+---+--------------------+-----------+-----------+------------+---------------+
|       6336|         Shep Dyson| 30|lcarabinedg@tinyp...|  Indonesia|     287.43|           1|     287.430000|
|       3794|     Carlyle Crosen| 30|dfellow3j@cbsnews...|     Mexico|     218.06|           1|     218.060000|
|       5803|  Pearline Northage| 49|veakleycp@cyberch...|    Morocco|     184.43|           1|     184.430000|
|       6397|        Kaile Rhyme| 22|  vbingallis@mlb.com|      China|     146.45|           1|     146.450000|
|       2866|    Mikkel Tabourel| 75|anutley5s@newsvin...|     Poland|     143.76|           1|     143.760000|
|       7554| Jermaine Loiterton| 60|cmaypeseu@cafepre...|Afghanistan|     287.31|           1|     287.

Топ-10 клиентов с наибольшей общей суммой покупок

In [73]:
top_10_customers = (
    customer_mart
    .withColumn("spent_rank", F.row_number().over(Window.orderBy(F.desc("total_spent"))))
    .filter(F.col("spent_rank") <= 10)
    .select("spent_rank", "customer_id", "customer_name", "country", "total_spent")
)

top_10_customers.show()

top_10_customers.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "top_10_customers") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY spent_rank") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+----------+-----------+-----------------+---------+-----------+
|spent_rank|customer_id|    customer_name|  country|total_spent|
+----------+-----------+-----------------+---------+-----------+
|         1|       8091|    Gus Hartshorn|  Albania|     499.85|
|         2|       3017|     Hayes McKain| Portugal|     499.80|
|         3|       8328|        Ava Lomas|    China|     499.76|
|         4|       6816|      Dawna Impey|Indonesia|     499.76|
|         5|       3010|Lavinia Horsburgh|   Poland|     499.73|
|         6|       9610|  Dame Auchinleck|Indonesia|     499.71|
|         7|       1596|  Isahella Colley|   Russia|     499.69|
|         8|       9493|  Sisely Bonevant|    China|     499.62|
|         9|        251|     Nicky Lattie|   Mexico|     499.62|
|        10|       6798|       Eran Cotes|    China|     499.59|
+----------+-----------+-----------------+---------+-----------+



Распределение клиентов по странам

In [74]:
country_distribution = (
    customer_mart
    .groupBy("country")
    .agg(
        F.count("customer_id").alias("customer_count"),
        F.sum("total_spent").alias("total_spent")
    )
    .orderBy(F.desc("customer_count"))
)

country_distribution.show()

country_distribution.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "customer_country_distribution") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY customer_count") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+--------------+--------------+-----------+
|       country|customer_count|total_spent|
+--------------+--------------+-----------+
|         China|          1738|  445815.35|
|     Indonesia|          1174|  296495.01|
|        Russia|           628|  154008.86|
|   Philippines|           555|  140433.74|
|        Brazil|           385|  100264.93|
|      Portugal|           336|   80297.91|
|        Poland|           332|   81038.37|
|        France|           322|   81675.90|
|        Sweden|           264|   65689.69|
| United States|           211|   51954.32|
|         Japan|           201|   51784.90|
|       Ukraine|           155|   39450.65|
|      Colombia|           152|   34772.97|
|Czech Republic|           140|   38333.13|
|        Canada|           137|   35511.14|
|      Thailand|           126|   31092.02|
|          Peru|           123|   28575.53|
|        Greece|           116|   30673.05|
|     Argentina|           113|   26739.31|
|       Nigeria|           103| 

Средний чек для каждого клиента

In [75]:
customer_avg_check = (
    customer_mart
    .select(
        "customer_id",
        "customer_name",
        "avg_order_value",
        "total_orders",
        "total_spent"
    )
    .withColumn("avg_check_category",
                F.when(F.col("avg_order_value") >= 500, "VIP")
                 .when(F.col("avg_order_value") >= 200, "High")
                 .when(F.col("avg_order_value") >= 100, "Medium")
                 .otherwise("Standard"))
)

customer_avg_check.show()

customer_avg_check.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "customer_avg_check") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (avg_check_category, avg_order_value)") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+-----------+-------------------+---------------+------------+-----------+------------------+
|customer_id|      customer_name|avg_order_value|total_orders|total_spent|avg_check_category|
+-----------+-------------------+---------------+------------+-----------+------------------+
|       6336|         Shep Dyson|     287.430000|           1|     287.43|              High|
|       3794|     Carlyle Crosen|     218.060000|           1|     218.06|              High|
|       5803|  Pearline Northage|     184.430000|           1|     184.43|            Medium|
|       6397|        Kaile Rhyme|     146.450000|           1|     146.45|            Medium|
|       2866|    Mikkel Tabourel|     143.760000|           1|     143.76|            Medium|
|       7554| Jermaine Loiterton|     287.310000|           1|     287.31|              High|
|       1088|Clay Van Schafflaer|      95.140000|           1|      95.14|          Standard|
|       7754|          Urson Ivy|     457.360000|           

### 3. Витрина продаж по времени

Основная витрина

In [76]:
time_sales = fact_sales.withColumn("year", F.year("sale_date")) \
    .withColumn("month", F.month("sale_date")) \
    .withColumn("quarter", F.quarter("sale_date"))

time_mart = time_sales.groupBy("year", "quarter", "month").agg(
    F.sum("sale_total_price").alias("total_revenue"),
    F.countDistinct("sale_id").alias("total_orders"),
    F.sum("sale_quantity").alias("total_items_sold")
).orderBy("year", "month")

time_mart.show()

time_mart.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "time_sales_mart") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (year, month)") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+----+-------+-----+-------------+------------+----------------+
|year|quarter|month|total_revenue|total_orders|total_items_sold|
+----+-------+-----+-------------+------------+----------------+
|2021|      1|    1|    224158.54|         874|            4856|
|2021|      1|    2|    192348.31|         739|            4070|
|2021|      1|    3|    207282.20|         843|            4561|
|2021|      2|    4|    206592.82|         837|            4564|
|2021|      2|    5|    211764.86|         828|            4451|
|2021|      2|    6|    215042.80|         822|            4438|
|2021|      3|    7|    220496.51|         858|            4750|
|2021|      3|    8|    221275.78|         897|            4818|
|2021|      3|    9|    210623.43|         839|            4507|
|2021|      4|   10|    228743.32|         892|            4976|
|2021|      4|   11|    200154.69|         801|            4297|
|2021|      4|   12|    191368.86|         770|            4335|
+----+-------+-----+-----

Месячные и годовые тренды продаж

In [77]:
monthly_trends = time_sales.groupBy("year", "month").agg(
    F.sum("sale_total_price").alias("monthly_revenue"),
    F.countDistinct("sale_id").alias("monthly_orders"),
    F.sum("sale_quantity").alias("monthly_items_sold")
).orderBy("year", "month")

monthly_trends.show()

monthly_trends.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "monthly_sales_trends") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (year, month)") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+----+-----+---------------+--------------+------------------+
|year|month|monthly_revenue|monthly_orders|monthly_items_sold|
+----+-----+---------------+--------------+------------------+
|2021|    1|      224158.54|           874|              4856|
|2021|    2|      192348.31|           739|              4070|
|2021|    3|      207282.20|           843|              4561|
|2021|    4|      206592.82|           837|              4564|
|2021|    5|      211764.86|           828|              4451|
|2021|    6|      215042.80|           822|              4438|
|2021|    7|      220496.51|           858|              4750|
|2021|    8|      221275.78|           897|              4818|
|2021|    9|      210623.43|           839|              4507|
|2021|   10|      228743.32|           892|              4976|
|2021|   11|      200154.69|           801|              4297|
|2021|   12|      191368.86|           770|              4335|
+----+-----+---------------+--------------+------------

In [78]:
yearly_trends = time_sales.groupBy("year").agg(
    F.sum("sale_total_price").alias("yearly_revenue"),
    F.countDistinct("sale_id").alias("yearly_orders"),
    F.sum("sale_quantity").alias("yearly_items_sold")
).orderBy("year")

yearly_trends.show()

yearly_trends.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "yearly_sales_trends") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY year") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+----+--------------+-------------+-----------------+
|year|yearly_revenue|yearly_orders|yearly_items_sold|
+----+--------------+-------------+-----------------+
|2021|    2529852.12|        10000|            54623|
+----+--------------+-------------+-----------------+



Сравнение выручки за разные периоды

In [79]:
monthly_base = (
    time_sales.groupBy("year", "month")
    .agg(F.sum("sale_total_price").alias("revenue"))
    .orderBy("year", "month")
)

comparison_df = monthly_base.withColumn(
        "prev_month_revenue", 
        F.lag("revenue").over(Window.orderBy("year", "month"))
    ).withColumn(
        "prev_year_revenue", 
        F.lag("revenue", 12).over(Window.orderBy("year", "month"))
    ).withColumn(
        "monthly_growth", 
        (F.col("revenue") - F.col("prev_month_revenue")) / F.col("prev_month_revenue")
    ).withColumn(
        "yearly_growth", 
        (F.col("revenue") - F.col("prev_year_revenue")) / F.col("prev_year_revenue")
    ).withColumn(
        "quarter", 
        F.quarter(F.concat_ws("-", F.col("year"), F.lpad(F.col("month"), 2, "0"), F.lit("01")))
    ).select(
        "year",
        "quarter",
        "month",
        "revenue",
        "prev_month_revenue",
        "prev_year_revenue",
        F.round(F.col("monthly_growth") * 100, 2).alias("monthly_growth_pct"),
        F.round(F.col("yearly_growth") * 100, 2).alias("yearly_growth_pct")
    )

quarterly_comparison = (
    comparison_df.groupBy("year", "quarter") 
    .agg(
        F.sum("revenue").alias("quarterly_revenue")
    )
)

yearly_comparison = (
    comparison_df.groupBy("year")
    .agg(
        F.sum("revenue").alias("yearly_revenue")
    )
)

revenue_comparison = (
    comparison_df
    .join(quarterly_comparison, ["year", "quarter"], "left")
    .join(yearly_comparison, ["year"], "left")
    .select(
        "year",
        "quarter",
        "month",
        "revenue",
        "prev_month_revenue",
        "monthly_growth_pct",
        "quarterly_revenue",
        "yearly_revenue"
    )
)

revenue_comparison.show()

revenue_comparison.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "revenue_comparison") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (year, month)") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+----+-------+-----+---------+------------------+------------------+-----------------+--------------+
|year|quarter|month|  revenue|prev_month_revenue|monthly_growth_pct|quarterly_revenue|yearly_revenue|
+----+-------+-----+---------+------------------+------------------+-----------------+--------------+
|2021|      1|    1|224158.54|              NULL|              NULL|        623789.05|    2529852.12|
|2021|      1|    2|192348.31|         224158.54|            -14.19|        623789.05|    2529852.12|
|2021|      1|    3|207282.20|         192348.31|              7.76|        623789.05|    2529852.12|
|2021|      2|    4|206592.82|         207282.20|             -0.33|        633400.48|    2529852.12|
|2021|      2|    5|211764.86|         206592.82|              2.50|        633400.48|    2529852.12|
|2021|      2|    6|215042.80|         211764.86|              1.55|        633400.48|    2529852.12|
|2021|      3|    7|220496.51|         215042.80|              2.54|        652395

Средний размер заказа по месяцам

In [80]:
monthly_avg_order_size = (
    time_sales.groupBy("year", "month")
    .agg(
        (F.sum("sale_quantity") / F.countDistinct("sale_id")).alias("avg_items_per_order")
    )
    .select("year", "month", "avg_items_per_order")
    .orderBy("year", "month")
)

monthly_avg_order_size.show()

monthly_avg_order_size.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "monthly_avg_order_size") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (year, month)") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+----+-----+-------------------+
|year|month|avg_items_per_order|
+----+-----+-------------------+
|2021|    1|  5.556064073226545|
|2021|    2|   5.50744248985115|
|2021|    3| 5.4104389086595495|
|2021|    4| 5.4528076463560335|
|2021|    5|  5.375603864734299|
|2021|    6|  5.399026763990268|
|2021|    7|  5.536130536130536|
|2021|    8|   5.37123745819398|
|2021|    9|  5.371871275327771|
|2021|   10|   5.57847533632287|
|2021|   11|  5.364544319600499|
|2021|   12|   5.62987012987013|
+----+-----+-------------------+



### 4. Витрина продаж по магазинам

Основная витрина

In [81]:
store_sales = fact_sales.groupBy("store_id").agg(
    F.sum("sale_total_price").alias("total_revenue"),
    F.count("sale_id").alias("total_sales"),
    F.avg("sale_total_price").alias("avg_order_value")
)

store_mart = (
    store_sales
    .join(dim_store, "store_id")
    .join(dim_city, "city_id")
    .join(dim_country, "country_id")
    .select(
        "store_id",
        dim_store["name"].alias("store_name"),
        dim_city["city_name"].alias("city"),
        dim_country["country_name"].alias("country"),
        "total_revenue",
        "total_sales",
        "avg_order_value"
    )
)

store_mart.show()

store_mart.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "store_sales_mart") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (country, city)") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+--------+------------+-----------+--------------+-------------+-----------+---------------+
|store_id|  store_name|       city|       country|total_revenue|total_sales|avg_order_value|
+--------+------------+-----------+--------------+-------------+-----------+---------------+
|    8592|     Wikibox|      Klina|         China|       155.65|          1|     155.650000|
|    2142|  Linkbridge|   Napnapan|      Portugal|        48.27|          1|      48.270000|
|    3918|       Yadel|Yenangyaung|        Poland|       383.27|          1|     383.270000|
|     496|        Vipe|  København|         China|       351.15|          1|     351.150000|
|     471|     Wikibox|   Vermelha|    Bangladesh|       178.40|          1|     178.400000|
|    6397|    Fivespan|     Sakura|       Finland|       208.87|          1|     208.870000|
|    5156|   Wordpedia|     Wuyang|          Iran|        51.92|          1|      51.920000|
|    5518|Thoughtworks|  Barrancas|         China|       327.86|      

Топ-5 магазинов с наибольшей выручкой

In [82]:
top_5_stores = (
    store_mart
    .withColumn("revenue_rank", F.row_number().over(Window.orderBy(F.desc("total_revenue"))))
    .filter(F.col("revenue_rank") <= 5)
    .select("revenue_rank", "store_id", "store_name", "city", "country", "total_revenue")
)

top_5_stores.show()

top_5_stores.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "top_5_stores") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY revenue_rank") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+------------+--------+-----------+---------+------------+-------------+
|revenue_rank|store_id| store_name|     city|     country|total_revenue|
+------------+--------+-----------+---------+------------+-------------+
|           1|    7051|       DabZ|   Grekan|South Africa|       499.85|
|           2|     241|Thoughtblab|    Fonte|      Poland|       499.80|
|           3|    8577|     Camido|Longzhong|      Sweden|       499.76|
|           4|    5926|   Edgeblab|    Pesek|   Indonesia|       499.76|
|           5|    3270|    Centizu|   Tylicz|      Poland|       499.73|
+------------+--------+-----------+---------+------------+-------------+



Распределение продаж по городам и странам

In [83]:
distribution = (
    store_mart
    .groupBy("country", "city")
    .agg(
        F.sum("total_revenue").alias("city_revenue"),
        F.sum("total_sales").alias("city_sales")
    )
    .withColumn(
        "revenue_share", 
        (F.col("city_revenue") / F.sum("city_revenue").over(Window.partitionBy()))
    )
    .withColumn(
        "city_rank", 
        F.dense_rank().over(Window.orderBy(F.desc("city_revenue")))
    )
    .select(
        "country",
        "city",
        "city_revenue",
        "city_sales",
        "revenue_share",
        "city_rank"
    )
    .orderBy("country", F.desc("city_revenue"))
)

distribution.show()

distribution.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "distribution") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (country, city_rank)") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+-----------+--------------------+------------+----------+-------------+---------+
|    country|                city|city_revenue|city_sales|revenue_share|city_rank|
+-----------+--------------------+------------+----------+-------------+---------+
|Afghanistan|             Talanga|      496.68|         1|     0.000196|      134|
|Afghanistan|            Borūjerd|      492.22|         1|     0.000195|      214|
|Afghanistan|              Daugai|      485.66|         1|     0.000192|      325|
|Afghanistan|           Asheville|      466.53|         1|     0.000184|      665|
|Afghanistan|              Souflí|      461.80|         1|     0.000183|      755|
|Afghanistan|      Saint-Constant|      459.56|         1|     0.000182|      792|
|Afghanistan|       Calzada Larga|      457.36|         1|     0.000181|      825|
|Afghanistan|            Longxing|      450.38|         1|     0.000178|      941|
|Afghanistan|Presidente Venceslau|      429.44|         1|     0.000170|     1341|
|Afg

Средний чек для каждого магазина

### 5. Витрина продаж по поставщикам

Основная витрина

In [84]:
supplier_products = dim_product.select("product_id", "supplier_id")

supplier_sales = (
    fact_sales
    .join(supplier_products, "product_id")
    .groupBy("supplier_id")
    .agg(
        F.sum("sale_total_price").alias("total_revenue"),
        F.sum("sale_quantity").alias("total_quantity"),
        F.count("sale_id").alias("total_sales")
    )
)

supplier_mart = (
    supplier_sales
    .join(dim_supplier, "supplier_id")
    .join(dim_country, "country_id")
    .select(
        "supplier_id",
        dim_supplier["name"].alias("supplier_name"),
        dim_country["country_name"].alias("country"),
        "total_revenue",
        "total_quantity",
        "total_sales",
        (F.col("total_revenue") / F.col("total_quantity")).alias("avg_product_price")
    )
)

supplier_mart.show()

supplier_mart.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "supplier_sales_mart") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY country") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+-----------+-------------+--------------+-------------+--------------+-----------+--------------------+
|supplier_id|supplier_name|       country|total_revenue|total_quantity|total_sales|   avg_product_price|
+-----------+-------------+--------------+-------------+--------------+-----------+--------------------+
|       1342|       Meezzy|       Ireland|       365.15|             9|          1| 40.5722222222222222|
|       3749|      Dabfeed|         China|       254.16|             4|          1| 63.5400000000000000|
|       4900|         Katz|       Ukraine|       316.15|             2|          1|158.0750000000000000|
|       2866|        Npath|         China|       186.31|             4|          1| 46.5775000000000000|
|       1238|        Eidel|         Syria|       422.94|             7|          1| 60.4200000000000000|
|       5803|     Dabshots|       Finland|       432.57|            10|          1| 43.2570000000000000|
|       3794|       Divape|        France|        33.74

Топ-5 поставщиков с наибольшей выручкой

In [85]:
top_5_suppliers = (
    supplier_mart
    .withColumn("revenue_rank", F.row_number().over(Window.orderBy(F.desc("total_revenue"))))
    .filter(F.col("revenue_rank") <= 5)
    .select("revenue_rank", "supplier_id", "supplier_name", "country", "total_revenue")
)

top_5_suppliers.show()

top_5_suppliers.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "top_5_suppliers") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY revenue_rank") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+------------+-----------+-------------+---------+-------------+
|revenue_rank|supplier_id|supplier_name|  country|total_revenue|
+------------+-----------+-------------+---------+-------------+
|           1|       9904|   Brainverse|  Ireland|       499.85|
|           2|       8835|        Jamia|   Russia|       499.80|
|           3|       2269|        Eabox| Portugal|       499.76|
|           4|       5955|      Demimbu|    China|       499.76|
|           5|       5501|   Browsezoom|Argentina|       499.73|
+------------+-----------+-------------+---------+-------------+



Средняя цена товаров от каждого поставщика

In [86]:
supplier_avg_price = (
    supplier_mart
    .select(
        "supplier_id",
        "supplier_name",
        "country",
        "avg_product_price",
        "total_quantity",
        "total_revenue"
    )
    .withColumn(
        "price_rank", 
        F.dense_rank().over(Window.orderBy(F.desc("avg_product_price")))
    )
)

supplier_avg_price.show()

supplier_avg_price.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "supplier_avg_product_price") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY price_rank") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+-----------+-------------+--------------------+--------------------+--------------+-------------+----------+
|supplier_id|supplier_name|             country|   avg_product_price|total_quantity|total_revenue|price_rank|
+-----------+-------------+--------------------+--------------------+--------------+-------------+----------+
|       8124|        Skajo|              Canada|499.2900000000000000|             1|       499.29|         1|
|       7066|     Wordtune|               China|498.7700000000000000|             1|       498.77|         2|
|       6139|       Agimba|              Russia|498.6500000000000000|             1|       498.65|         3|
|       1230|      Dabfeed|             Nigeria|498.6100000000000000|             1|       498.61|         4|
|       9699|     Feedfish|               China|498.5000000000000000|             1|       498.50|         5|
|       2797|   Zoomlounge|         Philippines|497.8000000000000000|             1|       497.80|         6|
|       36

Распределение продаж по странам поставщиков

In [87]:
supplier_country_distribution = (
    supplier_mart
    .groupBy("country")
    .agg(
        F.sum("total_revenue").alias("country_revenue"),
        F.sum("total_quantity").alias("country_quantity")
    )
    .orderBy(F.desc("country_revenue"))
)

supplier_country_distribution.show()

supplier_country_distribution.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "supplier_country_distribution") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY country_revenue") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+--------------+---------------+----------------+
|       country|country_revenue|country_quantity|
+--------------+---------------+----------------+
|         China|      492823.31|           10460|
|     Indonesia|      265717.99|            5885|
|        Russia|      149206.75|            3218|
|   Philippines|      136135.10|            2955|
|        Brazil|       97546.82|            2080|
|        Poland|       87370.64|            1797|
|      Portugal|       83210.60|            1955|
|        France|       80432.46|            1635|
| United States|       52560.14|            1154|
|        Sweden|       52074.94|            1155|
|Czech Republic|       45258.81|             956|
|       Ukraine|       42858.17|             883|
|      Thailand|       42409.05|             868|
|         Japan|       42075.85|             909|
|      Colombia|       39525.61|             885|
|          Peru|       37351.86|             815|
|     Argentina|       35606.32|             691|


### 6. Витрина качества продукции

Продукты с наивысшим и наименьшим рейтингом

In [88]:
rated_products = (
    product_mart
    .filter(F.col("rating").isNotNull())
    .orderBy(F.desc("rating"))
    .limit(10)
    .select("product_id", "product_name", "rating", "reviews")
)

rated_products.show()

rated_products.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "rated_products") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY rating") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+----------+------------+------+-------+
|product_id|product_name|rating|reviews|
+----------+------------+------+-------+
|      8506|   Bird Cage|  5.00|    899|
|      6801|    Dog Food|  5.00|    315|
|      3191|   Bird Cage|  5.00|      6|
|       183|   Bird Cage|  5.00|    220|
|      9986|   Bird Cage|  5.00|    227|
|      7249|   Bird Cage|  5.00|    524|
|      9525|   Bird Cage|  5.00|      8|
|      2067|   Bird Cage|  5.00|    488|
|      2558|   Bird Cage|  5.00|    276|
|      3087|    Dog Food|  5.00|    821|
+----------+------------+------+-------+



Корреляция между рейтингом и объемом продаж

In [89]:
rating_correlation = (
    product_mart
    .filter(F.col("rating").isNotNull())
    .select(
        F.corr("rating", "total_sales").alias("rating_sales_correlation"),
        F.corr("rating", "total_revenue").alias("rating_revenue_correlation")
    )
)

rating_correlation.show()

rating_correlation.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "rating_sales_correlation") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY rating_sales_correlation") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+------------------------+--------------------------+
|rating_sales_correlation|rating_revenue_correlation|
+------------------------+--------------------------+
|    0.001004977801145...|      0.004314699051728...|
+------------------------+--------------------------+



In [90]:
rating_analysis = (
    product_mart
    .filter(F.col("rating").isNotNull())
    .withColumn("rating_bucket", 
                F.when(F.col("rating") < 1.5, "1.0-1.49")
                 .when((F.col("rating") >= 1.5) & (F.col("rating") < 2.5), "1.5-2.49")
                 .when((F.col("rating") >= 2.5) & (F.col("rating") < 3.5), "2.5-3.49")
                 .when((F.col("rating") >= 3.5) & (F.col("rating") < 4.5), "3.5-4.49")
                 .otherwise("4.5-5.0"))
    .groupBy("rating_bucket")
    .agg(
        F.count("product_id").alias("products_count"),
        F.sum("total_sales").alias("total_units_sold"),
        F.sum("total_revenue").alias("total_revenue"),
        F.avg("total_sales").alias("avg_units_per_product"),
        F.avg("rating").alias("avg_rating_in_bucket"),
        F.avg("reviews").alias("avg_reviews_per_product")
    )
    .withColumn("revenue_per_unit", F.col("total_revenue") / F.col("total_units_sold"))
    .orderBy("rating_bucket")
)

rating_analysis.show()

rating_analysis.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "rating_sales_analysis") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY rating_bucket") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+-------------+--------------+----------------+-------------+---------------------+--------------------+-----------------------+----------------+
|rating_bucket|products_count|total_units_sold|total_revenue|avg_units_per_product|avg_rating_in_bucket|avg_reviews_per_product|revenue_per_unit|
+-------------+--------------+----------------+-------------+---------------------+--------------------+-----------------------+----------------+
|     1.0-1.49|          1101|            5967|    274668.80|    5.419618528610354|            1.223615|      505.2388737511353|       46.031306|
|     1.5-2.49|          2535|           13985|    645796.85|    5.516765285996056|            1.952465|     506.89664694280077|       46.177823|
|     2.5-3.49|          2476|           13388|    625932.93|    5.407108239095315|            2.950485|     491.60581583198706|       46.753281|
|     3.5-4.49|          2421|           13194|    616523.82|    5.449814126394052|            3.945064|      503.1986782321

Продукты с наибольшим количеством отзывов

In [91]:
most_reviewed_products = (
    product_mart
    .orderBy(F.desc("reviews"))
    .limit(10)
    .select("product_id", "product_name", "reviews")
)

most_reviewed_products.show()

most_reviewed_products.write \
    .format("jdbc") \
    .option("url", ch_url) \
    .option("user", ch_properties["user"]) \
    .option("password", ch_properties["password"]) \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .option("dbtable", "most_reviewed_products") \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY reviews") \
    .option("driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .mode("overwrite") \
    .save()

+----------+------------+-------+
|product_id|product_name|reviews|
+----------+------------+-------+
|      3098|     Cat Toy|   1000|
|      2932|   Bird Cage|   1000|
|      2358|   Bird Cage|   1000|
|      4918|    Dog Food|   1000|
|      5434|    Dog Food|   1000|
|      5773|     Cat Toy|   1000|
|      6873|   Bird Cage|   1000|
|      5294|   Bird Cage|   1000|
|       368|     Cat Toy|   1000|
|      1062|     Cat Toy|   1000|
+----------+------------+-------+

