In [3]:
!wget https://jdbc.postgresql.org/download/postgresql-42.7.4.jar -P /home/jovyan/work

--2024-12-25 17:10:44--  https://jdbc.postgresql.org/download/postgresql-42.7.4.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1086687 (1.0M) [application/java-archive]
Saving to: ‘/home/jovyan/work/postgresql-42.7.4.jar.1’


2024-12-25 17:10:47 (732 KB/s) - ‘/home/jovyan/work/postgresql-42.7.4.jar.1’ saved [1086687/1086687]



In [1]:
from pyspark.sql import SparkSession

def buildSpark():
    spark = SparkSession.builder \
        .master("local") \
        .appName("SparkConnectionTest") \
        .config("spark.jars", "postgresql-42.7.4.jar") \
        .getOrCreate()
    
    #Проверка соединения со Spark
    print("=== Spark Session Info ===")
    print(spark.sparkContext.getConf().getAll())
    return spark

jdbc_url = "jdbc:postgresql://postgres_db:5432/mydatabase"
db_properties = {
    "user": "myuser",
    "password": "mypassword",
    "driver": "org.postgresql.Driver"
}

def runSQL(spark, command):
    try:
        df = spark.read.jdbc(
            url=jdbc_url,
            table=command,  # Тестовый запрос
            properties=db_properties
        )
        print("PostgreSQL Connection Successful!")
        return df
    except Exception as e:
        print("PostgreSQL Connection Failed! If you just downloaded postgree driver, restart docker compose services please.")

# Настройка соединения со Spark Master
spark = buildSpark()
# Проверка соединения с PostgreSQL
runSQL(spark, "(SELECT 1) as test")

spark.stop()

=== Spark Session Info ===
[('spark.master', 'local'), ('spark.app.submitTime', '1735146539698'), ('spark.executor.id', 'driver'), ('spark.driver.host', 'ee3eb0a478a1'), ('spark.app.startTime', '1735146539776'), ('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=AL

## Load data into DWH

### Loading

In [128]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col, monotonically_increasing_id

def buildSpark():
    spark = SparkSession.builder \
        .master("local") \
        .appName("CraftMarketDWH") \
        .config("spark.jars", "postgresql-42.7.4.jar") \
        .getOrCreate()
    return spark

# Database connection parameters
jdbc_url = "jdbc:postgresql://postgres_db:5432/mydatabase"
db_properties = {
    "user": "myuser",
    "password": "mypassword",
    "driver": "org.postgresql.Driver"
}

def read_table(schema, table_name):
    """Helper function to read tables from PostgreSQL"""
    return spark.read.jdbc(
        url=jdbc_url,
        table=f"{schema}.{table_name}",
        properties=db_properties
    )

def execute_sql(sql):
    """Execute SQL command directly"""
    import psycopg2
    conn = psycopg2.connect(
        dbname="mydatabase",
        user="myuser",
        password="mypassword",
        host="postgres_db"
    )
    cur = conn.cursor()
    cur.execute(sql)
    conn.commit()
    cur.close()
    conn.close()

def clean_dwh():
    """Clean up DWH tables in correct order"""
    try:
        # Delete in proper order due to FK constraints
        execute_sql("DELETE FROM dwh.f_orders;")
        execute_sql("DELETE FROM dwh.d_craftsmans;")
        execute_sql("DELETE FROM dwh.d_products;")
        execute_sql("DELETE FROM dwh.d_customers;")
        print("DWH tables cleaned successfully")
    except Exception as e:
        print(f"Error cleaning DWH tables: {str(e)}")
        raise

def write_to_dwh(df, table_name):
    """Helper function to write dataframes to DWH"""
    df.write \
        .mode("append") \
        .jdbc(url=jdbc_url, table=table_name, properties=db_properties)

# Initialize Spark session
spark = buildSpark()

def load_craftsmans_incremental():
    """
    Incrementally load dwh.d_craftsmans, detecting new records by 'craftsman_email'.
    If a record with the same email is already present in DWH, we skip it.
    """
    from pyspark.sql.functions import current_timestamp

    print("Loading craftsmans dimension (incremental)...")
    
    # --- 1) Read existing DWH dimension (to skip duplicates) ---
    try:
        existing_dwh_crafts = read_table("dwh", "d_craftsmans").select("craftsman_email").distinct()
    except:
        # If table doesn't exist or is empty, create an empty DF with the same schema
        existing_dwh_crafts = spark.createDataFrame([], "craftsman_email STRING")

    # --- 2) Read from the three source tables ---
    craft_wide = read_table("source1", "craft_market_wide").select(
        "craftsman_id",
        "craftsman_name",
        "craftsman_address",
        "craftsman_birthday",
        "craftsman_email"
    )

    craft_masters = read_table("source2", "craft_market_masters_products").select(
        "craftsman_id",
        "craftsman_name",
        "craftsman_address",
        "craftsman_birthday",
        "craftsman_email"
    )

    craft_source3 = read_table("source3", "craft_market_craftsmans").select(
        "craftsman_id",
        "craftsman_name",
        "craftsman_address",
        "craftsman_birthday",
        "craftsman_email"
    )

    # Union all new source data
    all_craftsmans_src = craft_wide.union(craft_masters).union(craft_source3) \
        .dropDuplicates(["craftsman_email"])  # business key dedup

    # --- 3) Identify *new* craftsmans: left_anti join by email ---
    new_craftsmans = all_craftsmans_src.join(existing_dwh_crafts, 
                                             on=["craftsman_email"], 
                                             how="left_anti") \
                                       .withColumn("load_dttm", current_timestamp())

    # If there's nothing new, we skip writing:
    count_new = new_craftsmans.count()
    if count_new == 0:
        print("No new craftsmans found. Skipping write to DWH.")
        return None
    
    # --- 4) Write only new records to DWH ---
    write_to_dwh(new_craftsmans.drop("craftsman_id"), "dwh.d_craftsmans")  # dropping source PK if you wish
    print(f"Inserted {count_new} new craftsmans into DWH.")
    return new_craftsmans


def load_customers_incremental():
    """
    Incrementally load dwh.d_customers, detecting new records by 'customer_email'.
    """
    from pyspark.sql.functions import current_timestamp

    print("Loading customers dimension (incremental)...")

    # --- 1) Read existing DWH dimension ---
    try:
        existing_dwh_customers = read_table("dwh", "d_customers").select("customer_email").distinct()
    except:
        existing_dwh_customers = spark.createDataFrame([], "customer_email STRING")

    # --- 2) Read from the three source tables ---
    customers_wide = read_table("source1", "craft_market_wide").select(
        "customer_name",
        "customer_address",
        "customer_birthday",
        "customer_email"
    )

    customers_source2 = read_table("source2", "craft_market_orders_customers").select(
        "customer_name",
        "customer_address",
        "customer_birthday",
        "customer_email"
    )

    customers_source3 = read_table("source3", "craft_market_customers").select(
        "customer_name",
        "customer_address",
        "customer_birthday",
        "customer_email"
    )

    all_customers_src = customers_wide.union(customers_source2).union(customers_source3) \
        .dropDuplicates(["customer_email"])

    # --- 3) left_anti join to find truly new customers ---
    new_customers = all_customers_src.join(existing_dwh_customers, 
                                           on=["customer_email"], 
                                           how="left_anti") \
                                     .withColumn("load_dttm", current_timestamp())

    count_new = new_customers.count()
    if count_new == 0:
        print("No new customers found. Skipping write to DWH.")
        return None

    # --- 4) Write only new records to DWH ---
    write_to_dwh(new_customers, "dwh.d_customers")
    print(f"Inserted {count_new} new customers into DWH.")
    return new_customers


def load_products_incremental():
    """
    Incrementally load dwh.d_products, detecting new records by (product_name, product_description).
    """
    from pyspark.sql.functions import current_timestamp

    print("Loading products dimension (incremental)...")

    # --- 1) Read existing dwh dimension ---
    try:
        existing_dwh_products = read_table("dwh", "d_products") \
            .select("product_name", "product_description") \
            .distinct()
    except:
        existing_dwh_products = spark.createDataFrame([], "product_name STRING, product_description STRING")

    # --- 2) Read source tables ---
    products_wide = read_table("source1", "craft_market_wide").select(
        "product_name",
        "product_description",
        "product_type",
        "product_price"
    )

    products_masters = read_table("source2", "craft_market_masters_products").select(
        "product_name",
        "product_description",
        "product_type",
        "product_price"
    )

    products_orders = read_table("source3", "craft_market_orders").select(
        "product_name",
        "product_description",
        "product_type",
        "product_price"
    )

    all_products_src = products_wide.union(products_masters).union(products_orders) \
        .dropDuplicates(["product_name", "product_description"])

    # --- 3) left_anti to find truly new products ---
    new_products = all_products_src.join(existing_dwh_products,
                                         on=["product_name", "product_description"],
                                         how="left_anti") \
                                   .withColumn("load_dttm", current_timestamp())

    count_new = new_products.count()
    if count_new == 0:
        print("No new products found. Skipping write to DWH.")
        return None

    # --- 4) Write new records to DWH ---
    write_to_dwh(new_products, "dwh.d_products")
    print(f"Inserted {count_new} new products into DWH.")
    return new_products


def load_orders_incremental():
    """
    Incrementally load f_orders by detecting new records based on 
    (product_id, craftsman_id, customer_id, order_created_date, order_completion_date, order_status).
    Because there's no natural 'order_id', we treat that combo as the unique key. 
    If a row with that exact combo is already in f_orders, we skip it.
    """
    from pyspark.sql.functions import current_timestamp

    print("Loading orders fact table (incremental)...")

    # 1) Read dimension tables (already loaded)
    dwh_craftsmans = read_table("dwh", "d_craftsmans")
    dwh_customers = read_table("dwh", "d_customers")
    dwh_products = read_table("dwh", "d_products")

    # 2) Read existing f_orders to skip duplicates
    try:
        existing_orders = read_table("dwh", "f_orders") \
            .select("product_id", "craftsman_id", "customer_id",
                    "order_created_date", "order_completion_date", "order_status") \
            .distinct()
    except:
        existing_orders = spark.createDataFrame([], 
            "product_id LONG, craftsman_id LONG, customer_id LONG, order_created_date TIMESTAMP, order_completion_date TIMESTAMP, order_status STRING"
        )

    # --- Prepare Source1 Orders ---
    orders_source1 = read_table("source1", "craft_market_wide").select(
        "craftsman_email",
        "customer_email",
        "product_name",
        "order_created_date",
        "order_completion_date",
        "order_status"
    ).distinct()

    processed_orders1 = orders_source1 \
        .join(dwh_craftsmans, ["craftsman_email"]) \
        .join(dwh_customers, ["customer_email"]) \
        .join(dwh_products, ["product_name"]) \
        .select(
            col("product_id"),
            col("craftsman_id"),
            col("customer_id"),
            col("order_created_date"),
            col("order_completion_date"),
            col("order_status")
        )

    # --- Prepare Source2 Orders ---
    orders_source2 = read_table("source2", "craft_market_orders_customers")
    masters_source2 = read_table("source2", "craft_market_masters_products")

    orders_source2_with_masters = orders_source2.join(
        masters_source2, ["craftsman_id", "product_id"]
    )

    processed_orders2 = orders_source2_with_masters \
        .join(dwh_craftsmans, orders_source2_with_masters.craftsman_email == dwh_craftsmans.craftsman_email) \
        .join(dwh_customers, orders_source2_with_masters.customer_email == dwh_customers.customer_email) \
        .join(dwh_products, orders_source2_with_masters.product_name == dwh_products.product_name) \
        .select(
            dwh_products.product_id,
            dwh_craftsmans.craftsman_id,
            dwh_customers.customer_id,
            col("order_created_date"),
            col("order_completion_date"),
            col("order_status")
        )

    # --- Prepare Source3 Orders ---
    orders_source3 = read_table("source3", "craft_market_orders")
    craftsmans_source3 = read_table("source3", "craft_market_craftsmans")
    customers_source3 = read_table("source3", "craft_market_customers")

    orders_source3_enriched = orders_source3 \
        .join(craftsmans_source3, ["craftsman_id"]) \
        .join(customers_source3, ["customer_id"])

    processed_orders3 = orders_source3_enriched \
        .join(dwh_craftsmans, orders_source3_enriched.craftsman_email == dwh_craftsmans.craftsman_email) \
        .join(dwh_customers, orders_source3_enriched.customer_email == dwh_customers.customer_email) \
        .join(dwh_products, orders_source3_enriched.product_name == dwh_products.product_name) \
        .select(
            dwh_products.product_id,
            dwh_craftsmans.craftsman_id,
            dwh_customers.customer_id,
            col("order_created_date"),
            col("order_completion_date"),
            col("order_status")
        )

    # --- Combine all processed orders from the 3 sources ---
    all_orders = processed_orders1.union(processed_orders2).union(processed_orders3) \
        .distinct() \
        .withColumn("load_dttm", current_timestamp())

    # --- Identify truly NEW orders by left_anti on the unique columns ---
    new_orders = all_orders.join(
        existing_orders,
        on=[
            "product_id", 
            "craftsman_id", 
            "customer_id", 
            "order_created_date", 
            # "order_completion_date",
            "order_status"
        ],
        how="left_anti"
    )

    new_orders.show(10, truncate=False)

    count_new = new_orders.count()
    print(f"Found {count_new} new records for f_orders (incremental).")

    if count_new == 0:
        print("No new orders to insert. Skipping write.")
        return None

    # --- Write only new orders to f_orders ---
    write_to_dwh(new_orders, "dwh.f_orders")
    print("Orders fact table loaded successfully (incremental).")

    return new_orders

## Usage function
def loadDataIntoDWH():
    spark = SparkSession.builder \
        .master("local") \
        .appName("CraftMarketDatamart") \
        .config("spark.jars", "postgresql-42.7.4.jar") \
        .getOrCreate()

    jdbc_url = "jdbc:postgresql://postgres_db:5432/mydatabase"
    db_properties = {
        "user": "myuser",
        "password": "mypassword",
        "driver": "org.postgresql.Driver"
    }

    craftsmans_df = load_craftsmans_incremental()
    products_df   = load_products_incremental()
    customers_df  = load_customers_incremental()
    orders_df = load_orders_incremental()

clean_dwh() # это первый раз, почистим чтобы не было ничего

craftsmans_df = load_craftsmans_incremental()
products_df   = load_products_incremental()
customers_df  = load_customers_incremental()
orders_df = load_orders_incremental()


Loading craftsmans dimension (incremental)...
No new craftsmans found. Skipping write to DWH.
Loading products dimension (incremental)...
No new products found. Skipping write to DWH.
Loading customers dimension (incremental)...
No new customers found. Skipping write to DWH.
Loading orders fact table (incremental)...
+----------+------------+-----------+------------------+------------+---------------------+---------+
|product_id|craftsman_id|customer_id|order_created_date|order_status|order_completion_date|load_dttm|
+----------+------------+-----------+------------------+------------+---------------------+---------+
+----------+------------+-----------+------------------+------------+---------------------+---------+

Found 0 new records for f_orders (incremental).
No new orders to insert. Skipping write.


### Validation

In [130]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct, col

def verify_data_consistency():
    # Initialize Spark session
    spark = SparkSession.builder \
        .master("local") \
        .appName("DataVerification") \
        .config("spark.jars", "postgresql-42.7.4.jar") \
        .getOrCreate()

    # Database connection parameters
    jdbc_url = "jdbc:postgresql://postgres_db:5432/mydatabase"
    db_properties = {
        "user": "myuser",
        "password": "mypassword",
        "driver": "org.postgresql.Driver"
    }

    def read_table(schema, table_name):
        return spark.read.jdbc(
            url=jdbc_url,
            table=f"{schema}.{table_name}",
            properties=db_properties
        )

    print("Starting data verification...")
    print("-" * 50)

    # Verify Craftsmen
    print("\nVerifying Craftsmen counts:")
    
    # Source counts
    craft_wide = read_table("source1", "craft_market_wide").select("craftsman_email").distinct().count()
    craft_masters = read_table("source2", "craft_market_masters_products").select("craftsman_email").distinct().count()
    craft_source3 = read_table("source3", "craft_market_craftsmans").select("craftsman_email").distinct().count()
    
    # DWH count
    dwh_craftsmen = read_table("dwh", "d_craftsmans").count()
    
    print(f"Source1 unique craftsmen: {craft_wide}")
    print(f"Source2 unique craftsmen: {craft_masters}")
    print(f"Source3 unique craftsmen: {craft_source3}")
    print(f"DWH craftsmen: {dwh_craftsmen}")

    # Verify Products
    print("\nVerifying Products counts:")
    
    # Source counts
    products_wide = read_table("source1", "craft_market_wide").select("product_name", "product_type").distinct().count()
    products_masters = read_table("source2", "craft_market_masters_products").select("product_name", "product_type").distinct().count()
    products_orders = read_table("source3", "craft_market_orders").select("product_name", "product_type").distinct().count()
    
    # DWH count
    dwh_products = read_table("dwh", "d_products").count()
    
    print(f"Source1 unique products: {products_wide}")
    print(f"Source2 unique products: {products_masters}")
    print(f"Source3 unique products: {products_orders}")
    print(f"DWH products: {dwh_products}")

    # Verify Customers
    print("\nVerifying Customers counts:")
    
    # Source counts
    customers_wide = read_table("source1", "craft_market_wide").select("customer_email").distinct().count()
    customers_source2 = read_table("source2", "craft_market_orders_customers").select("customer_email").distinct().count()
    customers_source3 = read_table("source3", "craft_market_customers").select("customer_email").distinct().count()
    
    # DWH count
    dwh_customers = read_table("dwh", "d_customers").count()
    
    print(f"Source1 unique customers: {customers_wide}")
    print(f"Source2 unique customers: {customers_source2}")
    print(f"Source3 unique customers: {customers_source3}")
    print(f"DWH customers: {dwh_customers}")

    # Verify Orders
    print("\nVerifying Orders counts:")
    
    # Source counts
    orders_wide = read_table("source1", "craft_market_wide").select(
        "order_created_date", "order_completion_date", "order_status",
        "craftsman_email", "customer_email", "product_name"
    ).distinct().count()
    
    orders_source2 = read_table("source2", "craft_market_orders_customers").select(
        "order_created_date", "order_completion_date", "order_status",
        "craftsman_id", "customer_id", "product_id"
    ).distinct().count()
    
    orders_source3 = read_table("source3", "craft_market_orders").select(
        "order_created_date", "order_completion_date", "order_status",
        "craftsman_id", "customer_id", "product_id"
    ).distinct().count()
    
    # DWH count
    dwh_orders = read_table("dwh", "f_orders").count()
    
    print(f"Source1 unique orders: {orders_wide}")
    print(f"Source2 unique orders: {orders_source2}")
    print(f"Source3 unique orders: {orders_source3}")
    print(f"DWH orders: {dwh_orders}")

    print("\nVerification complete!")

if __name__ == "__main__":
    verify_data_consistency()

Starting data verification...
--------------------------------------------------

Verifying Craftsmen counts:
Source1 unique craftsmen: 997
Source2 unique craftsmen: 999
Source3 unique craftsmen: 1002
DWH craftsmen: 2998

Verifying Products counts:
Source1 unique products: 908
Source2 unique products: 903
Source3 unique products: 947
DWH products: 2480

Verifying Customers counts:
Source1 unique customers: 999
Source2 unique customers: 999
Source3 unique customers: 1000
DWH customers: 2998

Verifying Orders counts:
Source1 unique orders: 999
Source2 unique orders: 999
Source3 unique orders: 1013
DWH orders: 3549

Verification complete!


## Putting data into DataMART

In [131]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import (
    col, month, year, concat, lit, count, avg, sum, datediff, 
    percentile_approx, collect_list, struct, first, when, current_date
)
from datetime import datetime
import logging
import sys

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

def log_info(message):
    """Helper function to both log and print messages"""
    print(f"INFO: {message}")
    logger.info(message)

class DatamartLoader:
    def __init__(self, spark, jdbc_url, db_properties):
        self.spark = spark
        self.jdbc_url = jdbc_url
        self.db_properties = db_properties

    def read_table(self, schema, table_name):
        """Helper function to read tables from PostgreSQL"""
        return self.spark.read.jdbc(
            url=self.jdbc_url,
            table=f"{schema}.{table_name}",
            properties=self.db_properties
        )

    def write_to_table(self, df, table_name, mode="append"):
        """Helper function to write dataframes to PostgreSQL"""
        df.write.jdbc(
            url=self.jdbc_url,
            table=table_name,
            mode=mode,
            properties=self.db_properties
        )

    def get_last_load_date(self):
        """
        Get the last load date (TIMESTAMP) from the tracking table.
        Returns None if no record exists.
        """
        try:
            load_dates_df = self.read_table("dwh", "load_dates_craftsman_report_datamart")
            count_records = load_dates_df.count()
            log_info(f"Found {count_records} records in load dates tracking table")
            
            if count_records == 0:
                log_info("No previous load date found (empty table).")
                return None
            
            last_date = load_dates_df.orderBy(col("load_dttm").desc()).first()["load_dttm"]
            log_info(f"Last load date from tracking table: {last_date}")
            return last_date
            
        except Exception as e:
            log_info(f"Error reading load dates: {str(e)}")
            return None

    def update_load_date(self):
        """Update the load tracking table with the current max load_dttm from f_orders."""
        log_info("Updating load date tracking table...")
        orders = self.read_table("dwh", "f_orders")
        
        max_load_date = orders.agg({"load_dttm": "max"}).collect()[0][0]
        
        if max_load_date:
            log_info(f"Recording max order load date: {max_load_date}")
            current_date_df = self.spark.createDataFrame(
                [(max_load_date,)], ["load_dttm"]
            )
            self.write_to_table(current_date_df, "dwh.load_dates_craftsman_report_datamart")
            log_info("Load date tracking table updated successfully")
        else:
            log_info("No orders found, skipping load date update.")

    def calculate_datamart_metrics(self, last_load_date=None):
        """Calculate all metrics for the datamart, using incremental filtering on load_dttm."""
        log_info("Starting datamart metrics calculation...")
        
        # Read dimension and fact tables
        log_info("Reading dimension tables: d_craftsmans, d_customers, d_products...")
        craftsmen = self.read_table("dwh", "d_craftsmans")
        customers = self.read_table("dwh", "d_customers")
        products = self.read_table("dwh", "d_products")
        
        log_info("Reading orders table: f_orders...")
        orders = self.read_table("dwh", "f_orders")

        total_orders = orders.count()
        log_info(f"Total orders in f_orders (unfiltered): {total_orders}")
        
        min_date = orders.agg({"load_dttm": "min"}).collect()[0][0]
        max_date = orders.agg({"load_dttm": "max"}).collect()[0][0]
        log_info(f"Orders load_dttm range: {min_date} to {max_date}")
        
        # If we have a last_load_date, we do incremental filtering
        if last_load_date:
            log_info(f"Filtering orders with load_dttm > {last_load_date}")
            orders = orders.filter(col("load_dttm") > last_load_date)
            
            filtered_count = orders.count()
            log_info(f"After filtering, {filtered_count} orders remain.")
        else:
            log_info("No last load date found, so we process ALL orders.")
            filtered_count = orders.count()
        
        if filtered_count == 0:
            log_info("No new orders to process after filtering.")
            return None
        
        log_info(f"Proceeding with processing of {filtered_count} orders...")

        # Add month-year period to orders
        orders = orders.withColumn(
            "report_period",
            concat(year("order_created_date"), lit("-"), month("order_created_date"))
        )

        # Join all necessary dimension data
        enriched_orders = orders \
            .join(craftsmen, "craftsman_id") \
            .join(customers, "customer_id") \
            .join(products, "product_id")

        # Aggregate to form the result set
        result = enriched_orders.groupBy("craftsman_id", "report_period").agg(
            first("craftsman_name").alias("craftsman_name"),
            first("craftsman_address").alias("craftsman_address"),
            first("craftsman_birthday").alias("craftsman_birthday"),
            first("craftsman_email").alias("craftsman_email"),
            
            (sum("product_price") * 0.9).alias("craftsman_money"),
            (sum("product_price") * 0.1).cast("bigint").alias("platform_money"),
            
            count("order_created_date").alias("count_order"),
            avg("product_price").alias("avg_price_order"),
            
            avg(datediff(current_date(), col("customer_birthday")) / 365.25)
                .cast("decimal(3,1)")
                .alias("avg_age_customer"),
            
            percentile_approx(
                when(
                    col("order_completion_date").isNotNull(),
                    datediff("order_completion_date", "order_created_date")
                ),
                0.5
            ).alias("median_time_order_completed"),
            
            first("product_type").alias("top_product_category"),
            
            sum(when(col("order_status") == "created", 1).otherwise(0)).alias("count_order_created"),
            sum(when(col("order_status") == "in progress", 1).otherwise(0)).alias("count_order_in_progress"),
            sum(when(col("order_status") == "delivery", 1).otherwise(0)).alias("count_order_delivery"),
            sum(when(col("order_status") == "done", 1).otherwise(0)).alias("count_order_done"),
            sum(when(col("order_status") != "done", 1).otherwise(0)).alias("count_order_not_done")
        )

        combos_count = result.count()
        log_info(f"Calculated metrics for {combos_count} craftsman-period combinations.")
        return result

    def update_datamart(self):
        """
        Main method to handle incremental datamart updates:
          1. Get the last load timestamp
          2. Filter new orders
          3. Aggregate metrics
          4. Compare combos to existing datamart
          5. Insert only truly new combos
          6. Update the load date tracking table
        """
        try:
            # 1. Get last load date
            last_load_date = self.get_last_load_date()
            logger.info(f"Last load date: {last_load_date}")

            # 2 & 3. Calculate new metrics (aggregations) from new/updated orders
            new_metrics = self.calculate_datamart_metrics(last_load_date)
            if new_metrics is None:
                logger.info("No new data to process, returning.")
                return

            # 4. Compare combos to existing records in the datamart
            existing_periods_df = self.read_table("dwh", "craftsman_report_datamart")
            existing_periods_df = existing_periods_df.select("craftsman_id", "report_period").distinct()

            # Left-anti join: find combos that don't exist yet
            records_to_update = new_metrics.join(
                existing_periods_df,
                on=["craftsman_id", "report_period"],
                how="left_anti"
            )

            # 5. Insert only those that are truly new combos
            num_new_records = records_to_update.count()
            logger.info(f"Found {num_new_records} new craftsman-period combos to insert into datamart.")

            if num_new_records > 0:
                self.write_to_table(records_to_update, "dwh.craftsman_report_datamart")
                logger.info(f"Inserted {num_new_records} new records into datamart.")

                # 6. Update the load tracking table
                self.update_load_date()
                logger.info("Successfully updated datamart and load date tracking.")

        except Exception as e:
            logger.error(f"Error updating datamart: {str(e)}")
            raise

# ------------------------------
# Usage function
# ------------------------------
def run_datamart_update():
    spark = SparkSession.builder \
        .master("local") \
        .appName("CraftMarketDatamart") \
        .config("spark.jars", "postgresql-42.7.4.jar") \
        .getOrCreate()

    jdbc_url = "jdbc:postgresql://postgres_db:5432/mydatabase"
    db_properties = {
        "user": "myuser",
        "password": "mypassword",
        "driver": "org.postgresql.Driver"
    }

    loader = DatamartLoader(spark, jdbc_url, db_properties)
    loader.update_datamart()



### Validation

In [132]:
def insert_test_data(productName):
    """Insert test data to verify incremental loading"""
    try:
        log_info("Inserting test data into source tables...")
        
        import psycopg2
        
        conn = psycopg2.connect(
            dbname="mydatabase",
            user="myuser",
            password="mypassword",
            host="postgres_db"
        )
        cur = conn.cursor()
        
        # First, let's check current max IDs
        cur.execute("SELECT MAX(craftsman_id) FROM source3.craft_market_craftsmans")
        max_craftsman_id = cur.fetchone()[0] or 0
        
        cur.execute("SELECT MAX(customer_id) FROM source3.craft_market_customers")
        max_customer_id = cur.fetchone()[0] or 0
        
        cur.execute("SELECT MAX(order_id) FROM source3.craft_market_orders")
        max_order_id = cur.fetchone()[0] or 0
        
        log_info(f"Current max IDs - Craftsman: {max_craftsman_id}, Customer: {max_customer_id}, Order: {max_order_id}")
        
        # Override identity column for test data
        cur.execute("ALTER TABLE source3.craft_market_craftsmans ALTER COLUMN craftsman_id RESTART WITH %s", 
                   (max_craftsman_id + 1,))
        cur.execute("ALTER TABLE source3.craft_market_customers ALTER COLUMN customer_id RESTART WITH %s", 
                   (max_customer_id + 1,))
        cur.execute("ALTER TABLE source3.craft_market_orders ALTER COLUMN order_id RESTART WITH %s", 
                   (max_order_id + 1,))
        
        cur.execute("""
        INSERT INTO source3.craft_market_craftsmans 
        (craftsman_name, craftsman_address, craftsman_birthday, craftsman_email)
        VALUES ('Test Craftsman New2', '123 Test St', '1990-01-01', 'test_new3@test.com')
        RETURNING craftsman_id;
        """)
        new_craftsman_id = cur.fetchone()[0]
        log_info(f"Inserted new craftsman with ID: {new_craftsman_id}")
        
        # Add new customer
        cur.execute("""
        INSERT INTO source3.craft_market_customers 
        (customer_name, customer_address, customer_birthday, customer_email)
        VALUES ('Test Customer New', '456 Test Ave', '1995-01-01', 'testcustomer_new@test.com')
        RETURNING customer_id;
        """)
        new_customer_id = cur.fetchone()[0]
        log_info(f"Inserted new customer with ID: {new_customer_id}")
        
        # Add new order
        cur.execute("""
        INSERT INTO source3.craft_market_orders 
        (product_name, product_description, product_type, product_price,
         craftsman_id, customer_id, order_created_date, order_status)
        VALUES 
        (%s, %s, %s, %s, %s, %s, CURRENT_DATE, %s)
        RETURNING order_id;
        """, (
            productName,
            'A new test product',
            'Test Category',
            150,
            new_craftsman_id,
            new_customer_id,
            'created'
        ))
        new_order_id = cur.fetchone()[0]
        log_info(f"Inserted new order with ID: {new_order_id}")
        
        conn.commit()
        log_info("Test data inserted successfully")
        
        # Verify what we inserted
        cur.execute("""
        SELECT o.order_id, o.product_name, c.craftsman_name, cust.customer_name
        FROM source3.craft_market_orders o
        JOIN source3.craft_market_craftsmans c ON o.craftsman_id = c.craftsman_id
        JOIN source3.craft_market_customers cust ON o.customer_id = cust.customer_id
        WHERE o.order_id = %s
        """, (new_order_id,))
        
        result = cur.fetchone()
        log_info(f"Verified new order: {result}")
        
        cur.close()
        conn.close()
        
    except Exception as e:
        log_info(f"Error inserting test data: {str(e)}")
        raise


Сейчас у нас в соурсах оригинальный набор данных, он же лежит в DWH. Загрузим его в витрину

In [133]:
run_datamart_update()

INFO: Found 8 records in load dates tracking table
INFO: Last load date from tracking table: 2024-12-25 19:31:02.776379
INFO: Starting datamart metrics calculation...
INFO: Reading dimension tables: d_craftsmans, d_customers, d_products...
INFO: Reading orders table: f_orders...
INFO: Total orders in f_orders (unfiltered): 3549
INFO: Orders load_dttm range: 2024-12-25 19:57:05.701979 to 2024-12-25 19:57:05.701979
INFO: Filtering orders with load_dttm > 2024-12-25 19:31:02.776379
INFO: After filtering, 3549 orders remain.
INFO: Proceeding with processing of 3549 orders...
INFO: Calculated metrics for 2999 craftsman-period combinations.
INFO: Updating load date tracking table...
INFO: Recording max order load date: 2024-12-25 19:57:05.701979
INFO: Load date tracking table updated successfully


теперь в оригинальный источник добавим новый заказ

In [134]:
 #тут надо убедиться чтобы название было оригинальным, иначе его могут срезать как дубликат
insert_test_data("TEST ORDER 5")

INFO: Inserting test data into source tables...
INFO: Current max IDs - Craftsman: 1013, Customer: 1013, Order: 1013
INFO: Inserted new craftsman with ID: 1014
INFO: Inserted new customer with ID: 1014
INFO: Inserted new order with ID: 1014
INFO: Test data inserted successfully
INFO: Verified new order: (1014, 'TEST ORDER 4', 'Test Craftsman New2', 'Test Customer New')


In [None]:
теперь прогоним апдейт оттуда в DWH. Он должен произойти инкрементально

In [135]:
loadDataIntoDWH()

Loading craftsmans dimension (incremental)...
No new craftsmans found. Skipping write to DWH.
Loading products dimension (incremental)...
Inserted 1 new products into DWH.
Loading customers dimension (incremental)...
No new customers found. Skipping write to DWH.
Loading orders fact table (incremental)...
+----------+------------+-----------+------------------+------------+---------------------+--------------------------+
|product_id|craftsman_id|customer_id|order_created_date|order_status|order_completion_date|load_dttm                 |
+----------+------------+-----------+------------------+------------+---------------------+--------------------------+
|91843     |110596      |110638     |2024-12-25        |created     |NULL                 |2024-12-25 19:58:25.454518|
+----------+------------+-----------+------------------+------------+---------------------+--------------------------+

Found 1 new records for f_orders (incremental).
Orders fact table loaded successfully (incrementa

In [136]:
run_datamart_update()

INFO: Found 9 records in load dates tracking table
INFO: Last load date from tracking table: 2024-12-25 19:57:05.701979
INFO: Starting datamart metrics calculation...
INFO: Reading dimension tables: d_craftsmans, d_customers, d_products...
INFO: Reading orders table: f_orders...
INFO: Total orders in f_orders (unfiltered): 3550
INFO: Orders load_dttm range: 2024-12-25 19:57:05.701979 to 2024-12-25 19:58:26.243153
INFO: Filtering orders with load_dttm > 2024-12-25 19:57:05.701979
INFO: After filtering, 1 orders remain.
INFO: Proceeding with processing of 1 orders...
INFO: Calculated metrics for 1 craftsman-period combinations.


Готово :)