In [36]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Пути и настройки
postgresql_driver_path = "/opt/drivers/postgresql-42.7.4.jar"
source_url = "jdbc:postgresql://my_postgres:5432/postgres"
dwh_url = "jdbc:postgresql://my_postgres:5432/postgres"  # URL к базе данных DWH
dwh_properties = {
    "user": "postgres",
    "password": "mysecretpassword",
    "driver": "org.postgresql.Driver",
}

# Инициализация SparkSession
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars", postgresql_driver_path) \
    .getOrCreate()

# Функция для чтения последнего значения
def get_last_load_time(table_name):
 
  try:
        with open("last_load_times.txt", "r") as f:
            for line in f:
              name, time = line.strip().split("=")
              if name == table_name:
                return time
  except FileNotFoundError:
    return "1970-01-01 00:00:00"
  return "1970-01-01 00:00:00"

# Функция для обновления последнего значения
def update_last_load_time(table_name, new_time):
    
    lines = []
    try:
        with open("last_load_times.txt", "r") as f:
            for line in f:
               name, time = line.strip().split("=")
               if name != table_name:
                  lines.append(line)
    except FileNotFoundError:
        pass
    with open("last_load_times.txt", "w") as f:
        lines.append(f"{table_name}={new_time}\n")
        for line in lines:
           f.write(line)
# Инкрементальная загрузка для dwh.load_dates_craftmans_report_datamart
# (Источником данных является source1.craft_market_wide)
last_load_time_source1 = get_last_load_time("source1.craft_market_wide")

# 1. Чтение данных из исходной таблицы (с фильтром)
#  Используем order_created_date для фильтрации 
df_source1 = spark.read.jdbc(source_url, "source1.craft_market_wide", properties=dwh_properties) \
            .where(F.col("order_created_date") > last_load_time_source1)

# 2. Преобразование данных
df_dwh = df_source1.select(
    F.col(df_source1.columns[0]).alias("id"),
    F.current_timestamp().alias("load_dttm")
)

# 3. Запись в dwh.load_dates_craftmans_report_datamart
df_dwh.write.jdbc(
    dwh_url, "dwh.load_dates_craftmans_report_datamart", mode="append", properties=dwh_properties
)

# 4. Обновление последнего времени загрузки
#  Используем order_created_date 
new_load_time_source1 = str(df_source1.agg(F.max("order_created_date")).collect()[0][0])
update_last_load_time("source1.craft_market_wide", new_load_time_source1)

# Чтение из source2 таблица craft_market_masters_products
last_load_time_source2_1 = get_last_load_time("source2.craft_market_masters_products")

df_source2_1 = spark.read.jdbc(source_url, "source2.craft_market_masters_products", properties=dwh_properties) \
    .where(F.col("product_id") > last_load_time_source2_1)

# Выбор и переименование нужных столбцов для dwh.d_craftsmans  
df_source2_1_transformed = df_source2_1.select(
    df_source2_1.columns[0], 
    df_source2_1.columns[1], 
    df_source2_1.columns[2],
    df_source2_1.columns[3],
   df_source2_1.columns[4],
    F.current_timestamp().alias("load_dttm")

)

# Запись в таблицу dwh.d_craftsmans 
df_source2_1_transformed.write.jdbc(
    dwh_url, "dwh.d_craftsmans", mode="append", properties=dwh_properties
)

new_load_time_source2_1 = str(df_source2_1.agg(F.max("product_id")).collect()[0][0])
update_last_load_time("source2.craft_market_masters_products", new_load_time_source2_1)

# Чтение из source2 таблица craft_market_orders_customers
last_load_time_source2_2 = get_last_load_time("source2.craft_market_orders_customers")
# Используем order_created_date для фильтрации
df_source2_2 = spark.read.jdbc(source_url, "source2.craft_market_orders_customers", properties=dwh_properties) \
  .where(F.col("order_created_date") > last_load_time_source2_2)

#  Выбор нужных столбцов для dwh.d_customers 
df_source2_2_transformed = df_source2_2.select(
     F.col(df_source2_2.columns[7]).alias("customer_name"),  
     F.col(df_source2_2.columns[10]).alias("customer_email"), 
     F.current_timestamp().alias("load_dttm")
)

# Запись в таблицу dwh.d_customers 
df_source2_2_transformed.write.jdbc(
    dwh_url, "dwh.d_customers", mode="append", properties=dwh_properties
)
# Используем order_created_date для обновления
new_load_time_source2_2 = str(df_source2_2.agg(F.max("order_created_date")).collect()[0][0])
update_last_load_time("source2.craft_market_orders_customers", new_load_time_source2_2)

# Инкрементальная загрузка для dwh.craftsman_report_datamart
last_load_time_source1_craftsman_report = get_last_load_time("source1.craft_market_wide")

df_source1 = spark.read.jdbc(source_url, "source1.craft_market_wide", properties=dwh_properties) \
       .where(F.col("order_created_date") > last_load_time_source1_craftsman_report)

df_source1_transformed = df_source1.select(
    df_source1.columns[1],
    df_source1.columns[2],
    df_source1.columns[3],
    df_source1.columns[4],
    df_source1.columns[5],
    F.lit(0).alias("craftsman_money"),
    F.lit(0).alias("platform_money"),
    F.lit(0).alias("count_order"),
    F.lit(0).alias("avg_price_order"),
    F.lit(0).alias("avg_age_customer"),
    F.lit(0).alias("median_time_order_completed"),
    F.lit(0).alias("top_product_category"),
    F.lit(0).alias("count_order_created"),
    F.lit(0).alias("count_order_in_progress"),
    F.lit(0).alias("count_order_delivery"),
    F.lit(0).alias("count_order_done"),
    F.lit(0).alias("count_order_not_done"),
    F.current_timestamp().alias("report_period")
)

#  Запись в таблицу dwh.craftsman_report_datamart
df_source1_transformed.write.jdbc(
    dwh_url, "dwh.craftsman_report_datamart", mode="append", properties=dwh_properties
)

# Обновляем последнее время загрузки
# Используем order_created_date 
new_load_time_source1_craftsman_report = str(df_source1.agg(F.max("order_created_date")).collect()[0][0])
update_last_load_time("source1.craft_market_wide",new_load_time_source1_craftsman_report)


# Инкрементальная загрузка для source3
last_load_time_source3_3 = get_last_load_time("source3.craft_market_orders")
# Используем order_created_date для фильтрации
df_source3_3 = spark.read.jdbc(source_url, "source3.craft_market_orders", properties=dwh_properties) \
    .where(F.col("order_created_date") > last_load_time_source3_3)

df_source3_1_transformed = df_source3_3.select(
    df_source3_3.columns[2], 
    df_source3_3.columns[3], 
    df_source3_3.columns[4],
    df_source3_3.columns[5],
    df_source3_3.columns[6],
    F.current_timestamp().alias("load_dttm")
)

df_source3_2_transformed = df_source3_3.select(
    df_source3_3.columns[7],
    df_source3_3.columns[8],
    df_source3_3.columns[9],
    df_source3_3.columns[10],
    F.current_timestamp().alias("load_dttm")
)

# Запись в таблицы фактов
df_source3_2_transformed.write.jdbc(
    dwh_url, "dwh.d_products", mode="append", properties=dwh_properties
)


df_craftsmans = spark.read.jdbc(dwh_url, "dwh.d_craftsmans", properties=dwh_properties)

# Отфильтруем df_source3_1_transformed так, чтобы craftsman_id присутствовали в df_craftsmans
df_source3_1_transformed = df_source3_1_transformed.join(
    df_craftsmans.select("craftsman_id"), 
    on="craftsman_id",
    how="inner"
)
df_source3_1_transformed.write.jdbc(
    dwh_url, "dwh.f_orders", mode="append", properties=dwh_properties
)

#  Используем order_created_date для обновления
new_load_time_source3_3 = str(df_source3_3.agg(F.max("order_created_date")).collect()[0][0])
update_last_load_time("source3.craft_market_orders", new_load_time_source3_3)


spark.stop()