# ETL реализованный с помощью Spark
### Выполнил *Юрков Евгений М8О-312Б-22*

In [76]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ETL_Pipeline") \
    .config("spark.jars.packages",
            "org.postgresql:postgresql:42.2.27,"
            "com.clickhouse:clickhouse-jdbc:0.4.6,"
            "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1") \
    .getOrCreate()

In [None]:
jdbc_url = "jdbc:postgresql://postgres:5432/abd_2"
properties = {
    "user": "postgres",
    "password": "Rbkkth3920",
    "driver": "org.postgresql.Driver"
}

In [None]:
df = spark.read.jdbc(url=jdbc_url, table="abd_2.mock_data", properties=properties)
df.show(10)

+---+-------------------+------------------+------------+--------------------+------------------+--------------------+-----------------+-----------------+------------------+-----------------+----------------+--------------------+--------------+------------------+------------+----------------+-------------+----------------+----------+----------------+--------------+---------------+-------------+----------------+----------+--------------+------------+-----------+--------------------+------------+--------------------+------------+--------------+-------------+------------+-------------+----------------+--------------------+--------------+---------------+--------------------+-------------------+-------------+----------------+--------------------+--------------+----------------+------------------+----------------+
| id|customer_first_name|customer_last_name|customer_age|      customer_email|  customer_country|customer_postal_code|customer_pet_type|customer_pet_name|customer_pet_breed|seller_

## Преобразуем данные из `mock_data` в снежинку

Создадим таблицу для покупателей:

```sql
CREATE TABLE dim_customer (
    customer_id SERIAL PRIMARY KEY,
    first_name VARCHAR(50),
    last_name VARCHAR(50),
    age INT,
    email VARCHAR(50),
    country VARCHAR(50),
    postal_code VARCHAR(50)
);
```


In [79]:
dim_customer = df.select(
    df.sale_customer_id.alias("customer_id"),
    df.customer_first_name.alias("first_name"),
    df.customer_last_name.alias("last_name"),
    df.customer_age.alias("age"),
    df.customer_email.alias("email"),
    df.customer_country.alias("country"),
    df.customer_postal_code.alias("postal_code")).distinct()
dim_customer.show(10)

+-----------+----------+---------+---+--------------------+-----------+-----------+
|customer_id|first_name|last_name|age|               email|    country|postal_code|
+-----------+----------+---------+---+--------------------+-----------+-----------+
|        108|Holly-anne|  Elleton| 79|    membra2z@ucoz.ru|   Colombia|     631008|
|        282|  Katerine|  Screech| 75|bgiacovelli7t@abo...|    Croatia|      31207|
|        624|   Adeline|  Gurnett| 41|      hstearthb@g.co|      China|           |
|        916|   Skippie| McKernan| 28|egydepf@trellian.com|  Indonesia|           |
|        981|   Vincent|  Edgerly| 44| ltiesr8@oaic.gov.au|      China|           |
|       1056|   Maureen|  Panther| 62| cwitherby1j@mtv.com|      China|           |
|       1065|     Hatty|  Largent| 71|cmacneill1s@sfgat...|     Greece|           |
|       1109|   Patrice|     Cadd| 52|bbedborough30@int...|      China|           |
|       1156|    Basile|O'Howbane| 30|cpetrushka4b@tops...|Philippines|     

Таблица питомцев:
```sql
CREATE TABLE dim_customer_pet (
    pet_id SERIAL PRIMARY KEY,
    customer_id INT REFERENCES dim_customer(customer_id),
    name VARCHAR(50),
    type VARCHAR(50),
    breed VARCHAR(50),
    category VARCHAR(50)
)
```

In [80]:
from pyspark.sql import functions

dim_pet = df.select(
    df.customer_pet_type.alias("type"),
    df.customer_pet_name.alias("name"),
    df.customer_pet_breed.alias("breed"),
    df.pet_category.alias("category"),
    df.sale_customer_id.alias("customer_id"),
).withColumn("pet_id", functions.monotonically_increasing_id())
dim_pet.show(5)

+----+---------+------------------+--------+-----------+------+
|type|     name|             breed|category|customer_id|pet_id|
+----+---------+------------------+--------+-----------+------+
| cat|Priscella|Labrador Retriever|    Cats|          1|     0|
|bird|  Dalenna|Labrador Retriever|    Cats|          2|     1|
|bird| Aldridge|          Parakeet|    Fish|          3|     2|
| cat|  Beverie|Labrador Retriever|    Fish|          4|     3|
|bird|  Sydelle|           Siamese|Reptiles|          5|     4|
+----+---------+------------------+--------+-----------+------+
only showing top 5 rows



Таблица продавцов:
```sql
CREATE TABLE dim_seller (
    seller_id SERIAL PRIMARY KEY,
    first_name VARCHAR(50),
    last_name VARCHAR(50),
    email VARCHAR(50),
    country VARCHAR(50),
    postal_code VARCHAR(50)
);
```

In [81]:
dim_seller = df.select(
    df.sale_seller_id.alias("seller_id"),
    df.seller_first_name.alias("first_name"),
    df.seller_last_name.alias("last_name"),
    df.seller_email.alias("email"),
    df.seller_country.alias("country"),
    df.seller_postal_code.alias("postal_code"),
).distinct()
dim_seller.show(5)

+---------+----------+----------+--------------------+---------+-----------+
|seller_id|first_name| last_name|               email|  country|postal_code|
+---------+----------+----------+--------------------+---------+-----------+
|       77|       Doe|    Gilder|dgilder24@utexas.edu|   Russia|     656903|
|      167|    Zorine|MacCrackan|zmaccrackan4m@new...|Guatemala|      12009|
|      640|   Kandace| Monkhouse|kmonkhousehr@macr...|   Sweden|     111 20|
|     1081|    Reagen|  Rosedale|rrosedale28@ebay....|  Nigeria|           |
|     1345|    Cherie|     Brave|cbrave9k@sakura.n...|   Uganda|           |
+---------+----------+----------+--------------------+---------+-----------+
only showing top 5 rows



Таблица продуктов:
```sql
CREATE TABLE dim_product (
    product_id SERIAL PRIMARY KEY,
    name VARCHAR(50),
    category VARCHAR(50),
    price FLOAT,
    quantity INT,
    weight FLOAT,
    color VARCHAR(50),
    size VARCHAR(50),
    brand VARCHAR(50),
    material VARCHAR(50),
    description VARCHAR(1024),
    rating FLOAT,
    reviews INT,
    release_date DATE,
    expiry_date DATE
);
```

In [82]:
dim_product = df.select(
    df.sale_product_id.alias("product_id"),
    df.product_name.alias("name"),
    df.product_category.alias("category"),
    df.product_price.alias("price"),
    df.product_quantity.alias("quantity"),
    df.product_weight.alias("weight"),
    df.product_color.alias("color"),
    df.product_size.alias("size"),
    df.product_brand.alias("brand"),
    df.product_material.alias("material"),
    df.product_description.alias("description"),
    df.product_rating.alias("rating"),
    df.product_reviews.alias("reviews"),
    functions.to_date(df.product_release_date, "d/M/yyyy").alias("release_date"),
    functions.to_date(df.product_expiry_date, "d/M/yyyy").alias("expiry_date")).distinct()
dim_product.show(5)

+----------+---------+--------+-----+--------+------+------+------+-------------+--------+--------------------+------+-------+------------+-----------+
|product_id|     name|category|price|quantity|weight| color|  size|        brand|material|         description|rating|reviews|release_date|expiry_date|
+----------+---------+--------+-----+--------+------+------+------+-------------+--------+--------------------+------+-------+------------+-----------+
|       547|Bird Cage|    Cage|33.12|      73|  26.0|Indigo|Medium|        Skajo|Aluminum|Suspendisse poten...|   3.8|    872|  2018-06-10| 2025-07-02|
|      3137|Bird Cage|     Toy|68.36|       2|  38.2|Orange|Medium|       Dabjam|   Glass|Fusce consequat. ...|   1.7|    425|  2016-06-03| 2024-03-07|
|      3365| Dog Food|    Food|10.99|      26|  36.6|Fuscia| Large|Chatterbridge|   Vinyl|Morbi non lectus....|   4.2|     54|  2012-01-03| 2025-07-02|
|      3593| Dog Food|    Cage|63.94|      59|  16.8|  Teal|Medium|   Blognation|   Glas

Таблица поставщиков:
```sql
CREATE TABLE dim_supplier (
    supplier_id SERIAL PRIMARY KEY,
    name VARCHAR(50),
    contact VARCHAR(50),
    email VARCHAR(50),
    phone VARCHAR(50),
    address VARCHAR(50),
    city VARCHAR(50),
    country VARCHAR(50)
);
```

In [83]:
dim_supplier = df.select(
    df.supplier_name.alias("name"),
    df.supplier_contact.alias("contact"),
    df.supplier_email.alias("email"),
    df.supplier_phone.alias("phone"),
    df.supplier_address.alias("address"),
    df.supplier_city.alias("city"),
    df.supplier_country.alias("country")
).distinct().withColumn("supplier_id", functions.monotonically_increasing_id())
dim_supplier.show(5)

+--------+----------------+--------------------+------------+---------+-----------+--------+-----------+
|    name|         contact|               email|       phone|  address|       city| country|supplier_id|
+--------+----------------+--------------------+------------+---------+-----------+--------+-----------+
|   Oyoyo|Cirilo Hellewell|chellewellh@googl...|484-944-2476|7th Floor|      Manzë|Portugal|          0|
|Snaptags| Evelin Alvarado|ealvaradof5@dedec...|120-766-6526|  Apt 848|      Ai Tu|Colombia|          1|
|  Wikivu|Fayette Glascott|fglascottft@infos...|205-362-1952| Apt 1047|Mesyagutovo| Ukraine|          2|
| Edgeify|Whitney Pomphrey|wpomphreyg7@engad...|485-355-8348| Suite 41| Brumadinho| Ukraine|          3|
| Buzzdog|Rhianna Kneebone|rkneeboneki@merri...|451-222-9196| Suite 88|     Spassk| Armenia|          4|
+--------+----------------+--------------------+------------+---------+-----------+--------+-----------+
only showing top 5 rows



Таблица магазинов:
```sql
CREATE TABLE dim_store (
    store_id SERIAL PRIMARY KEY,
    name VARCHAR(50),
    location VARCHAR(50),
    city VARCHAR(50),
    state VARCHAR(50),
    country VARCHAR(50),
    phone VARCHAR(50),
    email VARCHAR(50)
);
```


In [84]:
dim_store = df.select(
    df.store_name.alias("name"),
    df.store_location.alias("location"),
    df.store_city.alias("city"),
    df.store_state.alias("state"),
    df.store_country.alias("country"),
    df.store_phone.alias("phone"),
    df.store_email.alias("email")
).distinct().withColumn("store_id", functions.monotonically_increasing_id())
dim_store.show(5)

+--------+------------+--------------+-----+---------+------------+--------------------+--------+
|    name|    location|          city|state|  country|       phone|               email|store_id|
+--------+------------+--------------+-----+---------+------------+--------------------+--------+
|    Mymm|  19th Floor|Ustrzyki Dolne|     | Colombia|117-194-3628|     dwitt57@ovh.net|       0|
|  Dabjam|   Room 1734|       Cayenne|   GF|   Greece|954-419-0001|  csalmonici@fc2.com|       1|
|    Ntag|    Suite 39|    Bailingnao|     |Indonesia|875-746-2634|    ebrabhaml8@51.la|       2|
|  Wikizz|    Suite 81|        Stýpsi|     |  Ecuador|779-407-6383|etrimbeyml@usnews...|       3|
|Feedfish|PO Box 75044|      Aleshtar|     |   Angola|620-253-0558|tbristerqk@miibei...|       4|
+--------+------------+--------------+-----+---------+------------+--------------------+--------+
only showing top 5 rows



Таблица фактов:
```sql
CREATE TABLE fact_sales (
    sale_id INT,
    customer_id INT REFERENCES dim_customer(customer_id),
    seller_id INT REFERENCES dim_seller(seller_id),
    product_id INT REFERENCES dim_product(product_id),
    supplier_id INT REFERENCES dim_supplier(supplier_id),
    store_id INT REFERENCES dim_store(store_id),
    date DATE,
    quantity INT,
    total_price FLOAT
);
```

In [85]:
joined_df = (
    df
    .join(dim_customer, df.sale_customer_id == dim_customer.customer_id)
    .join(dim_seller, df.sale_seller_id == dim_seller.seller_id)
    .join(dim_product, df.sale_product_id == dim_product.product_id)
    .join(dim_store,
          (df.store_name == dim_store.name) &
          (df.store_location == dim_store.location) &
          (df.store_city == dim_store.city) &
          (df.store_state == dim_store.state) &
          (df.store_country == dim_store.country) &
          (df.store_phone == dim_store.phone) &
          (df.store_email == dim_store.email)
    )
    .join(dim_supplier,
          (df.supplier_name == dim_supplier.name) &
          (df.supplier_contact == dim_supplier.contact) &
          (df.supplier_email == dim_supplier.email) &
          (df.supplier_phone == dim_supplier.phone) &
          (df.supplier_address == dim_supplier.address) &
          (df.supplier_city == dim_supplier.city) &
          (df.supplier_country == dim_supplier.country)
    )
)

fact_sales = joined_df.select(
    df.id.alias("sale_id"),
    dim_customer.customer_id,
    dim_seller.seller_id,
    dim_product.product_id,
    dim_supplier.supplier_id,
    dim_store.store_id,
    functions.to_date(df.sale_date, 'M/d/yyyy').alias("date"),
    df.sale_quantity.alias("quantity"),
    df.sale_total_price.alias("total_price")
)
fact_sales.show(5)

+-------+-----------+---------+----------+-----------+--------+----------+--------+-----------+
|sale_id|customer_id|seller_id|product_id|supplier_id|store_id|      date|quantity|total_price|
+-------+-----------+---------+----------+-----------+--------+----------+--------+-----------+
|    547|        547|      547|       547|       3434|    7694|2021-06-10|      10|     169.69|
|   3137|       3137|     3137|      3137|       3704|    2154|2021-01-23|       6|      169.6|
|   3365|       3365|     3365|      3365|        201|    5372|2021-06-25|       6|     390.72|
|   3593|       3593|     3593|      3593|       5708|    9532|2021-02-11|       9|      286.1|
|   3891|       3891|     3891|      3891|       7163|    8597|2021-03-08|       5|      249.8|
+-------+-----------+---------+----------+-----------+--------+----------+--------+-----------+
only showing top 5 rows



In [86]:
dim_customer.write.jdbc(
    url=jdbc_url,
    table="dim_customer",
    mode="append",
    properties=properties)
dim_pet.write.jdbc(
    url=jdbc_url,
    table="dim_pet",
    mode="append",
    properties=properties)
dim_seller.write.jdbc(
    url=jdbc_url,
    table="dim_seller",
    mode="append",
    properties=properties)
dim_product.write.jdbc(
    url=jdbc_url,
    table="dim_product",
    mode="append",
    properties=properties)
dim_supplier.write.jdbc(
    url=jdbc_url,
    table="dim_supplier",
    mode="append",
    properties=properties)
dim_store.write.jdbc(
    url=jdbc_url,
    table="dim_store",
    mode="append",
    properties=properties)
fact_sales.write.jdbc(
    url=jdbc_url,
    table="fact_sales",
    mode="append",
    properties=properties)

## Создадим отчёты в ClickHouse

In [87]:
clickhouse_url = "jdbc:clickhouse://clickhouse:8123/lab2"

clickhouse_props = {
    "driver": "com.clickhouse.jdbc.ClickHouseDriver",
    "user": "user",
    "password": "",
}

In [88]:
!curl http://clickhouse:8123

Ok.


### 1. Витрина продаж по продуктам
*Цель: Анализ выручки, количества продаж и популярности продуктов.*

 - Топ-10 самых продаваемых продуктов.


In [89]:
from pyspark.sql.window import Window

top_10_products = fact_sales.groupBy("product_id") \
    .agg(
        functions.sum("quantity").alias("total_quantity"),
        functions.sum("total_price").alias("total_revenue")) \
    .join(dim_product.select("product_id", "name", "category", "color", "size"), "product_id") \
    .orderBy(functions.desc("total_quantity"),
             functions.desc("total_revenue")) \
    .limit(10)
    
top_10_products = top_10_products.withColumn("rank",
    functions.row_number().over(
        Window.orderBy(functions.desc("total_quantity"),
                       functions.desc("total_revenue"))))

top_10_products.show(5)

+----------+--------------+-----------------+---------+--------+------+------+----+
|product_id|total_quantity|    total_revenue|     name|category| color|  size|rank|
+----------+--------------+-----------------+---------+--------+------+------+----+
|      8616|            10|499.7300109863281|  Cat Toy|     Toy|Fuscia|Medium|   1|
|      3550|            10|499.5899963378906| Dog Food|     Toy|Yellow| Large|   2|
|      1484|            10|498.9599914550781|Bird Cage|     Toy|Indigo| Small|   3|
|      7596|            10|498.3500061035156| Dog Food|    Food|  Puce| Small|   4|
|      9956|            10|           497.25| Dog Food|    Food|  Teal| Large|   5|
+----------+--------------+-----------------+---------+--------+------+------+----+
only showing top 5 rows



In [90]:
top_10_products.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (rank)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="top_10_products", properties=clickhouse_props)

 - Общая выручка по категориям продуктов.


In [91]:
total_revenue_by_category = fact_sales.join(
    dim_product.select("product_id", "category"), "product_id") \
    .groupBy("category") \
    .agg(functions.sum("total_price").alias("total_revenue"))
total_revenue_by_category.show(5)

+--------+-----------------+
|category|    total_revenue|
+--------+-----------------+
|    Cage|831117.9398345947|
|    Food|830632.5497875214|
|     Toy|868101.6302814484|
+--------+-----------------+



In [92]:
total_revenue_by_category.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (total_revenue)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="total_revenue_by_category", properties=clickhouse_props)


 - Средний рейтинг и количество отзывов для каждого продукта.


In [93]:
avg_product_rating = dim_product.groupBy('name').agg(functions.avg('rating'),
                                functions.sum('reviews'))
avg_product_rating.show(5)

+---------+------------------+------------+
|     name|       avg(rating)|sum(reviews)|
+---------+------------------+------------+
|Bird Cage|3.0001491633975137|     1682260|
| Dog Food| 3.018298893783149|     1653413|
|  Cat Toy| 3.006860081398008|     1676222|
+---------+------------------+------------+



In [94]:
avg_product_rating.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (name)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="avg_product_rating", properties=clickhouse_props)

### 2. Витрина продаж по клиентам
*Цель: Анализ покупательского поведения и сегментация клиентов.*

- Топ-10 клиентов с наибольшей общей суммой покупок


In [95]:


top_10_customers = fact_sales.join(dim_customer, "customer_id") \
    .groupBy("customer_id", "first_name", "last_name") \
    .agg(functions.sum("total_price").alias("total_spent")) \
    .orderBy(functions.desc("total_spent")) \
    .limit(10)
top_10_customers.show(5)
top_10_customers.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (total_spent)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="top_10_customers", properties=clickhouse_props)

+-----------+----------+---------+------------------+
|customer_id|first_name|last_name|       total_spent|
+-----------+----------+---------+------------------+
|       4188|       Gus|Hartshorn| 499.8500061035156|
|       6422|     Hayes|   McKain|499.79998779296875|
|       5923|     Dawna|    Impey|  499.760009765625|
|       6361|       Ava|    Lomas|  499.760009765625|
|       8616|   Lavinia|Horsburgh| 499.7300109863281|
+-----------+----------+---------+------------------+
only showing top 5 rows




- Распределение клиентов по странам


In [96]:
clients_by_country = dim_customer.groupBy("country") \
    .agg(functions.count("customer_id").alias("num_customers"))
clients_by_country.show(5)
clients_by_country.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (country)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="clients_by_country", properties=clickhouse_props)


+-------------------+-------------+
|            country|num_customers|
+-------------------+-------------+
|               Chad|            5|
|             Russia|          628|
|           Paraguay|           18|
|              Yemen|           39|
|U.S. Virgin Islands|            1|
+-------------------+-------------+
only showing top 5 rows




- Средний чек для каждого клиента


In [97]:
avg_receipt_per_customer = fact_sales.join(dim_customer, "customer_id") \
    .groupBy("customer_id", "first_name", "last_name") \
    .agg(functions.avg("total_price").alias("avg_receipt"))
avg_receipt_per_customer.show(5)
avg_receipt_per_customer.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (customer_id)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="avg_receipt_per_customer", properties=clickhouse_props)


+-----------+----------+-----------+------------------+
|customer_id|first_name|  last_name|       avg_receipt|
+-----------+----------+-----------+------------------+
|       2563|   Darline|   Ozintsev| 336.2099914550781|
|       3704|    Jenica|   Patching| 217.3000030517578|
|       9946|    Godfry|   Chatwood|151.55999755859375|
|       7206|   Phyllys|Schoolcroft| 298.8299865722656|
|       5427|      Noby|    Clayden|  338.010009765625|
+-----------+----------+-----------+------------------+
only showing top 5 rows




### 3. Витрина продаж по времени
*Цель: Анализ сезонности и трендов продаж.*


- Месячные и годовые тренды продаж


In [98]:
time_trends = fact_sales.withColumn("year", functions.year("date")) \
    .withColumn("month", functions.month("date")) \
    .groupBy("year", "month") \
    .agg(functions.sum("total_price").alias("monthly_revenue"),
         functions.count("sale_id").alias("orders_count"))

time_trends = time_trends.withColumn(
    "yearly_revenue",
    functions.sum("monthly_revenue").over(Window.partitionBy("year"))
)
    
time_trends.show(5)
time_trends.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (year, month)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="time_trends", properties=clickhouse_props)

+----+-----+------------------+------------+------------------+
|year|month|   monthly_revenue|orders_count|    yearly_revenue|
+----+-----+------------------+------------+------------------+
|2021|    8| 221275.7799243927|         897|2529852.1199035645|
|2021|    6| 215042.7997379303|         822|2529852.1199035645|
|2021|    5|211764.86022472382|         828|2529852.1199035645|
|2021|   10| 228743.3197774887|         892|2529852.1199035645|
|2021|   11|200154.69006824493|         801|2529852.1199035645|
+----+-----+------------------+------------+------------------+
only showing top 5 rows




- Сравнение выручки за разные периоды (например, по годам)


In [99]:
yearly_revenue = fact_sales.withColumn("year", functions.year("date")) \
    .groupBy("year") \
    .agg(functions.sum("total_price").alias("total_revenue"))
yearly_revenue.show(5)
yearly_revenue.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (year)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="yearly_revenue", properties=clickhouse_props)


+----+------------------+
|year|     total_revenue|
+----+------------------+
|2021|2529852.1199035645|
+----+------------------+




- Средний размер заказа по месяцам


In [100]:
avg_order_monthly = fact_sales.withColumn("month", functions.month("date")) \
    .groupBy("month") \
    .agg(functions.avg("quantity").alias("avg_quantity"))
avg_order_monthly.show(5)
avg_order_monthly.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (month)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="avg_order_monthly", properties=clickhouse_props)


+-----+------------------+
|month|      avg_quantity|
+-----+------------------+
|   12|  5.62987012987013|
|    1| 5.556064073226545|
|    6| 5.399026763990268|
|    3|5.4104389086595495|
|    5| 5.375603864734299|
+-----+------------------+
only showing top 5 rows



### 4. Витрина продаж по магазинам
*Цель: Анализ эффективности магазинов.*

- Топ-5 магазинов с наибольшей выручкой


In [101]:
top_5_stores = fact_sales.join(dim_store, "store_id") \
    .groupBy("store_id", "name") \
    .agg(functions.sum("total_price").alias("total_revenue")) \
    .orderBy(functions.desc("total_revenue")) \
    .limit(5)
top_5_stores.show(5)
top_5_stores.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (total_revenue)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="top_5_stores", properties=clickhouse_props)

+--------+-----------+------------------+
|store_id|       name|     total_revenue|
+--------+-----------+------------------+
|    3253|       DabZ| 499.8500061035156|
|    5057|Thoughtblab|499.79998779296875|
|    1671|     Camido|  499.760009765625|
|    9704|   Edgeblab|  499.760009765625|
|    4885|    Centizu| 499.7300109863281|
+--------+-----------+------------------+




- Распределение продаж по городам и странам


In [102]:
sales_by_location = fact_sales.join(dim_store, "store_id") \
    .groupBy("city", "country") \
    .agg(functions.sum("total_price").alias("city_revenue"))
sales_by_location = sales_by_location.withColumn(
    "country_revenue",
    functions.sum("city_revenue").over(Window.partitionBy("country"))
)
sales_by_location.show(5)
sales_by_location.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (country, city)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="sales_by_location", properties=clickhouse_props)


+--------+-----------+------------------+------------------+
|    city|    country|      city_revenue|   country_revenue|
+--------+-----------+------------------+------------------+
| Gagarin|Afghanistan|301.95001220703125|15966.489999771118|
|  Ma‘dān|Afghanistan|  93.3499984741211|15966.489999771118|
|Longxing|Afghanistan| 450.3800048828125|15966.489999771118|
|  Daugai|Afghanistan| 485.6600036621094|15966.489999771118|
|Zhangtan|Afghanistan| 420.9100036621094|15966.489999771118|
+--------+-----------+------------------+------------------+
only showing top 5 rows




- Средний чек для каждого магазина


In [103]:
avg_receipt_per_store = fact_sales.join(dim_store, 'store_id') \
    .groupBy("store_id", "name") \
    .agg(functions.avg("total_price").alias("avg_receipt"))
avg_receipt_per_store.show(5)
avg_receipt_per_store.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (store_id)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="avg_receipt_per_store", properties=clickhouse_props)


+--------+----------+------------------+
|store_id|      name|       avg_receipt|
+--------+----------+------------------+
|    8602|Browsetype|  326.260009765625|
|    1828|Topicshots| 428.2200012207031|
|    9262|  Livefish|486.57000732421875|
|    2766|      Yodo|207.82000732421875|
|    2443|   Flipbug| 333.8800048828125|
+--------+----------+------------------+
only showing top 5 rows




### 5. Витрина продаж по поставщикам
*Цель: Анализ эффективности поставщиков.*

- Топ-5 поставщиков с наибольшей выручкой


In [104]:
top_5_suppliers = fact_sales.join(dim_supplier, "supplier_id") \
    .groupBy("supplier_id", "name") \
    .agg(functions.sum("total_price").alias("total_revenue")) \
    .orderBy(functions.desc("total_revenue")) \
    .limit(5)
top_5_suppliers.show(5)
top_5_suppliers.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (total_revenue)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="top_5_suppliers", properties=clickhouse_props)


+-----------+----------+------------------+
|supplier_id|      name|     total_revenue|
+-----------+----------+------------------+
|       2496|Brainverse| 499.8500061035156|
|       6262|     Jamia|499.79998779296875|
|       7292|     Eabox|  499.760009765625|
|       4225|   Demimbu|  499.760009765625|
|       3923|Browsezoom| 499.7300109863281|
+-----------+----------+------------------+




- Средняя цена товаров от каждого поставщика


In [105]:
fact = fact_sales.alias("f")
product = dim_product.alias("p")
supplier = dim_supplier.alias("s")

avg_price_per_supplier = fact \
    .join(product, functions.col("p.product_id") == functions.col("f.product_id")) \
    .join(supplier, functions.col("s.supplier_id") == functions.col("f.supplier_id")) \
    .groupBy(functions.col("s.supplier_id"), functions.col("s.name")) \
    .agg(functions.avg(functions.col("p.price")).alias("avg_price"))

avg_price_per_supplier.show(5)

avg_price_per_supplier.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (supplier_id)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="avg_price_per_supplier", properties=clickhouse_props)

+-----------+--------+------------------+
|supplier_id|    name|         avg_price|
+-----------+--------+------------------+
|        938|  Skaboo| 40.61000061035156|
|       7532|   Oyope|29.600000381469727|
|       8877|Topdrive|21.149999618530273|
|       5250|  Tekfly|32.529998779296875|
|       8918| Yakitri|             73.25|
+-----------+--------+------------------+
only showing top 5 rows




- Распределение продаж по странам поставщиков


In [106]:
sales_by_supplier_country = fact_sales \
    .join(dim_supplier, "supplier_id") \
    .groupBy("country") \
    .agg(functions.sum("total_price").alias("total_revenue"))
sales_by_supplier_country.show(5)
sales_by_supplier_country.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (country)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="sales_by_supplier_country", properties=clickhouse_props)


+--------+------------------+
| country|     total_revenue|
+--------+------------------+
|    Chad|2051.0300102233887|
|Paraguay| 2957.859983444214|
|  Russia|149206.74995231628|
|   Yemen|11414.629959106445|
| Senegal| 3162.409980773926|
+--------+------------------+
only showing top 5 rows




### 6. Витрина качества продукции
*Цель: Анализ отзывов и рейтингов товаров.*

- Продукты с наивысшим и наименьшим рейтингом


In [107]:
extreme_ratings = dim_product.select("product_id", "name", "rating").orderBy("rating")
extreme_ratings.show(5)
extreme_ratings.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (rating)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="extreme_ratings", properties=clickhouse_props)


+----------+---------+------+
|product_id|     name|rating|
+----------+---------+------+
|      6146| Dog Food|   1.0|
|      3902|Bird Cage|   1.0|
|      6095|  Cat Toy|   1.0|
|      9227| Dog Food|   1.0|
|      9868| Dog Food|   1.0|
+----------+---------+------+
only showing top 5 rows




- Корреляция между рейтингом и объемом продаж


In [108]:
rating_sales_corr = fact_sales.join(dim_product, "product_id") \
    .groupBy("product_id", "rating") \
    .agg(functions.sum(fact_sales.quantity).alias("total_quantity"))
rating_sales_corr.show(5)
rating_sales_corr.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (rating)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="rating_sales_corr", properties=clickhouse_props)

+----------+------+--------------+
|product_id|rating|total_quantity|
+----------+------+--------------+
|       148|   4.8|             8|
|       463|   4.7|             5|
|       471|   1.7|            10|
|       496|   2.5|             2|
|       833|   1.6|             8|
+----------+------+--------------+
only showing top 5 rows




- Продукты с наибольшим количеством отзывов


In [109]:
top_reviewed_products = dim_product.select("product_id", "name", "reviews") \
    .orderBy(functions.desc("reviews"))
top_reviewed_products.show(5)
top_reviewed_products.write \
    .option("createTableOptions", "ENGINE = MergeTree() ORDER BY (reviews)") \
    .mode("overwrite") \
    .jdbc(url=clickhouse_url, table="top_reviewed_products", properties=clickhouse_props)


+----------+---------+-------+
|product_id|     name|reviews|
+----------+---------+-------+
|      4030|  Cat Toy|   1000|
|       874| Dog Food|   1000|
|      2387| Dog Food|   1000|
|      9986|Bird Cage|   1000|
|      7380|  Cat Toy|   1000|
+----------+---------+-------+
only showing top 5 rows

