# Extract


In [1]:
import os
import findspark

findspark.init()
from pyspark.sql import SparkSession

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
import pyspark.pandas as ps
import pandas as pd
import pyodbc
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql import Window
from pyspark.sql.types import *
import xml.etree.ElementTree as ET


def create_database_connection():
    the_spark = (
        SparkSession.builder.appName("SparkSQL")
        .config("spark.hadoop.home.dir", "C:/Hadoop/")
        .getOrCreate()
    )
    the_spark.sparkContext._jvm.java.lang.System.setProperty(
        "java.library.path",
        "./sqljdbc_12.4/enu/auth/x64/mssql-jdbc_auth-12.4.1.x64.dll",
    )
    return the_spark


def load_jdbc(spark, db):
    sql_query = "SELECT table_schema + '.' + table_name AS full_table_name FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema + '.' + table_name NOT IN ('dbo.DatabaseLog', 'dbo.ErrorLog', 'dbo.AWBuildVersion', 'dbo.sysdiagrams')"
    df = (
        spark.read.format("jdbc")
        .option(
            "url",
            f"jdbc:sqlserver://localhost:1433;databaseName={db};integratedSecurity=true;encrypt=true;trustServerCertificate=true;authenticationScheme=nativeAuthentication",
        )
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
        .option("user", "INSPIRON7620\dpl2k")
        .option("password", "")
        .option("query", sql_query)
        .load()
    )
    return df


def extract(spark, db):
    try:
        table_dataframes = {}
        dfs = load_jdbc(spark, db)
        data_collect = dfs.collect()
        for row in data_collect:
            tbl_name = row["full_table_name"]
            variable_name = tbl_name.split(".")[1] + "Df"
            table_df = (
                my_spark.read.format("jdbc")
                .option(
                    "url",
                    f"jdbc:sqlserver://localhost:1433;databaseName={db};integratedSecurity=true;encrypt=true;trustServerCertificate=true;authenticationScheme=nativeAuthentication",
                )
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
                .option("user", "INSPIRON7620\dpl2k")
                .option("password", "")
                .option("dbtable", tbl_name)
                .load()
            )
            table_dataframes[variable_name] = table_df
        print("Data extract completed successfully.")
        return table_dataframes
    except Exception as e:
        print("Data extract error: " + str(e))
        return {}


my_spark = create_database_connection()
my_dfs = extract(my_spark, "CompanyX")
category_df = my_dfs["ProductCategoryDf"]
subcategory_df = my_dfs["ProductSubcategoryDf"]
territory_df = my_dfs["SalesTerritoryDf"]
order_detail_df = my_dfs["SalesOrderDetailDf"]
order_header_df = my_dfs["SalesOrderHeaderDf"]
product_df = my_dfs["ProductDf"]
currency_df = my_dfs["CurrencyDf"]
promotion_df = my_dfs["SpecialOfferDf"]
country_region_df = my_dfs["CountryRegionDf"]
state_province_df = my_dfs["StateProvinceDf"]
address_df = my_dfs["AddressDf"]
employee_df = my_dfs["EmployeeDf"]
person_df = my_dfs["PersonDf"]
business_entity_df = my_dfs["BusinessEntityDf"]
business_entity_address_df = my_dfs["BusinessEntityAddressDf"]
store_df = my_dfs["StoreDf"]
customer_df = my_dfs["CustomerDf"]
product_list_price_history_df = my_dfs["ProductListPriceHistoryDf"]
product_cost_history_df = my_dfs["ProductCostHistoryDf"]

# store_df.printSchema()
# category_df.printSchema()
# subcategory_df.printSchema()
# territory_df.printSchema()
# order_detail_df.printSchema()
# order_header_df.printSchema()
# product_df.printSchema()

Data extract completed successfully.


# Transform

### DimProductCategory


In [2]:
DimProductCategory = category_df.drop("rowguid", "ModifiedDate")
DimProductCategory = DimProductCategory.withColumnsRenamed(
    {"ProductCategoryID": "ProductCategoryKey", "Name": "ProductCategoryName"}
)
DimProductCategory = DimProductCategory.withColumn(
    "ProductCategoryAlternateKey", DimProductCategory.ProductCategoryKey
)
DimProductCategory = DimProductCategory.select(
    "ProductCategoryKey", "ProductCategoryAlternateKey", "ProductCategoryName"
)
DimProductCategory.printSchema()

root
 |-- ProductCategoryKey: integer (nullable = true)
 |-- ProductCategoryAlternateKey: integer (nullable = true)
 |-- ProductCategoryName: string (nullable = true)



### DimProductSubcategory


In [3]:
DimProductSubcategory = subcategory_df.drop("rowguid", "ModifiedDate")
DimProductSubcategory = DimProductSubcategory.withColumnsRenamed(
    {
        "ProductSubcategoryID": "ProductSubcategoryKey",
        "ProductCategoryID": "ProductCategoryKey",
        "Name": "ProductSubcategoryName",
    }
)
DimProductSubcategory = DimProductSubcategory.withColumn(
    "ProductSubcategoryAlternateKey", DimProductSubcategory.ProductSubcategoryKey
)
DimProductSubcategory = DimProductSubcategory.select(
    "ProductSubcategoryKey",
    "ProductSubcategoryAlternateKey",
    "ProductCategoryKey",
    "ProductSubcategoryName",
)
DimProductSubcategory.printSchema()

root
 |-- ProductSubcategoryKey: integer (nullable = true)
 |-- ProductSubcategoryAlternateKey: integer (nullable = true)
 |-- ProductCategoryKey: integer (nullable = true)
 |-- ProductSubcategoryName: string (nullable = true)



### DimSalesTerritory


In [4]:
territory_df = territory_df.drop(
    "SalesYTD", "SalesLastYear", "CostYTD", "CostLastYear", "rowguid", "ModifiedDate"
)

country_region_df = country_region_df.drop("ModifiedDate")

DimSalesTerritory = territory_df.join(
    country_region_df,
    territory_df.CountryRegionCode == country_region_df.CountryRegionCode,
    how="inner",
)

DimSalesTerritory = DimSalesTerritory.select(
    territory_df.TerritoryID,
    territory_df.Name,
    territory_df.CountryRegionCode,
    territory_df.Group,
    country_region_df.Name.alias("CountryRegionName"),
)

DimSalesTerritory = DimSalesTerritory.withColumnsRenamed(
    {
        "TerritoryID": "SalesTerritoryKey",
        "Name": "SalesTerritoryRegion",
        "Group": "SalesTerritoryGroup",
        "CountryRegionName": "SalesTerritoryCountry",
    }
)

DimSalesTerritory = DimSalesTerritory.withColumn(
    "SalesTerritoryAlternateKey", DimSalesTerritory.SalesTerritoryKey
)

DimSalesTerritory = DimSalesTerritory.drop("CountryRegionCode")

DimSalesTerritory = DimSalesTerritory.select(
    "SalesTerritoryKey",
    "SalesTerritoryAlternateKey",
    "SalesTerritoryRegion",
    "SalesTerritoryCountry",
    "SalesTerritoryGroup",
).orderBy("SalesTerritoryKey")

DimSalesTerritory.printSchema()
DimSalesTerritory.show()

root
 |-- SalesTerritoryKey: integer (nullable = true)
 |-- SalesTerritoryAlternateKey: integer (nullable = true)
 |-- SalesTerritoryRegion: string (nullable = true)
 |-- SalesTerritoryCountry: string (nullable = true)
 |-- SalesTerritoryGroup: string (nullable = true)

+-----------------+--------------------------+--------------------+---------------------+-------------------+
|SalesTerritoryKey|SalesTerritoryAlternateKey|SalesTerritoryRegion|SalesTerritoryCountry|SalesTerritoryGroup|
+-----------------+--------------------------+--------------------+---------------------+-------------------+
|                1|                         1|           Northwest|        United States|      North America|
|                2|                         2|           Northeast|        United States|      North America|
|                3|                         3|             Central|        United States|      North America|
|                4|                         4|           Southwest|  

### DimDiscount


In [5]:
DimDiscount = promotion_df.drop("rowguid", "ModifiedDate")
DimDiscount = DimDiscount.withColumnsRenamed(
    {
        "SpecialOfferID": "DiscountKey",
        "Description": "DiscountName",
        "Type": "DiscountType",
        "Category": "DiscountCategory",
    }
)
DimDiscount = DimDiscount.withColumn("DiscountAlternateKey", DimDiscount.DiscountKey)
DimDiscount = DimDiscount.select(
    "DiscountKey",
    "DiscountAlternateKey",
    "DiscountName",
    "DiscountPct",
    "DiscountType",
    "DiscountCategory",
    "StartDate",
    "EndDate",
    "MinQty",
    "MaxQty",
)
DimDiscount = DimDiscount.withColumn(
    "MaxQty", F.when(DimDiscount["MaxQty"] == -24, 24).otherwise(DimDiscount["MaxQty"])
)
DimDiscount.printSchema()
DimDiscount.show()

root
 |-- DiscountKey: integer (nullable = true)
 |-- DiscountAlternateKey: integer (nullable = true)
 |-- DiscountName: string (nullable = true)
 |-- DiscountPct: decimal(10,4) (nullable = true)
 |-- DiscountType: string (nullable = true)
 |-- DiscountCategory: string (nullable = true)
 |-- StartDate: timestamp (nullable = true)
 |-- EndDate: timestamp (nullable = true)
 |-- MinQty: integer (nullable = true)
 |-- MaxQty: integer (nullable = true)

+-----------+--------------------+--------------------+-----------+--------------------+----------------+-------------------+-------------------+------+------+
|DiscountKey|DiscountAlternateKey|        DiscountName|DiscountPct|        DiscountType|DiscountCategory|          StartDate|            EndDate|MinQty|MaxQty|
+-----------+--------------------+--------------------+-----------+--------------------+----------------+-------------------+-------------------+------+------+
|          1|                   1|         No Discount|     0.0000|

### DimAddress


In [6]:
# Define window specifications
windowSpec1 = Window.partitionBy(
    address_df["PostalCode"], address_df["City"], address_df["StateProvinceID"]
).orderBy(state_province_df["Name"], country_region_df["CountryRegionCode"])
windowSpec2 = Window.orderBy(
    address_df["PostalCode"], address_df["City"], address_df["StateProvinceID"]
)

# Perform the joins
join_df = address_df.join(
    state_province_df,
    address_df.StateProvinceID == state_province_df.StateProvinceID,
    "inner",
).join(
    country_region_df,
    state_province_df.CountryRegionCode == country_region_df.CountryRegionCode,
    "inner",
)

# Apply the row number function for RankedAddresses
ranked_addresses_df = join_df.withColumn("rn", row_number().over(windowSpec1))

# Filter and create a new AddressID
DimAddress = (
    ranked_addresses_df.filter(col("rn") == 1)
    .withColumn("NewAddressID", row_number().over(windowSpec2))
    .select(
        col("NewAddressID").alias("AddressKey"),
        address_df["City"],
        address_df["StateProvinceID"].alias("StateProvinceCode"),
        state_province_df["CountryRegionCode"],
        country_region_df["Name"].alias("CountryRegionName"),
        address_df["PostalCode"],
        state_province_df["TerritoryID"].alias("SalesTerritoryKey"),
    )
)

# Show the result
DimAddress.printSchema()
DimAddress.show()

root
 |-- AddressKey: integer (nullable = false)
 |-- City: string (nullable = true)
 |-- StateProvinceCode: integer (nullable = true)
 |-- CountryRegionCode: string (nullable = true)
 |-- CountryRegionName: string (nullable = true)
 |-- PostalCode: string (nullable = true)
 |-- SalesTerritoryKey: integer (nullable = true)

+----------+-------------+-----------------+-----------------+-----------------+----------+-----------------+
|AddressKey|         City|StateProvinceCode|CountryRegionCode|CountryRegionName|PostalCode|SalesTerritoryKey|
+----------+-------------+-----------------+-----------------+-----------------+----------+-----------------+
|         1|      Dresden|               19|               DE|          Germany|     01071|                8|
|         2|       Saugus|               30|               US|    United States|     01906|                2|
|         3|      Norwood|               30|               US|    United States|     02062|                2|
|         4|  

### DimEmployee


In [7]:
# Perform the joins
join_df = (
    employee_df.join(
        business_entity_address_df,
        employee_df.BusinessEntityID == business_entity_address_df.BusinessEntityID,
        "inner",
    )
    .join(
        address_df,
        address_df.AddressID == business_entity_address_df.AddressID,
        "inner",
    )
    .join(
        state_province_df,
        state_province_df.StateProvinceID == address_df.StateProvinceID,
        "inner",
    )
    .join(
        person_df, person_df.BusinessEntityID == employee_df.BusinessEntityID, "inner"
    )
)

# Selecting and transforming the columns as per the SQL query
DimEmployee = join_df.select(
    employee_df["BusinessEntityID"],
    state_province_df["TerritoryID"],
    employee_df["NationalIDNumber"],
    person_df["FirstName"],
    person_df["MiddleName"],
    person_df["LastName"],
    employee_df["Gender"],
    employee_df["JobTitle"],
    employee_df["BirthDate"],
    employee_df["HireDate"],
    employee_df["CurrentFlag"].alias("ActiveFlag"),
    when(person_df["PersonType"] == "SP", True)
    .otherwise(False)
    .alias("SalesPersonFlag"),
).orderBy(employee_df["BusinessEntityID"])

DimEmployee = DimEmployee.withColumnsRenamed(
    {
        "BusinessEntityID": "EmployeeKey",
        "TerritoryID": "SalesTerritoryKey",
        "NationalIDNumber": "EmployeeNumber",
        "FirstName": "FirstName",
        "MiddleName": "MiddleName",
        "LastName": "LastName",
        "Gender": "Gender",
        "JobTitle": "Title",
        "BirthDate": "BirthDate",
        "HireDate": "HireDate",
        "CurrentFlag": "ActiveFlag",
        "SalesPersonFlag": "SalesPersonFlag",
    }
)

# Show the result
DimEmployee.show()
DimEmployee.printSchema()

+-----------+-----------------+--------------+---------+----------+----------+------+--------------------+----------+----------+----------+---------------+
|EmployeeKey|SalesTerritoryKey|EmployeeNumber|FirstName|MiddleName|  LastName|Gender|               Title| BirthDate|  HireDate|ActiveFlag|SalesPersonFlag|
+-----------+-----------------+--------------+---------+----------+----------+------+--------------------+----------+----------+----------+---------------+
|          1|                1|     295847284|      Ken|         J|   Sánchez|     M|Chief Executive O...|1969-01-29|2009-01-14|      true|          false|
|          2|                1|     245797967|    Terri|       Lee|     Duffy|     F|Vice President of...|1971-08-01|2008-01-31|      true|          false|
|          3|                1|     509647174|  Roberto|      null|Tamburello|     M| Engineering Manager|1974-11-12|2007-11-11|      true|          false|
|          4|                3|     112457891|      Rob|      nu

### DimProduct


In [8]:
DimProduct = (
    product_df.alias("p")
    .select(
        F.col("p.ProductID").alias("ProductKey"),
        F.col("p.ProductNumber").alias("ProductAlternateKey"),
        F.col("p.ProductSubcategoryID").alias("ProductSubcategoryKey"),
        F.col("p.Name").alias("ProductName"),
        F.col("p.StandardCost"),
        F.col("p.ListPrice"),
        F.col("p.Class"),
        F.col("p.Style"),
        F.col("p.ProductLine"),
        F.col("p.SellStartDate").alias("StartDate"),
        F.col("p.SellEndDate").alias("EndDate"),
    )
    .orderBy("ProductKey", "StartDate")
)

DimProduct.printSchema()
DimProduct.show()

root
 |-- ProductKey: integer (nullable = true)
 |-- ProductAlternateKey: string (nullable = true)
 |-- ProductSubcategoryKey: integer (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- StandardCost: decimal(19,4) (nullable = true)
 |-- ListPrice: decimal(19,4) (nullable = true)
 |-- Class: string (nullable = true)
 |-- Style: string (nullable = true)
 |-- ProductLine: string (nullable = true)
 |-- StartDate: timestamp (nullable = true)
 |-- EndDate: timestamp (nullable = true)

+----------+-------------------+---------------------+--------------------+------------+---------+-----+-----+-----------+-------------------+-------+
|ProductKey|ProductAlternateKey|ProductSubcategoryKey|         ProductName|StandardCost|ListPrice|Class|Style|ProductLine|          StartDate|EndDate|
+----------+-------------------+---------------------+--------------------+------------+---------+-----+-----+-----------+-------------------+-------+
|         1|            AR-5381|                

### DimReseller


In [10]:
def extract_xml_values(xml_string, element_name):
    try:
        root = ET.fromstring(xml_string)
        namespace = (
            "http://schemas.microsoft.com/sqlserver/2004/07/adventure-works/StoreSurvey"
        )
        value = root.find(".//{%s}%s" % (namespace, element_name)).text
        return value
    except:
        return None


# Define the UDFs
extract_business_type_udf = F.udf(
    lambda xml: extract_xml_values(xml, "BusinessType"), returnType=StringType()
)
extract_specialty_udf = F.udf(
    lambda xml: extract_xml_values(xml, "Specialty"), returnType=StringType()
)
extract_year_opened_udf = F.udf(
    lambda xml: int(extract_xml_values(xml, "YearOpened")), returnType=IntegerType()
)
extract_number_employees_udf = F.udf(
    lambda xml: int(extract_xml_values(xml, "NumberEmployees")),
    returnType=IntegerType(),
)
store_survey_df = store_df.select(F.col("BusinessEntityID"), F.col("Demographics"))
# Apply the UDFs to the DataFrame
ha_df = (
    store_survey_df.withColumn(
        "BusinessType", extract_business_type_udf("Demographics")
    )
    .withColumn("Specialty", extract_specialty_udf("Demographics"))
    .withColumn("YearOpened", extract_year_opened_udf("Demographics"))
    .withColumn("NumberEmployees", extract_number_employees_udf("Demographics"))
)

# Show the result
ha_df = ha_df.drop("Demographics")

DimReseller = (
    customer_df.alias("customer")
    .join(
        store_df.alias("store"),
        F.col("customer.StoreID") == F.col("store.BusinessEntityID"),
        "inner",
    )
    .join(
        order_header_df.alias("order_header"),
        F.col("order_header.CustomerID") == F.col("customer.CustomerID"),
        "inner",
    )
    .join(
        business_entity_address_df.alias("business_entity_address"),
        F.col("store.BusinessEntityID")
        == F.col("business_entity_address.BusinessEntityID"),
        "inner",
    )
    .join(
        address_df.alias("address"),
        F.col("business_entity_address.AddressID") == F.col("address.AddressID"),
        "inner",
    )
    .join(
        DimAddress.alias("dim_address"),
        (F.col("dim_address.PostalCode") == F.col("address.PostalCode"))
        & (F.col("dim_address.City") == F.col("address.City"))
        & (F.col("dim_address.StateProvinceCode") == F.col("address.StateProvinceID")),
        "inner",
    )
    .join(
        ha_df.alias("ha_df"),
        F.col("store.BusinessEntityID") == F.col("ha_df.BusinessEntityID"),
        "left_outer",
    )
    .filter(
        (F.col("customer.PersonID").isNotNull())
        & (F.col("customer.StoreID").isNotNull())
        & (F.col("order_header.Status") == 5)
    )
    .groupBy(
        F.col("dim_address.AddressKey"),
        F.col("customer.AccountNumber"),
        F.col("store.Name"),
        F.col("ha_df.NumberEmployees"),
        F.col("ha_df.BusinessType"),
        F.col("ha_df.Specialty"),
        F.col("ha_df.YearOpened"),
    )
    .agg(
        F.min(F.year(F.col("order_header.OrderDate"))).alias("FirstOrderYear"),
        F.max(F.year(F.col("order_header.OrderDate"))).alias("LastOrderYear"),
        F.month(F.max(F.col("order_header.OrderDate"))).alias("LastOrderMonth"),
    )
    .withColumn(
        "ResellerKey",
        F.row_number().over(Window.orderBy(F.col("customer.AccountNumber"))),
    )
)

DimReseller = DimReseller.select(
    "ResellerKey",
    "AddressKey",
    F.col("customer.AccountNumber").alias("ResellerAlternateKey"),
    F.col("store.Name").alias("ResellerName"),
    "NumberEmployees",
    "LastOrderMonth",
    "FirstOrderYear",
    "LastOrderYear",
    "BusinessType",
    F.col("ha_df.Specialty").alias("ProductLine"),
    "YearOpened",
)

DimReseller.printSchema()
DimReseller.show()
DimReseller.count()

root
 |-- ResellerKey: integer (nullable = false)
 |-- AddressKey: integer (nullable = false)
 |-- ResellerAlternateKey: string (nullable = true)
 |-- ResellerName: string (nullable = true)
 |-- NumberEmployees: integer (nullable = true)
 |-- LastOrderMonth: integer (nullable = true)
 |-- FirstOrderYear: integer (nullable = true)
 |-- LastOrderYear: integer (nullable = true)
 |-- BusinessType: string (nullable = true)
 |-- ProductLine: string (nullable = true)
 |-- YearOpened: integer (nullable = true)

+-----------+----------+--------------------+--------------------+---------------+--------------+--------------+-------------+------------+-----------+----------+
|ResellerKey|AddressKey|ResellerAlternateKey|        ResellerName|NumberEmployees|LastOrderMonth|FirstOrderYear|LastOrderYear|BusinessType|ProductLine|YearOpened|
+-----------+----------+--------------------+--------------------+---------------+--------------+--------------+-------------+------------+-----------+----------+
| 

635

### DimDate


In [11]:
# Define schema for the DataFrame
schema = StructType(
    [
        StructField("DateKey", IntegerType(), False),
        StructField("DateValue", DateType(), False),
        StructField("DayOfWeek", StringType(), False),
        StructField("DayOfMonth", ByteType(), False),
        StructField("WeekOfYear", ByteType(), False),
        StructField("MonthName", StringType(), False),
        StructField("MonthOfYear", ByteType(), False),
        StructField("Quarter", ByteType(), False),
        StructField("Year", ShortType(), False),
    ]
)

# Create an empty DataFrame with the defined schema
DimDate = my_spark.createDataFrame([], schema)

# Generate dates between start date and end date
start_date = "2008-01-01"
end_date = "2014-12-31"
dates = my_spark.sql(
    "SELECT sequence(to_date('{}'), to_date('{}'), interval 1 day) as date".format(
        start_date, end_date
    )
).withColumn("DateValue", explode(col("date")))

# Add other columns to the DataFrame
DimDate = (
    dates.withColumn("DateKey", expr("date_format(DateValue, 'yyyyMMdd')").cast("int"))
    .withColumn("DayOfWeek", date_format(col("DateValue"), "EEEE"))
    .withColumn("DayOfMonth", dayofmonth(col("DateValue")))
    .withColumn("WeekOfYear", weekofyear(col("DateValue")))
    .withColumn("MonthName", date_format(col("DateValue"), "MMMM"))
    .withColumn("MonthOfYear", month(col("DateValue")))
    .withColumn("Quarter", quarter(col("DateValue")))
    .withColumn("Year", year(col("DateValue")))
)
DimDate = DimDate.drop("date")
DimDate = DimDate.select(
    "DateKey",
    "DateValue",
    "DayOfWeek",
    "DayOfMonth",
    "WeekOfYear",
    "MonthName",
    "MonthOfYear",
    "Quarter",
    "Year",
)
# Show the DataFrame
DimDate.printSchema()
DimDate.show()

root
 |-- DateKey: integer (nullable = true)
 |-- DateValue: date (nullable = false)
 |-- DayOfWeek: string (nullable = false)
 |-- DayOfMonth: integer (nullable = false)
 |-- WeekOfYear: integer (nullable = false)
 |-- MonthName: string (nullable = false)
 |-- MonthOfYear: integer (nullable = false)
 |-- Quarter: integer (nullable = false)
 |-- Year: integer (nullable = false)

+--------+----------+---------+----------+----------+---------+-----------+-------+----+
| DateKey| DateValue|DayOfWeek|DayOfMonth|WeekOfYear|MonthName|MonthOfYear|Quarter|Year|
+--------+----------+---------+----------+----------+---------+-----------+-------+----+
|20080101|2008-01-01|  Tuesday|         1|         1|  January|          1|      1|2008|
|20080102|2008-01-02|Wednesday|         2|         1|  January|          1|      1|2008|
|20080103|2008-01-03| Thursday|         3|         1|  January|          1|      1|2008|
|20080104|2008-01-04|   Friday|         4|         1|  January|          1|      1|2

### FactResellerSales


In [12]:
order_detail_df = order_detail_df.withColumn(
    "OrderQty",
    when(col("OrderQty") < 0, col("OrderQty") * (-1)).otherwise(col("OrderQty")),
).withColumn(
    "UnitPrice",
    when(col("UnitPrice") < 0, col("UnitPrice") * (-1)).otherwise(col("UnitPrice")),
)

FactResellerSales = (
    order_header_df.alias("soh")
    .join(
        order_detail_df.alias("sod"),
        F.col("sod.SalesOrderID") == F.col("soh.SalesOrderID"),
    )
    .join(product_df.alias("p"), F.col("sod.ProductID") == F.col("p.ProductID"))
    .join(
        DimProduct.alias("dp"),
        F.col("dp.ProductAlternateKey") == F.col("p.ProductNumber"),
    )
    .join(customer_df.alias("c"), F.col("c.CustomerID") == F.col("soh.CustomerID"))
    .join(
        DimDiscount.alias("dd"), F.col("dd.DiscountKey") == F.col("sod.SpecialOfferID")
    )
    .join(
        DimSalesTerritory.alias("dst"),
        F.col("dst.SalesTerritoryKey") == F.col("soh.TerritoryID"),
    )
    .join(
        DimReseller.alias("dre"),
        F.col("dre.ResellerAlternateKey") == F.col("c.AccountNumber"),
    )
    .join(
        employee_df.alias("e"),
        F.col("soh.SalesPersonID") == F.col("e.BusinessEntityID"),
    )
    .join(
        DimEmployee.alias("de"),
        F.col("de.EmployeeNumber") == F.col("e.NationalIDNumber"),
    )
    .join(
        DimDate.alias("order_dt"),
        F.expr("order_dt.DateValue = CAST(soh.OrderDate AS DATE)"),
    )
    .join(
        DimDate.alias("due_dt"),
        F.expr("due_dt.DateValue = CAST(soh.DueDate AS DATE)"),
    )
    .join(
        DimDate.alias("ship_dt"),
        F.expr("ship_dt.DateValue = CAST(soh.ShipDate AS DATE)"),
    )
    .select(
        F.col("dp.ProductKey").alias("ProductKey"),
        F.col("order_dt.DateKey").alias("OrderDateKey"),
        F.col("due_dt.DateKey").alias("DueDateKey"),
        F.col("ship_dt.DateKey").alias("ShipDateKey"),
        F.col("dre.ResellerKey").alias("ResellerKey"),
        F.col("de.EmployeeKey").alias("EmployeeKey"),
        F.col("dd.DiscountKey").alias("DiscountKey"),
        F.col("dst.SalesTerritoryKey").alias("SalesTerritoryKey"),
        F.col("soh.SalesOrderNumber").alias("SalesOrderNumber"),
        (
            F.col("sod.SalesOrderDetailID")
            - F.min("sod.SalesOrderDetailID").over(
                Window.partitionBy("sod.SalesOrderID")
            )
            + 1
        ).alias("SalesOrderLineNumber"),
        F.col("sod.OrderQty").alias("OrderQuantity"),
        F.col("sod.UnitPrice").alias("UnitPrice"),
        F.round(F.col("sod.OrderQty") * F.col("sod.UnitPrice"), 4).alias(
            "ExtendedAmount"
        ),
        F.col("sod.UnitPriceDiscount").alias("UnitPriceDiscountPct"),
        F.round(
            F.col("sod.OrderQty")
            * F.col("sod.UnitPrice")
            * F.col("sod.UnitPriceDiscount"),
            4,
        ).alias("DiscountAmount"),
        F.round(
            F.col("sod.OrderQty")
            * F.col("sod.UnitPrice")
            * (1 - F.col("sod.UnitPriceDiscount")),
            4,
        ).alias("SalesAmount"),
        F.col("soh.OrderDate").alias("OrderDate"),
        F.col("soh.DueDate").alias("DueDate"),
        F.col("soh.ShipDate").alias("ShipDate"),
    )
    .filter(
        (F.col("soh.Status") == 5)
        & (F.col("sod.OrderQty") != 0)
        & (F.col("sod.UnitPrice") != 0)
    )
    .orderBy("OrderDateKey", "SalesOrderNumber")
)

# Show the result
FactResellerSales.printSchema()
FactResellerSales.show(5)

root
 |-- ProductKey: integer (nullable = true)
 |-- OrderDateKey: integer (nullable = true)
 |-- DueDateKey: integer (nullable = true)
 |-- ShipDateKey: integer (nullable = true)
 |-- ResellerKey: integer (nullable = false)
 |-- EmployeeKey: integer (nullable = true)
 |-- DiscountKey: integer (nullable = true)
 |-- SalesTerritoryKey: integer (nullable = true)
 |-- SalesOrderNumber: string (nullable = true)
 |-- SalesOrderLineNumber: integer (nullable = true)
 |-- OrderQuantity: integer (nullable = true)
 |-- UnitPrice: decimal(21,4) (nullable = true)
 |-- ExtendedAmount: decimal(33,4) (nullable = true)
 |-- UnitPriceDiscountPct: decimal(19,4) (nullable = true)
 |-- DiscountAmount: decimal(37,4) (nullable = true)
 |-- SalesAmount: decimal(37,4) (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- DueDate: timestamp (nullable = true)
 |-- ShipDate: timestamp (nullable = true)

+----------+------------+----------+-----------+-----------+-----------+-----------+-------------

# FactClassifiedReseller


In [13]:
windowSpec = Window.partitionBy("ResellerKey").orderBy("Year", "Month")

temp_df = FactResellerSales.alias("frs").select(
    col("frs.ResellerKey"),
    col("frs.OrderDateKey"),
    col("frs.OrderDate"),
    col("frs.SalesAmount"),
)

all_combinations_df = (
    DimReseller.alias("dre")
    .crossJoin(DimDate.alias("dd"))
    .select(
        col("dre.ResellerKey"),
        col("dd.DateKey").alias("OrderDateKey"),
        col("dre.NumberEmployees"),
        col("dd.MonthOfYear"),
        col("dd.Year"),
    )
)

final_result_df = (
    all_combinations_df.join(
        temp_df,
        [
            all_combinations_df.ResellerKey == temp_df.ResellerKey,
            all_combinations_df.OrderDateKey == temp_df.OrderDateKey,
        ],
        "left_outer",
    )
    .select(
        all_combinations_df.ResellerKey,
        all_combinations_df.NumberEmployees,
        all_combinations_df.Year,
        all_combinations_df.MonthOfYear.alias("Month"),
        F.coalesce(temp_df.SalesAmount, F.lit(0)).alias("SalesAmount"),
    )
    .filter(all_combinations_df.Year > 2010)
)

result_df = (
    final_result_df.alias("frd")
    .groupBy(
        col("frd.ResellerKey"),
        col("frd.NumberEmployees"),
        col("frd.Year"),
        col("frd.Month"),
    )
    .agg(sum(col("frd.SalesAmount")).alias("TotalSalesAmount"))
    .orderBy("ResellerKey", "Year", "Month")
)

# Calculate the sum of sales for 1 month ago
result_df = result_df.withColumn(
    "SalesLastMonth", lag("TotalSalesAmount").over(windowSpec)
)

# Calculate the sum of sales for 2 months ago
result_df = result_df.withColumn(
    "SalesOf2MonthsAgo", lag("TotalSalesAmount", 2).over(windowSpec)
)
result_df = result_df.na.fill(0)
result_df = result_df.filter(result_df.TotalSalesAmount != 0)
result_df = result_df.withColumn(
    "TotalSalesAmount", col("TotalSalesAmount").cast("double")
)
result_df = result_df.withColumn("SalesLastMonth", col("SalesLastMonth").cast("double"))
result_df = result_df.withColumn(
    "SalesOf2MonthsAgo", col("SalesOf2MonthsAgo").cast("double")
)

result_df = result_df.withColumn(
    "Label",
    when(
        col("NumberEmployees") >= 20,
        when(
            (
                (col("TotalSalesAmount") >= 5231)
                | (col("SalesLastMonth") > 4000)
                | (col("SalesOf2MonthsAgo") > 2020)
            ),
            when(((col("Month") >= 8) | (col("Month") < 2)), lit("VIP")).otherwise(
                when(
                    ((col("Month") >= 2) & (col("Month") < 8)), lit("NORMAL")
                ).otherwise(lit("NORMAL"))
            ),
        ).otherwise(lit("NORMAL")),
    ).otherwise(
        when(
            (
                (col("TotalSalesAmount") >= 2422.5)
                | (col("SalesLastMonth") > 2500)
                | (col("SalesOf2MonthsAgo") > 0)
            ),
            when(((col("Month") >= 8) | (col("Month") < 2)), lit("VIP")).otherwise(
                when(((col("Month") >= 2) & (col("Month") < 8)), lit("VIP")).otherwise(lit("NORMAL"))
            ),
        ).otherwise(lit("NORMAL"))
    ),
)

result_df.printSchema()
result_df.show()
# output_csv_path = "./result.csv"
# result_df.write.csv(output_csv_path, header=True, mode="overwrite")

root
 |-- ResellerKey: integer (nullable = false)
 |-- NumberEmployees: integer (nullable = true)
 |-- Year: integer (nullable = false)
 |-- Month: integer (nullable = false)
 |-- TotalSalesAmount: double (nullable = true)
 |-- SalesLastMonth: double (nullable = true)
 |-- SalesOf2MonthsAgo: double (nullable = true)
 |-- Label: string (nullable = false)

+-----------+---------------+----+-----+----------------+--------------+-----------------+------+
|ResellerKey|NumberEmployees|Year|Month|TotalSalesAmount|SalesLastMonth|SalesOf2MonthsAgo| Label|
+-----------+---------------+----+-----+----------------+--------------+-----------------+------+
|          1|             13|2011|    8|        4049.988|           0.0|              0.0|   VIP|
|          1|             13|2012|    1|        4079.988|           0.0|              0.0|   VIP|
|          1|             13|2012|    4|       1104.9968|           0.0|              0.0|NORMAL|
|          1|             13|2012|    7|      27429.529

# Load


In [25]:
def get_dw_tbl_names(spark, db):
    try:
        table_names = []
        dfs = load_jdbc(spark, db)
        data_collect = dfs.collect()
        for row in data_collect:
            table_names.append(row["full_table_name"])
        return table_names
    except Exception as e:
        print("Data extract error: " + str(e))
        return []

def load_to_dw(spark, db):
    try:
        tbl_names = get_dw_tbl_names(spark, db)
        # tbl_names = tbl_names[0 : len(tbl_names) - 1]

        dataframes_list = [
            DimProductCategory,
            DimProductSubcategory,
            DimProduct,
            DimDate,
            DimDiscount,
            DimSalesTerritory,
            DimAddress,
            DimReseller,
            DimEmployee,
            FactResellerSales,
            result_df,
        ]

        dataframe_table_mapping = dict(zip(dataframes_list, tbl_names))

        for dataframe, table_name in dataframe_table_mapping.items():
            # Save the DataFrame to the corresponding table
            dataframe.write.mode("append").format("jdbc").option(
                "url",
                f"jdbc:sqlserver://localhost:1433;databaseName={db};integratedSecurity=true;encrypt=true;trustServerCertificate=true;authenticationScheme=nativeAuthentication",
            ).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option(
                "user", "INSPIRON7620\dpl2k"
            ).option(
                "password", ""
            ).option(
                "dbtable", table_name
            ).save()

            print(f"Data loaded successfully to {table_name}")
    except Exception as e:
        print("Data load error: " + str(e))


load_to_dw(my_spark, "CompanyXDW")

Data loaded successfully to dbo.DimProductCategory
Data loaded successfully to dbo.DimProductSubcategory
Data loaded successfully to dbo.DimProduct
Data loaded successfully to dbo.DimDate
Data loaded successfully to dbo.DimDiscount
Data loaded successfully to dbo.DimSalesTerritory
Data loaded successfully to dbo.DimAddress
Data loaded successfully to dbo.DimReseller
Data loaded successfully to dbo.DimEmployee
Data loaded successfully to dbo.FactResellerSales
Data loaded successfully to dbo.FactClassifiedReseller
