#Enterprise Fleet Analytics Pipeline: Focuses on the business outcome (analytics) and the domain (fleet/logistics).

![logistics](logistics_project.png)

Download the data from the below gdrive and upload into the catalog
https://drive.google.com/drive/folders/1J3AVJIPLP7CzT15yJIpSiWXshu1iLXKn?usp=drive_link

##**1. Data Munging** -

####1. Visibily/Manually opening the file and capture couple of data patterns (Manual Exploratory Data Analysis)

Logistics source1 file:
- Format- CSV
- delimiter - comma
- line sep - new line
- header in file - yes
- missing columns
- extra columns
- data for format issues- id and age(expected int but received string)
- empty rows

Logistics source2 file:

- Format- CSV
- delimiter - comma
- line sep - new line
- header in file - yes
- missing columns
- data for format issues- id and age(expected int but received string)
- empty rows


####2. Programatically try to find couple of data patterns applying below EDA (File: logistics_source1)
1. Apply inferSchema and toDF to create a DF and analyse the actual data.
2. Analyse the schema, datatypes, columns etc.,
3. Analyse the duplicate records count and summary of the dataframe.

In [0]:
pass_mung_df=spark.read.csv("/Volumes/usecase_data/logistics_proj_data/projdata/logistics_source1",inferSchema=True,header=True).toDF("shipment_id","first_name","last_name","age","role")
pass_mung_df.printSchema()
#from printSchema able to understand that shipment_id and age is not in expected formats
print("Total row count is:",pass_mung_df.count())
print("The number of distinct rows:",pass_mung_df.distinct().count())

#from distinct() and dropDuplicates() able to identify the row and column level duplicates correspondingly
print("The number of distinct values in the column 'shipment_id':",pass_mung_df.dropDuplicates(["shipment_id"]).count())

#summary or describe helps us to understand statistical data understanding
#it shows there are nulls in few columns values, avg, stddev, min, max, percentile distribution
pass_mung_df.summary().show()
pass_mung_df.describe().show()

#Using columns, schema, dtypes properties to understand the column names, datatypes
print(pass_mung_df.columns)
print(pass_mung_df.schema)
print(pass_mung_df.dtypes)



###a. Passive Data Munging -  (File: logistics_source1  and logistics_source2)
Without modifying the data, identify:<br>
Shipment IDs that appear in both master_v1 and master_v2<br>
Records where:<br>
1. shipment_id is non-numeric
2. age is not an integer<br>

Count rows having:
3. fewer columns than expected
4. more columns than expected

In [0]:
#Create a Spark Session Object
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("MY Spark Session").getOrCreate()
print(spark)

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

#logistics_source1
schema1=StructType([StructField("shipment_id", StringType(), True),StructField("first_name", StringType(), True),StructField("last_name", StringType(), True),StructField("age", StringType(), True),StructField("role", StringType(), True),StructField("corrupt_record",StringType(),True)])

pass_mung_df=spark.read.schema(schema1).csv("/Volumes/usecase_data/logistics_proj_data/projdata/logistics_source1",columnNameOfCorruptRecord="corrupt_record",mode="PERMISSIVE",header=True)

#Identifying the rows with less columns and additional columns
display(pass_mung_df.where("corrupt_record is not null"))
#identifying non int shipment_id from logistics_source1
display(pass_mung_df.withColumn("non_int_id", col("shipment_id").try_cast("int")).select("shipment_id", "non_int_id").where("non_int_id is null and shipment_id is not null"))
#identifying non int age from logistics_source1
display(pass_mung_df.withColumn("non_int_age", col("age").try_cast("int")).select("shipment_id","age", "non_int_age").where("non_int_age is null and age is not null"))



In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

#logistics_source2
schema2=StructType([StructField("shipment_id", StringType(), True),StructField("first_name", StringType(), True),StructField("last_name", StringType(), True),StructField("age", StringType(), True),StructField("role", StringType(), True),StructField("hub_location", StringType(), True),StructField("vehicle_type", StringType(), True),StructField("corrupt_record", StringType(), True)])

pass_mung_df2=spark.read.schema(schema2).csv("/Volumes/usecase_data/logistics_proj_data/projdata/logistics_source2",columnNameOfCorruptRecord="corrupt_record",mode="PERMISSIVE",header=True)

#Identifying the rows with less columns and additional columns
display(pass_mung_df2.where("corrupt_record is not null"))
#identifying non int shipment_id from logistics_source1
display(pass_mung_df2.withColumn("non_int_id", col("shipment_id").try_cast("int")).select("shipment_id", "non_int_id").where("non_int_id is null and shipment_id is not null"))
#identifying non int age from logistics_source1
display(pass_mung_df2.withColumn("non_int_age", col("age").try_cast("int")).select("shipment_id","age", "non_int_age").where("non_int_age is null and age is not null"))
display(pass_mung_df2)



In [0]:
display(pass_mung_df.join(pass_mung_df2,how="inner",on='shipment_id').select("shipment_id"))

###**b. Active Data Munging** File: logistics_source1 and logistics_source2

#####1.Combining Data + Schema Merging (Structuring)
1. Read both files without enforcing schema
2. Align them into a single canonical schema: shipment_id,
first_name,
last_name,
age,
role,
hub_location,
vehicle_type,
data_source
3. Add data_source column with values as: system1, system2 in the respective dataframes

In [0]:
active_mung_df1=spark.read.csv("/Volumes/usecase_data/logistics_proj_data/projdata/logistics_source1",inferSchema=True,header=True).withColumn("data_source",lit("system1"))
active_mung_df2=spark.read.csv("/Volumes/usecase_data/logistics_proj_data/projdata/logistics_source2",inferSchema=True,header=True).withColumn("data_source",lit("system2")).withColumn("shipment_id", col("shipment_id").cast("string"))
#using unionByName to achive schema Mergeing of 2 dataframes with different columns 
active_mung_df=active_mung_df1.unionByName(active_mung_df2,allowMissingColumns=True)

display(active_mung_df)

#####2. Cleansing, Scrubbing: 
Cleansing (removal of unwanted datasets)<br>
1. Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role<br>
2. Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name<br>
3. Join Readiness Rule - Drop records where the join key is null: shipment_id<br>

Scrubbing (convert raw to tidy)<br>
4. Age Defaulting Rule - Fill NULL values in the age column with: -1<br>
5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN<br>
6. Invalid Age Replacement - Replace the following values in age:
"ten" to -1
"" to -1<br>
7. Vehicle Type Normalization - Replace inconsistent vehicle types: 
truck to LMV
bike to TwoWheeler

In [0]:
#Cleansing - drop columns if null using na.drop
null_dropped_df=active_mung_df.na.drop(how="any",subset=['shipment_id','role']).na.drop(how="all",subset=['first_name','last_name']).dropna(how="any",subset=['shipment_id'])
display(null_dropped_df)


In [0]:
scrubbed_df=null_dropped_df.na.fill(-1,['age']).na.fill('UNKNOWN',['vehicle_type']).na.replace({"ten":"-1","": "-1"},subset=['age']).na.replace({"Truck":"LMV","Bike":"TwoWheeler"},subset=['vehicle_type'])

display(scrubbed_df)

####3. Standardization, De-Duplication and Replacement / Deletion of Data to make it in a usable format

Creating shipments Details data Dataframe creation <br>
1. Create a DF by Reading Data from logistics_shipment_detail.json
2. As this data is a clean json data, it doesn't require any cleansing or scrubbing.

In [0]:
shipment_df=spark.read.json("/Volumes/usecase_data/logistics_proj_data/projdata/logistics_shipment_detail_3000.json",multiLine=True)
shipment_df.show(5)
shipment_df.printSchema()

Standardizations:<br>

1. Add a column<br> 
Source File: DF of logistics_shipment_detail_3000.json<br>: domain as 'Logistics',  current timestamp 'ingestion_timestamp' and 'False' as 'is_expedited'
2. Column Uniformity: 
role - Convert to lowercase<br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
vehicle_type - Convert values to UPPERCASE<br>
Source Files: DF of logistics_shipment_detail_3000.json
hub_location - Convert values to initcap case<br>
Source Files: DF of merged(logistics_source1 & logistics_source2)<br>
3. Format Standardization:<br>
Source Files: DF of logistics_shipment_detail_3000.json<br>
Convert shipment_date to yyyy-MM-dd<br>
Ensure shipment_cost has 2 decimal precision<br>
4. Data Type Standardization<br>
Standardizing column data types to fix schema drift and enable mathematical operations.<br>
Source File: DF of merged(logistics_source1 & logistics_source2) <br>
age: Cast String to Integer<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
shipment_weight_kg: Cast to Double<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
is_expedited: Cast to Boolean<br>
5. Naming Standardization <br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
Rename: first_name to staff_first_name<br>
Rename: last_name to staff_last_name<br>
Rename: hub_location to origin_hub_city<br>
6. Reordering columns logically in a better standard format:<br>
Source File: DF of Data from all 3 files<br>
shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)

In [0]:
#Add a column
shipment_df=shipment_df.withColumn("domain",lit("Logistics")).withColumn("is_expedited",lit(False)).withColumn("ingestion_timestamp",lit(current_timestamp()))


#Column Uniformity: 
shipment_df=shipment_df.withColumn("vehicle_type",upper(col("vehicle_type")))
uniform_df=scrubbed_df.withColumn("role",lower(col("role"))).withColumn("hub_location",initcap(col("hub_location")))
                                                                         
#Format Standardization:
shipment_df=shipment_df.withColumn("shipment_date",to_date("shipment_date","yy-MM-dd")).withColumn("shipment_cost",round(col("shipment_cost"),2))

#Data Type Standardisation
uniform_df=uniform_df.withColumn("age",col("age").cast("int")).withColumn("shipment_id",col("shipment_id").try_cast("long")).na.drop(how="any",subset=["shipment_id"])
shipment_df=shipment_df.withColumn("shipment_weight_kg",col("shipment_weight_kg").cast("double")).withColumn("is_expedited",col("is_expedited").cast("boolean"))

#Naming Standardisation
uniform_df=uniform_df.withColumnRenamed("first_name","staff_first_name").withColumnRenamed("last_name","staff_last_name").withColumnRenamed("hub_location","origin_hub_city")

#Column reordering
logistics_std_df=uniform_df.join(shipment_df,how="inner",on='shipment_id').select("shipment_id","staff_first_name","staff_last_name","role","origin_hub_city","shipment_cost","ingestion_timestamp")
display(logistics_std_df)



Deduplication:
1. Apply Record Level De-Duplication
2. Apply Column Level De-Duplication (Primary Key Enforcement)

In [0]:
uniform_df=uniform_df.distinct()
uniform_df=uniform_df.dropDuplicates(["shipment_id"])
display(uniform_df)

##2. Data Enrichment - Detailing of data
Makes your data rich and detailed <br>

###### Adding of Columns (Data Enrichment)
*Creating new derived attributes to enhance traceability and analytical capability.*

**1. Add Audit Timestamp (`load_dt`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** We need to track exactly when this record was ingested into our Data Lakehouse for auditing purposes.
* **Action:** Add a column `load_dt` using the function `current_timestamp()`.

**2. Create Full Name (`full_name`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The reporting dashboard requires a single field for the driver's name instead of separate columns.
* **Action:** Create `full_name` by concatenating `first_name` and `last_name` with a space separator.
* **Result:** "Rajesh" + " " + "Kumar" -> **"Rajesh Kumar"**

**3. Define Route Segment (`route_segment`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
* **Action:** Combine `source_city` and `destination_city` with a hyphen.
* **Result:** "Chennai" + "-" + "Pune" -> **"Chennai-Pune"**

**4. Generate Vehicle Identifier (`vehicle_identifier`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** We need a unique tracking code that immediately tells us the vehicle type and the shipment ID.
* **Action:** Combine `vehicle_type` and `shipment_id` to create a composite key.
* **Result:** "Truck" + "_" + "500001" -> **"Truck_500001"**

In [0]:
from pyspark.sql.functions import *
#adding load date
uniform_df=uniform_df.withColumn("load_dt",lit(current_timestamp()))
#deriving full name 
#Here using concat_ws instead of concat as concat_ws will take care of null values 
uniform_df=uniform_df.withColumn("full_name",concat(col("staff_first_name"),lit(" "),col("staff_last_name")))
#Deriving route segment
shipment_df=shipment_df.withColumn("route_segment",concat(col("source_city"),lit("-"),col("destination_city")))
#Deriving vehicle Identifier
shipment_df=shipment_df.withColumn("vehicle_identifier",concat(col("vehicle_type"),lit("_"),col("shipment_id")))
display(uniform_df)
shipment_df.show(5)
                                 

###### Deriving of Columns (Time Intelligence)
*Extracting temporal features from dates to enable period-based analysis and reporting.*<br>
Source File: logistics_shipment_detail_3000.json<br>
**1. Derive Shipment Year (`shipment_year`)**
* **Scenario:** Management needs an annual performance report to compare growth year-over-year.
* **Action:** Extract the year component from `shipment_date`.
* **Result:** "2024-04-23" -> **2024**

**2. Derive Shipment Month (`shipment_month`)**
* **Scenario:** Analysts want to identify seasonal peaks (e.g., increased volume in December).
* **Action:** Extract the month component from `shipment_date`.
* **Result:** "2024-04-23" -> **4** (April)

**3. Flag Weekend Operations (`is_weekend`)**
* **Scenario:** The Operations team needs to track shipments handled during weekends to calculate overtime pay or analyze non-business day capacity.
* **Action:** Flag as **'True'** if the `shipment_date` falls on a Saturday or Sunday.

**4. Flag shipment status (`is_expedited`)**
* **Scenario:** The Operations team needs to track shipments is IN_TRANSIT or DELIVERED.
* **Action:** Flag as **'True'** if the `shipment_status` IN_TRANSIT or DELIVERED.

In [0]:
#adding shipment_year
shipment_df=shipment_df.withColumn("shipment_year",year(col("shipment_date")))
#adding shipment_month
shipment_df=shipment_df.withColumn("shipment_month",month(col("shipment_date")))
#adding is_weekend(shipment_date is a weekday or weekend)
shipment_df=shipment_df.withColumn("is_weekend",when((dayofweek(col("shipment_date")) == 1) | (dayofweek(col("shipment_date")) == 7 ),True).otherwise(False))
#adding shipment_status
shipment_df=shipment_df.withColumn("is_expedited",when((col("shipment_status") == 'IN_TRANSIT') | (col("shipment_status") == 'DELIVERED'), True).otherwise(False))
display(shipment_df.select("shipment_date","is_weekend"))

###### Enrichment/Business Logics (Calculated Fields)
*Deriving new metrics and financial indicators using mathematical and date-based operations.*<br>
Source File: logistics_shipment_detail_3000.json<br>

**1. Calculate Unit Cost (`cost_per_kg`)**
* **Scenario:** The Finance team wants to analyze the efficiency of shipments by determining the cost incurred per unit of weight.
* **Action:** Divide `shipment_cost` by `shipment_weight_kg`.
* **Logic:** `shipment_cost / shipment_weight_kg`

**2. Track Shipment Age (`days_since_shipment`)**
* **Scenario:** The Operations team needs to monitor how long it has been since a shipment was dispatched to identify potential delays.
* **Action:** Calculate the difference in days between the `current_date` and the `shipment_date`.
* **Logic:** `datediff(current_date(), shipment_date)`

**3. Compute Tax Liability (`tax_amount`)**
* **Scenario:** For invoicing and compliance, we must calculate the Goods and Services Tax (GST) applicable to each shipment.
* **Action:** Calculate 18% GST on the total `shipment_cost`.
* **Logic:** `shipment_cost * 0.18`

In [0]:
#Calculate unit cost
shipment_df=shipment_df.withColumn("cost_per_kg", try_divide(col("shipment_cost"), col("shipment_weight_kg")))
#Track shipment age
shipment_df=shipment_df.withColumn("days_since_shipment",datediff(current_date(),col("shipment_date")))
#compute tax amount
shipment_df=shipment_df.withColumn("tax_amount",col("shipment_cost")*0.18)
display(shipment_df.select("cost_per_kg","shipment_cost","shipment_weight_kg","days_since_shipment","shipment_date","tax_amount","shipment_cost"))


###### Remove/Eliminate (drop, select, selectExpr)
*Excluding unnecessary or redundant columns to optimize storage and privacy.*<br>
Source File: DF of logistics_source1 and logistics_source2<br>

**1. Remove Redundant Name Columns**
* **Scenario:** Since we have already created the `full_name` column in the Enrichment step, the individual name columns are now redundant and clutter the dataset.
* **Action:** Drop the `first_name` and `last_name` columns.
* **Logic:** `df.drop("first_name", "last_name")`

In [0]:
uniform_df=uniform_df.drop("staff_first_name","staff_last_name")

uniform_df=uniform_df.select('shipment_id','full_name', 'age', 'role', 'origin_hub_city', 'vehicle_type', 'load_dt','data_source' )



##### Splitting & Merging/Melting of Columns
*Reshaping columns to extract hidden values or combine fields for better analysis.*<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
**1. Splitting (Extraction)**
*Breaking one column into multiple to isolate key information.*
* **Split Order Code:**
  * **Action:** Split `order_id` ("ORD100000") into two new columns:
    * `order_prefix` ("ORD")
    * `order_sequence` ("100000")
* **Split Date:**
  * **Action:** Split `shipment_date` into three separate columns for partitioning:
    * `ship_year` (2024)
    * `ship_month` (4)
    * `ship_day` (23)

**2. Merging (Concatenation)**
*Combining multiple columns into a single unique identifier or description.*
* **Create Route ID:**
  * **Action:** Merge `source_city` ("Chennai") and `destination_city` ("Pune") to create a descriptive route key:
    * `route_lane` ("Chennai->Pune")

In [0]:

#split order code
shipment_df = shipment_df.withColumn("order_prefix", substring(col("order_id"), 1, 3)).withColumn("order_sequence", substring(col("order_id"), 4, length(col("order_id"))))
#split data
shipment_df=shipment_df.withColumn("spilt_date",split(col("shipment_date"),"-"))
shipment_df=shipment_df.withColumn("ship_year",col("spilt_date")[0]).withColumn("ship_month",col("spilt_date")[1]).withColumn("ship_day",col("spilt_date")[2]).drop("spilt_date")
#merge into route lane
shipment_df=shipment_df.withColumn("route_lane",concat(col("source_city"),lit("->"),col("destination_city")))

## 3. Data Customization & Processing - Application of Tailored Business Specific Rules

### **UDF1: Complex Incentive Calculation**
**Scenario:** The Logistics Head wants to calculate a "Performance Bonus" for drivers based on tenure and role complexity.

**Action:** Create a Python function `calculate_bonus(role, age)` and register it as a Spark UDF.

**Logic:**
* **IF** `Role` == 'Driver' **AND** `Age` > 50:
  * `Bonus` = 15% of Salary (Reward for Seniority)
* **IF** `Role` == 'Driver' **AND** `Age` < 30:
  * `Bonus` = 5% of Salary (Encouragement for Juniors)
* **ELSE**:
  * `Bonus` = 0

**Result:** A new derived column `projected_bonus` is generated for every row in the dataset.

---

### **UDF2: PII Masking (Privacy Compliance)**
**Scenario:** For the analytics dashboard, we must hide the full identity of the staff to comply with privacy laws (GDPR/DPDP), while keeping names recognizable for internal managers.

**Business Rule:** Show the first 2 letters, mask the middle characters with `****`, and show the last letter.

**Action:** Create a UDF `mask_identity(name)`.

**Example:**
* **Input:** `"Rajesh"`
* **Output:** `"Ra****h"`
<br>
**Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.**

In [0]:

def calculate_bonus(role:str,age:int):
    if role.upper()=="DRIVER" and age>50:
        return 0.15
    elif role.upper()=="DRIVER" and age<30:
        return 0.05
    else:
        return 0
bonus_udf = udf(calculate_bonus)
uniform_df1=uniform_df.withColumn("projected_bonus",bonus_udf(col("role"),col("age")))
display(uniform_df1)


In [0]:

uniform_df_bonus=uniform_df.withColumn("projected_bonus",when((col("role")=="driver") & (col("age")>50),0.15).when((col("role")=="driver") & (col("age")<30),0.05).otherwise(0))
display(uniform_df_bonus)

In [0]:

def mask_identities(name):
    if name is None:
        return None
    if len(name) <= 2:
        return name
    return name[:2] + "*" * (len(name) - 3) + name[-1]
mask_identities_udf=udf(mask_identities)
uniform_df.columns

uniform_df1=uniform_df.withColumn("full_name",mask_identities_udf(col("full_name")))
display(uniform_df1)

In [0]:

uniform_df=uniform_df.withColumn("full_name",concat(
    substring(col("full_name"),1,2),
    repeat(lit("*"),length(col("full_name"))-3),
    substring(col("full_name"),-1,1)))
display(uniform_df)

## 4. Data Core Curation & Processing (Pre-Wrangling)
*Applying business logic to focus, filter, and summarize data before final analysis.*

**1. Select (Projection)**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The Driver App team only needs location data, not sensitive HR info.
* **Action:** Select only `first_name`, `role`, and `hub_location`.

**2. Filter (Selection)**<br>
Source File: DF of json<br>
* **Scenario:** We need a report on active operational problems.
* **Action:** Filter rows where `shipment_status` is **'DELAYED'** or **'RETURNED'**.
* **Scenario:** Insurance audit for senior staff.
* **Action:** Filter rows where `age > 50`.

**3. Derive Flags & Columns (Business Logic)**<br>
Source File: DF of json<br>
* **Scenario:** Identify high-value shipments for security tracking.
* **Action:** Create flag `is_high_value` = **True** if `shipment_cost > 50,000`.
* **Scenario:** Flag weekend operations for overtime calculation.
* **Action:** Create flag `is_weekend` = **True** if day is Saturday or Sunday.

**4. Format (Standardization)**<br>
Source File: DF of json<br>
* **Scenario:** Finance requires readable currency formats.
* **Action:** Format `shipment_cost` to string like **"₹30,695.80"**.
* **Scenario:** Standardize city names for reporting.
* **Action:** Format `source_city` to Uppercase (e.g., "chennai" → **"CHENNAI"**).

**5. Group & Aggregate (Summarization)**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** Regional staffing analysis.
* **Action:** Group by `hub_location` and **Count** the number of staff.
* **Scenario:** Fleet capacity analysis.
* **Action:** Group by `vehicle_type` and **Sum** the `shipment_weight_kg`.

**6. Sorting (Ordering)**<br>
Source File: DF of json<br>
* **Scenario:** Prioritize the most expensive shipments.
* **Action:** Sort by `shipment_cost` in **Descending** order.
* **Scenario:** Organize daily dispatch schedule.
* **Action:** Sort by `shipment_date` (Ascending) then `priority_flag` (Descending).

**7. Limit (Top-N Analysis)**<br>
Source File: DF of json<br>
* **Scenario:** Dashboard snapshot of critical delays.
* **Action:** Filter for 'DELAYED', Sort by Cost, and **Limit to top 10** rows.

In [0]:
#selecting required columns 
driver_team_df=uniform_df.select("full_name","role","origin_hub_city")
#Filter rows based on conditions
operational_prblm_df=shipment_df.where(col("shipment_status").isin(["DELAYED","RETURNED"]))
audit_df=uniform_df.where(col("age")>50)

#Derive flag
shipment_df=shipment_df.withColumn("is_high_value",when(col("shipment_cost")>50000,lit(True)).otherwise(lit(False)))
#display(shipment_df.select("is_high_value","shipment_cost"))

#Format Standardisation
shipment_df=shipment_df.withColumn("finance_shipment_cost",col("shipment_cost").cast("string"))

shipment_df=shipment_df.withColumn("finance_shipment_cost",concat(lit("$"),col("shipment_cost")))


shipment_df=shipment_df.withColumn("source_city",upper(col("source_city")))



In [0]:

#Grouping data
display(uniform_df.groupBy("origin_hub_city").agg(count("*").alias("Hub_wise_staff_count")))
display(shipment_df.groupBy("vehicle_type").agg(sum("shipment_weight_kg").alias("Total_weight_kg")))

In [0]:
#sorting
display(shipment_df.orderBy("shipment_cost",ascending=False))
display(shipment_df.orderBy("shipment_date","is_high_value",ascending=[True,False]))

In [0]:

#Top n analysis
display(shipment_df.where(col("shipment_status")=="DELAYED").orderBy(col("shipment_cost").desc()).limit(10))

## 5. Data Wrangling - Transformation & Analytics
*Combining, modeling, and analyzing data to answer complex business questions.*

### **1. Joins**
Source Files:<br>
Left Side (staff_df):<br> DF of logistics_source1 & logistics_source2<br>
Right Side (shipments_df):<br> DF of logistics_shipment_detail_3000.json<br>
#### **1.1 Frequently Used Simple Joins (Inner, Left)**
* **Inner Join (Performance Analysis):**
  * **Scenario:** We only want to analyze *completed work*. Connect Staff to the Shipments they handled.
  * **Action:** Join `staff_df` and `shipments_df` on `shipment_id`.
  * **Result:** Returns only rows where a staff member is assigned to a valid shipment.
* **Left Join (Idle Resource check):**
  * **Scenario:** Find out which staff members are currently *idle* (not assigned to any shipment).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right) on `shipment_id`. Filter where `shipments_df.shipment_id` is NULL.

#### **1.2 Infrequent Simple Joins (Self, Right, Full, Cartesian)**
* **Self Join (Peer Finding):**
  * **Scenario:** Find all pairs of employees working in the same `hub_location`.
  * **Action:** Join `staff_df` to itself on `hub_location`, filtering where `staff_id_A != staff_id_B`.
* **Right Join (Orphan Data Check):**
  * **Scenario:** Identify shipments in the system that have *no valid driver* assigned (Data Integrity Issue).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right). Focus on NULLs on the left side.
* **Full Outer Join (Reconciliation):**
  * **Scenario:** A complete audit to find *both* idle drivers AND unassigned shipments in one view.
  * **Action:** Perform a Full Outer Join on `shipment_id`.
* **Cartesian/Cross Join (Capacity Planning):**
  * **Scenario:** Generate a schedule of *every possible* driver assignment to *every* pending shipment to run an optimization algorithm.
  * **Action:** Cross Join `drivers_df` and `pending_shipments_df`.

#### **1.3 Advanced Joins (Semi and Anti)**
* **Left Semi Join (Existence Check):**
  * **Scenario:** "Show me the details of Drivers who have *at least one* shipment." (Standard filtering).
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_semi")`.
  * **Benefit:** Performance optimization; it stops scanning the right table once a match is found.
* **Left Anti Join (Negation Check):**
  * **Scenario:** "Show me the details of Drivers who have *never* touched a shipment."
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_anti")`.

### **2. Lookup**<br>
Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF)<br>
* **Scenario:** Validation. Check if the `hub_location` in the staff file exists in the corporate `Master_City_List`.
* **Action:** Compare values against a reference list.

### **3. Lookup & Enrichment**<br>
Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF)<br>
* **Scenario:** Geo-Tagging.
* **Action:** Lookup `hub_location` ("Pune") in a Master Latitude/Longitude table and enrich the dataset by adding `lat` and `long` columns for map plotting.

### **4. Schema Modeling (Denormalization)**<br>
Source Files: DF of All 3 Files (logistics_source1, logistics_source2, logistics_shipment_detail_3000.json)<br>
* **Scenario:** Creating a "Gold Layer" Table for PowerBI/Tableau.
* **Action:** Flatten the Star Schema. Join `Staff`, `Shipments`, and `Vehicle_Master` into one wide table (`wide_shipment_history`) so analysts don't have to perform joins during reporting.

### **5. Windowing (Ranking & Trends)**<br>
Source Files:<br>
DF of logistics_source2: Provides hub_location (Partition Key).<br>
logistics_shipment_detail_3000.json: Provides shipment_cost (Ordering Key)<br>
* **Scenario:** "Who are the Top 3 Drivers by Cost in *each* Hub?"
* **Action:**
  1. Partition by `hub_location`.
  2. Order by `total_shipment_cost` Descending.
  3. Apply `dense_rank()` and `row_number()
  4. Filter where `rank or row_number <= 3`.

### **6. Analytical Functions (Lead/Lag)**<br>
Source File: <br>
DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** Idle Time Analysis.
* **Action:** For each driver, calculate the days elapsed since their *previous* shipment.

### **7. Set Operations**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Union:** Combining `Source1` (Legacy) and `Source2` (Modern) into one dataset (Already done in Active Munging).
* **Intersect:** Identifying Staff IDs that appear in *both* Source 1 and Source 2 (Duplicate/Migration Check).
* **Except (Difference):** Identifying Staff IDs present in Source 2 but *missing* from Source 1 (New Hires).

### **8. Grouping & Aggregations (Advanced)**<br>
Source Files:<br>
DF of logistics_source2: Provides hub_location and vehicle_type (Grouping Dimensions).<br>
DF of logistics_shipment_detail_3000.json: Provides shipment_cost (Aggregation Metric).<br>
* **Scenario:** The CFO wants a subtotal report at multiple levels:
  1. Total Cost by Hub.
  2. Total Cost by Hub AND Vehicle Type.
  3. Grand Total.
* **Action:** Use `cube("hub_location", "vehicle_type")` or `rollup()` to generate all these subtotals in a single query.

In [0]:
#Basic Joins(Inner and left)

#Inner join
'''Unable to directly join and write as table as vechile type is present in both df and table is not accepting duplicate column name'''
shipment_df=shipment_df.withColumnRenamed("vehicle_type","shipment_vehicle_type")
inner_joined_df=uniform_df.join(shipment_df,how="inner",on="shipment_id")

#or 
'''display(uniform_df.join(shipment_df,how="inner",on="shipment_id").where(col(shipment_status)=='DELIVERED)'''

#Left join
u = uniform_df.alias("u")
s = shipment_df.alias("s")
display(u.join(s,how="left",on=col("u.shipment_id") == col("s.shipment_id")).where(col("s.shipment_id").isNull()).select("u.*"))


In [0]:
#self_join

display(uniform_df.alias("a").join(uniform_df.alias("b"),on=col("a.origin_hub_city")==col("b.origin_hub_city")).where(col("a.shipment_id") != col("b.shipment_id")).select("a.shipment_id","a.origin_hub_city","b.shipment_id","b.origin_hub_city"))

In [0]:


  #right_join
  display(uniform_df.alias("u").join(shipment_df.alias("s"),how="right",on=col("u.shipment_id")==col("s.shipment_id")).where(col("u.shipment_id").isNull()).select("s.*"))
  #or the below scenario checks if the shipment is assigned to some other except driver
  display(uniform_df.alias("u").join(shipment_df.alias("s"),how="right",on=col("u.shipment_id")==col("s.shipment_id")).where((col("u.shipment_id").isNotNull()) & (col("u.role")=="driver")).select("u.*"))
  
  #full_outer_join
display(uniform_df.alias("u").join(shipment_df.alias("s"),col("u.shipment_id") == col("s.shipment_id"),how="full").where((col("u.shipment_id").isNull()) |(col("s.shipment_id").isNull())))

#cross_join
pending_shipment_df=shipment_df.where((col("shipment_status")=="IN_TRANSIT") | (col("shipment_status")=="DELAYED"))
driver_df=uniform_df.where(col("role")=="driver")
#display(driver_df.join(pending_shipment_df,how="cross"))

In [0]:
#left semi join
display(uniform_df.join(shipment_df,how="left_semi",on="shipment_id"))
#left anti join
display(uniform_df.join(shipment_df,how="left_anti",on="shipment_id"))

In [0]:

master_city_list=spark.read.csv("/Volumes/usecase_data/logistics_proj_data/projdata/Master_City_List.csv",header=True,inferSchema=True)
#Lookup
display(uniform_df.alias("u").join(master_city_list.alias("m"),how="left_semi",on=col("u.origin_hub_city")==col("m.city_name")).select("u.origin_hub_city").distinct())

#Lookup and enrichment
geo_tagging_df=uniform_df.alias("u").join(master_city_list.alias("m"),how="left",on=col("u.origin_hub_city")==col("m.city_name"))

In [0]:
from pyspark.sql.window import Window

#Schema modeling(Demormalisation)
joined_df=uniform_df.join(shipment_df,how="inner",on="shipment_id")
#windowing functions
window_spec = Window.partitionBy("origin_hub_city").orderBy(col("shipment_cost").desc())
display(joined_df.withColumn("dense_rk",dense_rank().over(window_spec)).where(col("dense_rk") <= 3))
top3_driver_df=joined_df.withColumn("row_num",row_number().over(window_spec)).where(col("row_num") <= 3)



In [0]:
display(shipment_df)
window_spec1=Window.partitionBy(col("shipment_id")).orderBy(col("shipment_date"))
idle_driver_analysis=shipment_df.withColumn("idle_time",datediff(col("shipment_date"),lag(col("shipment_date"),1).over(window_spec1)))

In [0]:
active_mung_df3=active_mung_df1.withColumn("hub_location",lit(None)).withColumn("vehicle_type",lit(None))
'''Union, expectALL, Intersect can only be performed on the sets with equal number of columns.. hence and adding them with Null values'''

#union - joins by position, expects column order and type to be same
#union-has duplicates
All_staff_data=active_mung_df3.union(active_mung_df2)
'''UNION can only be performed on inputs with the same number of columns, but the first input has 6 columns and the second input has 8 columns. SQLSTATE: 42826'''

#unionAll - joins by position
#Above case failed so using the unionByName:
active_mung_df3.unionByName(active_mung_df2,allowMissingColumns=True)

#Intersect -> no duplicates -> rows present in both df1 and df2
duplicate_staff_id=active_mung_df3.intersect(active_mung_df2)
'''[NUM_COLUMNS_MISMATCH] INTERSECT can only be performed on inputs with the same number of columns, but the first input has 6 columns and the second input has 8 columns. SQLSTATE: 42826'''

#ExceptAll(df1-df2) -> withduplicates
New_hires_list=active_mung_df3.exceptAll(active_mung_df2)

'''#Expect(df1-df2) -> withoutduplicates
#active_mung_df1.except(active_mung_df2).display()'''


In [0]:
multilevel_subtotal_report=inner_joined_df.cube("origin_hub_city","vehicle_type").agg(sum("shipment_cost").alias("total_cost"))
display(multilevel_subtotal_report)

##6. Data Persistance (LOAD)-> Data Publishing & Consumption<br>

Store the inner joined, lookup and enrichment, Schema Modeling, windowing, analytical functions, set operations, grouping and aggregation data into the delta tables.

In [0]:
inner_joined_df.write.saveAsTable("usecase_data.logistics_proj_data.denormalized_table",mode="overwrite")
geo_tagging_df.write.saveAsTable("usecase_data.logistics_proj_data.geo_tag_table",mode="overwrite")
inner_joined_df.write.saveAsTable("usecase_data.logistics_proj_data.wide_shipment_history",mode="overwrite")
top3_driver_df.write.saveAsTable("usecase_data.logistics_proj_data.top_performing_driver",mode="overwrite")
idle_driver_analysis.write.saveAsTable("usecase_data.logistics_proj_data.idle_driver_analysis",mode="overwrite")
All_staff_data.write.saveAsTable("usecase_data.logistics_proj_data.all_staff_data",mode="overwrite")
New_hires_list.write.saveAsTable("usecase_data.logistics_proj_data.new_hires_list",mode="overwrite") 
duplicate_staff_id.write.saveAsTable("usecase_data.logistics_proj_data.duplicate_staff_list",mode="overwrite")
multilevel_subtotal_report.write.saveAsTable("usecase_data.logistics_proj_data.multilevel_subtotal_report",mode="overwrite")




##7.Take the copy of the above notebook and try to write the equivalent SQL for which ever applicable.