#Enterprise Fleet Analytics Pipeline: Focuses on the business outcome (analytics) and the domain (fleet/logistics).

![logistics](https://raw.githubusercontent.com/mohamedirfankader/databricks-code-repo/main/4_logistics_usecase/logistics_project.png)

Download the data from the below gdrive and upload into the catalog
https://drive.google.com/drive/folders/1J3AVJIPLP7CzT15yJIpSiWXshu1iLXKn?usp=drive_link

In [0]:
%sql
--create schema if not exists logistics_catalog.landing_zone;
--create volume if not exists logistics_catalog.landing_zone.landing_vol;

In [0]:
from pyspark.sql.session import SparkSession
spark1 = SparkSession.builder.getOrCreate()

In [0]:
base_path = "/Volumes/logistics_catalog/landing_zone/landing_vol/"
loaddata = dbutils.fs.mkdirs(base_path + "Source_data/")
shipdata = dbutils.fs.mkdirs(base_path + "shipment_data/")

##**1. Data Munging** -

####1. Visibily/Manually opening the file and capture couple of data patterns (Manual Exploratory Data Analysis)

#####shipment data:
######1. date field in yy-mm-dd format
######2. status field has special character in string- "IN_TRANSIT"
######3. HAS 3K records

#####source1 & 2:
######1. comma seperated values with <100 records
######2. Duplicate rows & id keys present (6000002)
######3. Null records are present
######4. missing few columns values in multiple records like firstname, lastname, vehicle type, every column except id etc.
######5. integer values are in string like 10 as ten(age and id columns) 


####2. Programatically try to find couple of data patterns applying below EDA (File: logistics_source1)
1. Apply inferSchema and toDF to create a DF and analyse the actual data.
2. Analyse the schema, datatypes, columns etc.,
3. Analyse the duplicate records count and summary of the dataframe.

In [0]:
#1
rawsrc1df = spark.read.csv("/Volumes/logistics_catalog/landing_zone/landing_vol/Source_data/logistics_source1",header=True,inferSchema=True).toDF("shipment_id","first_name","last_name","age","role")
display(rawsrc1df)
# how header = false/true or not mentioning header when using toDf differ from each other.

#2
rawsrc1df.printSchema()
print("schema")
display(rawsrc1df.schema)
display(rawsrc1df.dtypes)
print("columns")
display(rawsrc1df.columns)
print("total record count", rawsrc1df.count())

#3 
print("deduplicate using distinct func",rawsrc1df.distinct().count())
print("deduplicate using dropduplicate func",rawsrc1df.dropDuplicates().count())
print("deduplicate using dropduplicate with id col filter",rawsrc1df.dropDuplicates(["shipment_id"]).count())

print("summary")
display(rawsrc1df.summary())
print("Describe")
print(rawsrc1df.describe())

###a. Passive Data Munging -  (File: logistics_source1  and logistics_source2)
Without modifying the data, identify:<br>
Shipment IDs that appear in both master_v1 and master_v2<br>
Records where:<br>
1. shipment_id is non-numeric
2. age is not an integer<br>

Count rows having:
3. fewer columns than expected
4. more columns than expected

In [0]:
rawsrc2df = spark.read.csv("/Volumes/logistics_catalog/landing_zone/landing_vol/Source_data/logistics_source2",header=True)

#Shipment IDs that appear in both master_v1 and master_v2, Records where:
s12joindf = rawsrc1df.alias("l").join(rawsrc2df.alias("r"),how="inner",on=(["first_name","last_name"]))
display(s12joindf)
#1.shipment_id is non-numeric
src1idnndf1 = s12joindf.select("l.shipment_id").where("shipment_id rlike '[^0-9]'") 
display(src1idnndf1)
src1idnndf2 = s12joindf.select("l.shipment_id","l.age").where("age rlike '[^0-9]'")
display(src1idnndf2)

src2idnndf1 = s12joindf.select("r.shipment_id").where("shipment_id rlike '[^0-9]'") 
display(src2idnndf1)
src2idnndf2 = s12joindf.select("r.shipment_id","r.age").where("age rlike '[^0-9]'")
display(src2idnndf2)

#Count rows having:
#:3. fewer columns than expected - below both doesnt show records that has null in name cols.
src1missingcoldf1 = s12joindf.select("l.shipment_id","l.first_name","l.last_name","l.age","l.role").where("l.first_name is null or l.last_name is null or l.age is null or l.role is null")
display(src1missingcoldf1)#source1 has 3 records with fewer cols than expected

src2missingcoldf1 = s12joindf.select("r.shipment_id","r.first_name","r.last_name","r.age","r.role","r.hub_location","r.vehicle_type").where("r.first_name is null or r.last_name is null or r.age is null or r.role is null or r.hub_location is null or r.vehicle_type is null")
display(src2missingcoldf1)#source2 has 13 records with fewer cols than expected

#4. more columns than expected - ask and learn how to do it?


In [0]:
#Create a Spark Session Object
from pyspark.sql.session import SparkSession
spark1 = SparkSession.builder.appName("logistics usecases").getOrCreate()

from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ShortType, TimestampType, DateType

###**b. Active Data Munging** File: logistics_source1 and logistics_source2

#####1.Combining Data + Schema Merging (Structuring)
1. Read both files without enforcing schema
2. Align them into a single canonical schema: shipment_id,
first_name,
last_name,
age,
role,
hub_location,
vehicle_type,
data_source
3. Add data_source column with values as: system1, system2 in the respective dataframes

In [0]:
#1.Read both files without enforcing schema
src1df = spark.read.csv("/Volumes/logistics_catalog/landing_zone/landing_vol/Source_data/",header=True,mode="PERMISSIVE",pathGlobFilter="logistics_source1",recursiveFileLookup=True)
#display(src1df)
#display(src1df.schema)
src2df = spark.read.csv("/Volumes/logistics_catalog/landing_zone/landing_vol/Source_data/",header=True,mode="PERMISSIVE",pathGlobFilter="logistics_source2",recursiveFileLookup=True)
#display(src2df)
#display(src2df.schema)
#3.Add data_source column with values as: system1, system2 in the respective dataframes
src1df = src1df.withColumn("data_source",lit("system1"))
src2df = src2df.withColumn("data_source",lit("system2"))
#2.Align them into a single canonical schema: shipment_id, first_name, last_name, age, role, hub_location, vehicle_type, data_source
mergesrc12df = src1df.unionByName(src2df,allowMissingColumns=True)
#display(mergesrc12df)
#display(mergesrc12df.schema)
#StructType([StructField('shipment_id', StringType(), True), StructField('first_name', StringType(), True), StructField('last_name', StringType(), True), StructField('age', StringType(), True), StructField('role', StringType(), True)])
#StructType([StructField('shipment_id', StringType(), True), StructField('first_name', StringType(), True), StructField('last_name', StringType(), True), StructField('age', StringType(), True), StructField('role', StringType(), True), StructField('hub_location', StringType(), True), StructField('vehicle_type', StringType(), True)])

#####2. Cleansing, Scrubbing: 
Cleansing (removal of unwanted datasets)<br>
1. Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role<br>
2. Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name<br>
3. Join Readiness Rule - Drop records where the join key is null: shipment_id<br>

Scrubbing (convert raw to tidy)<br>
4. Age Defaulting Rule - Fill NULL values in the age column with: -1<br>
5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN<br>
6. Invalid Age Replacement - Replace the following values in age:
"ten" to -1
"" to -1<br>
7. Vehicle Type Normalization - Replace inconsistent vehicle types: 
truck to LMV
bike to TwoWheeler

In [0]:
#Cleansing (removal of unwanted datasets)
#1.Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role
cleandedupdf = mergesrc12df.na.drop(how="any",subset=["shipment_id","role"])
#display(cleandedupdf.filter("shipment_id is null or role is null")) 
#display(cleandedupdf)
#2.Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name
cleandedup2df = cleandedupdf.na.drop(how="any",subset=["first_name","last_name"])
#display(cleandedup2df.filter("first_name is null or last_name is null"))
#display(cleandedup2df)
#3.Join Readiness Rule - Drop records where the join key is null: shipment_id
#display(cleandedup2df.filter("shipment_id is null"))
cleansrc12df = cleandedup2df
display(cleansrc12df)

#Scrubbing (convert raw to tidy)
#4. Age Defaulting Rule - Fill NULL values in the age column with: -1
scrubsrc12df = cleansrc12df.na.fill('-1', subset=["age"])
#display(scubsrc12df.filter("age == -1"))
#display(scrubsrc12df)
#5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN
scrubsrc12df = scrubsrc12df.na.fill('UNKNOWN', subset=["vehicle_type"])
#display(scrubsrc12df.filter("vehicle_type == 'UNKNOWN'"))
#display(scrubsrc12df)
#6. Invalid Age Replacement - Replace the following values in age: "ten" to -1 "
scrubsrc12df = scrubsrc12df.na.replace('ten','-1', subset=["age"])
#display(scrubsrc12df.filter("age == -1"))
#display(scrubsrc12df)
#7. Vehicle Type Normalization - Replace inconsistent vehicle types: truck to LMV bike to TwoWheeler
#display(scrubsrc12df.where("vehicle_type in ('Truck','Bike')"))
scrubsrc12df = scrubsrc12df.na.replace({'Truck':'LMV','Bike':'TwoWheeler'}, subset=["vehicle_type"])
#display(scrubsrc12df.where("vehicle_type in ('LMV','TwoWheeler')"))
display(scrubsrc12df)

####3. Standardization, De-Duplication and Replacement / Deletion of Data to make it in a usable format

Creating shipments Details data Dataframe creation <br>
1. Create a DF by Reading Data from logistics_shipment_detail.json
2. As this data is a clean json data, it doesn't require any cleansing or scrubbing.

In [0]:
shipmentsrcdf = spark.read.format("json").load("/Volumes/logistics_catalog/landing_zone/landing_vol/shipment_data/logistics_shipment_detail_3000.json",primitivesAsString=True,columnNameOfCorruptRecord="corruptrows",multiline=True)
print("json file read")
shipmentsrcdf.printSchema()
display(shipmentsrcdf)


Standardizations:<br>

1. Add a column<br> 
Source File: DF of logistics_shipment_detail_3000.json<br>: domain as 'Logistics',  current timestamp 'ingestion_timestamp' and 'False' as 'is_expedited'
2. Column Uniformity: 
role - Convert to lowercase<br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
vehicle_type - Convert values to UPPERCASE<br>
Source Files: DF of logistics_shipment_detail_3000.json
hub_location - Convert values to initcap case<br>
Source Files: DF of merged(logistics_source1 & logistics_source2)<br>
3. Format Standardization:<br>
Source Files: DF of logistics_shipment_detail_3000.json<br>
Convert shipment_date to yyyy-MM-dd<br>
Ensure shipment_cost has 2 decimal precision<br>
4. Data Type Standardization<br>
Standardizing column data types to fix schema drift and enable mathematical operations.<br>
Source File: DF of merged(logistics_source1 & logistics_source2) <br>
age: Cast String to Integer<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
shipment_weight_kg: Cast to Double<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
is_expedited: Cast to Boolean<br>
5. Naming Standardization <br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
Rename: first_name to staff_first_name<br>
Rename: last_name to staff_last_name<br>
Rename: hub_location to origin_hub_city<br>
6. Reordering columns logically in a better standard format:<br>
Source File: DF of Data from all 3 files<br>
shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)

In [0]:
#1.Add a column
#: domain as 'Logistics', current timestamp 'ingestion_timestamp' and 'False' as #'is_expedited'
addcolshipdf = shipmentsrcdf.withColumns({"Domain":lit("Logistics"),"ingestion_timestamp":current_timestamp(),"is_expedited":lit("False")})
#display(addcolshipdf)

#2.Column Uniformity: role - Convert to lowercase
#Source File: DF of merged(logistics_source1 & logistics_source2)
#vehicle_type - Convert values to UPPERCASE
#Source Files: DF of logistics_shipment_detail_3000.json hub_location - Convert #values to initcap case
colunisrc12df = scrubsrc12df.withColumns({"role":lower("role"),"vehicle_type":upper("vehicle_type"),"hub_location":initcap("hub_location")})
#display(colunisrc12df)

#3.Format Standardization:
#Source Files: DF of logistics_shipment_detail_3000.json Convert shipment_date to yyyy-MM-dd
#Ensure shipment_cost has 2 decimal precision
shipsrcstddf = addcolshipdf.withColumn("shipment_date", to_date(col("shipment_date"),'yy-MM-dd')).withColumn("shipment_cost", round("shipment_cost", 2))
#display(shipsrcstddf)

#4.Data Type Standardization
#Standardizing column data types to fix schema drift and enable mathematical operations.
#Source File:merged(logistics_source1 & logistics_source2)age: Cast String to Integer
#Source File:logistics_shipment_detail_3000.json,shipment_weight_kg: Cast to Double
#Source File:logistics_shipment_detail_3000.json,is_expedited: Cast to Boolean
dtstdsrc12df = colunisrc12df.withColumn("age",col("age").cast("int"))
display(dtstdsrc12df)
dtstdsrc12df.printSchema()
dtstdshipdf = shipsrcstddf.withColumn("shipment_weight_kg",col("shipment_weight_kg").cast("double")).withColumn("is_expedited",col("is_expedited").cast("boolean"))
display(dtstdshipdf)
dtstdshipdf.printSchema()

#5.Naming Standardization
#Source File: DF of merged(logistics_source1 & logistics_source2)
#Rename: first_name to staff_first_name
#Rename: last_name to staff_last_name
#Rename: hub_location to origin_hub_city
namestdsrc12df = dtstdsrc12df.withColumnsRenamed({"first_name":"staff_first_name","last_name":"staff_last_name","hub_location":"origin_hub_city"})
display(namestdsrc12df)

#6.Reordering columns logically in a better standard format:
#Source File: DF of Data from all 3 files
#shipment_id (Identifier), staff_first_name (Dimension),staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)
allsrcmergedf = namestdsrc12df.join(dtstdshipdf,how="inner",on="shipment_id").select("shipment_id","staff_first_name","staff_last_name","role","origin_hub_city","shipment_cost","ingestion_timestamp")
display(allsrcmergedf)
print("today records after src merge")
print(len(allsrcmergedf.collect()))


Deduplication:
1. Apply Record Level De-Duplication
2. Apply Column Level De-Duplication (Primary Key Enforcement)

In [0]:
#Apply Record Level De-Duplication
dedupallsrcdf = allsrcmergedf.distinct()
#display(dedupallsrcdf)
print(len(dedupallsrcdf.collect()))

#Apply Column Level De-Duplication (Primary Key Enforcement)
dedupallsrcdf1 = dedupallsrcdf.dropDuplicates(["shipment_id"])
#display(dedupallsrcdf1)
print(len(dedupallsrcdf1.collect()))

mungeddf = dedupallsrcdf1
display(mungeddf)


##2. Data Enrichment - Detailing of data
Makes your data rich and detailed <br>

###### Adding of Columns (Data Enrichment)
*Creating new derived attributes to enhance traceability and analytical capability.*

**1. Add Audit Timestamp (`load_dt`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** We need to track exactly when this record was ingested into our Data Lakehouse for auditing purposes.
* **Action:** Add a column `load_dt` using the function `current_timestamp()`.

**2. Create Full Name (`full_name`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The reporting dashboard requires a single field for the driver's name instead of separate columns.
* **Action:** Create `full_name` by concatenating `first_name` and `last_name` with a space separator.
* **Result:** "Rajesh" + " " + "Kumar" -> **"Rajesh Kumar"**

**3. Define Route Segment (`route_segment`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
* **Action:** Combine `source_city` and `destination_city` with a hyphen.
* **Result:** "Chennai" + "-" + "Pune" -> **"Chennai-Pune"**

**4. Generate Vehicle Identifier (`vehicle_identifier`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** We need a unique tracking code that immediately tells us the vehicle type and the shipment ID.
* **Action:** Combine `vehicle_type` and `shipment_id` to create a composite key.
* **Result:** "Truck" + "_" + "500001" -> **"Truck_500001"**

In [0]:
#1. Add Audit Timestamp 
#Action: Add a column load_dt using the function current_timestamp()
namestd1src12df = namestdsrc12df.distinct()
namestd2src12df = namestd1src12df.dropDuplicates(["shipment_id"])

denrichsrc12df = namestdsrc12df.withColumn("load_dt",current_timestamp())
#display(denrichsrc12df)
#2. Create Full Name (full_name) 
#Action: Create full_name by concatenating first_name and last_name with a space separator.
denrichsrc12df1 = denrichsrc12df.select("shipment_id",
concat_ws(" ",col("staff_first_name"),col("staff_last_name")).alias("full_name"),"staff_first_name","staff_last_name","age","role","data_source","origin_hub_city","vehicle_type","load_dt")
#display(denrichsrc12df1)

#3.Define Route Segment (route_segment)
#Action: Combine source_city and destination_city with a hyphen.
display(dtstdshipdf.take(5))
denrichshipdf = dtstdshipdf.select("shipment_id","order_id",concat("source_city",lit('-'),"destination_city").alias("route_segment"),"shipment_status","cargo_type","vehicle_type","payment_mode","shipment_weight_kg","shipment_cost","shipment_date")
#display(denrichshipdf)

#4.Generate Vehicle Identifier (vehicle_identifier) 
#Action: Combine vehicle_type and shipment_id to create a composite key
denrichshipdf1 = denrichshipdf.select("shipment_id",concat("vehicle_type",lit('_'),"shipment_id").alias("vehicle_identifier"),"order_id","route_segment","shipment_status","cargo_type","payment_mode","shipment_weight_kg","shipment_cost","shipment_date")
display(denrichshipdf1)


###### Deriving of Columns (Time Intelligence)
*Extracting temporal features from dates to enable period-based analysis and reporting.*<br>
Source File: logistics_shipment_detail_3000.json<br>
**1. Derive Shipment Year (`shipment_year`)**
* **Scenario:** Management needs an annual performance report to compare growth year-over-year.
* **Action:** Extract the year component from `shipment_date`.
* **Result:** "2024-04-23" -> **2024**

**2. Derive Shipment Month (`shipment_month`)**
* **Scenario:** Analysts want to identify seasonal peaks (e.g., increased volume in December).
* **Action:** Extract the month component from `shipment_date`.
* **Result:** "2024-04-23" -> **4** (April)

**3. Flag Weekend Operations (`is_weekend`)**
* **Scenario:** The Operations team needs to track shipments handled during weekends to calculate overtime pay or analyze non-business day capacity.
* **Action:** Flag as **'True'** if the `shipment_date` falls on a Saturday or Sunday.

**4. Flag shipment status (`is_expedited`)**
* **Scenario:** The Operations team needs to track shipments is IN_TRANSIT or DELIVERED.
* **Action:** Flag as **'True'** if the `shipment_status` IN_TRANSIT or DELIVERED.

In [0]:
#1.Derive Shipment Year (shipment_year), 2. Shipment Month (shipment_month)
display(denrichshipdf1)
coldershipdf = denrichshipdf1.withColumns({"shipment_year":year(col("shipment_date")),"shipment_month":month(col("shipment_date"))})

#3.Flag Weekend Operations (is_weekend)
coldershipdf1 = coldershipdf.withColumn("is_weekend",when(dayofweek(col("shipment_date")).isin([1,7]),True).otherwise(False))

#4.Flag shipment status (is_expedited)
coldershipdf2 = coldershipdf1.withColumn("is_expedited",when(col("shipment_status").isin(["IN_TRANSIT","DELIVERED"]),True))
display(coldershipdf2)


###### Enrichment/Business Logics (Calculated Fields)
*Deriving new metrics and financial indicators using mathematical and date-based operations.*<br>
Source File: logistics_shipment_detail_3000.json<br>

**1. Calculate Unit Cost (`cost_per_kg`)**
* **Scenario:** The Finance team wants to analyze the efficiency of shipments by determining the cost incurred per unit of weight.
* **Action:** Divide `shipment_cost` by `shipment_weight_kg`.
* **Logic:** `shipment_cost / shipment_weight_kg`

**2. Track Shipment Age (`days_since_shipment`)**
* **Scenario:** The Operations team needs to monitor how long it has been since a shipment was dispatched to identify potential delays.
* **Action:** Calculate the difference in days between the `current_date` and the `shipment_date`.
* **Logic:** `datediff(current_date(), shipment_date)`

**3. Compute Tax Liability (`tax_amount`)**
* **Scenario:** For invoicing and compliance, we must calculate the Goods and Services Tax (GST) applicable to each shipment.
* **Action:** Calculate 18% GST on the total `shipment_cost`.
* **Logic:** `shipment_cost * 0.18`

In [0]:
#from pyspark.sql.functions import col, lit, datediff, current_date
#1.Calculate Unit Cost (cost_per_kg): Action: Divide shipment_cost by shipment_weight_kg.
#2.Track Shipment Age (days_since_shipment): Calculate the difference in days between the current_date and the shipment_date.
#3. Compute Tax Liability (tax_amount): Calculate 18% GST on the total shipment_cost.
blshipdf = coldershipdf2\
    .withColumn("cost_per_kg", round(col("shipment_cost")/col("shipment_weight_kg"),2))\
    .withColumn("days_since_shipment", datediff(current_date(), col("shipment_date")))\
    .withColumn("tax_amount", round(col("shipment_cost")*lit(0.18),2))
display(blshipdf)


###### Remove/Eliminate (drop, select, selectExpr)
*Excluding unnecessary or redundant columns to optimize storage and privacy.*<br>
Source File: DF of logistics_source1 and logistics_source2<br>

**1. Remove Redundant Name Columns**
* **Scenario:** Since we have already created the `full_name` column in the Enrichment step, the individual name columns are now redundant and clutter the dataset.
* **Action:** Drop the `first_name` and `last_name` columns.
* **Logic:** `df.drop("first_name", "last_name")`

In [0]:
#display(denrichsrc12df1)
cleansedsrc12df = denrichsrc12df1.drop("staff_first_name","staff_last_name")
display(cleansedsrc12df)

##### Splitting & Merging/Melting of Columns
*Reshaping columns to extract hidden values or combine fields for better analysis.*<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
**1. Splitting (Extraction)**
*Breaking one column into multiple to isolate key information.*
* **Split Order Code:**
  * **Action:** Split `order_id` ("ORD100000") into two new columns:
    * `order_prefix` ("ORD")
    * `order_sequence` ("100000")
* **Split Date:**
  * **Action:** Split `shipment_date` into three separate columns for partitioning:
    * `ship_year` (2024)
    * `ship_month` (4)
    * `ship_day` (23)

**2. Merging (Concatenation)**
*Combining multiple columns into a single unique identifier or description.*
* **Create Route ID:**
  * **Action:** Merge `source_city` ("Chennai") and `destination_city` ("Pune") to create a descriptive route key:
    * `route_lane` ("Chennai->Pune")

In [0]:
#1.Splitting (Extraction): 
# Split order_id ("ORD100000") into:order_prefix ("ORD") & #order_sequence ("100000")
#Split shipment_date into:ship_year (2024),ship_month (4) & ship_day (23)
#display(blshipdf) #shipment year & month are already classified, only day is derived here and renaming year and month
splitshipdf = blshipdf.withColumn("order_prefix",substring(col("order_id"),1,3))\
    .withColumn("order_sequence",substring(col("order_id"),4,9))\
    .withColumn("ship_day",day(col("shipment_date")))\
    .withColumnsRenamed({"shipment_year":"ship_year","shipment_month":"ship_month"})
display(splitshipdf)

#2.Merging (Concatenation): Create Route ID: Use route_segment to create route_lane
#already we have route_segment showing same data, we can rename the column and enrich more
mergedshipdf = splitshipdf.withColumnRenamed("route_segment","route_lane")
display(mergedshipdf)


##3. Data Customization & Processing - Application of Tailored Business Specific Rules

### **UDF1: Complex Incentive Calculation**
**Scenario:** The Logistics Head wants to calculate a "Performance Bonus" for drivers based on tenure and role complexity.

**Action:** Create a Python function `calculate_bonus(role, age)` and register it as a Spark UDF.

**Logic:**
* **IF** `Role` == 'Driver' **AND** `Age` > 50:
  * `Bonus` = 15% of Salary (Reward for Seniority)
* **IF** `Role` == 'Driver' **AND** `Age` < 30:
  * `Bonus` = 5% of Salary (Encouragement for Juniors)
* **ELSE**:
  * `Bonus` = 0

**Result:** A new derived column `projected_bonus` is generated for every row in the dataset.

---

### **UDF2: PII Masking (Privacy Compliance)**
**Scenario:** For the analytics dashboard, we must hide the full identity of the staff to comply with privacy laws (GDPR/DPDP), while keeping names recognizable for internal managers.

**Business Rule:** Show the first 2 letters, mask the middle characters with `****`, and show the last letter.

**Action:** Create a UDF `mask_identity(name)`.

**Example:**
* **Input:** `"Rajesh"`
* **Output:** `"Ra****h"`
<br>
**Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.**

In [0]:
#Create a Python function calculate_bonus(role, age) and register it as a Spark UDF.
def calculate_bonus(role, age):
	if role == "driver" and age > 50:
	    return  0.15
	elif role == "driver" and age < 30:
	    return 0.05
	else:
	    return 0

#Create a UDF mask_identity(name).
def mask_identity(name):
    return name[0:2] + "*" * (len(name) - 3) + name[-1]

#A new derived column projected_bonus is generated for every row in the dataset &  Convert the above udf logic to inbult function based transformation to ensure the performance is improved.
bonuspct_udf = udf(calculate_bonus)
namemask_udf = udf(mask_identity)

dcussrc12df = cleansedsrc12df.withColumn("projected_bonus",bonuspct_udf(col("role"),col("age")))\
.withColumn("masked_name",namemask_udf(col("full_name")))
display(dcussrc12df)
#display(cleansedsrc12df.filter(col("role") =="driver"))

dcussrc12df1 = cleansedsrc12df.withColumn("projected_bonus",
when((col("role") == "driver") & (col("age") >50), 0.15)
.when((col("role") == "driver") & (col("age") <30), 0.05)
.otherwise(0))\
.withColumn("masked_name",when((length(col("full_name"))>2),
concat(substring(col("full_name"),1,2),
repeat("*",length(col("full_name"))-3),
substring(col("full_name"),-1,1))))
display(dcussrc12df)



## 4. Data Core Curation & Processing (Pre-Wrangling)
*Applying business logic to focus, filter, and summarize data before final analysis.*

**1. Select (Projection)**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The Driver App team only needs location data, not sensitive HR info.
* **Action:** Select only `first_name`, `role`, and `hub_location`.

**2. Filter (Selection)**<br>
Source File: DF of json<br>
* **Scenario:** We need a report on active operational problems.
* **Action:** Filter rows where `shipment_status` is **'DELAYED'** or **'RETURNED'**.
* **Scenario:** Insurance audit for senior staff.
* **Action:** Filter rows where `age > 50`.

**3. Derive Flags & Columns (Business Logic)**<br>
Source File: DF of json<br>
* **Scenario:** Identify high-value shipments for security tracking.
* **Action:** Create flag `is_high_value` = **True** if `shipment_cost > 50,000`.
* **Scenario:** Flag weekend operations for overtime calculation.
* **Action:** Create flag `is_weekend` = **True** if day is Saturday or Sunday.

**4. Format (Standardization)**<br>
Source File: DF of json<br>
* **Scenario:** Finance requires readable currency formats.
* **Action:** Format `shipment_cost` to string like **"₹30,695.80"**.
* **Scenario:** Standardize city names for reporting.
* **Action:** Format `source_city` to Uppercase (e.g., "chennai" → **"CHENNAI"**).

**5. Group & Aggregate (Summarization)**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** Regional staffing analysis.
* **Action:** Group by `hub_location` and **Count** the number of staff.
* **Scenario:** Fleet capacity analysis.
* **Action:** Group by `vehicle_type` and **Sum** the `shipment_weight_kg`.

**6. Sorting (Ordering)**<br>
Source File: DF of json<br>
* **Scenario:** Prioritize the most expensive shipments.
* **Action:** Sort by `shipment_cost` in **Descending** order.
* **Scenario:** Organize daily dispatch schedule.
* **Action:** Sort by `shipment_date` (Ascending).

**7. Limit (Top-N Analysis)**<br>
Source File: DF of json<br>
* **Scenario:** Dashboard snapshot of critical delays.
* **Action:** Filter for 'DELAYED', Sort by Cost, and **Limit to top 10** rows.

In [0]:
#Business logics & transformations

#if the business logic to be derived here requires "when condition", then its better to use "selectExpr()" as it is mere english kinda code(SQL). refer vvimp. notebook for reference.

#display(mergedshipdf)
#display(cleansedsrc12df)
#1.Select (Projection): src12 - Select only first_name, role, and hub_location
dccursrc12df = cleansedsrc12df.select("full_name","role","origin_hub_city")
display(dccursrc12df)

#2. Filter (Selection): shipsrc - 
#Action: Filter rows where shipment_status is 'DELAYED' or 'RETURNED'.
#Scenario: Insurance audit for senior staff: src12 -
#Action: Filter rows where age > 50.
dccurshipdf = mergedshipdf.filter(col("shipment_status").isin("DELAYED","RETURNED"))
dccursrc12df1 = cleansedsrc12df.select("*").filter(col("age") > 50)
#display(dccurshipdf)
#display(dccursrc12df1)

#3.Derive Flags & Columns (Business Logic): shipsrc - 
#Scenario: Identify high-value shipments for security tracking.
#Action: Create flag is_high_value = True if shipment_cost > 50,000.
#Scenario: Flag weekend operations for overtime calculation. - ignore already done
#****Action: Create flag is_weekend = True if day is Saturday or Sunday.

#display(mergedshipdf.filter(col("shipment_Cost") > 50000)) # no records found
dccurshipdf1 = mergedshipdf.withColumn("is_high_value", when(col("shipment_cost") > 50000, True))
#display(dccurshipdf1)

#4.Format (Standardization): shipsrc -
#Scenario: Finance requires readable currency formats.
#Action: Format shipment_cost to string like "₹30,695.80".
#Scenario: Standardize city names for reporting.- since we dropped source & dest. city columns, we can upper route_lane.
#Action: Format source_city to Uppercase (e.g., "chennai" → "CHENNAI")
dccurshipdf2 = dccurshipdf1.selectExpr(
    "shipment_id",
    "vehicle_identifier",
    "order_id",
    "route_lane",
    "shipment_status",
    "cargo_type",
    "payment_mode",
    "shipment_weight_kg",
    "concat('₹',shipment_cost) as shipment_cost",
    "shipment_date",
    "ship_year",
    "ship_month",
    "ship_day",
    "is_weekend",
    "is_expedited",
    "cost_per_kg",
    "days_since_shipment",
    "tax_amount",
    "order_prefix",
    "order_sequence"
).withColumn("route_lane", upper(col("route_lane")))
display(dccurshipdf2)
                                        
#5.Group & Aggregate (Summarization): src12 -
#Scenario: Regional staffing analysis.
#Action: Group by hub_location and Count the number of staff.
#Scenario: Fleet capacity analysis.: shipsrc - 
#Action: Group by vehicle_type and Sum the shipment_weight_kg
display(cleansedsrc12df)
dccursrc12df2 = cleansedsrc12df.groupBy("origin_hub_city").agg(count("full_name").alias("staff_count"))
dccurshipdf3 = dccurshipdf2.groupBy(split(col("vehicle_identifier"),"_")[0]).agg(sum("shipment_weight_kg").alias("fleet_capacity"))
#display(dccursrc12df2)
#display(dccurshipdf3)

#6.Sorting (Ordering): shipsrc -
#Scenario: Prioritize the most expensive shipments.
#Action: Sort by shipment_cost in Descending order.
#Scenario: Organize daily dispatch schedule.
#Action: Sort by shipment_date (Ascending).
dccurshipdf4 = dccurshipdf2.orderBy(col("shipment_cost").desc())
dccurshipdf5 = dccurshipdf2.orderBy(col("shipment_date").asc())
#display(dccurshipdf4.take(10))
#display(dccurshipdf5.take(10))

#7.Limit (Top-N Analysis):shipsrc -Filter for 'DELAYED', Sort by Cost, and Limit to top 10 rows
dccurshipdf6 = dccurshipdf2.filter(col("shipment_status") == "DELAYED")\
.orderBy(col("shipment_Cost").desc())\
.limit(10)
#display(dccurshipdf6)


## 5. Data Wrangling - Transformation & Analytics
*Combining, modeling, and analyzing data to answer complex business questions.*



### **1. Joins**
Source Files:<br>
Left Side (staff_df):<br> DF of logistics_source1 & logistics_source2<br>
Right Side (shipments_df):<br> DF of logistics_shipment_detail_3000.json<br>
#### **1.1 Frequently Used Simple Joins (Inner, Left)**
* **Inner Join (Performance Analysis):**
  * **Scenario:** We only want to analyze *completed work*. Connect Staff to the Shipments they handled.
  * **Action:** Join `staff_df` and `shipments_df` on `shipment_id`.
  * **Result:** Returns only rows where a staff member is assigned to a valid shipment.
* **Left Join (Idle Resource check):**
  * **Scenario:** Find out which staff members are currently *idle* (not assigned to any shipment).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right) on `shipment_id`. Filter where `shipments_df.shipment_id` is NULL.

#### **1.2 Infrequent Simple Joins (Self, Right, Full, Cartesian)**
* **Self Join (Peer Finding):**
  * **Scenario:** Find all pairs of employees working in the same `hub_location`.
  * **Action:** Join `staff_df` to itself on `hub_location`, filtering where `staff_id_A != staff_id_B`.
* **Right Join (Orphan Data Check):**
  * **Scenario:** Identify shipments in the system that have *no valid driver* assigned (Data Integrity Issue).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right). Focus on NULLs on the left side.
* **Full Outer Join (Reconciliation):**
  * **Scenario:** A complete audit to find *both* idle drivers AND unassigned shipments in one view.
  * **Action:** Perform a Full Outer Join on `shipment_id`.
* **Cartesian/Cross Join (Capacity Planning):**
  * **Scenario:** Generate a schedule of *every possible* driver assignment to *every* pending shipment to run an optimization algorithm.
  * **Action:** Cross Join `drivers_df` and `pending_shipments_df`.

#### **1.3 Advanced Joins (Semi and Anti)**
* **Left Semi Join (Existence Check):**
  * **Scenario:** "Show me the details of Drivers who have *at least one* shipment." (Standard filtering).
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_semi")`.
  * **Benefit:** Performance optimization; it stops scanning the right table once a match is found.
* **Left Anti Join (Negation Check):**
  * **Scenario:** "Show me the details of Drivers who have *never* touched a shipment."
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_anti")`.

In [0]:
#Joins
lsrcdf = cleansedsrc12df
rshipdf = dccurshipdf2
display(lsrcdf)
display(rshipdf)

#1.1.Frequently Used Simple Joins (Inner, Left)
#Inner Join (Performance Analysis):Join staff_df and shipments_df on shipment_id.
innerjoindf = lsrcdf.join(rshipdf,how="inner",on="shipment_id")
print("inner join")
#display(innerjoindf)

#Left Join (Idle Resource check):Join staff_df (Left) with shipments_df (Right) on shipment_id. Filter where shipments_df.shipment_id is NULL.
leftjoindf = lsrcdf.join(rshipdf,how="left",on="shipment_id").filter(col("shipment_id").isNull())
print("left join")
#display(leftjoindf)

#1.2.Infrequent Simple Joins (Self, Right, Full, Cartesian)
#Self Join (Peer Finding):Join staff_df to itself on hub_location, filtering where staff_id_A != staff_id_B.
#*******ERROR FIXED
selfjoindf = lsrcdf.alias("l").join(lsrcdf.alias("r"),how="inner",on="origin_hub_city").filter(col("l.shipment_id") != col("r.shipment_id"))
print("self join")
selfjoindf.select("l.shipment_id","r.*").display(10)

#Right Join (Orphan Data Check):Join staff_df (Left) with shipments_df (Right). Focus on NULLs on the left side.
rightjoindf = lsrcdf.alias("l").join(rshipdf.alias("r"),how="right",on="shipment_id").filter(col("l.shipment_id").isNull())
#display(rightjoindf)

#Full Outer Join (Reconciliation):Perform a Full Outer Join on shipment_id.
fulljoindf = lsrcdf.join(rshipdf,how="full",on="shipment_id")
print("full join")
#display(fulljoindf)

#Cartesian/Cross Join (Capacity Planning):Cross Join drivers_df and pending_shipments_df.
l1srcdf = lsrcdf.filter(col("role") == "driver")
r1shipdf = dccurshipdf6
cartjoindf = l1srcdf.join(rshipdf)
print("catesian join")
#display(cartjoindf)

#1.3.Advanced Joins (Semi and Anti)
#Left Semi Join (Existence Check):staff_df.join(shipments_df, "shipment_id", "left_semi")
lsemijoindf = lsrcdf.join(rshipdf,how="left_semi",on="shipment_id")
print("semi join")
#display(lsemijoindf)

#Left Anti Join (Negation Check):staff_df.join(shipments_df, "shipment_id", "left_anti")
lantijoindf = lsrcdf.join(rshipdf,how="left_anti",on="shipment_id")
print("anti join")
#display(lantijoindf)


### **2. Lookup**<br>
Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF) and Master_City_List.csv dataframe<br>
* **Scenario:** Validation. Check if the `hub_location` in the staff file exists in the in the dataframe of corporate `Master_City_List`.
* **Action:** Compare values against this Master_City_List list.

In [0]:
mcldf = spark.read.csv("/Volumes/logistics_catalog/landing_zone/landing_vol/Master_City_List.csv",header=True,mode='permissive')
#display(mcldf)

mcldf1 = lsrcdf.join(mcldf,how="semi",on=(col("origin_hub_city")==col("city_name")))
#display(mcldf1)

### **3. Lookup & Enrichment**<br>
Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF) and Master_City_List.csv dataframe<br>
* **Scenario:** Geo-Tagging.
* **Action:** Lookup hub_location (eg. "Pune") in a Master Latitude/Longitude Master_City_List.csv dataframe and enrich our logistics_source (merged dataframe) by adding lat and long columns for map plotting.

In [0]:
#display(lsrcdf)
lsrcdf1 = lsrcdf.withColumnRenamed("origin_hub_city","city_name")
luenrichdf = lsrcdf1.alias("l").join(mcldf.alias("r"), how='left', on=col("l.city_name") == col("r.city_name"))\
    .select("l.*", "r.latitude", "r.longitude")
display(luenrichdf)


### **4. Schema Modeling (Denormalization)**<br>
Source Files: DF of All 3 Files (logistics_source1, logistics_source2, logistics_shipment_detail_3000.json)<br>
* **Scenario:** Creating a "Gold Layer" Table for PowerBI/Tableau.
* **Action:** Flatten the Star Schema. Join `Staff`, `Shipments`, and `Vehicle_Master` into one wide table (`wide_shipment_history`) so analysts don't have to perform joins during reporting.

In [0]:
wide_shipment_historydf = lsrcdf.join(rshipdf,how='inner',on="shipment_id")\
.join(mcldf,how='inner',on=(col("city_name")==col("origin_hub_city")))
display(wide_shipment_historydf)

### **5. Windowing (Ranking & Trends)**<br>
Source Files:<br>
DF of logistics_source2: Provides hub_location (Partition Key).<br>
logistics_shipment_detail_3000.json: Provides shipment_cost (Ordering Key)<br>
* **Scenario:** "Who are the Top 3 Drivers by Cost in *each* Hub?"
* **Action:**
  1. Partition by `hub_location`.
  2. Order by `total_shipment_cost` Descending.
  3. Apply `dense_rank()` and `row_number()
  4. Filter where `rank or row_number <= 3`.

In [0]:
from pyspark.sql.window import Window

#display(src2df)
windjoindf = src2df.join(shipmentsrcdf,how='inner',on="shipment_id")
#display(windjoindf)

#using row_number()
windowdf = windjoindf.withColumn("seqnum",row_number().over(Window.partitionBy("hub_location").orderBy(desc("shipment_cost"))))
#display(windowdf)
display(windowdf.where("seqnum <=3"))

#using dense_Rank()
windowdf = windjoindf.withColumn("dnsrank",dense_rank().over(Window.partitionBy(    "hub_location").orderBy(desc("shipment_Cost")))).where("dnsrank <=3")
display(windowdf)


### **6. Analytical Functions (Lead/Lag)**<br>
Source File: <br>
DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** Idle Time Analysis.
* **Action:** For each driver, calculate the days elapsed since their *previous* shipment.

### **7. Set Operations**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Union:** Combining `Source1` (Legacy) and `Source2` (Modern) into one dataset (Already done in Active Munging).
* **Intersect:** Identifying Staff IDs that appear in *both* Source 1 and Source 2 (Duplicate/Migration Check).
* **Except (Difference):** Identifying Staff IDs present in Source 2 but *missing* from Source 1 (New Hires).


In [0]:
# Find common columns between src1df and src2df
common_cols = [col for col in src1df.columns if col in src2df.columns]

#Union
setuniondf = src1df.select(common_cols).union(src2df.select(common_cols)).distinct()
setuniondf.show(20)

#intersect
setintdf = src1df.select(common_cols).intersect(src2df.select(common_cols))
setintdf.show(20)

#Difference
setdiffdf = src1df.select(common_cols).subtract(src2df.select(common_cols))
setdiffdf.show(20)


### **8. Grouping & Aggregations (Advanced)**<br>
Source Files:<br>
DF of logistics_source2: Provides hub_location and vehicle_type (Grouping Dimensions).<br>
DF of logistics_shipment_detail_3000.json: Provides shipment_cost (Aggregation Metric).<br>
* **Scenario:** The CFO wants a subtotal report at multiple levels:
  1. Total Cost by Hub.
  2. Total Cost by Hub AND Vehicle Type.
  3. Grand Total.
* **Action:** Use `cube("hub_location", "vehicle_type")` or `rollup()` to generate all these subtotals in a single query.

In [0]:
#****Error

windjoindf = src2df.alias("l").join(shipmentsrcdf.alias("r"),how='inner',on="shipment_id")
analyticdf = windowdf.rollup("l.hub_location").agg(sum("r.shipment_cost").alias("r.tot_hub_cost")).orderBy("l.hub_location")
analyticdf = windowdf.cube("l.hub_location","l.vehicle_type").agg(sum("r.shipment_cost").alias("r.tot_cost_hub_vehtype")).orderBy("l.hub_location","r.tot_hub_cost")
display(analyticdf)

##6. Data Persistance (LOAD)-> Data Publishing & Consumption<br>

Store the inner joined, lookup and enrichment, Schema Modeling, windowing, analytical functions, set operations, grouping and aggregation data into the delta tables.

##7.Take the copy of the above notebook and try to write the equivalent SQL for which ever applicable.

####SQL is best to use for 
1. when conditions
2. groupby & aggregation , orderby & limit
3. re-ordering & re-formatting of columns
4. windowing operations
5. no unionByName avail in SQL
6. uses minus (no subtract avail)

####DSL is best to use for
1. ingress of file & egress of file or table
2. operations where functions are critical to use(like na.drop/fill/replace etc)
3. pivoting in dsl is better 
4. unionByNAme is useful here for dat munching
5. uses subtract (no minus avail)