In [0]:
from pyspark.sql import functions as F


# Create Gold area
# spark.sql("CREATE SCHEMA IF NOT EXISTS globalsuperstore.silver")

# Create Volume
# spark.sql("CREATE VOLUME IF NOT EXISTS globalsuperstore.silver.silver_superstore")


# Bronze paths (your existing)
bronze_base = "dbfs:/Volumes/globalsuperstore/bronze/bronze_superstore"
orders_bronze  = spark.read.format("delta").load("dbfs:/Volumes/globalsuperstore/bronze/bronze_superstore/orders_bronze_cm/")
returns_bronze = spark.read.format("delta").load("dbfs:/Volumes/globalsuperstore/bronze/bronze_superstore/returns_bronze_cm/")

silver_base = "dbfs:/Volumes/globalsuperstore/silver/silver_superstore"


# Load 
orders_bz  = spark.read.format("delta").load("dbfs:/Volumes/globalsuperstore/bronze/bronze_superstore/orders_bronze_cm/")
returns_bz = spark.read.format("delta").load("dbfs:/Volumes/globalsuperstore/bronze/bronze_superstore/returns_bronze_cm/")


## Defining the source schema 
# Defining the folder name for delta files 

silver_base_path = "dbfs:/Volumes/globalsuperstore/silver/silver_superstore"
orders_path  = f"{silver_base}/silver_order"
returns_path = f"{silver_base}/silver_return"
customer_path = f"{silver_base}/silver_customer"
product_path = f"{silver_base}/silver_product"
geography_path = f"{silver_base}/silver_geography"
date_path = f"{silver_base}/silver_date"

orders_bz.limit(2).display()
returns_bz.limit(2).display()

[0;31m---------------------------------------------------------------------------[0m
[0;31m_InactiveRpcError[0m                         Traceback (most recent call last)
File [0;32m/databricks/python/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py:1541[0m, in [0;36mSparkConnectClient._analyze[0;34m(self, method, **kwargs)[0m
[1;32m   1540[0m [38;5;28;01mwith[39;00m attempt:
[0;32m-> 1541[0m     resp [38;5;241m=[39m [38;5;28mself[39m[38;5;241m.[39m_stub[38;5;241m.[39mAnalyzePlan(req, metadata[38;5;241m=[39m[38;5;28mself[39m[38;5;241m.[39mmetadata())
[1;32m   1542[0m     [38;5;28mself[39m[38;5;241m.[39m_verify_response_integrity(resp)

File [0;32m/databricks/python/lib/python3.11/site-packages/grpc/_interceptor.py:277[0m, in [0;36m_UnaryUnaryMultiCallable.__call__[0;34m(self, request, timeout, metadata, credentials, wait_for_ready, compression)[0m
[1;32m    268[0m [38;5;28;01mdef[39;00m [38;5;21m__call__[39m(
[1;32m    269[0

In [0]:


## customer dim
# Transformation

dim_customer = (orders_bz
  .select(
    F.col("Customer ID").alias("Customer_ID"),
    F.col("Customer Name").alias("Customer_Name"),
    "Segment"
  )
  .dropDuplicates()
  .withColumn(
    "customer_sk",
    F.abs(F.xxhash64(F.concat(F.col("Customer_ID"), F.col("Customer_Name"), F.col("Segment")))).cast("long")
  )
)

## Writing the Delta files in silver schema
# Writing order table

(dim_customer.write.format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .option("delta.columnMapping.mode", "name")
  .option("delta.minReaderVersion", "2")
  .option("delta.minWriterVersion", "5")
  .option("maxRetries", "10")  # Increase the retry limit
  .save(customer_path))

In [0]:


## Product dim
# Transformation

dim_product = (orders_bz
                    .select(F.col("Product Name").alias("Product_Name"), F.col("Product ID").alias("Product_ID"), 'Category', 'Sub-Category')
                    .withColumn('product_sk', F.abs(F.xxhash64(F.concat(F.col("Product_Name"),  F.col("Product_ID"), F.col("Category"), F.col("Sub-Category") ) ) ) .cast("long") )
                    .dropDuplicates()
)

## Writing the Delta files in silver schema
# Writing product table

(dim_product.write.format("delta")
  .mode("overwrite")
  .option("overwriteSchema","true")
  .option("delta.columnMapping.mode","name")
  .option("delta.minReaderVersion","2")
  .option("delta.minWriterVersion","5")
  .save(product_path))



In [0]:


  ## Geography dim
# Transformation

dim_geography = (orders_bz.
                        select(
                            "City","State","Country","Region",F.col("Postal Code").alias("Zip_Code")
                            )
                        
                        .withColumn("geography_sk", F.abs(F.xxhash64("City","State","Country","Region","Zip_Code")).cast("long"))
                        .dropDuplicates()
                        

)

# Writing the Delta files in silver schema
# Writing product table

(dim_geography.write.format("delta").
 mode("overwrite").
 option("overwriteSchema","true").
 option("delta.columnMapping.mode","name").
 option("delta.minReaderVersion","2").
 option("delta.minWriterVersion","5").
 save(geography_path))


In [0]:


## Return Dim
# Transformation

dim_returns = (returns_bz
                        .select(F.col("Order ID").alias("Order_ID"),"Market","Returned")
                        .dropDuplicates()
                               )


# Writing the Delta files in silver schema
# Writing return table


(dim_returns.write.format("delta").
 mode("overwrite").
 option("overwriteSchema","true").
 option("delta.columnMapping.mode","name").
 option("delta.minReaderVersion","2").
 option("delta.minWriterVersion","5").
 save(returns_path))



In [0]:

## Order Fact
# Transformation

fact_orders = (orders_bz
               .select(
                    F.col("Row ID").alias("Row_ID"),
                    F.col("Order ID").alias("Order_ID"),
                    F.col("Order Date").alias("Order_Date"),
                    F.col("Ship Date").alias("Ship_Date"),
                    F.col("Ship Mode").alias("Ship_Mode"),
                    "Sales", "Quantity", "Discount", "Profit", 
                    F.col("Shipping Cost").alias("Shipping_Cost"),
                    F.col("Order Priority").alias("Order_Priority"),
                    "City", "State", "Country", "Region", "Product Name", "Product ID", "Category", "Sub-Category", "Customer ID", "Customer Name", "Segment","Postal Code")
                   
               .withColumn("geography_sk", F.abs(F.xxhash64("City","State","Country","Region","Postal Code")).cast("long"))
               .withColumn('product_sk', F.abs(F.xxhash64(F.concat(F.col("Product Name"), F.col("Product ID"), F.col("Category"), F.col("Sub-Category")))).cast("long"))
               .withColumn("customer_sk", F.abs(F.xxhash64(F.concat(F.col("Customer ID"), F.col("Customer Name"), F.col("Segment")))).cast("long"))
               .drop( "City", "State", "Country", "Region", "Product Name", "Product ID", "Category", "Sub-Category", "Customer ID", "Customer Name", "Segment")
                .dropDuplicates()
              )

# Writing the Delta files in silver schema
# Writing order table

(fact_orders.write.format("delta")
  .mode("overwrite")
  .option("overwriteSchema","true")
  .option("delta.columnMapping.mode","name")
  .option("delta.minReaderVersion","2")
  .option("delta.minWriterVersion","5")
  .save(orders_path) 
 )



In [0]:

 # Define the path of silver order
orders_path = "dbfs:/Volumes/globalsuperstore/silver/silver_superstore/silver_order/"

# Read the Delta file
orders_df = spark.read.format("delta").load(orders_path)

# Set variable of minimum and maximum order dates
Date_range = (orders_df
              .select(F.min("Order_Date").alias("start"),
                      F.max("Order_Date").alias("end"))
              .first())

# Extract start and end dates
start = Date_range['start']
end = Date_range['end']

# Create sequence in the column with start date and end date of 1 day interval
date_df = (spark.createDataFrame([(start, end)], ["start","end"])
           .select(F.explode(F.sequence(F.col("start"), F.col("end"), F.expr("interval 1 day"))).alias("date")))

# Define necessary columns
dow_iso = F.date_format(F.col("date"), "E").cast("int")
fy_start_year = F.when(F.month(F.col("date")) >= 4, F.year(F.col("date"))).otherwise(F.year(F.col("date")) - 1)
fiscal_month_number = F.expr("((month(date) + 8) % 12) + 1")
fiscal_quarter = F.expr("((month(date) + 2) div 3) % 4 + 1")

# Create calendar columns on the basis of date column
dim_date = (date_df
  # keys
  .withColumn("date_key", F.date_format("date","yyyyMMdd").cast("int"))

  # calendar
  .withColumn("year", F.year("date"))
  .withColumn("quarter", F.quarter("date"))
  .withColumn("month", F.month("date"))
  .withColumn("day",  F.dayofmonth("date"))
  .withColumn("weekofyear",F.weekofyear("date"))
  .withColumn("day_name",F.date_format("date","EE"))
  .withColumn("month_name", F.date_format("date","MMM"))
  .withColumn("month_name_full", F.date_format("date","MMMM"))
 
)


# Writing the Delta files in silver schema
# Writing order table

(dim_date.write.format("delta")
  .mode("overwrite")
  .option("overwriteSchema","true")
  .option("delta.columnMapping.mode","name")
  .option("delta.minReaderVersion","2")
  .option("delta.minWriterVersion","5")
  .save(date_path) 
 )


In [0]:
# %python
# # Display the first 5 rows of orders_bz
# display(orders_bz.limit(5))

# # Display the first 5 rows of returns_bz
# display(returns_bz.limit(5))

# # Display the first 5 rows of dim_customer
# display(dim_customer.limit(5))

# # Display the first 5 rows of dim_product
# display(dim_product.limit(5))

# # Display the first 5 rows of dim_geography
# display(dim_geography.limit(5))

# # Display the first 5 rows of dim_returns
# display(dim_returns.limit(5))

# # Display the first 5 rows of fact_orders
# display(fact_orders.limit(5))


# display(dim_date.limit(5))


In [0]:
# spark.read.format("delta").load("dbfs:/Volumes/globalsuperstore/silver/silver_superstore/silver_customer/").limit(5).display()
# spark.read.format("delta").load("dbfs:/Volumes/globalsuperstore/silver/silver_superstore/silver_geography/").limit(5).display()
# spark.read.format("delta").load("dbfs:/Volumes/globalsuperstore/silver/silver_superstore/silver_product/").limit(5).display()
# spark.read.format("delta").load("dbfs:/Volumes/globalsuperstore/silver/silver_superstore/silver_return/").limit(5).display()
# spark.read.format("delta").load("dbfs:/Volumes/globalsuperstore/silver/silver_superstore/silver_order/").limit(5).display()
# spark.read.format("delta").load("dbfs:/Volumes/globalsuperstore/silver/silver_superstore/silver_date/").limit(5).display()


In [0]:
# Define the paths
customer_path = "dbfs:/Volumes/globalsuperstore/silver/silver_superstore/silver_customer/"
geography_path = "dbfs:/Volumes/globalsuperstore/silver/silver_superstore/silver_geography/"
product_path = "dbfs:/Volumes/globalsuperstore/silver/silver_superstore/silver_product/"
return_path = "dbfs:/Volumes/globalsuperstore/silver/silver_superstore/silver_return/"
order_path = "dbfs:/Volumes/globalsuperstore/silver/silver_superstore/silver_order/"
date_path = "dbfs:/Volumes/globalsuperstore/silver/silver_superstore/silver_date/"

# Create schema if not exists
# spark.sql("CREATE SCHEMA IF NOT EXISTS globalsuperstore.silver_superstore_views")

# Create permanent views in the specified schema
spark.sql(f"CREATE OR REPLACE VIEW globalsuperstore.silver_superstore_views.silver_customer AS SELECT * FROM delta.`{customer_path}`")
spark.sql(f"CREATE OR REPLACE VIEW globalsuperstore.silver_superstore_views.silver_geography AS SELECT DISTINCT city, state, country, region, geography_sk FROM delta.`{geography_path}`")
spark.sql(f"CREATE OR REPLACE VIEW globalsuperstore.silver_superstore_views.silver_product AS SELECT * FROM delta.`{product_path}`")
spark.sql(f"CREATE OR REPLACE VIEW globalsuperstore.silver_superstore_views.silver_return AS SELECT * FROM delta.`{return_path}`")
spark.sql(f"CREATE OR REPLACE VIEW globalsuperstore.silver_superstore_views.silver_order AS SELECT * FROM delta.`{order_path}`")
spark.sql(f"CREATE OR REPLACE VIEW globalsuperstore.silver_superstore_views.silver_date AS SELECT * FROM delta.`{date_path}`")


DataFrame[]