# BigDataSpark

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BigDataSnowSpark") \
    .config("spark.jars", 
            "/opt/spark/jars/postgresql-42.7.6.jar,"
            "/opt/spark/jars/clickhouse-jdbc-0.8.5-all.jar") \
    .getOrCreate()

## Создание снежинки

In [2]:
postgres_jdbc_url = "jdbc:postgresql://postgres:5432/mydatabase"
postgres_connection_properties = {
    "user": "myuser",
    "password": "mypassword",
    "driver": "org.postgresql.Driver"
}

In [3]:
mock_data = spark.read.jdbc(url=postgres_jdbc_url, table="mock_data", properties=postgres_connection_properties)

mock_data.show()
mock_data.count()

+---+-------------------+------------------+------------+--------------------+------------------+--------------------+-----------------+-----------------+------------------+-----------------+----------------+--------------------+--------------+------------------+------------+----------------+--------------------+----------------+----------+----------------+--------------+---------------+-------------+--------------------+-----------+--------------+-------------+-----------+--------------------+------------+--------------------+------------+--------------------+-------------+------------+-------------+----------------+--------------------+--------------------+---------------+--------------------+-------------------+-------------+-----------------+--------------------+--------------+----------------+------------------+----------------+
| id|customer_first_name|customer_last_name|customer_age|      customer_email|  customer_country|customer_postal_code|customer_pet_type|customer_pet_name|

10000

In [4]:
def execute_query(query):
    spark.sparkContext._jvm.java.sql.DriverManager.getConnection(
        postgres_jdbc_url, 
        postgres_connection_properties["user"], 
        postgres_connection_properties["password"]
    ).createStatement().execute(query)

In [5]:
from pyspark.sql import functions as F

Таблица countries

In [6]:
execute_query("CREATE TABLE IF NOT EXISTS countries (country_id SERIAL PRIMARY KEY, country_name VARCHAR UNIQUE);")  

countries = mock_data.select(
    F.col("customer_country").alias("country_name")
).filter(F.col("customer_country").isNotNull()).distinct() \
.union(mock_data.select(F.col("seller_country").alias("country_name")).filter(F.col("seller_country").isNotNull()).distinct()) \
.union(mock_data.select(F.col("store_country").alias("country_name")).filter(F.col("store_country").isNotNull()).distinct()) \
.union(mock_data.select(F.col("supplier_country").alias("country_name")).filter(F.col("supplier_country").isNotNull()).distinct()) \
.distinct()

countries.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "countries") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

countries_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="countries", properties=postgres_connection_properties)

countries_with_id.show(3)
countries_with_id.count()

+----------+------------+
|country_id|country_name|
+----------+------------+
|         1|        Chad|
|         2|      Russia|
|         3|    Paraguay|
+----------+------------+
only showing top 3 rows



230

Таблица cities

In [7]:
execute_query("CREATE TABLE IF NOT EXISTS cities (city_id SERIAL PRIMARY KEY, city_name VARCHAR UNIQUE);")

cities = mock_data.select(
    F.col("store_city").alias("city_name")
).filter(F.col("store_city").isNotNull()).distinct() \
.union(mock_data.select(F.col("supplier_city").alias("city_name")).filter(F.col("supplier_city").isNotNull()).distinct()) \
.distinct()

cities.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "cities") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

cities_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="cities", properties=postgres_connection_properties)
cities_with_id.show(3)
cities_with_id.count()

+-------+-----------+
|city_id|  city_name|
+-------+-----------+
|      1|     Takefu|
|      2|Trollhättan|
|      3|   Żyrardów|
+-------+-----------+
only showing top 3 rows



14181

Таблица dates

In [8]:
execute_query("CREATE TABLE IF NOT EXISTS dates (date_id SERIAL PRIMARY KEY, date_value DATE UNIQUE);")

dates = mock_data.select(
    F.col("sale_date").alias("date_value")
).filter(F.col("sale_date").isNotNull()).distinct() \
.union(mock_data.select(F.col("product_release_date").alias("date_value")).distinct()) \
.union(mock_data.select(F.col("product_expiry_date").alias("date_value")).distinct()) \
.distinct()

dates.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "dates") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

dates_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="dates", properties=postgres_connection_properties)
dates_with_id.show(3)
dates_with_id.count()

+-------+----------+
|date_id|date_value|
+-------+----------+
|      1|2021-11-13|
|      2|2021-12-18|
|      3|2021-08-27|
+-------+----------+
only showing top 3 rows



7033

Таблица pet_types

In [9]:
execute_query("CREATE TABLE IF NOT EXISTS pet_types (pet_type_id SERIAL PRIMARY KEY, pet_type_name VARCHAR UNIQUE);")

pet_types = mock_data.select(
    F.col("customer_pet_type").alias("pet_type_name")
).distinct()

pet_types.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "pet_types") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

pet_types_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="pet_types", properties=postgres_connection_properties)
pet_types_with_id.show(3)
pet_types_with_id.count()

+-----------+-------------+
|pet_type_id|pet_type_name|
+-----------+-------------+
|          1|          dog|
|          2|          cat|
|          3|         bird|
+-----------+-------------+



3

Таблица pet_breeds

In [10]:
execute_query("CREATE TABLE IF NOT EXISTS pet_breeds (pet_breed_id SERIAL PRIMARY KEY, pet_breed_name VARCHAR UNIQUE);")

pet_breeds = mock_data.select(
    F.col("customer_pet_breed").alias("pet_breed_name")
).distinct()

pet_breeds.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "pet_breeds") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

pet_breeds_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="pet_breeds", properties=postgres_connection_properties)
pet_breeds_with_id.show(3)
pet_breeds_with_id.count()

+------------+------------------+
|pet_breed_id|    pet_breed_name|
+------------+------------------+
|           1|Labrador Retriever|
|           2|          Parakeet|
|           3|           Siamese|
+------------+------------------+



3

Таблица pet_categories

In [11]:
execute_query("CREATE TABLE IF NOT EXISTS pet_categories (pet_category_id SERIAL PRIMARY KEY, pet_category_name VARCHAR UNIQUE);")

pet_categories = mock_data.select(
    F.col("pet_category").alias("pet_category_name")
).distinct()

pet_categories.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "pet_categories") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

pet_categories_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="pet_categories", properties=postgres_connection_properties)
pet_categories_with_id.show(3)
pet_categories_with_id.count()

+---------------+-----------------+
|pet_category_id|pet_category_name|
+---------------+-----------------+
|              1|         Reptiles|
|              2|             Fish|
|              3|            Birds|
+---------------+-----------------+
only showing top 3 rows



5

Таблица pets

In [12]:
execute_query("CREATE TABLE IF NOT EXISTS pets (pet_id SERIAL PRIMARY KEY, pet_type_id INT, pet_name VARCHAR, pet_breed_id INT, pet_category_id INT, FOREIGN KEY (pet_type_id) REFERENCES pet_types(pet_type_id), FOREIGN KEY (pet_breed_id) REFERENCES pet_breeds(pet_breed_id), FOREIGN KEY (pet_category_id) REFERENCES pet_categories(pet_category_id));")

pets = mock_data.join(pet_types_with_id, mock_data.customer_pet_type == pet_types_with_id.pet_type_name) \
    .join(pet_breeds_with_id, mock_data.customer_pet_breed == pet_breeds_with_id.pet_breed_name) \
    .join(pet_categories_with_id, mock_data.pet_category == pet_categories_with_id.pet_category_name) \
    .select(
        pet_types_with_id["pet_type_id"],
        mock_data["customer_pet_name"].alias("pet_name"),
        pet_breeds_with_id["pet_breed_id"],
        pet_categories_with_id["pet_category_id"]
    ).distinct()

pets.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "pets") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

pets_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="pets", properties=postgres_connection_properties)
pets_with_id.show(3)
pets_with_id.count()

+------+-----------+--------+------------+---------------+
|pet_id|pet_type_id|pet_name|pet_breed_id|pet_category_id|
+------+-----------+--------+------------+---------------+
|     1|          3|   Barty|           1|              3|
|     2|          2| Lombard|           1|              3|
|     3|          1|  Odilia|           1|              3|
+------+-----------+--------+------------+---------------+
only showing top 3 rows



9850

Таблица customers

In [13]:
execute_query("CREATE TABLE IF NOT EXISTS customers (customer_id SERIAL PRIMARY KEY, customer_first_name VARCHAR, customer_last_name VARCHAR, customer_age INT, customer_email VARCHAR, customer_country_id INT, customer_postal_code VARCHAR, customer_pet_id INT, FOREIGN KEY (customer_country_id) REFERENCES countries(country_id), FOREIGN KEY (customer_pet_id) REFERENCES pets(pet_id));")

customers = mock_data.join(countries_with_id, mock_data.customer_country == countries_with_id.country_name) \
    .join(pet_types_with_id, mock_data.customer_pet_type == pet_types_with_id.pet_type_name) \
    .join(pet_breeds_with_id, mock_data.customer_pet_breed == pet_breeds_with_id.pet_breed_name) \
    .join(pet_categories_with_id, mock_data.pet_category == pet_categories_with_id.pet_category_name) \
    .join(pets_with_id, [
        mock_data.customer_pet_name == pets_with_id.pet_name,
        pet_types_with_id.pet_type_id == pets_with_id.pet_type_id,
        pet_breeds_with_id.pet_breed_id == pets_with_id.pet_breed_id,
        pet_categories_with_id.pet_category_id == pets_with_id.pet_category_id]) \
    .select(
        mock_data["customer_first_name"],
        mock_data["customer_last_name"],
        mock_data["customer_age"],
        mock_data["customer_email"],
        countries_with_id["country_id"].alias("customer_country_id"),
        mock_data["customer_postal_code"],
        pets_with_id["pet_id"].alias("customer_pet_id")
    ).distinct()

customers.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "customers") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

customers_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="customers", properties=postgres_connection_properties)
customers_with_id.show(3)
customers_with_id.count()

+-----------+-------------------+------------------+------------+--------------------+-------------------+--------------------+---------------+
|customer_id|customer_first_name|customer_last_name|customer_age|      customer_email|customer_country_id|customer_postal_code|customer_pet_id|
+-----------+-------------------+------------------+------------+--------------------+-------------------+--------------------+---------------+
|          1|              Genia|             Toyer|          47|dnewberry8l@globo...|                 28|         94384 CEDEX|           9752|
|          2|            Lorelle|           Whybrow|          44|pdicken28@seesaa.net|                176|              43-523|           4920|
|          3|             Irwinn|        Shelbourne|          25|jvernazzafl@linke...|                151|                 P6A|           5685|
+-----------+-------------------+------------------+------------+--------------------+-------------------+--------------------+---------

10000

Таблица sellers

In [14]:
execute_query("CREATE TABLE IF NOT EXISTS sellers (seller_id SERIAL PRIMARY KEY, seller_first_name VARCHAR, seller_last_name VARCHAR, seller_email VARCHAR, seller_country_id INT, seller_postal_code VARCHAR, FOREIGN KEY (seller_country_id) REFERENCES countries(country_id));")

sellers = mock_data.join(countries_with_id, mock_data.seller_country == countries_with_id.country_name) \
    .select(
        mock_data["seller_first_name"],
        mock_data["seller_last_name"],
        mock_data["seller_email"],
        countries_with_id["country_id"].alias("seller_country_id"),
        mock_data["seller_postal_code"]
    ).distinct()

sellers.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "sellers") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

sellers_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="sellers", properties=postgres_connection_properties)
sellers_with_id.show(3)
sellers_with_id.count()

+---------+-----------------+----------------+--------------------+-----------------+------------------+
|seller_id|seller_first_name|seller_last_name|        seller_email|seller_country_id|seller_postal_code|
+---------+-----------------+----------------+--------------------+-----------------+------------------+
|        1|             Dari|     Brownbridge|dbrownbridgeet@ph...|                2|            352195|
|        2|             Barr|          Conkey| bconkeyes@goo.ne.jp|                2|            188502|
|        3|           Benita|       Davidescu|bdavidescu17@admi...|                2|            249929|
+---------+-----------------+----------------+--------------------+-----------------+------------------+
only showing top 3 rows



10000

Таблица stores

In [15]:
execute_query("CREATE TABLE IF NOT EXISTS stores (store_id SERIAL PRIMARY KEY, store_name VARCHAR, store_location VARCHAR, store_city_id INT, store_state VARCHAR, store_country_id INT, store_phone VARCHAR, store_email VARCHAR, FOREIGN KEY (store_city_id) REFERENCES cities(city_id), FOREIGN KEY (store_country_id) REFERENCES countries(country_id));")

stores = mock_data.join(cities_with_id, mock_data.store_city == cities_with_id.city_name) \
    .join(countries_with_id, mock_data.store_country == countries_with_id.country_name) \
    .select(
        mock_data["store_name"],
        mock_data["store_location"],
        cities_with_id["city_id"].alias("store_city_id"),
        mock_data["store_state"],
        countries_with_id["country_id"].alias("store_country_id"),
        mock_data["store_phone"],
        mock_data["store_email"]
    ).distinct()

stores.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "stores") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

stores_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="stores", properties=postgres_connection_properties)
stores_with_id.show(3)
stores_with_id.count()

+--------+-------------+--------------+-------------+-----------+----------------+------------+--------------------+
|store_id|   store_name|store_location|store_city_id|store_state|store_country_id| store_phone|         store_email|
+--------+-------------+--------------+-------------+-----------+----------------+------------+--------------------+
|       1|Thoughtsphere|     7th Floor|         1657|        VER|             176|816-996-5401|jbeldersonlf@irs.gov|
|       2|        Eazzy|      Room 182|         2592|        ENG|             173|114-332-5669|   tcameliada@vk.com|
|       3|        Quimm|      Apt 1809|         5223|         10|             102|577-217-8194| tnunnery2g@imdb.com|
+--------+-------------+--------------+-------------+-----------+----------------+------------+--------------------+
only showing top 3 rows



10000

Таблица suppliers

In [16]:
execute_query("CREATE TABLE IF NOT EXISTS suppliers (supplier_id SERIAL PRIMARY KEY, supplier_name VARCHAR, supplier_contact VARCHAR, supplier_email VARCHAR, supplier_phone VARCHAR, supplier_address VARCHAR, supplier_city_id INT, supplier_country_id INT, FOREIGN KEY (supplier_city_id) REFERENCES cities(city_id), FOREIGN KEY (supplier_country_id) REFERENCES countries(country_id));")

suppliers = mock_data.join(cities_with_id, mock_data.supplier_city == cities_with_id.city_name) \
    .join(countries_with_id, mock_data.supplier_country == countries_with_id.country_name) \
    .select(
        mock_data["supplier_name"],
        mock_data["supplier_contact"],
        mock_data["supplier_email"],
        mock_data["supplier_phone"],
        mock_data["supplier_address"],
        cities_with_id["city_id"].alias("supplier_city_id"),
        countries_with_id["country_id"].alias("supplier_country_id")
    ).distinct()

suppliers.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "suppliers") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

suppliers_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="suppliers", properties=postgres_connection_properties)
suppliers_with_id.show(3)
suppliers_with_id.count()

+-----------+-------------+----------------+--------------------+--------------+----------------+----------------+-------------------+
|supplier_id|supplier_name|supplier_contact|      supplier_email|supplier_phone|supplier_address|supplier_city_id|supplier_country_id|
+-----------+-------------+----------------+--------------------+--------------+----------------+----------------+-------------------+
|          1|   Browsezoom|    Chandra Heck| checkkj@dropbox.com|  298-320-9959|    PO Box 16684|            8243|                 10|
|          2|      Jetwire|  Alvan Fronczak|afronczak8f@naver...|  292-753-7593|    PO Box 88685|            8344|                  2|
|          3|         DabZ|   Tatum Gorling| tgorlingc7@phoca.cz|  844-754-1883|      20th Floor|            8627|                199|
+-----------+-------------+----------------+--------------------+--------------+----------------+----------------+-------------------+
only showing top 3 rows



10000

Таблица product_names

In [17]:
execute_query("CREATE TABLE IF NOT EXISTS product_names (product_name_id SERIAL PRIMARY KEY, product_name VARCHAR UNIQUE);")

product_names = mock_data.select(F.col("product_name")).distinct()

product_names.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "product_names") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

product_names_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="product_names", properties=postgres_connection_properties)
product_names_with_id.show(3)
product_names_with_id.count()

+---------------+------------+
|product_name_id|product_name|
+---------------+------------+
|              1|   Bird Cage|
|              2|    Dog Food|
|              3|     Cat Toy|
+---------------+------------+



3

Таблица product_categories

In [18]:
execute_query("CREATE TABLE IF NOT EXISTS product_categories (product_category_id SERIAL PRIMARY KEY, product_category_name VARCHAR UNIQUE);")

product_categories = mock_data.select(F.col("product_category").alias("product_category_name")).distinct()

product_categories.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "product_categories") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

product_categories_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="product_categories", properties=postgres_connection_properties)
product_categories_with_id.show(3)
product_categories_with_id.count()

+-------------------+---------------------+
|product_category_id|product_category_name|
+-------------------+---------------------+
|                  1|                 Cage|
|                  2|                 Food|
|                  3|                  Toy|
+-------------------+---------------------+



3

Таблица product_colors

In [19]:
execute_query("CREATE TABLE IF NOT EXISTS product_colors (product_color_id SERIAL PRIMARY KEY, product_color_name VARCHAR UNIQUE);")

product_colors = mock_data.select(F.col("product_color").alias("product_color_name")).distinct()

product_colors.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "product_colors") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

product_colors_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="product_colors", properties=postgres_connection_properties)
product_colors_with_id.show(3)
product_colors_with_id.count()

+----------------+------------------+
|product_color_id|product_color_name|
+----------------+------------------+
|               1|              Teal|
|               2|             Khaki|
|               3|           Crimson|
+----------------+------------------+
only showing top 3 rows



19

Таблица product_sizes

In [20]:
execute_query("CREATE TABLE IF NOT EXISTS product_sizes (product_size_id SERIAL PRIMARY KEY, product_size_name VARCHAR UNIQUE);")

product_sizes = mock_data.select(F.col("product_size").alias("product_size_name")).distinct()

product_sizes.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "product_sizes") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

product_sizes_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="product_sizes", properties=postgres_connection_properties)
product_sizes_with_id.show(3)
product_sizes_with_id.count()

+---------------+-----------------+
|product_size_id|product_size_name|
+---------------+-----------------+
|              1|           Medium|
|              2|            Small|
|              3|            Large|
+---------------+-----------------+



3

Таблица product_brands

In [21]:
execute_query("CREATE TABLE IF NOT EXISTS product_brands (product_brand_id SERIAL PRIMARY KEY, product_brand_name VARCHAR UNIQUE);")

product_brands = mock_data.select(F.col("product_brand").alias("product_brand_name")).distinct()

product_brands.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "product_brands") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

product_brands_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="product_brands", properties=postgres_connection_properties)
product_brands_with_id.show(3)
product_brands_with_id.count()

+----------------+------------------+
|product_brand_id|product_brand_name|
+----------------+------------------+
|               1|           Jetwire|
|               2|          Jaxworks|
|               3|         Reallinks|
+----------------+------------------+
only showing top 3 rows



383

Таблица product_materials

In [22]:
execute_query("CREATE TABLE IF NOT EXISTS product_materials (product_material_id SERIAL PRIMARY KEY, product_material_name VARCHAR UNIQUE);")

product_materials = mock_data.select(F.col("product_material").alias("product_material_name")).distinct()

product_materials.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "product_materials") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

product_materials_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="product_materials", properties=postgres_connection_properties)
product_materials_with_id.show(3)
product_materials_with_id.count()

+-------------------+---------------------+
|product_material_id|product_material_name|
+-------------------+---------------------+
|                  1|                Steel|
|                  2|                Vinyl|
|                  3|              Granite|
+-------------------+---------------------+
only showing top 3 rows



11

Таблица products

In [23]:
execute_query("CREATE TABLE IF NOT EXISTS products (product_id SERIAL PRIMARY KEY, product_name_id INT, product_category_id INT, product_price DECIMAL, product_quantity INT, product_weight DECIMAL, product_color_id INT, product_size_id INT, product_brand_id INT, product_material_id INT, product_description TEXT, product_rating DECIMAL, product_reviews INT, product_release_date_id INT, product_expiry_date_id INT, product_supplier_id INT, FOREIGN KEY (product_name_id) REFERENCES product_names(product_name_id), FOREIGN KEY (product_category_id) REFERENCES product_categories(product_category_id), FOREIGN KEY (product_color_id) REFERENCES product_colors(product_color_id), FOREIGN KEY (product_size_id) REFERENCES product_sizes(product_size_id), FOREIGN KEY (product_brand_id) REFERENCES product_brands(product_brand_id), FOREIGN KEY (product_material_id) REFERENCES product_materials(product_material_id), FOREIGN KEY (product_release_date_id) REFERENCES dates(date_id), FOREIGN KEY (product_expiry_date_id) REFERENCES dates(date_id), FOREIGN KEY (product_supplier_id) REFERENCES suppliers(supplier_id));")

products = mock_data.join(product_names_with_id, mock_data.product_name == product_names_with_id.product_name) \
    .join(product_categories_with_id, mock_data.product_category == product_categories_with_id.product_category_name) \
    .join(product_colors_with_id, mock_data.product_color == product_colors_with_id.product_color_name) \
    .join(product_sizes_with_id, mock_data.product_size == product_sizes_with_id.product_size_name) \
    .join(product_brands_with_id, mock_data.product_brand == product_brands_with_id.product_brand_name) \
    .join(product_materials_with_id, mock_data.product_material == product_materials_with_id.product_material_name) \
    .join(suppliers_with_id, [mock_data.supplier_name == suppliers_with_id.supplier_name, mock_data.supplier_contact == suppliers_with_id.supplier_contact]) \
    .join(dates_with_id.alias("release_date"), mock_data.product_release_date == F.col("release_date.date_value")) \
    .join(dates_with_id.alias("expiry_date"), mock_data.product_expiry_date == F.col("expiry_date.date_value")) \
    .select(
        product_names_with_id["product_name_id"].alias("product_name_id"),
        product_categories_with_id["product_category_id"].alias("product_category_id"),
        mock_data["product_price"],
        mock_data["product_quantity"],
        mock_data["product_weight"],
        product_colors_with_id["product_color_id"].alias("product_color_id"),
        product_sizes_with_id["product_size_id"].alias("product_size_id"),
        product_brands_with_id["product_brand_id"].alias("product_brand_id"),
        product_materials_with_id["product_material_id"].alias("product_material_id"),
        mock_data["product_description"],
        mock_data["product_rating"],
        mock_data["product_reviews"],
        F.col("release_date.date_id").alias("product_release_date_id"),
        F.col("expiry_date.date_id").alias("product_expiry_date_id"),
        suppliers_with_id["supplier_id"].alias("product_supplier_id")
    ).distinct()

products.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "products") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

products_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="products", properties=postgres_connection_properties)
products_with_id.show(3)
products_with_id.count()

+----------+---------------+-------------------+--------------------+----------------+--------------------+----------------+---------------+----------------+-------------------+--------------------+--------------------+---------------+-----------------------+----------------------+-------------------+
|product_id|product_name_id|product_category_id|       product_price|product_quantity|      product_weight|product_color_id|product_size_id|product_brand_id|product_material_id| product_description|      product_rating|product_reviews|product_release_date_id|product_expiry_date_id|product_supplier_id|
+----------+---------------+-------------------+--------------------+----------------+--------------------+----------------+---------------+----------------+-------------------+--------------------+--------------------+---------------+-----------------------+----------------------+-------------------+
|         1|              1|                  3|20.09000000000000...|              12|7.000

10000

Таблица sales

In [24]:
execute_query("CREATE TABLE IF NOT EXISTS sales (sale_id SERIAL PRIMARY KEY, sale_date_id INT, sale_customer_id INT, sale_seller_id INT, sale_product_id INT, sale_store_id INT, sale_quantity INT, sale_total_price DECIMAL, FOREIGN KEY (sale_date_id) REFERENCES dates(date_id), FOREIGN KEY (sale_customer_id) REFERENCES customers(customer_id), FOREIGN KEY (sale_seller_id) REFERENCES sellers(seller_id), FOREIGN KEY (sale_product_id) REFERENCES products(product_id), FOREIGN KEY (sale_store_id) REFERENCES stores(store_id));")

md = mock_data.alias("md")
d = dates_with_id.alias("d")
c = customers_with_id.alias("c")
s = sellers_with_id.alias("s")
st = stores_with_id.alias("st")
p = products_with_id.alias("p")
pn = product_names_with_id.alias("pn")
pc = product_categories_with_id.alias("pc")
pco = product_colors_with_id.alias("pco")
ps = product_sizes_with_id.alias("ps")
pb = product_brands_with_id.alias("pb")
pm = product_materials_with_id.alias("pm")

sales = md \
    .join(d, md.sale_date == d.date_value) \
    .join(c, md.customer_email == c.customer_email) \
    .join(s, md.seller_email == s.seller_email) \
    .join(st, [md.store_name == st.store_name,
              md.store_location == st.store_location,
              md.store_phone == st.store_phone,
              md.store_email == st.store_email]) \
    .join(pn, md.product_name == pn.product_name) \
    .join(pc, md.product_category == pc.product_category_name) \
    .join(pco, md.product_color == pco.product_color_name) \
    .join(ps, md.product_size == ps.product_size_name) \
    .join(pb, md.product_brand == pb.product_brand_name) \
    .join(pm, md.product_material == pm.product_material_name) \
    .join(p, [
        p.product_name_id == pn.product_name_id,
        p.product_category_id == pc.product_category_id,
        p.product_color_id == pco.product_color_id,
        p.product_size_id == ps.product_size_id,
        p.product_brand_id == pb.product_brand_id,
        p.product_material_id == pm.product_material_id,
        md.product_price == p.product_price,
        md.product_quantity == p.product_quantity,
        md.product_weight == p.product_weight
    ]) \
    .select(
        d["date_id"].alias("sale_date_id"),
        c["customer_id"].alias("sale_customer_id"),
        s["seller_id"].alias("sale_seller_id"),
        p["product_id"].alias("sale_product_id"),
        st["store_id"].alias("sale_store_id"),
        md["sale_quantity"],
        md["sale_total_price"]
    ).distinct()

sales.write \
    .format("jdbc") \
    .option("url", postgres_jdbc_url) \
    .option("dbtable", "sales") \
    .option("user", postgres_connection_properties["user"]) \
    .option("password", postgres_connection_properties["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

sales_with_id = spark.read.jdbc(url=postgres_jdbc_url, table="sales", properties=postgres_connection_properties)
sales_with_id.show(3)
sales_with_id.count()

+-------+------------+----------------+--------------+---------------+-------------+-------------+--------------------+
|sale_id|sale_date_id|sale_customer_id|sale_seller_id|sale_product_id|sale_store_id|sale_quantity|    sale_total_price|
+-------+------------+----------------+--------------+---------------+-------------+-------------+--------------------+
|      1|         283|            3070|          3636|           5423|         1130|           10|63.16000000000000...|
|      2|         277|            6035|          3052|           2538|         6559|            2|95.28000000000000...|
|      3|         260|            8460|           595|           3836|         2353|            7|216.0600000000000...|
+-------+------------+----------------+--------------+---------------+-------------+-------------+--------------------+
only showing top 3 rows



10000

## Создание витрин с отчётами

In [25]:
clickhouse_jdbc_url = "jdbc:clickhouse://clickhouse:8123/myreports"
clickhouse_connection_properties = {
    "user": "myclickhouse",
    "password": "mypassword",
    "driver": "com.clickhouse.jdbc.ClickHouseDriver"
}

### Витрина продаж по продуктам

Топ-10 самых продаваемых продуктов

In [26]:
top_10_products = sales_with_id.join(products_with_id, sales_with_id.sale_product_id == products_with_id.product_id) \
    .join(product_names_with_id, products_with_id.product_name_id == product_names_with_id.product_name_id) \
    .groupBy(product_names_with_id.product_name) \
    .agg(
        F.sum(sales_with_id.sale_quantity).alias("total_quantity_sold"),
        F.sum(sales_with_id.sale_total_price).alias("total_revenue")
    ) \
    .orderBy(F.desc("total_quantity_sold")) \
    .limit(10)

top_10_products.show()

+------------+-------------------+--------------------+
|product_name|total_quantity_sold|       total_revenue|
+------------+-------------------+--------------------+
|    Dog Food|              18298|848567.1900000000...|
|   Bird Cage|              18205|847478.0500000000...|
|     Cat Toy|              18120|833806.8800000000...|
+------------+-------------------+--------------------+



Общая выручка по категориям продуктов

In [27]:
revenue_by_category = sales_with_id.join(products_with_id, sales_with_id.sale_product_id == products_with_id.product_id) \
    .join(product_categories_with_id, products_with_id.product_category_id == product_categories_with_id.product_category_id) \
    .groupBy(product_categories_with_id.product_category_name) \
    .agg(
        F.sum(sales_with_id.sale_total_price).alias("total_revenue")
    )

revenue_by_category.show()

+---------------------+--------------------+
|product_category_name|       total_revenue|
+---------------------+--------------------+
|                 Cage|831117.9400000000...|
|                 Food|830632.5500000000...|
|                  Toy|868101.6300000000...|
+---------------------+--------------------+



Средний рейтинг и количество отзывов для каждого продукта

In [28]:
rating_reviews = products_with_id.join(product_names_with_id, products_with_id.product_name_id == product_names_with_id.product_name_id) \
    .groupBy(product_names_with_id.product_name) \
    .agg(
        F.avg(products_with_id.product_rating).alias("avg_rating"),
        F.sum(products_with_id.product_reviews).alias("total_reviews")
    )

rating_reviews.show()

+------------+--------------------+-------------+
|product_name|          avg_rating|total_reviews|
+------------+--------------------+-------------+
|   Bird Cage|3.000149164677804...|      1682260|
|    Dog Food|3.018298891883797...|      1653413|
|     Cat Toy|3.006860078573587...|      1676222|
+------------+--------------------+-------------+



In [29]:
top_10_with_rating = top_10_products.join(
    rating_reviews, on="product_name", how="left"
)

top_10_with_rating.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "top_10_products_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (product_name)") \
    .mode("overwrite") \
    .save()

revenue_by_category.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "revenue_by_category_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (product_category_name)") \
    .mode("overwrite") \
    .save()

rating_reviews.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "product_rating_reviews_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (product_name)") \
    .mode("overwrite") \
    .save()

### Витрина продаж по клиентам

Топ-10 клиентов с наибольшей общей суммой покупок

In [30]:
top_10_customers = sales_with_id.join(customers_with_id, sales_with_id.sale_customer_id == customers_with_id.customer_id) \
    .groupBy(
        customers_with_id.customer_id,
        customers_with_id.customer_first_name,
        customers_with_id.customer_last_name,
        customers_with_id.customer_email
    ) \
    .agg(
        F.sum(sales_with_id.sale_total_price).alias("total_purchase")
    ) \
    .orderBy(F.desc("total_purchase")) \
    .limit(10)

top_10_customers.show()

+-----------+-------------------+------------------+--------------------+--------------------+
|customer_id|customer_first_name|customer_last_name|      customer_email|      total_purchase|
+-----------+-------------------+------------------+--------------------+--------------------+
|       9794|                Gus|         Hartshorn| bfeasby57@youku.com|499.8500000000000...|
|        946|              Hayes|            McKain|sstappardbp@busin...|499.8000000000000...|
|       5166|              Dawna|             Impey|    rivattspm@un.org|499.7600000000000...|
|       7554|                Ava|             Lomas|dsorea0@geocities...|499.7600000000000...|
|       1966|            Lavinia|         Horsburgh|previllh3@tinyurl...|499.7300000000000...|
|       9356|               Dame|        Auchinleck|jthurnhamqe@sourc...|499.7100000000000...|
|       4384|           Isahella|            Colley|bselewayi0@chron.com|499.6900000000000...|
|       6096|             Sisely|          Bonevan

Распределение клиентов по странам

In [31]:
customers_by_country = customers_with_id.join(countries_with_id, customers_with_id.customer_country_id == countries_with_id.country_id) \
    .groupBy(countries_with_id.country_name) \
    .agg(
        F.count(customers_with_id.customer_id).alias("customer_count")
    )

customers_by_country.show()

+--------------------+--------------+
|        country_name|customer_count|
+--------------------+--------------+
|                Chad|             5|
|            Paraguay|            18|
|              Russia|           628|
| U.S. Virgin Islands|             1|
|               Yemen|            39|
|             Senegal|             4|
|              Sweden|           264|
|Svalbard and Jan ...|             1|
|              Guyana|             1|
|         Philippines|           555|
|             Eritrea|             3|
|            Djibouti|             1|
|            Malaysia|            40|
|              Turkey|             1|
|              Malawi|            12|
|                Iraq|             8|
|             Germany|            30|
|Northern Mariana ...|             1|
|             Comoros|            13|
|         Afghanistan|            31|
+--------------------+--------------+
only showing top 20 rows



Средний чек для каждого клиента

In [32]:
avg_check_by_customer = sales_with_id.groupBy("sale_customer_id") \
    .agg(
        (F.sum(sales_with_id.sale_total_price) / F.sum(sales_with_id.sale_quantity)).alias("avg_check")
    ) \
    .join(customers_with_id, sales_with_id.sale_customer_id == customers_with_id.customer_id) \
    .select(
        customers_with_id.customer_id,
        customers_with_id.customer_first_name,
        customers_with_id.customer_last_name,
        customers_with_id.customer_email,
        "avg_check"
    )

avg_check_by_customer.show()

+-----------+-------------------+------------------+--------------------+--------------------+
|customer_id|customer_first_name|customer_last_name|      customer_email|           avg_check|
+-----------+-------------------+------------------+--------------------+--------------------+
|       4900|            Vincent|           Edgerly| ltiesr8@oaic.gov.au|3.820000000000000000|
|       6336|              Allen|           Avesque|gcastellag8@issuu...|25.46800000000000...|
|       7880|             Frazer|             Leuty|pprayjx@bizjourna...|23.92000000000000...|
|       3997|              Abner|           Brandle|mreinbeckgx@howst...|193.6050000000000...|
|       2659|            Ingunna|          Farlambe|rruslenj0@yolasit...|75.43000000000000...|
|       2866|          Rosemaria|            Batrop| wrivallandma@w3.org|477.5200000000000...|
|       1088|            Chester|          Harsnipe|oetheredgenh@tamu...|359.1700000000000...|
|       1342|                Mae|           Halbor

In [33]:
top_10_customers.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "top_10_customers_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (customer_id)") \
    .mode("overwrite") \
    .save()

customers_by_country.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "customers_by_country_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (country_name)") \
    .mode("overwrite") \
    .save()

avg_check_by_customer.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "avg_check_by_customer_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (customer_id)") \
    .mode("overwrite") \
    .save()

### Витрина продаж по времени

Месячные и годовые тренды продаж и средний размер заказа по месяцам

In [34]:
sales_with_date = sales_with_id.join(dates_with_id, sales_with_id.sale_date_id == dates_with_id.date_id) \
    .withColumn("year", F.year("date_value")) \
    .withColumn("month", F.month("date_value"))

In [35]:
monthly_trends = sales_with_date.groupBy("year", "month") \
    .agg(
        F.sum("sale_total_price").alias("total_revenue"),
        F.sum("sale_quantity").alias("total_quantity"),
        (F.sum("sale_total_price") / F.sum("sale_quantity")).alias("avg_order_size")
    ) \
    .orderBy("year", "month")

monthly_trends.show()

+----+-----+--------------------+--------------+--------------------+
|year|month|       total_revenue|total_quantity|      avg_order_size|
+----+-----+--------------------+--------------+--------------------+
|2021|    1|224158.5400000000...|          4856|46.16114909390444...|
|2021|    2|192348.3100000000...|          4070|47.26002702702702...|
|2021|    3|207282.2000000000...|          4561|45.44665643499232...|
|2021|    4|206592.8200000000...|          4564|45.26573619631901...|
|2021|    5|211764.8600000000...|          4451|47.57691754661873...|
|2021|    6|215042.8000000000...|          4438|48.45488958990536...|
|2021|    7|220496.5100000000...|          4750|46.42031789473684...|
|2021|    8|221275.7800000000...|          4818|45.92689497716894...|
|2021|    9|210623.4300000000...|          4507|46.73251164854670...|
|2021|   10|228743.3200000000...|          4976|45.96931672025723...|
|2021|   11|200154.6900000000...|          4297|46.58010006981615...|
|2021|   12|191368.8

In [36]:
yearly_trends = sales_with_date.groupBy("year") \
    .agg(
        F.sum("sale_total_price").alias("total_revenue"),
        F.sum("sale_quantity").alias("total_quantity"),
        (F.sum("sale_total_price") / F.sum("sale_quantity")).alias("avg_order_size")
    ) \
    .orderBy("year")

yearly_trends.show()

+----+--------------------+--------------+--------------------+
|year|       total_revenue|total_quantity|      avg_order_size|
+----+--------------------+--------------+--------------------+
|2021|2529852.120000000...|         54623|46.31477802390934...|
+----+--------------------+--------------+--------------------+



Сравнение выручки за разные периоды

In [37]:
from pyspark.sql import Window

window_spec = Window.orderBy("year", "month")
monthly_trends_with_prev = monthly_trends.withColumn(
    "prev_total_revenue", F.lag("total_revenue").over(window_spec)
).withColumn(
    "revenue_mom_growth",
    ((F.col("total_revenue") - F.col("prev_total_revenue")) / F.col("prev_total_revenue")) * 100
)

revenue_comparison = monthly_trends_with_prev

revenue_comparison.show()

+----+-----+--------------------+--------------+--------------------+--------------------+------------------+
|year|month|       total_revenue|total_quantity|      avg_order_size|  prev_total_revenue|revenue_mom_growth|
+----+-----+--------------------+--------------+--------------------+--------------------+------------------+
|2021|    1|224158.5400000000...|          4856|46.16114909390444...|                NULL|              NULL|
|2021|    2|192348.3100000000...|          4070|47.26002702702702...|224158.5400000000...|        -14.191000|
|2021|    3|207282.2000000000...|          4561|45.44665643499232...|192348.3100000000...|          7.764000|
|2021|    4|206592.8200000000...|          4564|45.26573619631901...|207282.2000000000...|         -0.332600|
|2021|    5|211764.8600000000...|          4451|47.57691754661873...|206592.8200000000...|          2.503500|
|2021|    6|215042.8000000000...|          4438|48.45488958990536...|211764.8600000000...|          1.547900|
|2021|    

In [38]:
monthly_trends.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "monthly_sales_trends_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (year, month)") \
    .mode("overwrite") \
    .save()

yearly_trends.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "yearly_sales_trends_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (year)") \
    .mode("overwrite") \
    .save()

revenue_comparison.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "sales_revenue_comparison_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (year, month)") \
    .mode("overwrite") \
    .save()

### Витрина продаж по магазинам

Топ-5 магазинов с наибольшей выручкой

In [39]:
top_5_stores = sales_with_id.join(stores_with_id, sales_with_id.sale_store_id == stores_with_id.store_id) \
    .groupBy(
        stores_with_id.store_id,
        stores_with_id.store_name,
        stores_with_id.store_location,
        stores_with_id.store_city_id,
        stores_with_id.store_country_id
    ) \
    .agg(
        F.sum(sales_with_id.sale_total_price).alias("total_revenue")
    ) \
    .orderBy(F.desc("total_revenue")) \
    .limit(5)

top_5_stores.show()

+--------+-----------+--------------+-------------+----------------+--------------------+
|store_id| store_name|store_location|store_city_id|store_country_id|       total_revenue|
+--------+-----------+--------------+-------------+----------------+--------------------+
|    9786|       DabZ|    13th Floor|         6067|             191|499.8500000000000...|
|     363|Thoughtblab|      Apt 1200|          424|             176|499.8000000000000...|
|    9433|     Camido|      Apt 1720|         2001|               7|499.7600000000000...|
|    3569|   Edgeblab|    17th Floor|         5951|             123|499.7600000000000...|
|    6106|    Centizu|      Suite 86|         1497|             176|499.7300000000000...|
+--------+-----------+--------------+-------------+----------------+--------------------+



Распределение продаж по городам и странам

In [40]:
sales_by_city_country = sales_with_id.join(stores_with_id, sales_with_id.sale_store_id == stores_with_id.store_id) \
    .join(cities_with_id, stores_with_id.store_city_id == cities_with_id.city_id) \
    .join(countries_with_id, stores_with_id.store_country_id == countries_with_id.country_id) \
    .groupBy(
        cities_with_id.city_name,
        countries_with_id.country_name
    ) \
    .agg(
        F.sum(sales_with_id.sale_total_price).alias("total_revenue"),
        F.count(sales_with_id.sale_id).alias("sales_count")
    )

sales_by_city_country.show()

+-------------------+-------------+--------------------+-----------+
|          city_name| country_name|       total_revenue|sales_count|
+-------------------+-------------+--------------------+-----------+
|Costa de Rios Frios|        Japan|180.6900000000000...|          1|
|            Lintaca|      Nigeria|193.6500000000000...|          1|
|           Canga’an|    Indonesia|437.8100000000000...|          1|
|            Avignon|        China|400.9100000000000...|          2|
|            Wangren|    Indonesia|36.72000000000000...|          1|
|             Gerong|        China|143.9700000000000...|          1|
|            Donghui|    Indonesia|470.0000000000000...|          1|
|       Kohtla-Järve|        China|251.8600000000000...|          1|
|             Linshi|United States|101.0400000000000...|          1|
|            Taiping|     Portugal|419.6100000000000...|          1|
|             Chanhe|    Argentina|284.5200000000000...|          1|
|        Lubukgadang|    Indonesia

Средний чек для каждого магазина

In [41]:
avg_check_by_store = sales_with_id.join(stores_with_id, sales_with_id.sale_store_id == stores_with_id.store_id) \
    .groupBy(
        stores_with_id.store_id,
        stores_with_id.store_name,
        stores_with_id.store_location
    ) \
    .agg(
        (F.sum(sales_with_id.sale_total_price) / F.sum(sales_with_id.sale_quantity)).alias("avg_check")
    )

avg_check_by_store.show()

+--------+------------+--------------+--------------------+
|store_id|  store_name|store_location|           avg_check|
+--------+------------+--------------+--------------------+
|     148|        Kare|  PO Box 99919|35.59000000000000...|
|     463|        Vitz|     5th Floor|74.66750000000000...|
|     471|      Avaveo|      Apt 1731|5.485714285714285714|
|     496|     Demivee|     7th Floor|48.97700000000000...|
|     833|       Avamm|      Apt 1338|57.52333333333333...|
|    1088|     Tagfeed|     1st Floor|23.82000000000000...|
|    1238|     Zoonder|     Room 1316|170.8400000000000...|
|    1342|      JumpXS|      Suite 88|23.96833333333333...|
|    1580|  Bubbletube|      Suite 64|41.10666666666666...|
|    1591|        Geba|  PO Box 52599|34.00250000000000...|
|    1645|Chatterpoint|      Suite 31|65.13333333333333...|
|    1829|       Yadel|  PO Box 44655|107.9600000000000...|
|    1959|    Edgewire|  PO Box 81210|242.5800000000000...|
|    2122|       Eadel|  PO Box 72093|49

In [42]:
top_5_stores.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "top_5_stores_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (store_id)") \
    .mode("overwrite") \
    .save()

sales_by_city_country.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "sales_by_city_country_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (country_name, city_name)") \
    .mode("overwrite") \
    .save()

avg_check_by_store.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "avg_check_by_store_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (store_id)") \
    .mode("overwrite") \
    .save()

### Витрина продаж по поставщикам

Топ-5 поставщиков с наибольшей выручкой

In [43]:
top_5_suppliers = sales_with_id.join(products_with_id, sales_with_id.sale_product_id == products_with_id.product_id) \
    .join(suppliers_with_id, products_with_id.product_supplier_id == suppliers_with_id.supplier_id) \
    .groupBy(
        suppliers_with_id.supplier_id,
        suppliers_with_id.supplier_name,
        suppliers_with_id.supplier_contact,
        suppliers_with_id.supplier_email
    ) \
    .agg(
        F.sum(sales_with_id.sale_total_price).alias("total_revenue")
    ) \
    .orderBy(F.desc("total_revenue")) \
    .limit(5)

top_5_suppliers.show()

+-----------+-------------+----------------+--------------------+--------------------+
|supplier_id|supplier_name|supplier_contact|      supplier_email|       total_revenue|
+-----------+-------------+----------------+--------------------+--------------------+
|       1017|   Brainverse| Barbabas Feasby|    bfeasby57@ed.gov|499.8500000000000...|
|        634|        Jamia|    Sax Stappard|sstappardbp@webno...|499.8000000000000...|
|       9553|      Demimbu|   Reggis Ivatts|   rivattspm@nps.gov|499.7600000000000...|
|       1225|        Eabox|       Dell Sore|     dsorea0@soup.io|499.7600000000000...|
|       8428|   Browsezoom|  Padgett Revill|previllh3@pcworld...|499.7300000000000...|
+-----------+-------------+----------------+--------------------+--------------------+



Средняя цена товаров от каждого поставщика

In [44]:
avg_price_by_supplier = products_with_id.join(suppliers_with_id, products_with_id.product_supplier_id == suppliers_with_id.supplier_id) \
    .groupBy(
        suppliers_with_id.supplier_id,
        suppliers_with_id.supplier_name,
        suppliers_with_id.supplier_contact,
        suppliers_with_id.supplier_email
    ) \
    .agg(
        F.avg(products_with_id.product_price).alias("avg_product_price")
    )

avg_price_by_supplier.show()

+-----------+-------------+-------------------+--------------------+--------------------+
|supplier_id|supplier_name|   supplier_contact|      supplier_email|   avg_product_price|
+-----------+-------------+-------------------+--------------------+--------------------+
|        148|       Meevee|    Doralin Doudney| ddoudney7l@ning.com|93.18000000000000...|
|        463|   Bubbletube|       Dayle Emmett|demmettrp@tripadv...|98.69000000000000...|
|        471|        Quatz|       Rip Hulstrom|rhulstrombt@githu...|80.73000000000000...|
|        496|        Vidoo|   Torrence Trevett|   ttrevetth7@pen.io|12.31000000000000...|
|        833|       Voonyx|        Brade Waugh|   bwaughms@xrea.com|37.21000000000000...|
|       1088|       Divape|    Elfrida Merioth|emerioth27@storif...|77.81000000000000...|
|       1238|        Yodoo|     Tabby Slograve|tslogravemh@sprin...|28.34000000000000...|
|       1342| Jabbersphere|       Buffy Astell|   bastelldz@loc.gov|6.100000000000000...|
|       15

Распределение продаж по странам поставщиков

In [45]:
sales_by_supplier_country = sales_with_id.join(products_with_id, sales_with_id.sale_product_id == products_with_id.product_id) \
    .join(suppliers_with_id, products_with_id.product_supplier_id == suppliers_with_id.supplier_id) \
    .join(countries_with_id, suppliers_with_id.supplier_country_id == countries_with_id.country_id) \
    .groupBy(
        countries_with_id.country_name
    ) \
    .agg(
        F.sum(sales_with_id.sale_total_price).alias("total_revenue"),
        F.count(sales_with_id.sale_id).alias("sales_count")
    )

sales_by_supplier_country.show()

+--------------------+--------------------+-----------+
|        country_name|       total_revenue|sales_count|
+--------------------+--------------------+-----------+
|                Chad|2051.030000000000...|          7|
|              Russia|149206.7500000000...|        582|
|            Paraguay|2957.860000000000...|         13|
|               Yemen|11414.63000000000...|         41|
|             Senegal|3162.410000000000...|         11|
|              Sweden|52074.94000000000...|        217|
|             Tokelau|425.9900000000000...|          1|
|French Southern T...|287.3000000000000...|          1|
|            Kiribati|409.8300000000000...|          1|
|              Guyana|1325.270000000000...|          4|
|         Philippines|136135.1000000000...|        536|
|             Eritrea|352.9200000000000...|          2|
|               Tonga|340.8000000000000...|          3|
|            Djibouti|397.6700000000000...|          1|
|            Malaysia|13223.61000000000...|     

In [46]:
top_5_suppliers.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "top_5_suppliers_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (supplier_id)") \
    .mode("overwrite") \
    .save()

avg_price_by_supplier.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "avg_price_by_supplier_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (supplier_id)") \
    .mode("overwrite") \
    .save()

sales_by_supplier_country.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "sales_by_supplier_country_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (country_name)") \
    .mode("overwrite") \
    .save()

### Витрина качества продукции

Продукты с наивысшим и наименьшим рейтингом

In [47]:
top_rated_products = rating_reviews.orderBy(F.desc("avg_rating")).limit(10)
top_rated_products.show()

+------------+--------------------+-------------+
|product_name|          avg_rating|total_reviews|
+------------+--------------------+-------------+
|    Dog Food|3.018298891883797...|      1653413|
|     Cat Toy|3.006860078573587...|      1676222|
|   Bird Cage|3.000149164677804...|      1682260|
+------------+--------------------+-------------+



In [48]:
lowest_rated_products = rating_reviews.filter(F.col("total_reviews") > 0).orderBy(F.asc("avg_rating")).limit(10)
lowest_rated_products.show()

+------------+--------------------+-------------+
|product_name|          avg_rating|total_reviews|
+------------+--------------------+-------------+
|   Bird Cage|3.000149164677804...|      1682260|
|     Cat Toy|3.006860078573587...|      1676222|
|    Dog Food|3.018298891883797...|      1653413|
+------------+--------------------+-------------+



Корреляция между рейтингом и объемом продаж

In [49]:
product_sales = sales_with_id.join(products_with_id, sales_with_id.sale_product_id == products_with_id.product_id) \
    .join(product_names_with_id, products_with_id.product_name_id == product_names_with_id.product_name_id) \
    .groupBy(product_names_with_id.product_name) \
    .agg(
        F.sum(sales_with_id.sale_quantity).alias("total_quantity_sold"),
        F.sum(sales_with_id.sale_total_price).alias("total_revenue")
    )

rating_sales = rating_reviews.join(product_sales, on="product_name", how="inner")

rating_sales.show()

correlation = rating_sales.stat.corr("avg_rating", "total_quantity_sold")
print("Корреляция между средним рейтингом и объемом продаж:", correlation)

+------------+--------------------+-------------+-------------------+--------------------+
|product_name|          avg_rating|total_reviews|total_quantity_sold|       total_revenue|
+------------+--------------------+-------------+-------------------+--------------------+
|   Bird Cage|3.000149164677804...|      1682260|              18205|847478.0500000000...|
|    Dog Food|3.018298891883797...|      1653413|              18298|848567.1900000000...|
|     Cat Toy|3.006860078573587...|      1676222|              18120|833806.8800000000...|
+------------+--------------------+-------------+-------------------+--------------------+

Корреляция между средним рейтингом и объемом продаж: 0.6433138852387386


Продукты с наибольшим количеством отзывов

In [50]:
most_reviewed_products = rating_reviews.orderBy(F.desc("total_reviews")).limit(10)
most_reviewed_products.show()

+------------+--------------------+-------------+
|product_name|          avg_rating|total_reviews|
+------------+--------------------+-------------+
|   Bird Cage|3.000149164677804...|      1682260|
|     Cat Toy|3.006860078573587...|      1676222|
|    Dog Food|3.018298891883797...|      1653413|
+------------+--------------------+-------------+



In [51]:
top_rated_products.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "top_rated_products_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (avg_rating, product_name)") \
    .mode("overwrite") \
    .save()

lowest_rated_products.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "lowest_rated_products_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (avg_rating, product_name)") \
    .mode("overwrite") \
    .save()

rating_sales.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "rating_sales_correlation_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (product_name)") \
    .mode("overwrite") \
    .save()

most_reviewed_products.write \
    .format("jdbc") \
    .option("url", clickhouse_jdbc_url) \
    .option("dbtable", "most_reviewed_products_report") \
    .option("user", clickhouse_connection_properties["user"]) \
    .option("password", clickhouse_connection_properties["password"]) \
    .option("driver", clickhouse_connection_properties["driver"]) \
    .option("createTableOptions", "ENGINE = MergeTree ORDER BY (total_reviews, product_name)") \
    .mode("overwrite") \
    .save()