In [0]:
customer_path="/mnt/businesscase/bronze/CustomerParquet"
sales_order_header_path = "/mnt/businesscase/bronze/SalesOrderHeaderParquet"
sales_order_detail_path = "/mnt/businesscase/bronze/SalesOrderDetailParquet"


customer_df = spark.read.parquet(customer_path)
sales_order_header_df = spark.read.parquet(sales_order_header_path)
sales_order_detail_df = spark.read.parquet(sales_order_detail_path)

# Perform a join operation to add 'CustomerID' to sales_order_detail_df
sales_order_detail_df = sales_order_detail_df.join(sales_order_header_df.select('SalesOrderID', 'CustomerID'), 'SalesOrderID', 'inner')


# Select only 'SalesOrderID' and 'ShipToAddressID' columns from sales_order_header_df
ship_to_address_df = sales_order_header_df.select("SalesOrderID", "ShipToAddressID")

# Join sales_order_detail_df and ship_to_address_df on 'SalesOrderID'
# This helps add the 'ShipToAddressID' column to sales_order_detail_df
sales_order_detail_df = sales_order_detail_df.join(ship_to_address_df, "SalesOrderID", "left")

#display(sales_order_detail_with_ship_to_address)
# Show the resulting DataFrame with the added 'CustomerID' column
#display(sales_order_detail_df)
#sales_order_detail_df.printSchema()

In [0]:
from pyspark.sql.functions import regexp_replace, col
from pyspark.sql.types import DoubleType

product_path = "/mnt/businesscase/bronze/ProductParquet"

# Read the Parquet  table
product_df = spark.read.parquet(product_path)


# Cast 'StandardCost' and 'ListPrice' to DoubleType and format them correctly
product_df = product_df.withColumn("StandardCost", regexp_replace(col("StandardCost"), ",", ".").cast(DoubleType()))
product_df = product_df.withColumn("ListPrice", regexp_replace(col("ListPrice"), ",", ".").cast(DoubleType()))

# Assuming 'UnitPriceDiscount' contains values like '0,40'
sales_order_detail_df = sales_order_detail_df.withColumn("UnitPriceDiscount", regexp_replace(col("UnitPriceDiscount"), ",", ".").cast(DoubleType()))

# Save the DataFrame as a Parquet file in the "silver" folder
sales_order_detail_path = "/mnt/businesscase/silver/SalesOrderDetailParquet"
sales_order_detail_df.coalesce(1).write.parquet(sales_order_detail_path, mode="overwrite")



In [0]:

address_path = "/mnt/businesscase/bronze/AddressParquet"
customer_address_path = "/mnt/businesscase/bronze/CustomerAddressParquet"

address_df = spark.read.parquet(address_path)
customer_address_df = spark.read.parquet(customer_address_path)

# Check for duplicate values using the 'AddressID' column
duplicate_address_ids = customer_address_df.groupBy("AddressID").count().filter(col("count") > 1)

# Show the duplicate 'AddressID' values
#duplicate_address_ids.show()



In [0]:
# Join address_df with customer_address_df on 'AddressID'
# This helps add the 'CustomerID' column to address_df
address_with_customer_id = address_df.join(customer_address_df, "AddressID", "left")

# Select only the 'AddressID' and 'CustomerID' columns
address_df = address_with_customer_id.select("AddressID", "CustomerID","City","StateProvince","CountryRegion")

product_path = "/mnt/businesscase/silver/AddressParquet"
address_df.coalesce(1).write.parquet(product_path, mode="overwrite")

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import from_json

productcategory_path = "/mnt/businesscase/bronze/ProductCategoryParquet"
productsubcategory_path = "/mnt/businesscase/bronze/ProductSubCategoryParquet"

productcategory_df = spark.read.parquet(productcategory_path)
productsubcategory_df = spark.read.parquet(productsubcategory_path)

productcategory_df = productcategory_df.withColumnRenamed("ProductCategoryID", "ParentProductCategoryID")
productcategory_df = productcategory_df.withColumnRenamed("Name", "ParentCategoryName")

productcategory_df = productsubcategory_df.join(productcategory_df, "ParentProductCategoryID", "left")

product_df = product_df.join(productcategory_df, "ProductCategoryID", "left")

product_path = "/mnt/businesscase/silver/ProductParquet"
product_df.coalesce(1).write.parquet(product_path, mode="overwrite")


In [0]:
product_path = "/mnt/businesscase/silver/CustomerParquet"
customer_df.coalesce(1).write.parquet(product_path, mode="overwrite")