#Enterprise Fleet Analytics Pipeline: Focuses on the business outcome (analytics) and the domain (fleet/logistics).

![logistics](logistics_project.png)

Download the data from the below gdrive and upload into the catalog
https://drive.google.com/drive/folders/1J3AVJIPLP7CzT15yJIpSiWXshu1iLXKn?usp=drive_link

##**1. Data Munging** -

####1. Visibily/Manually opening the file and capture couple of data patterns (Manual Exploratory Data Analysis)

In [0]:
rawdf1 = spark.read.csv("/Volumes/logistics_project/default/logistics/logistics_source1") 
rawdf2 = spark.read.csv("/Volumes/logistics_project/default/logistics/logistics_source2")
rawdf1.show(2)
rawdf2.show(2)

####2. Programatically try to find couple of data patterns applying below EDA (File: logistics_source1)
1. Apply inferSchema and toDF to create a DF and analyse the actual data.
2. Analyse the schema, datatypes, columns etc.,
3. Analyse the duplicate records count and summary of the dataframe.

In [0]:
#1.Apply inferSchema and toDF to create a DF and analyse the actual data.
rawdf1 = spark.read.csv("/Volumes/logistics_project/default/logistics/logistics_source1", inferSchema=True, header=True)
rawdf2 = spark.read.csv("/Volumes/logistics_project/default/logistics/logistics_source2", inferSchema=True, header=True)

df1 = rawdf1.toDF("shipment_id", "first_name", "last_name", "age", "role")
df1.show(2)
df2 = rawdf2.toDF("shipment_id", "first_name", "last_name", "age", "role", "hub_location", "vehicle_type")
df2.show(2)

#2.Analyse the schema, datatypes, columns etc.,
df1.printSchema()

#3.Analyse the duplicate records count and summary of the dataframe.
total_count = df1.count()
distinct_count = df1.distinct().count()

print("Total records    :", total_count)
print("Distinct records :", distinct_count)
print("Duplicate records:", total_count - distinct_count)
df1.summary().show()

###a. Passive Data Munging -  (File: logistics_source1  and logistics_source2)
Without modifying the data, identify:<br>
Shipment IDs that appear in both master_v1 and master_v2<br>
Records where:<br>
1. shipment_id is non-numeric
2. age is not an integer<br>

Count rows having:
3. fewer columns than expected
4. more columns than expected

In [0]:
#Shipment IDs appearing in both master_v1 and master_v2
from pyspark.sql.functions import col
common_shipments = (df1.select(col("shipment_id").cast("string")).intersect(df2.select(col("shipment_id").cast("string"))))
#display(common_shipments)

#1.shipment_id is non-numeric
non_numeric_shipment_df1 = df1.filter(col("shipment_id").rlike("^[0-9]+$"))
non_numeric_shipment_df2 = df2.filter(col("shipment_id").rlike("^[0-9]+$"))
#display(non_numeric_shipment_df1)
#display(non_numeric_shipment_df2)

#2.age is not an integer
non_integer_age_df1 = df1.filter(col("age").rlike("^[0-9]+$"))
non_integer_age_df2 = df2.filter(col("age").rlike("^[0-9]+$"))
display(non_integer_age_df1)
display(non_integer_age_df2)

#3.fewer columns than expected 
#4. more columns than expected

print(df1.columns)
print(df2.columns)

Expected_cols_df1 =5
Expected_cols_df2 =7

from pyspark.sql.functions import size, split,lit
# Read raw text to analyze column structure
raw1_text = spark.read.text("/Volumes/logistics_project/default/logistics/logistics_source1")
raw2_text = spark.read.text("/Volumes/logistics_project/default/logistics/logistics_source2")

raw1_cols = raw1_text.withColumn("col_count", size(split(col("value"), ",")))
raw2_cols = raw2_text.withColumn("col_count", size(split(col("value"), ",")))

# Counts
fewer_cols_df1 = raw1_cols.filter(col("col_count") < Expected_cols_df1).count()
more_cols_df1  = raw1_cols.filter(col("col_count") > Expected_cols_df1).count()

fewer_cols_df2 = raw2_cols.filter(col("col_count") < Expected_cols_df2).count()
more_cols_df2  = raw2_cols.filter(col("col_count") > Expected_cols_df2).count()

print("DF1 → fewer columns:", fewer_cols_df1, "| more columns:", more_cols_df1)
print("DF2 → fewer columns:", fewer_cols_df2, "| more columns:", more_cols_df2)

In [0]:
#Create a Spark Session Object

###**b. Active Data Munging** File: logistics_source1 and logistics_source2

#####1.Combining Data + Schema Merging (Structuring)
1. Read both files without enforcing schema
2. Align them into a single canonical schema: shipment_id,
first_name,
last_name,
age,
role,
hub_location,
vehicle_type,
data_source
3. Add data_source column with values as: system1, system2 in the respective dataframes

In [0]:
#1.Read both files without enforcing schema
rawdf1= spark.read.csv("/Volumes/logistics_project/default/logistics/logistics_source1",header=True,inferSchema=False)
rawdf2= spark.read.csv("/Volumes/logistics_project/default/logistics/logistics_source2",header=True,inferSchema=False)
rawdf1.show()
rawdf2.show()

#2.Align them into a single canonical schema: shipment_id, first_name, last_name, age, role, hub_location, vehicle_type, data_source

struct1 = "shipment_id string, first_name string, last_name string, age string, role string"
struct2 = "shipment_id string, first_name string, last_name string, age string, role string, hub_location string, vehicle_type string"
rawdf1= spark.read.schema(struct1).csv("/Volumes/logistics_project/default/logistics/logistics_source1",header=True,inferSchema=False)
rawdf2= spark.read.schema(struct2).csv("/Volumes/logistics_project/default/logistics/logistics_source2",header=True,inferSchema=False)
rawdf1.show()
rawdf2.show()

#3.Add data_source column with values as: system1, system2 in the respective dataframes
rawdf1.withColumn("data_source",lit("system1")).show()
rawdf2.withColumn("data_source",lit("system2")).show()

rawdf_merged=rawdf1.unionByName(rawdf2,allowMissingColumns=True)
display(rawdf_merged)



#####2. Cleansing, Scrubbing: 
Cleansing (removal of unwanted datasets)<br>
1. Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role<br>
2. Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name<br>
3. Join Readiness Rule - Drop records where the join key is null: shipment_id<br>

Scrubbing (convert raw to tidy)<br>
4. Age Defaulting Rule - Fill NULL values in the age column with: -1<br>
5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN<br>
6. Invalid Age Replacement - Replace the following values in age:
"ten" to -1
"" to -1<br>
7. Vehicle Type Normalization - Replace inconsistent vehicle types: 
truck to LMV
bike to TwoWheeler

In [0]:
#Cleansing (removal of unwanted datasets)

#1.Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role
cleanseddf1=rawdf1.na.drop(how="any",subset=["shipment_id","role"])
cleanseddf2=rawdf2.na.drop(how="any",subset=["shipment_id","role"])
display(cleanseddf1)
display(cleanseddf2)

#2.Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name
cleanseddf3=cleanseddf1.na.drop(how="any",subset=["first_name","last_name"])
cleanseddf4=cleanseddf2.na.drop(how="any",subset=["first_name","last_name"])
display(cleanseddf3)
display(cleanseddf4)

#3.Join Readiness Rule - Drop records where the join key is null: shipment_id
joineddf=rawdf_merged.na.drop(how="any",subset=["shipment_id"])
display(joineddf)

#Scrubbing (convert raw to tidy)

#4. Age Defaulting Rule - Fill NULL values in the age column with: -1
scrubbeddf1 = joineddf.na.fill('-1',subset=["age"])
display(scrubbeddf1)


#5.Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN
scrubbeddf2 = scrubbeddf1.na.fill('UNKNOWN',subset=["vehicle_type"])
display(scrubbeddf2) 


#6.Invalid Age Replacement - Replace the following values in age: "ten" to -1 "" to -1
scrubbeddf3 = scrubbeddf2.replace("ten", "-1", "age")
display(scrubbeddf3)



#7.Vehicle Type Normalization - Replace inconsistent vehicle types: truck to LMV bike to TwoWheeler
fill_replace_value = {'Truck':'LMV','Bike':'TwoWheeler'}
Normalized_df = scrubbeddf3.replace(fill_replace_value,"vehicle_type")
display(Normalized_df)



####3. Standardization, De-Duplication and Replacement / Deletion of Data to make it in a usable format

Creating shipments Details data Dataframe creation <br>
1. Create a DF by Reading Data from logistics_shipment_detail.json
2. As this data is a clean json data, it doesn't require any cleansing or scrubbing.

In [0]:
#1.Create a DF by Reading Data from logistics_shipment_detail.json

shipment_df = spark.read.option("multiline", "true").format("json").load("/Volumes/logistics_project/default/logistics/logistics_shipment_detail_3000.json")
display(shipment_df)

Standardizations:<br>

1. Add a column<br> 
Source File: DF of logistics_shipment_detail_3000.json<br>: domain as 'Logistics',  current timestamp 'ingestion_timestamp' and 'False' as 'is_expedited'
2. Column Uniformity: 
role - Convert to lowercase<br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
vehicle_type - Convert values to UPPERCASE<br>
Source Files: DF of logistics_shipment_detail_3000.json
hub_location - Convert values to initcap case<br>
Source Files: DF of merged(logistics_source1 & logistics_source2)<br>
3. Format Standardization:<br>
Source Files: DF of logistics_shipment_detail_3000.json<br>
Convert shipment_date to yyyy-MM-dd<br>
Ensure shipment_cost has 2 decimal precision<br>
4. Data Type Standardization<br>
Standardizing column data types to fix schema drift and enable mathematical operations.<br>
Source File: DF of merged(logistics_source1 & logistics_source2) <br>
age: Cast String to Integer<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
shipment_weight_kg: Cast to Double<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
is_expedited: Cast to Boolean<br>
5. Naming Standardization <br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
Rename: first_name to staff_first_name<br>
Rename: last_name to staff_last_name<br>
Rename: hub_location to origin_hub_city<br>
6. Reordering columns logically in a better standard format:<br>
Source File: DF of Data from all 3 files<br>
shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)

In [0]:
# Define shipment_df before using it
from pyspark.sql.functions import initcap,lower,upper,to_date,col,current_timestamp
shipment_df = spark.read.option("multiline", "true").format("json").load("/Volumes/logistics_project/default/logistics/logistics_shipment_detail_3000.json")

#1.Add a column Source File: DF of logistics_shipment_detail_3000.json : domain as 'Logistics', current timestamp 'ingestion_timestamp' and 'False' as 'is_expedited'
std_df = shipment_df.withColumn("domain",lit("Logistics")).withColumn("ingestion_timestamp",current_timestamp()).withColumn("is_expedited",lit("False"))
#display(std_df)

#2.Column Uniformity: role - Convert to lowercase
#Source File: DF of merged(logistics_source1 & logistics_source2)
#vehicle_type - Convert values to UPPERCASE
#Source Files: DF of logistics_shipment_detail_3000.json hub_location - Convert values to initcap case
#Source Files: DF of merged(logistics_source1 & logistics_source2)


std_csv_df = joineddf.withColumn("role",lower("role")).withColumn("hub_location",initcap("hub_location"))
std_json_df = std_df.withColumn("vehicle_type",upper("vehicle_type"))
#display(std_csv_df)
#display(std_json_df)

#3.Format Standardization:
#Source Files: DF of logistics_shipment_detail_3000.json
#Convert shipment_date to yyyy-MM-dd
#Ensure shipment_cost has 2 decimal precision

from pyspark.sql.functions import to_date, col, round

std_format_json_df = std_json_df.withColumn("shipment_date",to_date(col("shipment_date"), "dd-MM-yy")).withColumn("shipment_cost",round(col("shipment_cost"), 2))
#display(std_format_json_df)

#4.Data Type Standardization
#Standardizing column data types to fix schema drift and enable mathematical operations.
#Source File: DF of merged(logistics_source1 & logistics_source2)
#age: Cast String to Integer
#Source File: DF of logistics_shipment_detail_3000.json
#shipment_weight_kg: Cast to Double
#Source File: DF of logistics_shipment_detail_3000.json
#is_expedited: Cast to Boolean
std_csv_df.printSchema()
from pyspark.sql.functions import expr
std_csv_df = std_csv_df.withColumn("age",expr("try_cast(age as int)"))
std_csv_df.printSchema()

std_format_json_df = std_format_json_df.withColumn("shipment_weight_kg",col("shipment_weight_kg").cast("double")).withColumn("is_expedited",col("is_expedited").cast("boolean"))
std_format_json_df.printSchema()

#5.Naming Standardization
#Source File: DF of merged(logistics_source1 & logistics_source2)
#Rename: first_name to staff_first_name
#Rename: last_name to staff_last_name
#Rename: hub_location to origin_hub_city

std_csv_df = Normalized_df.withColumnRenamed("first_name","staff_first_name").withColumnRenamed("last_name","staff_last_name").withColumnRenamed("hub_location","origin_hub_city")
#display(std_csv_df)

#6.Reordering columns logically in a better standard format:
#Source File: DF of Data from all 3 files
#shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), #shipment_cost (Metric), ingestion_timestamp (Audit)


#std_format_json_df = std_format_json_df.na.replace("shipment_id", ["id"]).withColumn("shipment_id", col("id").cast("bigint"))
std_csv_df =std_csv_df.select("shipment_id","staff_first_name","staff_last_name","role","origin_hub_city","age","vehicle_type")
#std_format_json_df =std_format_json_df.select("shipment_cost","ingestion_timestamp")
std_csv_df.show(2)
std_format_json_df.show(2)










Deduplication:
1. Apply Record Level De-Duplication
2. Apply Column Level De-Duplication (Primary Key Enforcement)

In [0]:
#1.Apply Record Level De-Duplication

from pyspark.sql.functions import col, countDistinct, count, avg, sum, max, min, lit
std_csv_df.printSchema()
std_csv_df = std_csv_df.coalesce(1).dropDuplicates()

std_format_json_df = std_format_json_df.coalesce(1).dropDuplicates()
std_csv_df.show(2)
std_format_json_df.show(2)

#2.Apply Column Level De-Duplication (Primary Key Enforcement)

std_csv_df = std_csv_df.dropDuplicates(['shipment_id'])
std_format_json_df = std_format_json_df.dropDuplicates(['shipment_id']) 
std_csv_df.show(2)
std_format_json_df.show(2)


##2. Data Enrichment - Detailing of data
Makes your data rich and detailed <br>

###### Adding of Columns (Data Enrichment)
*Creating new derived attributes to enhance traceability and analytical capability.*

**1. Add Audit Timestamp (`load_dt`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** We need to track exactly when this record was ingested into our Data Lakehouse for auditing purposes.
* **Action:** Add a column `load_dt` using the function `current_timestamp()`.

**2. Create Full Name (`full_name`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The reporting dashboard requires a single field for the driver's name instead of separate columns.
* **Action:** Create `full_name` by concatenating `first_name` and `last_name` with a space separator.
* **Result:** "Rajesh" + " " + "Kumar" -> **"Rajesh Kumar"**

**3. Define Route Segment (`route_segment`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
* **Action:** Combine `source_city` and `destination_city` with a hyphen.
* **Result:** "Chennai" + "-" + "Pune" -> **"Chennai-Pune"**

**4. Generate Vehicle Identifier (`vehicle_identifier`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** We need a unique tracking code that immediately tells us the vehicle type and the shipment ID.
* **Action:** Combine `vehicle_type` and `shipment_id` to create a composite key.
* **Result:** "Truck" + "_" + "500001" -> **"Truck_500001"**

In [0]:
#1. Add Audit Timestamp (load_dt) Source File: DF of logistics_source1 and logistics_source2

#Scenario: We need to track exactly when this record was ingested into our Data Lakehouse for auditing purposes.
#Action: Add a column load_dt using the function current_timestamp().

from pyspark.sql.functions import current_timestamp, concat

std_csv_df = std_csv_df.withColumn("load_dt", current_timestamp())  
std_csv_df.show(2)

#2.Create Full Name (full_name) Source File: DF of logistics_source1 and logistics_source2
#Scenario: The reporting dashboard requires a single field for the driver's name instead of separate columns.
#Action: Create full_name by concatenating first_name and last_name with a space separator.
#Result: "Rajesh" + " " + "Kumar" -> "Rajesh Kumar"

std_csv_df = std_csv_df .withColumn("full_name", concat(col("staff_first_name"),lit(" "),col("staff_last_name")))
std_csv_df.show(10)
std_csv_df.printSchema()

#3.Define Route Segment (route_segment) Source File: DF of logistics_shipment_detail_3000.json

#Scenario: The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
##Action: Combine source_city and destination_city with a hyphen.
#Result: "Chennai" + "-" + "Pune" -> "Chennai-Pune"

std_format_json_df = std_format_json_df.withColumn("route_segment",concat(col("source_city"),lit("-"),col("destination_city")))
std_format_json_df.show(10)

#4. Generate Vehicle Identifier (vehicle_identifier) Source File: DF of logistics_shipment_detail_3000.json
#Action: Combine vehicle_type and shipment_id to create a composite key.
#Result: "Truck" + "_" + "500001" -> "Truck_500001"

std_format_json_df = std_format_json_df.withColumn("vehicle_identifier",concat(col("vehicle_type"),lit("_"),col("shipment_id")))
std_format_json_df.show(10)



###### Deriving of Columns (Time Intelligence)
*Extracting temporal features from dates to enable period-based analysis and reporting.*<br>
Source File: logistics_shipment_detail_3000.json<br>
**1. Derive Shipment Year (`shipment_year`)**
* **Scenario:** Management needs an annual performance report to compare growth year-over-year.
* **Action:** Extract the year component from `shipment_date`.
* **Result:** "2024-04-23" -> **2024**

**2. Derive Shipment Month (`shipment_month`)**
* **Scenario:** Analysts want to identify seasonal peaks (e.g., increased volume in December).
* **Action:** Extract the month component from `shipment_date`.
* **Result:** "2024-04-23" -> **4** (April)

**3. Flag Weekend Operations (`is_weekend`)**
* **Scenario:** The Operations team needs to track shipments handled during weekends to calculate overtime pay or analyze non-business day capacity.
* **Action:** Flag as **'True'** if the `shipment_date` falls on a Saturday or Sunday.

**4. Flag shipment status (`is_expedited`)**
* **Scenario:** The Operations team needs to track shipments is IN_TRANSIT or DELIVERED.
* **Action:** Flag as **'True'** if the `shipment_status` IN_TRANSIT or DELIVERED.

In [0]:
#1. 1. Derive Shipment Year (shipment_year)
#Scenario: Management needs an annual performance report to compare growth year-over-year.
#Action: Extract the year component from shipment_date.
#Result: "2024-04-23" -> 2024
from pyspark.sql.functions import *
der_df = std_format_json_df.select("*", substring(col("shipment_date"),1,4).cast("int").alias("shipment_year"))
#display(der_df)

#2.Derive Shipment Month (shipment_month)
#Scenario: Analysts want to identify seasonal peaks (e.g., increased volume in December).
#Action: Extract the month component from shipment_date.
#Result: "2024-04-23" -> 4 (April)

#der_df = der_df.select("*",substring(col("shipment_date"),6,2).cast("int").alias("shipment_month"))
der_df = der_df.withColumn("shipment_month", month("shipment_date"))
#display(der_df)

#3.Flag Weekend Operations (is_weekend)
#Scenario: The Operations team needs to track shipments handled during weekends to calculate overtime pay or analyze non-business day capacity.
#Action: Flag as 'True' if the shipment_date falls on a Saturday or Sunday.

der_df = der_df.withColumn("is_weekend", when(dayofweek(col("shipment_date")).isin(1,7), True).otherwise(False))
#display(der_df)

#4.lag shipment status (is_expedited)
#Scenario: The Operations team needs to track shipments is IN_TRANSIT or DELIVERED.
#Action: Flag as 'True' if the shipment_status IN_TRANSIT or DELIVERED. 

der_df = der_df.withColumn("is_expedited",when(col("shipment_status").isin("IN_TRANSIT","DELIVERED"),True).otherwise(False))
display(der_df)


###### Enrichment/Business Logics (Calculated Fields)
*Deriving new metrics and financial indicators using mathematical and date-based operations.*<br>
Source File: logistics_shipment_detail_3000.json<br>

**1. Calculate Unit Cost (`cost_per_kg`)**
* **Scenario:** The Finance team wants to analyze the efficiency of shipments by determining the cost incurred per unit of weight.
* **Action:** Divide `shipment_cost` by `shipment_weight_kg`.
* **Logic:** `shipment_cost / shipment_weight_kg`

**2. Track Shipment Age (`days_since_shipment`)**
* **Scenario:** The Operations team needs to monitor how long it has been since a shipment was dispatched to identify potential delays.
* **Action:** Calculate the difference in days between the `current_date` and the `shipment_date`.
* **Logic:** `datediff(current_date(), shipment_date)`

**3. Compute Tax Liability (`tax_amount`)**
* **Scenario:** For invoicing and compliance, we must calculate the Goods and Services Tax (GST) applicable to each shipment.
* **Action:** Calculate 18% GST on the total `shipment_cost`.
* **Logic:** `shipment_cost * 0.18`

In [0]:
#1. Calculate Unit Cost (cost_per_kg)

#enr_df=der_df.withColumn("cost_per_kg",col("shipment_cost")/col("shipment_weight_kg"))
enr_df = der_df.withColumn( "cost_per_kg",when(col("shipment_weight_kg") > 0,round(col("shipment_cost") / col("shipment_weight_kg"), 2)
).otherwise(None))
#display(enr_df)

#2.Track Shipment Age (days_since_shipment)
#enr_df = enr_df.withColumn("days_since_shipment",datediff(col("ingestion_timestamp"),col("shipment_date")))
enr_df = enr_df.withColumn("days_since_shipment",datediff(current_date(),col("shipment_date")))
#display(enr_df)


#3.Compute Tax Liability (tax_amount)
enr_df = enr_df.withColumn("tax_amount",round(col("shipment_cost")*0.18,2))
#display(enr_df)
display(enr_df)


In [0]:
rawdf2= spark.read.csv("/Volumes/logistics_project/default/logistics/logistics_source2",header=True,inferSchema=False)
rawdf2 = rawdf1.unionByName(rawdf2, allowMissingColumns=True).where("shipment_id != 'ten'")
#display(rawdf2)
#display(mask_df)
rawdf3 = mask_df.withColumnRenamed("vehicle_type", "lvehicle_type").where("shipment_id != 'ten'")
#display(rawdf3)
flatdf = rawdf3.join(std_json_df,how = "left", on = "shipment_id")
#display(flatdf)
grpdf = flatdf.groupBy("origin_hub_city","lvehicle_type").agg(sum("shipment_cost").alias("Overallshipment_cost")).orderBy("origin_hub_city","lvehicle_type")
display(grpdf)

#Scenario: The CFO wants a subtotal report at multiple levels:
#Total Cost by Hub.
totdf = flatdf.rollup("origin_hub_city").agg(sum("shipment_cost")).orderBy("origin_hub_city",ascending = [False]).alias("rollup_cost")
#display(totdf)
#Total Cost by Hub AND Vehicle Type.
#totdf = flatdf.rollup("origin_hub_city","lvehicle_type").agg(sum("shipment_cost").alias("rollup_cost")).orderBy("origin_hub_city","rollup_cost", ascending = [False,False]).alias("rollup_cost")
#display(totdf)
#Grand Total.
#totdf = flatdf.agg(sum("shipment_cost"))
#display(totdf)
#Action: Use cube("origin_hub_city", "lvehicle_type") or rollup() to generate all these subtotals in a single query.
#cbdf = flatdf.cube("origin_hub_city","lvehicle_type").agg(sum("shipment_cost").alias("rollup_cost")).orderBy("origin_hub_city","rollup_cost",ascending = [False,False]).alias("rollup_cost")
#display(cbdf)


###### Remove/Eliminate (drop, select, selectExpr)
*Excluding unnecessary or redundant columns to optimize storage and privacy.*<br>
Source File: DF of logistics_source1 and logistics_source2<br>

**1. Remove Redundant Name Columns**
* **Scenario:** Since we have already created the `full_name` column in the Enrichment step, the individual name columns are now redundant and clutter the dataset.
* **Action:** Drop the `first_name` and `last_name` columns.
* **Logic:** `df.drop("first_name", "last_name")`

In [0]:

#Remove Redundant Name Columns
std_csv_df.printSchema()
csv_df =std_csv_df.drop("staff_first_name","staff_last_name")
csv_df = csv_df.select("shipment_id","full_name","role","age","vehicle_type","origin_hub_city","load_dt")
display(csv_df)

##### Splitting & Merging/Melting of Columns
*Reshaping columns to extract hidden values or combine fields for better analysis.*<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
**1. Splitting (Extraction)**
*Breaking one column into multiple to isolate key information.*
* **Split Order Code:**
  * **Action:** Split `order_id` ("ORD100000") into two new columns:
    * `order_prefix` ("ORD")
    * `order_sequence` ("100000")
* **Split Date:**
  * **Action:** Split `shipment_date` into three separate columns for partitioning:
    * `ship_year` (2024)
    * `ship_month` (4)
    * `ship_day` (23)

**2. Merging (Concatenation)**
*Combining multiple columns into a single unique identifier or description.*
* **Create Route ID:**
  * **Action:** Merge `source_city` ("Chennai") and `destination_city` ("Pune") to create a descriptive route key:
    * `route_lane` ("Chennai->Pune")

In [0]:
#1.Split Order Code:
#Action: Split order_id ("ORD100000") into two new columns:
#order_prefix ("ORD")
#order_sequence ("100000")

split_df = enr_df \
    .withColumn("order_prefix", regexp_extract(col("order_id"), "([A-Za-z]+)", 1)) \
    .withColumn("order_sequence", regexp_extract(col("order_id"), "(\\d+)", 1))
#display(split_df)

#2.Split shipment_date into three separate columns for partitioning:

split_df = split_df.withColumn("ship_year", substring(col("shipment_date"), 1, 4)).withColumn("ship_month", substring(col("shipment_date"), 6, 2)).withColumn("ship_day", substring(col("shipment_date"), 9, 2))
#display(split_df)


#2.Merging
# Action: Merge source_city ("Chennai") and destination_city ("Pune") to create a descriptive route key

merge_df = split_df.withColumn("route_lane",concat(col("source_city"),lit("->"),col("destination_city")))
display(merge_df)


## 3. Data Customization & Processing - Application of Tailored Business Specific Rules

### **UDF1: Complex Incentive Calculation**
**Scenario:** The Logistics Head wants to calculate a "Performance Bonus" for drivers based on tenure and role complexity.

**Action:** Create a Python function `calculate_bonus(role, age)` and register it as a Spark UDF.

**Logic:**
* **IF** `Role` == 'Driver' **AND** `Age` > 50:
  * `Bonus` = 15% of Salary (Reward for Seniority)
* **IF** `Role` == 'Driver' **AND** `Age` < 30:
  * `Bonus` = 5% of Salary (Encouragement for Juniors)
* **ELSE**:
  * `Bonus` = 0

**Result:** A new derived column `projected_bonus` is generated for every row in the dataset.

---

### **UDF2: PII Masking (Privacy Compliance)**
**Scenario:** For the analytics dashboard, we must hide the full identity of the staff to comply with privacy laws (GDPR/DPDP), while keeping names recognizable for internal managers.

**Business Rule:** Show the first 2 letters, mask the middle characters with `****`, and show the last letter.

**Action:** Create a UDF `mask_identity(name)`.

**Example:**
* **Input:** `"Rajesh"`
* **Output:** `"Ra****h"`
<br>
**Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.**

In [0]:
#1.UDF1: Complex Incentive Calculation

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

# UDF Logic
def calculate_bonus(role, age):
    if role == 'Driver' and age is not None and age > 50:
        return 15
    elif role == 'Driver' and age is not None and age < 30:
        return 5
    else:
        return 0

# Register UDF with return type
calculate_bonus_udf = udf(calculate_bonus, IntegerType())

# Apply UDF (DO NOT cast age to string)
bonus_df = csv_df.withColumn("projected_bonus",calculate_bonus_udf(col("role"), col("age").cast("int")))
display(bonus_df)





In [0]:
#DF2: PII Masking (Privacy Compliance)
#Create a UDF mask_identity(name)
#Input: "Rajesh"
#Output: "Ra****h"
#**Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.**

def mask_identity(name):
    if name is None or len(name) <=3:
        return name
    else:
        return name[:2]+ "*****" + name[-1]
    
masked_udf = udf(mask_identity)
#mask_df = rawdf_merged.withColumn("mask_name",masked_udf(col("first_name")))
mask_df = bonus_df.withColumn("mask_last_name",masked_udf(col("full_name")))
display(mask_df)






## 4. Data Core Curation & Processing (Pre-Wrangling)
*Applying business logic to focus, filter, and summarize data before final analysis.*

**1. Select (Projection)**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The Driver App team only needs location data, not sensitive HR info.
* **Action:** Select only `first_name`, `role`, and `hub_location`.

**2. Filter (Selection)**<br>
Source File: DF of json<br>
* **Scenario:** We need a report on active operational problems.
* **Action:** Filter rows where `shipment_status` is **'DELAYED'** or **'RETURNED'**.
* **Scenario:** Insurance audit for senior staff.
* **Action:** Filter rows where `age > 50`.

**3. Derive Flags & Columns (Business Logic)**<br>
Source File: DF of json<br>
* **Scenario:** Identify high-value shipments for security tracking.
* **Action:** Create flag `is_high_value` = **True** if `shipment_cost > 50,000`.
* **Scenario:** Flag weekend operations for overtime calculation.
* **Action:** Create flag `is_weekend` = **True** if day is Saturday or Sunday.

**4. Format (Standardization)**<br>
Source File: DF of json<br>
* **Scenario:** Finance requires readable currency formats.
* **Action:** Format `shipment_cost` to string like **"₹30,695.80"**.
* **Scenario:** Standardize city names for reporting.
* **Action:** Format `source_city` to Uppercase (e.g., "chennai" → **"CHENNAI"**).

**5. Group & Aggregate (Summarization)**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** Regional staffing analysis.
* **Action:** Group by `hub_location` and **Count** the number of staff.
* **Scenario:** Fleet capacity analysis.
* **Action:** Group by `vehicle_type` and **Sum** the `shipment_weight_kg`.

**6. Sorting (Ordering)**<br>
Source File: DF of json<br>
* **Scenario:** Prioritize the most expensive shipments.
* **Action:** Sort by `shipment_cost` in **Descending** order.
* **Scenario:** Organize daily dispatch schedule.
* **Action:** Sort by `shipment_date` (Ascending).

**7. Limit (Top-N Analysis)**<br>
Source File: DF of json<br>
* **Scenario:** Dashboard snapshot of critical delays.
* **Action:** Filter for 'DELAYED', Sort by Cost, and **Limit to top 10** rows.

In [0]:
#1.Select (Projection) - Select only first_name, role, and hub_location.
select_df = rawdf_merged.select("first_name","role","hub_location")
#display(select_df)

#2.Filter (Selection)
 #Filter rows where shipment_status is 'DELAYED' or 'RETURNED'.

merge_df = merge_df.filter((col("shipment_status") =="DELAYED" )| (col("shipment_status") =="RETURNED"))
#display(merge_df)

# Filter rows where age > 50

filter_df =csv_df.filter(col("age") > 50)
#display(filter_df)

#3.3. Derive Flags & Columns (Business Logic)
#Source File: DF of json
#Scenario: Identify high-value shipments for security tracking.
#Action: Create flag is_high_value = True if shipment_cost > 50,000.
#Scenario: Flag weekend operations for overtime calculation.
#Action: Create flag is_weekend = True if day is Saturday or Sunday.

flag_df = merge_df.withColumn("is_high_value",when(col("shipment_cost") > 40000, lit(True)).otherwise(lit(False))).withColumn("is_weekend",when(dayofweek(col("shipment_date")).isin([1,7]), lit(True)).otherwise(lit(False)))
#display(flag_df)


#4.Format (Standardization)
# Format shipment_cost to string like "₹30,695.80"

format_df=merge_df.withColumn("shipment_cost",concat(lit("₹"),format_number(col("shipment_cost"),2)))
#display(format_df)

#Format source_city to Uppercase (e.g., "chennai" → "CHENNAI").
format_df = format_df.withColumn("source_city",upper(col("source_city")))
#display(format_df)

#5 Group & Aggregate (Summarization)

#Group by hub_location and Count the number of staff.
group_df =mask_df.groupBy("origin_hub_city").agg(count("shipment_id").alias("staff_count"))
#display(group_df)

#Group by vehicle_type and Sum the shipment_weight_kg.

grp_json_df = format_df.groupBy("vehicle_type").agg(round(sum("shipment_weight_kg"),2))
#display(grp_json_df)


#6. Sorting (Ordering)

# Sort by shipment_cost in Descending order.
sort_df = flag_df.orderBy(col("shipment_cost").desc())
#display(sort_df)
#Scenario: Organize daily dispatch schedule.
#Action: Sort by shipment_date (Ascending).

sort_df= sort_df.orderBy(col("shipment_date").asc())
#display(sort_df)

#7.Limit (Top-N Analysis)
#Scenario: Dashboard snapshot of critical delays.
#Action: Filter for 'DELAYED', Sort by Cost, and Limit to top 10 rows.

limit_df =flag_df.filter(col("shipment_status")=="DELAYED").orderBy(col("shipment_cost").desc())
display(limit_df.limit(10))


## 5. Data Wrangling - Transformation & Analytics
*Combining, modeling, and analyzing data to answer complex business questions.*

### **1. Joins**
Source Files:<br>
Left Side (staff_df):<br> DF of logistics_source1 & logistics_source2<br>
Right Side (shipments_df):<br> DF of logistics_shipment_detail_3000.json<br>
#### **1.1 Frequently Used Simple Joins (Inner, Left)**
* **Inner Join (Performance Analysis):**
  * **Scenario:** We only want to analyze *completed work*. Connect Staff to the Shipments they handled.
  * **Action:** Join `staff_df` and `shipments_df` on `shipment_id`.
  * **Result:** Returns only rows where a staff member is assigned to a valid shipment.
* **Left Join (Idle Resource check):**
  * **Scenario:** Find out which staff members are currently *idle* (not assigned to any shipment).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right) on `shipment_id`. Filter where `shipments_df.shipment_id` is NULL.

#### **1.2 Infrequent Simple Joins (Self, Right, Full, Cartesian)**
* **Self Join (Peer Finding):**
  * **Scenario:** Find all pairs of employees working in the same `hub_location`.
  * **Action:** Join `staff_df` to itself on `hub_location`, filtering where `staff_id_A != staff_id_B`.
* **Right Join (Orphan Data Check):**
  * **Scenario:** Identify shipments in the system that have *no valid driver* assigned (Data Integrity Issue).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right). Focus on NULLs on the left side.
* **Full Outer Join (Reconciliation):**
  * **Scenario:** A complete audit to find *both* idle drivers AND unassigned shipments in one view.
  * **Action:** Perform a Full Outer Join on `shipment_id`.
* **Cartesian/Cross Join (Capacity Planning):**
  * **Scenario:** Generate a schedule of *every possible* driver assignment to *every* pending shipment to run an optimization algorithm.
  * **Action:** Cross Join `drivers_df` and `pending_shipments_df`.

#### **1.3 Advanced Joins (Semi and Anti)**
* **Left Semi Join (Existence Check):**
  * **Scenario:** "Show me the details of Drivers who have *at least one* shipment." (Standard filtering).
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_semi")`.
  * **Benefit:** Performance optimization; it stops scanning the right table once a match is found.
* **Left Anti Join (Negation Check):**
  * **Scenario:** "Show me the details of Drivers who have *never* touched a shipment."
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_anti")`.

### **2. Lookup**<br>
Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF) and Master_City_List.csv<br>
* **Scenario:** Validation. Check if the `hub_location` in the staff file exists in the dataframe of corporate `Master_City_List.csv`.
* **Action:** Compare values against this Master_City_List list.

### **3. Lookup & Enrichment**<br>
Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF) and Master_City_List.csv dataframe<br>
* **Scenario:** Geo-Tagging.
* **Action:** Lookup `hub_location` (eg. "Pune") in a Master Latitude/Longitude Master_City_List.csv dataframe and enrich our logistics_source (merged dataframe) by adding `lat` and `long` columns for map plotting.

### **4. Schema Modeling (Denormalization)**<br>
Source Files: DF of All 3 Files (logistics_source1, logistics_source2, logistics_shipment_detail_3000.json)<br>
* **Scenario:** Creating a "Gold Layer" Table for PowerBI/Tableau.
* **Action:** Flatten the Star Schema. Join `Staff`, `Shipments`, and `Vehicle_Master` into one wide table (`wide_shipment_history`) so analysts don't have to perform joins during reporting.

### **5. Windowing (Ranking & Trends)**<br>
Source Files:<br>
DF of logistics_source2: Provides hub_location (Partition Key).<br>
logistics_shipment_detail_3000.json: Provides shipment_cost (Ordering Key)<br>
* **Scenario:** "Who are the Top 3 Drivers by Cost in *each* Hub?"
* **Action:**
  1. Partition by `hub_location`.
  2. Order by `total_shipment_cost` Descending.
  3. Apply `dense_rank()` and `row_number()
  4. Filter where `rank or row_number <= 3`.

### **6. Analytical Functions (Lead/Lag)**<br>
Source File: <br>
DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** Idle Time Analysis.
* **Action:** For each driver, calculate the days elapsed since their *previous* shipment.

### **7. Set Operations**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Union:** Combining `Source1` (Legacy) and `Source2` (Modern) into one dataset (Already done in Active Munging).
* **Intersect:** Identifying Staff IDs that appear in *both* Source 1 and Source 2 (Duplicate/Migration Check).
* **Except (Difference):** Identifying Staff IDs present in Source 2 but *missing* from Source 1 (New Hires).

### **8. Grouping & Aggregations (Advanced)**<br>
Source Files:<br>
DF of logistics_source2: Provides hub_location and vehicle_type (Grouping Dimensions).<br>
DF of logistics_shipment_detail_3000.json: Provides shipment_cost (Aggregation Metric).<br>
* **Scenario:** The CFO wants a subtotal report at multiple levels:
  1. Total Cost by Hub.
  2. Total Cost by Hub AND Vehicle Type.
  3. Grand Total.
* **Action:** Use `cube("hub_location", "vehicle_type")` or `rollup()` to generate all these subtotals in a single query.

In [0]:
rawdf_merged = rawdf_merged.filter(col("shipment_id").rlike("^[0-9]+$"))
#display(rawdf_merged)
#1. Joins
staff_df = rawdf_merged
shipments_df = std_json_df

#1.1 Frequently Used Simple Joins (Inner, Left)
#Inner Join (Performance Analysis):
##Scenario: We only want to analyze completed work. Connect Staff to the Shipments they handled.
#Action: Join staff_df and shipments_df on shipment_id.
#Result: Returns only rows where a staff member is assigned to a valid shipment.

inner_join_df = staff_df.join(shipments_df, how="inner", on="shipment_id")
#display(inner_join_df)

#Left Join (Idle Resource check):
#Scenario: Find out which staff members are currently idle (not assigned to any shipment).
#Action: Join staff_df (Left) with shipments_df (Right) on shipment_id. Filter where shipments_df.shipment_id is NULL.

left_join_df = staff_df.join(shipments_df, how="left", on="shipment_id").filter(shipments_df.shipment_id.isNull())
#display(left_join_df)


#1.2 Infrequent Simple Joins (Self, Right, Full, Cartesian)
#Self Join (Peer Finding):
#Scenario: Find all pairs of employees working in the same hub_location.
#Action: Join staff_df to itself on hub_location, filtering where staff_id_A != staff_id_B.

a = staff_df.withColumnRenamed("shipment_id", "shipment_id_A")
b = staff_df.withColumnRenamed("shipment_id", "shipment_id_B")
c = a.join(b, how = "inner", on = "hub_location").filter(a["shipment_id_A"] != b["shipment_id_B"])
#display(c)

#Right Join (Orphan Data Check):
#Scenario: Identify shipments in the system that have no valid driver assigned (Data Integrity Issue).
#Action: Join staff_df (Left) with shipments_df (Right). Focus on NULLs on the left side.

staff_df = staff_df.withColumn("shipment_ky",col("shipment_id"))
rgtdf = staff_df.join(shipment_df,how = "right", on = "shipment_id").filter(col("shipment_ky").isNull())
#display(rgtdf)

#Full Outer Join (Reconciliation):
#Scenario: A complete audit to find both idle drivers AND unassigned shipments in one view.
#Action: Perform a Full Outer Join on shipment_id.

full_df = staff_df.join(shipment_df,how ="full",on="shipment_id")
#display(full_df)

#1.3 Advanced Joins (Semi, Anti)


#Cartesian/Cross Join (Capacity Planning):
#Scenario: Generate a schedule of every possible driver assignment to every pending shipment to run an optimization algorithm.
#Action: Cross Join drivers_df and pending_shipments_df.

cart_df = staff_df.join(shipment_df)
#display(cart_df)



In [0]:
#1.3 Advanced Joins (Semi and Anti)
#Left Semi Join (Existence Check):
#Scenario: "Show me the details of Drivers who have at least one shipment." (Standard filtering).
#Action: staff_df.join(shipments_df, "shipment_id", "left_semi").
#Benefit: Performance optimization; it stops scanning the right table once a match is found.
semi_df = staff_df.join(shipment_df,how ="left_semi",on="shipment_id")
#display(semi_df)


#Left Anti Join (Negation Check):
#Scenario: "Show me the details of Drivers who have never touched a shipment."
#Action: staff_df.join(shipments_df, "shipment_id", "left_anti").

anti_df = staff_df.join(shipment_df,how ="left_anti",on="shipment_id")
display(anti_df)

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

geo_data = [
    ("AbuDhabi", 24.4539, 54.3773),
    ("Ahmedabad", 23.0225, 72.5714),
    ("Amritsar", 31.6340, 74.8723),
    ("Bangalore", 12.9716, 77.5946),
    ("Birmingham", 52.4862, -1.8904),
    ("Boston", 42.3601, -71.0589),
    ("California", 36.7783, -119.4179),
    ("Chennai", 13.0827, 80.2707),
    ("Chicago", 41.8781, -87.6298),
    ("Coimbatore", 11.0168, 76.9558),
    ("Delhi", 28.7041, 77.1025),
    ("Dubai", 25.2048, 55.2708),
    ("HongKong", 22.3193, 114.1694),
    ("Hyderabad", 17.3850, 78.4867),
    ("Indore", 22.7196, 75.8577),
    ("Jaipur", 26.9124, 75.7873),
    ("Kochi", 9.9312, 76.2673),
    ("London", 51.5074, -0.1278),
    ("Lucknow", 26.8467, 80.9462),
    ("MexicoCity", 19.4326, -99.1332),
    ("Mumbai", 19.0760, 72.8777),
    ("NewYork", 40.7128, -74.0060),
    ("Pune", 18.5204, 73.8567),
    ("Scranton", 41.4089, -75.6624),
    ("Singapore", 1.3521, 103.8198),
    ("Texas", 31.9686, -99.9018),
    ("Tokyo", 35.6762, 139.6503)
]

geodf = spark.createDataFrame(geo_data, ["hub_location", "latitude", "longitude"])
#display(geodf)

#2. Lookup
#Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF)
#Scenario: Validation. Check if the hub_location in the staff file exists in the corporate Master_City_List.
#Action: Compare values against a reference list.
Master_City_List = rawdf2.select("hub_location").distinct()
#display(Master_City_List)
lookupdf = Master_City_List.join(rawdf_merged, on = "hub_location", how = "semi")
#display(lookupdf)

#3. Lookup & Enrichment
#Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF)
#Scenario: Geo-Tagging.
#Action: Lookup hub_location ("Pune") in a Master Latitude/Longitude table and enrich the dataset by adding lat and long columns for map plotting.
lookupdf = rawdf_merged.join(geodf, on = "hub_location", how = "inner").filter("hub_location = 'Pune'")
#display(lookupdf)

#4. Schema Modeling (Denormalization)
#Source Files: DF of All 3 Files (logistics_source1, logistics_source2, logistics_shipment_detail_3000.json)
#Scenario: Creating a "Gold Layer" Table for PowerBI/Tableau.
#Action: Flatten the Star Schema. Join Staff, Shipments, and Vehicle_Master into one wide table (wide_shipment_history) so analysts don't have to perform joins during reporting.

fltdf = rawdf_merged.join(std_json_df, on = "shipment_id", how = "full").filter("role = 'Driver'")
display(fltdf)


In [0]:
from pyspark.sql.functions import dense_rank,desc,row_number,lead,datediff,to_date,lag
from pyspark.sql.window import Window
#5. Windowing (Ranking & Trends)
#Source Files:
#DF of logistics_source2: Provides hub_location (Partition Key).
#logistics_shipment_detail_3000.json: Provides shipment_cost (Ordering Key)
#Scenario: "Who are the Top 3 Drivers by Cost in each Hub?"
#Action:Partition by hub_location.
# Order by total_shipment_cost Descending.
#Apply dense_rank() and `row_number()
#Filter where rank or row_number <= 3.

winddf = fltdf.filter("role = 'Driver' and hub_location is not null").withColumn("rn",row_number().over(Window.partitionBy("hub_location").orderBy(desc("shipment_cost")))).\
    filter("rn <= 3")

#display(winddf)


#6. Analytical Functions (Lead/Lag)
#Source File:
#DF of logistics_shipment_detail_3000.json
#Scenario: Idle Time Analysis.
#Action: For each driver, calculate the days elapsed since their previous shipment.
leadlagdf = fltdf.filter("role = 'Driver'").withColumn("days_since_last_shipment", lag("shipment_date", 1).over(Window.partitionBy("role").orderBy(desc("shipment_date")))).withColumn("shipment_date",to_date("shipment_date","dd-MM-yy")).withColumn("days_since_last_shipment",to_date("days_since_last_shipment","dd-MM-yy")).withColumn("days_elapsed",datediff("shipment_date","days_since_last_shipment"))
display(leadlagdf)
     

In [0]:
#7. Set Operations
#Source Files: DF of logistics_source1 and logistics_source2
#Union: Combining Source1 (Legacy) and Source2 (Modern) into one dataset (Already done in Active Munging).
#Intersect: Identifying Staff IDs that appear in both Source 1 and Source 2 (Duplicate/Migration Check).
#Except (Difference): Identifying Staff IDs present in Source 2 but missing from Source 1 (New Hires).
union_df = rawdf1.unionByName(rawdf2, allowMissingColumns=True)
display(union_df)
intersect_df = rawdf1.intersect(rawdf2)
#display(intersectdf)
exceptdf = rawdf1.exceptAll(rawdf2).dropDuplicates()
#display(exceptdf)  


In [0]:
#8. Grouping & Aggregations (Advanced)
#Source Files:
#DF of logistics_source2: Provides hub_location and vehicle_type (Grouping Dimensions).
#DF of logistics_shipment_detail_3000.json: Provides shipment_cost (Aggregation Metric).
rawdf2= spark.read.csv("/Volumes/logistics_project/default/logistics/logistics_source2",header=True,inferSchema=False)
rawdf2 = rawdf1.unionByName(rawdf2, allowMissingColumns=True).where("shipment_id != 'ten'")
#display(rawdf2)
#display(mask_df)
rawdf3 = mask_df.withColumnRenamed("vehicle_type", "lvehicle_type").where("shipment_id != 'ten'")
#display(rawdf3)
flatdf = rawdf3.join(std_json_df,how = "left", on = "shipment_id")
flatdf = flatdf.withColumnRenamed("origin_hub_city", "hub_location")
#display(flatdf)
from pyspark.sql.functions import sum

grpdf = (
    flatdf
    .groupBy("hub_location", "lvehicle_type")
    .agg(sum("shipment_cost").alias("overall_shipment_cost"))
    .orderBy("hub_location", "lvehicle_type")
)

#display(grpdf)




#Scenario: The CFO wants a subtotal report at multiple levels:
#Total Cost by Hub.
totdf = flatdf.rollup("hub_location").agg(sum("shipment_cost")).orderBy("hub_location",ascending = [False]).alias("rollup_cost")
display(totdf)
#Total Cost by Hub AND Vehicle Type.
totdf = flatdf.rollup("hub_location","lvehicle_type").agg(sum("shipment_cost").alias("rollup_cost")).orderBy("hub_location","rollup_cost", ascending = [False,False]).alias("rollup_cost")
display(totdf)
#Grand Total.
totdf = flatdf.agg(sum("shipment_cost"))
display(totdf)
#Action: Use cube("hub_location", "vehicle_type") or rollup() to generate all these subtotals in a single query.
cube_df = flatdf.cube("hub_location","lvehicle_type").agg(sum("shipment_cost").alias("rollup_cost")).orderBy("hub_location","rollup_cost",ascending = [False,False]).alias("rollup_cost")
display(cube_df)


##6. Data Persistance (LOAD)-> Data Publishing & Consumption<br>

Store the inner joined, lookup and enrichment, Schema Modeling, windowing, analytical functions, set operations, grouping and aggregation data into the delta tables.

In [0]:

flatdf.write.json("/Volumes/logistics_project/default/logistics/logistics_US_Json/",mode = "overwrite")
mask_df.write.csv("/Volumes/logistics_project/default/logistics/logistics_US_CSV/",mode = "overwrite")
cube_df.write.parquet("/Volumes/logistics_project/default/logistics/logistics_US_Parquet/",mode = "overwrite")
grpdf.write.format("delta").save("/Volumes/logistics_project/default/logistics/logistics_US_delta/",mode = "overwrite")
mask_df.write.mode("overwrite").saveAsTable("logistics_US")

##7.Take the copy of the above notebook and try to write the equivalent SQL for which ever applicable.