In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, to_date, row_number, monotonically_increasing_id, expr, min, max 
from pyspark.sql.window import Window
from pyspark.sql.types import DateType

import pandas as pd
from datetime import datetime, timedelta

# Инициализация SparkSession
spark = SparkSession.builder \
    .appName("PostgreSQL Snowflake ETL") \
    .config("spark.jars", "/home/jovyan/jars/postgresql-42.6.0.jar") \
    .getOrCreate()

# Параметры подключения к PostgreSQL
pg_url = "jdbc:postgresql://db:5432/mydatabase"
pg_properties = {
    "user": "user_bd",
    "password": "password_bd",
    "driver": "org.postgresql.Driver"
}


In [2]:
# 1. Чтение данных из public.mock_data
df_mock_data = spark.read \
    .jdbc(url=pg_url, table="public.mock_data", properties=pg_properties)
print(f"Количество строк в public.mock_data: {df_mock_data.count()}") 
df_mock_data.printSchema()
df_mock_data.show(5)

df_mock_data = df_mock_data.withColumn("sale_date", to_date(col("sale_date"), "M/d/yyyy")) \
                           .withColumn("product_release_date", to_date(col("product_release_date"), "M/d/yyyy")) \
                           .withColumn("product_expiry_date", to_date(col("product_expiry_date"), "M/d/yyyy"))


# 2. Трансформация и загрузка в DimCustomers
print("Загрузка в DimCustomers...")
dim_customers = df_mock_data.select(
    col("customer_first_name"),
    col("customer_last_name"),
    col("customer_age"),
    col("customer_email"),
    col("customer_country"),
    col("customer_postal_code"),
    col("customer_pet_type"),
    col("customer_pet_name"),
    col("customer_pet_breed")
).dropDuplicates(["customer_email"]) 

# Загрузка в DimCustomers
dim_customers.write \
    .jdbc(url=pg_url, table="DimCustomers", mode="append", properties=pg_properties)

print("DimCustomers загружены.")

Количество строк в public.mock_data: 10000
root
 |-- id: integer (nullable = true)
 |-- customer_first_name: string (nullable = true)
 |-- customer_last_name: string (nullable = true)
 |-- customer_age: integer (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_country: string (nullable = true)
 |-- customer_postal_code: string (nullable = true)
 |-- customer_pet_type: string (nullable = true)
 |-- customer_pet_name: string (nullable = true)
 |-- customer_pet_breed: string (nullable = true)
 |-- seller_first_name: string (nullable = true)
 |-- seller_last_name: string (nullable = true)
 |-- seller_email: string (nullable = true)
 |-- seller_country: string (nullable = true)
 |-- seller_postal_code: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- product_price: float (nullable = true)
 |-- product_quantity: integer (nullable = true)
 |-- sale_date: string (nullable = true)
 |-- sale_cust

In [3]:
# 3. Трансформация и загрузка в DimSellers
print("Загрузка в DimSellers...")
dim_sellers = df_mock_data.select(
    col("seller_first_name"),
    col("seller_last_name"),
    col("seller_email"),
    col("seller_country"),
    col("seller_postal_code")
).dropDuplicates(["seller_email"]) 

# Загрузка в DimSellers
dim_sellers.write \
    .jdbc(url=pg_url, table="DimSellers", mode="append", properties=pg_properties)

print("DimSellers загружены.")

Загрузка в DimSellers...
DimSellers загружены.


In [4]:
print("Загрузка в DimProducts...")
dim_products = df_mock_data.select(
    col("product_name"),
    col("product_category"),
    col("product_price"),
    col("product_weight"),
    col("product_color"),
    col("product_size"),
    col("product_brand"),
    col("product_material"),
    col("product_description"),
    col("product_rating"),
    col("product_reviews"),
    to_date(col("product_release_date"), "yyyy-MM-dd").alias("product_release_date"), 
    to_date(col("product_expiry_date"), "yyyy-MM-dd").alias("product_expiry_date"),   
    col("pet_category")
).dropDuplicates(["product_name"]) 

# Загрузка в DimProducts
dim_products.write \
    .jdbc(url=pg_url, table="DimProducts", mode="append", properties=pg_properties)


Загрузка в DimProducts...


In [5]:
# 5. Трансформация и загрузка в DimStores
print("Загрузка в DimStores...")
dim_stores = df_mock_data.select(
    col("store_name"),
    col("store_location"),
    col("store_city"),
    col("store_state"),
    col("store_country"),
    col("store_phone"),
    col("store_email")
).dropDuplicates(["store_name"]) 

# Загрузка в DimStores
dim_stores.write \
    .jdbc(url=pg_url, table="DimStores", mode="append", properties=pg_properties)

print("DimStores загружены.")

Загрузка в DimStores...
DimStores загружены.


In [6]:
# 6. Трансформация и загрузка в DimSuppliers
print("Загрузка в DimSuppliers...")
dim_suppliers = df_mock_data.select(
    col("supplier_name"),
    col("supplier_contact"),
    col("supplier_email"),
    col("supplier_phone"),
    col("supplier_address"),
    col("supplier_city"),
    col("supplier_country")
).dropDuplicates(["supplier_name"])

# Загрузка в DimSuppliers
dim_suppliers.write \
    .jdbc(url=pg_url, table="DimSuppliers", mode="append", properties=pg_properties)

print("DimSuppliers загружены.")

Загрузка в DimSuppliers...
DimSuppliers загружены.


In [7]:
# 7. Трансформация и загрузка в DimDate
print("Загрузка в DimDate...")
min_date_obj = df_mock_data.agg(min("sale_date")).collect()[0][0]
max_date_obj = df_mock_data.agg(max("sale_date")).collect()[0][0]

min_date = min_date_obj
max_date = max_date_obj

print(f"Минимальная дата (объект): {min_date}")
print(f"Максимальная дата (объект): {max_date}")

date_range = []
current_date = min_date
while current_date <= max_date:
    date_range.append({
        "full_date": current_date,
        "year": current_date.year,
        "month": current_date.month,
        "day": current_date.day,
        "day_of_week": current_date.isoweekday(),
        "day_name": current_date.strftime("%A"),
        "month_name": current_date.strftime("%B"),
        "quarter": (current_date.month - 1) // 3 + 1,
        "week_of_year": current_date.isocalendar()[1],
        "is_weekend": current_date.weekday() >= 5
    })
    current_date += timedelta(days=1)

df_date_dim = spark.createDataFrame(date_range) \
    .withColumn("date_sk", expr("CAST(regexp_replace(CAST(full_date AS STRING), '-', '') AS INT)"))

# Загружаем только новые даты
existing_dates_df = spark.read.jdbc(url=pg_url, table="DimDate", properties=pg_properties)
df_date_to_insert = df_date_dim.join(existing_dates_df, "date_sk", "left_anti")

if df_date_to_insert.count() > 0:
    df_date_to_insert.write \
        .jdbc(url=pg_url, table="DimDate", mode="append", properties=pg_properties)
    print(f"{df_date_to_insert.count()} новых дат загружено в DimDate.")
else:
    print("DimDate уже актуален.")

Загрузка в DimDate...
Минимальная дата (объект): 2021-01-01
Максимальная дата (объект): 2021-12-30
0 новых дат загружено в DimDate.


In [8]:
# 8. Трансформация и загрузка в FactSales
print("Загрузка в FactSales...")

# Чтение Dim-таблиц для получения SK
dim_customers_lookup = spark.read.jdbc(url=pg_url, table="DimCustomers", properties=pg_properties) \
    .select(col("customer_email"), col("customer_sk"))

dim_sellers_lookup = spark.read.jdbc(url=pg_url, table="DimSellers", properties=pg_properties) \
    .select(col("seller_email"), col("seller_sk"))

dim_products_lookup = spark.read.jdbc(url=pg_url, table="DimProducts", properties=pg_properties) \
    .select(col("product_name"), col("product_sk"))

dim_stores_lookup = spark.read.jdbc(url=pg_url, table="DimStores", properties=pg_properties) \
    .select(col("store_name"), col("store_sk"))

dim_suppliers_lookup = spark.read.jdbc(url=pg_url, table="DimSuppliers", properties=pg_properties) \
    .select(col("supplier_name"), col("supplier_sk"))

dim_date_lookup = spark.read.jdbc(url=pg_url, table="DimDate", properties=pg_properties) \
    .withColumn("sale_date", to_date(col("full_date"), "YYYY-MM-DD")) \
    .select(col("sale_date"), col("date_sk"))


# Объединение с mock_data
fact_sales = df_mock_data.alias("md") \
    .join(dim_customers_lookup.alias("dc"), col("md.customer_email") == col("dc.customer_email"), "inner") \
    .join(dim_sellers_lookup.alias("ds"), col("md.seller_email") == col("ds.seller_email"), "inner") \
    .join(dim_products_lookup.alias("dp"), col("md.product_name") == col("dp.product_name"), "inner") \
    .join(dim_stores_lookup.alias("dst"), col("md.store_name") == col("dst.store_name"), "inner") \
    .join(dim_suppliers_lookup.alias("dsu"), col("md.supplier_name") == col("dsu.supplier_name"), "left_outer") \
    .join(dim_date_lookup.alias("dd"), to_date(col("md.sale_date"), "YYYY-MM-DD") == col("dd.sale_date"), "inner") \
    .select(
        col("dc.customer_sk"),
        col("ds.seller_sk"),
        col("dp.product_sk"),
        col("dst.store_sk"),
        col("dsu.supplier_sk"),
        col("dd.date_sk"),
        col("md.sale_quantity"),
        col("md.sale_total_price")
    )

# Загрузка в FactSales
fact_sales.write \
    .jdbc(url=pg_url, table="FactSales", mode="append", properties=pg_properties)

print("FactSales загружены.")

# Остановка SparkSession
spark.stop()

Загрузка в FactSales...
FactSales загружены.
