In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format
URL_IDB =  "jdbc:sqlserver://Admin\TRAM1:1433;databaseName=Integration;user=sa;password=12345;"
URL_DW =  "jdbc:sqlserver://Admin\TRAM1:1433;databaseName=datawarehouse;user=sa;password=12345;"
spark = SparkSession.builder \
    .appName("Read from MSSQL") \
    .config("spark.driver.extraClassPath", "C:/SPARK/spark-3.5.1-bin-hadoop3/jars/apache-spark-sql-connector.jar") \
    .getOrCreate()
properties = {
    "user": "sa",
    "password": "12345",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"

In [None]:
m1 = '2008-01-01'
m2 = '2023-01-01'
m3 = '2023-12-02'

In [None]:
no_change_table_ls = [
    ("RepresentativeOffices","DimRepresentativeOffice"),
    ("Stores","DimStore"),
    ("Items","DimItem"),
]

In [None]:
def nochange_table_idb_to_dw(idb_table,wh_table,start, end):
    condition = f"time<='{end}' and time >'{start}'"
    print(idb_table,condition)
    df = spark.read.jdbc(url=URL_IDB, table=idb_table, properties=properties).filter(condition)
    if idb_table=="RepresentativeOffices":
        df = df.withColumnRenamed("CityId", "CityKey")
    if idb_table=="Stores":
        df = df.withColumnRenamed("StoreId", "StoreKey")
        df = df.withColumnRenamed("CityId", "CityKey")
    if idb_table=="Items":
        df = df.withColumnRenamed("ItemId", "ItemKey")
    df.write.jdbc(url=URL_DW,
              table=wh_table,
              mode = "append",
              properties=properties)
    return 0

In [None]:
def idb_customer_to_dw_customer(start,end):
    df = spark.read.jdbc(url=URL_IDB, table="Customers", properties=properties)
    df.createOrReplaceTempView("Customers")
    dft = spark.read.jdbc(url=URL_IDB, table="TravelCustomers", properties=properties)
    dft.createOrReplaceTempView("TravelCustomers")
    dfp = spark.read.jdbc(url=URL_IDB, table="PostalCustomers", properties=properties)
    dfp.createOrReplaceTempView("PostalCustomers")
    query = f"""
    SELECT cus.Customerid, cus.CustomerName, cus.CityId,
    CASE
    WHEN EXISTS (SELECT customerid FROM TravelCustomers WHERE customerid = cus.customerid) THEN 1
    ELSE 0
    END AS Travel,
    CASE
    WHEN EXISTS (SELECT customerid FROM PostalCustomers WHERE customerid = cus.customerid) THEN 1
    ELSE 0
    END AS Postal,
    cus.FirstOrderDate
    FROM Customers AS cus
    WHERE cus.FirstOrderDate>='{start}' and cus.FirstOrderDate<'{end}';
    """
    result = spark.sql(query)
    dwdf = result.toDF("CustomerKey","CustomerName","CityKey","Travel","Postal","FirstOrderDate")
    dwdf.write.jdbc(url=URL_DW,
              table="DimCustomer",
              mode = "append",
              properties=properties)
    return 0

In [None]:
def idb_to_order_item_facts(start,end):
    dfc_cus = spark.read.jdbc(url=URL_IDB, table="Customers", properties=properties)
    df.createOrReplaceTempView("Customers")
    df_it = spark.read.jdbc(url=URL_IDB, table="Items", properties=properties)
    df_it.createOrReplaceTempView("Items")
    df_od = spark.read.jdbc(url=URL_IDB, table="Orders", properties=properties)
    df_od.createOrReplaceTempView("Orders")
    df_ot = spark.read.jdbc(url=URL_IDB, table="OrderedItems", properties=properties)
    df_ot.createOrReplaceTempView("OrderItems")
    query = f""" SELECT ot.Time, ot.ItemId AS ItemKey, cus.CustomerId AS CustomerKey,
        ot.OrderedQuantity, ot.OrderCost, ot.OrderCost - (ot.OrderedQuantity * it.Price) AS Profit
    FROM OrderedItems ot, Items it, Customers cus, Orders od
    WHERE ot.time>='{m1}' and ot.time<'{m3}' and ot.ItemId = it.ItemId AND ot.OrderId = od.OrderId AND od.CustomerId = cus.CustomerId;
    """
    result = spark.sql(query)
    result = result.withColumn("TimeKey", date_format(result.Time, "yyyyMMdd"))
    result = result.select("TimeKey","ItemKey","CustomerKey","OrderedQuantity","OrderCost","Profit")
    dwdf = result.toDF("TimeKey","ItemKey","CustomerKey","OrderedQuantity","OrderedCost","Profit")
    dwdf.write.jdbc(url=URL_DW,
              table="FactOrderItems",
              mode = "append",
              properties=properties)
    return 0

In [None]:
def idb_to_store_item_facts(start,end):
    df =  spark.read.jdbc(url=URL_IDB, table="StoredItems", properties=properties)
    df.createOrReplaceTempView("StoredItems")
    query = f"""
    SELECT Time,ItemId,StoreId,StoredQuantity
    FROM StoredItems
    where Time>='{start}' and Time<'{end}';
    """
    result = spark.sql(query)
    result = result.withColumn("TimeKey", date_format(result.Time, "yyyyMMdd"))
    result = result.select("TimeKey","ItemId","StoreId","StoredQuantity")
    dwdf =  result.toDF("TimeKey","ItemKey","StoreKey","Quantity")
    dwdf.write.jdbc(url=URL_DW,
              table="FactStoreItems",
              mode = "append",
              properties=properties)
    return 0

In [None]:
for x in no_change_table_ls:
    nochange_table_idb_to_dw(x[0],x[1],m1,m2)

In [None]:
for x in no_change_table_ls:
    nochange_table_idb_to_dw(x[0],x[1],m2,m3)

In [None]:
idb_customer_to_dw_customer(m1,m3)

In [None]:
idb_to_order_item_facts(m1,m3)

In [None]:
idb_to_store_item_facts(m1,m3)