## Purpose of the notebook

1. Retrieve Parquet files from the silver layer.
2. Perform a join operation on both files.
3. Save the resulting dataset to the gold layer and a Lake Database.

In [None]:
from pyspark.sql.functions import col

### Parameters

In [None]:
# Parameters
# abfs[s]://file_system_name@account_name.dfs.core.windows.net/file_path
silver_folder_path = "file_system_name@account_name.dfs.core.windows.net/silver"

### 1. Retrieve Parquet files from the silver layer.

In [None]:
orders_df = spark.read.parquet(f"{silver_folder_path}/orders") 
returns_df = spark.read.parquet(f"{silver_folder_path}/returns") 

In [None]:
# Preview dfs
#orders_df.show(10, truncate=False)
display(orders_df.limit(10))
display(returns_df.limit(10))

In [21]:
# Count rows
orders_count = orders_df.count()
returns_count = returns_df.count()
print(f"orders_df contains: {orders_count} rows")
print(f"returns_df contains: {returns_count} rows")

StatementMeta(sparkPool, 7, 22, Finished, Available)

orders_df contains: 9994 rows
returns_df contains: 296 rows


### 2. Perform a join operation on both files.


In [None]:
# Join orders_df and returns_df
join_df = orders_df.join(returns_df, "order_id", "left")

# Replace null with "no" in returned column
join_df = join_df.na.fill('no', subset=['returned'])
display(join_df.limit(10))

#### Validation after joining

In [24]:
# Validate join_df number of rows is equal to orders_count
join_count_df = join_df.count()
print(f"join_df contains: {join_count_df} rows")
print(f"Number of rows match:", join_count_df == orders_count)

StatementMeta(sparkPool, 7, 25, Finished, Available)

join_df contains: 9994 rows
Number of rows match: True


In [25]:
# Group by returned column and count rows
join_df.groupBy('returned').count().show()

StatementMeta(sparkPool, 7, 26, Finished, Available)

+--------+-----+
|returned|count|
+--------+-----+
|     Yes|  800|
|      no| 9194|
+--------+-----+



In [26]:
# Filter returned status is "Yes"
returned_yes_df = join_df.filter(join_df['returned'] == 'Yes')

# Get distinct order_id
distinct_returned_yes_order_id = returned_yes_df.select('order_id').distinct()

# Number of distinct order_id in distinct_returned_yes_order_id
distinct_returned_yes_order_id_count = distinct_returned_yes_order_id.count()
print("Distinct number of returned order_id after joining:", distinct_returned_yes_order_id_count)

# Number of distinct order_id in returns_df
returns_order_id_count = returns_df.count()
print("Distinct number of returned order_id in returns_df:", returns_order_id_count)

# Compare distinct_returned_yes_order_id_count and returns_order_id_count
print("Distinct number of returned order_id matach:", distinct_returned_yes_order_id_count == returns_order_id_count)

StatementMeta(sparkPool, 7, 27, Finished, Available)

Distinct number of returned order_id after joining: 296
Distinct number of returned order_id in returns_df: 296
Distinct number of returned order_id matach: True


In [27]:
# Check schema before exporting result
print("Schema before casting:")
join_df.printSchema()

join_df = join_df.withColumn('profit', col('profit').cast('double'))
join_df = join_df.withColumn('sales', col('sales').cast('double'))

print("Schema after casting:")
join_df.printSchema()

StatementMeta(sparkPool, 7, 28, Finished, Available)

Schema before casting:
root
 |-- order_id: string (nullable = true)
 |-- row_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- segment: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- region: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- sales: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- discount: string (nullable = true)
 |-- profit: string (nullable = true)
 |-- returned: string (nullable = false)

Schema after casting:
root
 |-- order_id: string (nullable = true)
 |-- row_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- segment: string (nullable = true)
 |-- country: stri

### 3. Save the resulting dataset to the gold layer and a Lake Database.

In [None]:
%%sql
CREATE DATABASE IF NOT EXISTS superstore_spark
LOCATION "abfss://file_system_name@account_name.dfs.core.windows.net/gold"

In [None]:
# Save the result to the gol layer and table
join_df.write.mode("overwrite").format("parquet").saveAsTable("superstore_spark.orders_records")