# Общая настройка

In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars ./jars/postgresql-42.6.0.jar,./jars/clickhouse-jdbc-0.4.6.jar pyspark-shell'

In [4]:
from dotenv import load_dotenv

load_dotenv()

DB_NAME = os.getenv("POSTGRES_DB")
DB_USER = os.getenv("POSTGRES_USER")
DB_PASSWORD = os.getenv("POSTGRES_PASSWORD")
DB_HOST = os.getenv("POSTGRES_HOST")
DB_PORT = os.getenv("POSTGRES_PORT")

# Создание таблиц

In [3]:
import psycopg2

def create_tables():
    try:
        # Подключение к базе данных
        conn = psycopg2.connect(
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD,
            host=DB_HOST,
            port=DB_PORT
        )
        cursor = conn.cursor()

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS d_customer (
                customer_id SERIAL PRIMARY KEY,
                customer_first_name TEXT,
                customer_last_name TEXT,
                customer_email TEXT,
                customer_age INTEGER,
                customer_country TEXT,
                customer_postal_code TEXT,
                customer_pet_name TEXT,
                customer_pet_type TEXT,
                customer_pet_breed TEXT
            )
        """)

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS d_supplier (
                supplier_id SERIAL PRIMARY KEY,
                supplier_name TEXT,
                supplier_email TEXT,
                supplier_phone TEXT,
                supplier_contact TEXT,
                supplier_country TEXT,
                supplier_city TEXT,
                supplier_address TEXT
            )
        """)

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS d_product (
                product_id SERIAL PRIMARY KEY,
                product_name TEXT,
                product_category TEXT,
                pet_category TEXT,
                product_brand TEXT,
                product_price DECIMAL,
                product_quantity INTEGER,
                product_size TEXT,
                product_weight DECIMAL,
                product_color TEXT,
                product_material TEXT,
                product_description TEXT,
                product_rating DECIMAL,
                product_reviews INTEGER,
                product_release_date DATE,
                product_expiry_date DATE,
                supplier_id INTEGER REFERENCES d_supplier(supplier_id)
            )
        """)

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS d_seller (
                seller_id SERIAL PRIMARY KEY,
                seller_first_name TEXT,
                seller_last_name TEXT,
                seller_email TEXT,
                seller_country TEXT,
                seller_postal_code TEXT
            )
        """)

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS d_store (
                store_id SERIAL PRIMARY KEY,
                store_name TEXT,
                store_location TEXT,
                store_city TEXT,
                store_state TEXT,
                store_country TEXT,
                store_email TEXT,
                store_phone TEXT
            )
        """)

        cursor.execute("""
            CREATE TABLE IF NOT EXISTS f_sales (
                sale_id SERIAL PRIMARY KEY,
                sale_date DATE,
                sale_customer_id INTEGER REFERENCES d_customer(customer_id),
                sale_product_id INTEGER REFERENCES d_product(product_id),
                sale_seller_id INTEGER REFERENCES d_seller(seller_id),
                sale_quantity INTEGER,
                sale_total_price DECIMAL
            )
        """)

        conn.commit()
        cursor.close()
        conn.close()

    except Exception as e:
        print(f"Ошибка: {e}")

In [4]:
create_tables()

# Перевод данных в звезду

### Создание соединения

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

spark = SparkSession.builder \
    .appName("Spark SQL with PostgreSQL") \
    .getOrCreate()

25/05/24 16:26:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Настройки соединения и считывание основной таблицы

In [6]:
jdbc_url = f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}"
properties = {
    "user": DB_USER,
    "password": DB_PASSWORD,
    "driver": "org.postgresql.Driver"
}

source_table = spark.read.jdbc(
    url=jdbc_url,
    table="mock_data",
    properties=properties
).withColumn("sale_date", to_date("product_release_date", "m/d/yyyy")) \
.withColumn("product_release_date", to_date("product_release_date", "m/d/yyyy")) \
.withColumn("product_expiry_date", to_date("product_expiry_date", "m/d/yyyy"))

## Таблицы измерений

### Покупатели

In [7]:
customers = source_table.select([
    "customer_first_name",
    "customer_last_name",
    "customer_email",
    "customer_age",
    "customer_country",
    "customer_postal_code",
    "customer_pet_name",
    "customer_pet_type",
    "customer_pet_breed",
]).distinct()

customers.write.jdbc(
    url=jdbc_url,
    table="d_customer",
    mode="append",
    properties=properties
)

25/05/24 16:14:30 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

### Поставщики

In [8]:
suppliers = source_table.select([
    "supplier_name",
    "supplier_email",
    "supplier_phone",
    "supplier_contact",
    "supplier_country",
    "supplier_city",
    "supplier_address"
]).distinct()

suppliers.write.jdbc(
    url=jdbc_url,
    table="d_supplier",
    mode="append",
    properties=properties
)

### Продукты

In [5]:
suppliers_with_id = spark.read.jdbc(
    url=jdbc_url,
    table="d_supplier",
    properties=properties
)

In [10]:
products = source_table.join(suppliers_with_id, on="supplier_name").select([
    "product_name",
    "product_category",
    "pet_category",
    "product_brand",
    "product_price",
    "product_quantity",
    "product_size",
    "product_weight",
    "product_color",
    "product_material",
    "product_description",
    "product_rating",
    "product_reviews",
    "product_release_date",
    "product_expiry_date",
    "supplier_id"
]).distinct()

products.write.jdbc(
    url=jdbc_url,
    table="d_product",
    mode="append",
    properties=properties
)

                                                                                

### Продавцы

In [7]:
sellers = source_table.select([
    "seller_first_name",
    "seller_last_name",
    "seller_email",
    "seller_country",
    "seller_postal_code"
]).distinct()

sellers.write.jdbc(
    url=jdbc_url,
    table="d_seller",
    mode="append",
    properties=properties
)

25/05/24 16:26:46 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

### Магазины

In [8]:
stores = source_table.select([
    "store_name",  
    "store_location",  
    "store_city",  
    "store_state",  
    "store_country",  
    "store_email",  
    "store_phone"
]).distinct()

stores.write.jdbc(
    url=jdbc_url,
    table="d_store",
    mode="append",
    properties=properties
)

## Таблица фактов