In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, sum as spark_sum, expr, datediff, when, first, max as spark_max
from pyspark.sql.window import Window
from pyspark.sql.types import DateType
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("DWH Population") \
    .config("spark.driver.extraClassPath", "/home/jovyan/postgresql-42.7.4.jar") \
    .getOrCreate()

jdbc_url = "jdbc:postgresql://localhost:5432/postgres"
properties = {
    "user": "postgres",
    "password": "mysecretpassword",
    "driver": "org.postgresql.Driver"
}

In [34]:
customers_df = spark.read.jdbc(url=jdbc_url, table="dwh.d_customers", properties=properties)
products_df = spark.read.jdbc(url=jdbc_url, table="dwh.d_products", properties=properties)
craftsmans_df = spark.read.jdbc(url=jdbc_url, table="dwh.d_craftsmans", properties=properties)
orders_df = spark.read.jdbc(url=jdbc_url, table="dwh.f_orders", properties=properties)

In [35]:
last_load_date_df = spark.read.jdbc(url=jdbc_url, table="dwh.load_dates_craftsman_report_datamart", properties=properties)

last_load_date = last_load_date_df.agg(spark_max("load_dttm")).collect()[0][0]
if last_load_date is None:
    filtered_customers_df = customers_df
    filtered_products_df = products_df
    filtered_craftsmans_df = craftsmans_df
    filtered_orders_df = orders_df
else:
    filtered_customers_df = customers_df.filter(col("load_dttm") > last_load_date)
    filtered_products_df = products_df.filter(col("load_dttm") > last_load_date)
    filtered_craftsmans_df = craftsmans_df.filter(col("load_dttm") > last_load_date)
    filtered_orders_df = orders_df.filter(col("load_dttm") > last_load_date)
    
joined_df = filtered_orders_df.join(filtered_products_df, "product_id") \
    .join(filtered_craftsmans_df, "craftsman_id") \
    .join(filtered_customers_df, "customer_id")

joined_df = joined_df.withColumn("report_period", F.date_format(col("order_created_date"), "yyyy-MM"))

window_spec = Window.partitionBy("craftsman_id", "report_period").orderBy("order_completion_date")
joined_df = joined_df.withColumn("median_time_order_completed",
                                 datediff(col("order_completion_date"), col("order_created_date")))

joined_df = joined_df.withColumn("customer_age",
                                 F.datediff(F.current_date(), col("customer_birthday")) / 365.25)

agg_df = joined_df.groupBy("craftsman_id", "craftsman_name", "craftsman_address", "craftsman_birthday", "craftsman_email", "report_period") \
    .agg(
        spark_sum(col("product_price") * 0.9).alias("craftsman_money"),
        spark_sum(col("product_price") * 0.1).alias("platform_money"),
        count("order_id").alias("count_order"),
        avg(col("product_price")).alias("avg_price_order"),
        avg(col("customer_age")).alias("avg_age_customer"),
        expr("percentile_approx(median_time_order_completed, 0.5)").alias("median_time_order_completed"),
        count(when(col("order_status") == "created", True)).alias("count_order_created"),
        count(when(col("order_status") == "in progress", True)).alias("count_order_in_progress"),
        count(when(col("order_status") == "delivery", True)).alias("count_order_delivery"),
        count(when(col("order_status") == "done", True)).alias("count_order_done"),
        count(when(col("order_status").isin("created", "in progress", "delivery"), True)).alias("count_order_not_done")
    )

product_category_count_df = joined_df.groupBy("craftsman_id", "report_period", "product_type") \
    .agg(count("product_id").alias("product_count"))

window_spec_category = Window.partitionBy("craftsman_id", "report_period").orderBy(col("product_count").desc())
top_product_category_df = product_category_count_df.withColumn("rank", F.row_number().over(window_spec_category)) \
    .filter(col("rank") == 1) \
    .select("craftsman_id", "report_period", "product_type") \
    .withColumnRenamed("product_type", "top_product_category")

final_df = agg_df.join(top_product_category_df, ["craftsman_id", "report_period"], "left")

In [36]:
final_df.write.jdbc(url=jdbc_url, table="dwh.craftsman_report_datamart", mode="append", properties=properties)

current_load_date = F.current_date()
spark.read.jdbc(url=jdbc_url, table="dwh.load_dates_craftsman_report_datamart", properties=properties) \
    .createOrReplaceTempView("load_dates_craftsman_report_datamart")

max_id_df = spark.sql("SELECT MAX(id) AS max_id FROM load_dates_craftsman_report_datamart")
max_id = max_id_df.collect()[0][0]

next_id = 1 if max_id is None else max_id + 1

spark.sql(f"INSERT INTO load_dates_craftsman_report_datamart (id, load_dttm) VALUES ({next_id}, CURRENT_DATE)")

DataFrame[]

In [32]:
final_df

DataFrame[craftsman_id: bigint, report_period: string, craftsman_name: string, craftsman_address: string, craftsman_birthday: date, craftsman_email: string, craftsman_money: double, platform_money: double, count_order: bigint, avg_price_order: double, avg_age_customer: double, median_time_order_completed: int, count_order_created: bigint, count_order_in_progress: bigint, count_order_delivery: bigint, count_order_done: bigint, count_order_not_done: bigint, top_product_category: string]