#Enterprise Fleet Analytics Pipeline: Focuses on the business outcome (analytics) and the domain (fleet/logistics).

![logistics](logistics_project.png)

Download the data from the below gdrive and upload into the catalog
https://drive.google.com/drive/folders/1J3AVJIPLP7CzT15yJIpSiWXshu1iLXKn?usp=drive_link

In [0]:
%sql
create volume if not exists workspace.default.logistics

##**1. Data Munging** -

####1. Visibily/Manually opening the file and capture couple of data patterns (Manual Exploratory Data Analysis)

####2. Programatically try to find couple of data patterns applying below EDA (File: logistics_source1)
1. Apply inferSchema and toDF to create a DF and analyse the actual data.
2. Analyse the schema, datatypes, columns etc.,
3. Analyse the duplicate records count and summary of the dataframe.

In [0]:
#2.1
rawdf=spark.read.csv("/Volumes/workspace/default/logistics/logistic_data/logistics_source1",inferSchema=True,header=True)
display(rawdf.limit(10))
rawdf1=spark.read.csv("/Volumes/workspace/default/logistics/logistic_data/logistics_source2",inferSchema=True,header=True)
display(rawdf1.limit(10))

In [0]:
#2.3
print("total no of count",rawdf.count())
print("total no of count",rawdf.dropDuplicates().count())
display(rawdf.summary())

###a. Passive Data Munging -  (File: logistics_source1  and logistics_source2)
Without modifying the data, identify:<br>
Shipment IDs that appear in both master_v1 and master_v2<br>
Records where:<br>
1. shipment_id is non-numeric
2. age is not an integer<br> 
3. Count rows having:fewer columns than expected
4. Count rows having:more columns than expected

In [0]:
combinedf = spark.read.csv(path=["/Volumes/workspace/default/logistics/logistic_data/logistics_source1","/Volumes/workspace/default/logistics/logistic_data/logistics_source2"],header=True,inferSchema = True)
display(combinedf.show(10))
#a.1
#invalid_id=combinedf.where("shipment_id rlike '[a-zA-Z]'").show()
#a.2
ageisnumber=combinedf.where("age rlike '[^0-9]'").show(10)

In [0]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,ShortType
schema=StructType([StructField("shipment_id",StringType(),True),
                   StructField("first_name",StringType(),True),
                   StructField("last_name",StringType(),True),
                   StructField("age",ShortType(),True),
                   StructField("role",StringType(),True),
                   StructField("corruptedrows",StringType())])

df_1 = spark.read.schema(schema).csv(path="/Volumes/workspace/default/logistics/logistic_data/logistics_source1",header=True,mode='permissive',columnNameOfCorruptRecord="corruptedrows")
#display(df_1)
#a.3
fewer_df = df_1.where("size(split(corruptedrows, ',')) < 5")
display(fewer_df)
display(len(fewer_df.collect()))
#a.4
more_df = df_1.where("size(split(corruptedrows, ',')) > 5")
display(more_df)
display(len(more_df.collect()))

###**b. Active Data Munging** File: logistics_source1 and logistics_source2

#####1.Combining Data + Schema Merging (Structuring)
1. Read both files without enforcing schema
2. Align them into a single canonical schema: shipment_id,
first_name,
last_name,
age,
role,
hub_location,
vehicle_type,
data_source
3. Add data_source column with values as: system1, system2 in the respective dataframes

In [0]:
from pyspark.sql.functions import lit,col

# 1.Read both files without enforcing schema 
rawdf1 = spark.read.csv("/Volumes/workspace/default/logistics/logistic_data/logistics_source1",header= True)
rawdf2 = spark.read.csv("/Volumes/workspace/default/logistics/logistic_data/logistics_source2",header= True) 
 
# 2.Align them into a single canonical schema: shipment_id, first_name, last_name, age, role, hub_location, vehicle_type, data_source
rawdf1 = rawdf1.withColumn("data_source",lit("system1"))
rawdf2 = rawdf2.withColumn("data_source",lit("system2"))
combine_df = rawdf1.unionByName(rawdf2,allowMissingColumns = True)
merged_df = combine_df.select("shipment_id","first_name","last_name","age","role","hub_location","vehicle_type","data_source")
display(merged_df)


#####2. Cleansing, Scrubbing: 
Cleansing (removal of unwanted datasets)<br>
1. Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role<br>
2. Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name<br>
3. Join Readiness Rule - Drop records where the join key is null: shipment_id<br>

Scrubbing (convert raw to tidy)<br>
4. Age Defaulting Rule - Fill NULL values in the age column with: -1<br>
5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN<br>
6. Invalid Age Replacement - Replace the following values in age:
"ten" to -1
"" to -1<br>
7. Vehicle Type Normalization - Replace inconsistent vehicle types: 
truck to LMV
bike to TwoWheeler

In [0]:
from pyspark.sql.functions import when
# 1.Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role

cleansed_df1 = merged_df.na.drop(how="any",subset=["shipment_id","role"])
#display(cleansed_df1)

# 2.Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name 
cleansed_df2 = cleansed_df1.na.drop(how="all",subset=["first_name","last_name"])
#display(cleansed_df2)

# 3. Join Readiness Rule - Drop records where the join key is null: shipment_id and shipment_id contains only digits
cleansed_df3 = cleansed_df2.where("shipment_id IS NOT NULL AND shipment_id rlike '^[0-9]+$'") 
#display(cleansed_df3) 

# Scrubbing
# 4. Age Defaulting Rule - Fill NULL values in the age column with: -1
scrubbed_df1 = cleansed_df3.na.fill("-1",subset=["age"]) 
#display(scrubbed_df1)

# 5.Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN
scrubbed_df2 = scrubbed_df1.na.fill("Unknown",subset=["vehicle_type"])
#display(scrubbed_df2)

# 6.Invalid Age Replacement - Replace the following values in age: "ten" to -1, "" to -1
find_replace = {"ten": "-1","": "-1" }
scrubbed_df3 = scrubbed_df2.na.replace(find_replace,subset=['age'])
#display(scrubbed_df3)

# 7.Vehicle Type Normalization - Replace inconsistent vehicle types: truck to LMV bike to TwoWheeler
find_replace_2 = {'Truck':'LMV','Bike':'TwoWheeler'}
scrubbed_df4 = scrubbed_df3.na.replace(find_replace_2,subset=["vehicle_type"])
display(scrubbed_df4)

####3. Standardization, De-Duplication and Replacement / Deletion of Data to make it in a usable format

Creating shipments Details data Dataframe creation <br>
1. Create a DF by Reading Data from logistics_shipment_detail.json
2. As this data is a clean json data, it doesn't require any cleansing or scrubbing.

In [0]:
jsondf1=spark.read.json("/Volumes/workspace/default/logistics/logistic_data/logistics_shipment_detail_3000.json",multiLine=True)
jsondf1.printSchema() 
display(jsondf1)

Standardizations:<br>

1. Add a column<br> 
Source File: DF of logistics_shipment_detail_3000.json<br>: domain as 'Logistics',  current timestamp 'ingestion_timestamp' and 'False' as 'is_expedited'
2. Column Uniformity: 
role - Convert to lowercase<br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
vehicle_type - Convert values to UPPERCASE<br>
Source Files: DF of logistics_shipment_detail_3000.json
hub_location - Convert values to initcap case<br>
Source Files: DF of merged(logistics_source1 & logistics_source2)<br>


In [0]:
from pyspark.sql.functions import lit,col,lower,upper,current_timestamp,initcap,to_date,cast
#1
addcolumndf=jsondf1.withColumn("domain",lit("Logistics"))\
    .withColumn("ingestion_timestamp", current_timestamp())

expedited_df =addcolumndf.withColumn("is_expedited",lit("False"))
display(expedited_df)

#2
coluni_df2= scrubbed_df4.withColumn("role",lower(col("role")))\
    .withColumn('vehicle_type',upper(col('vehicle_type')))\
        .withColumn('hub_location',initcap(col('hub_location')))          
display(coluni_df2) 
print(coluni_df2.printSchema)

3. Format Standardization:<br>
Source Files: DF of logistics_shipment_detail_3000.json<br>
Convert shipment_date to yyyy-MM-dd<br>
Ensure shipment_cost has 2 decimal precision<br>
4. Data Type Standardization<br>
Standardizing column data types to fix schema drift and enable mathematical operations.<br>
Source File: DF of merged(logistics_source1 & logistics_source2) <br>
age: Cast String to Integer<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
shipment_weight_kg: Cast to Double<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
is_expedited: Cast to Boolean<br>

In [0]:
from pyspark.sql.functions import *
#3
convertdatedf=expedited_df.withColumn("shipment_date",
    date_format(to_date(col("shipment_date"), "yy-MM-dd"), "yyyy-MM-dd"))
#display(convertdatedf)

shipment_cost_df=convertdatedf.withColumn("shipment_cost",col("shipment_cost").cast("double"))
#display(shippingcostdf)

#4
dtype_df=scrubbed_df4.withColumn("age",col("age").cast("integer"))
#display(dtype_df)

expedited_cast_df=shipment_cost_df.withColumn("shipment_weight_kg",col("shipment_weight_kg").cast("double"))\
              .withColumn("is_expedited",when((col("shipment_status")=="IN_TRANSIT"),True).otherwise(False))
display(expedited_cast_df)

5. Naming Standardization <br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
Rename: first_name to staff_first_name<br>
Rename: last_name to staff_last_name<br>
Rename: hub_location to origin_hub_city<br>
6. Reordering columns logically in a better standard format:<br>
Source File: DF of Data from all 3 files<br>
shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)

In [0]:
#5
namingdf = scrubbed_df4.withColumnRenamed("first_name", "staff_first_name") \
    .withColumnRenamed("last_name", "staff_last_name") \
    .withColumnRenamed("hub_location", "origin_hub_city")
#display(namingdf)

expedited_cast_df =expedited_cast_df.select("shipment_id","order_id","cargo_type","vehicle_type","shipment_status","payment_mode","destination_city","source_city","shipment_weight_kg","shipment_cost","shipment_date","domain","is_expedited","ingestion_timestamp")
#display(expedited_cast_df)

raw_data_df1 = rawdf1.select("shipment_id", "first_name","last_name","role","age","data_source")
#display(raw_data_df1)

raw_data_df2 = rawdf2.select("shipment_id", "first_name","last_name","role","age","vehicle_type","hub_location","data_source")
#display(raw_data_df2)
 


Deduplication:
1. Apply Record Level De-Duplication
2. Apply Column Level De-Duplication (Primary Key Enforcement)

In [0]:
#1
duplicate_df = jsondf1.dropDuplicates()
#display(duplicate_df.count())

#2
col_duplicate_df = jsondf1.dropDuplicates(["shipment_id"])
display(col_duplicate_df.count())

##2. Data Enrichment - Detailing of data
Makes your data rich and detailed <br>

###### Adding of Columns (Data Enrichment)
*Creating new derived attributes to enhance traceability and analytical capability.*

**1. Add Audit Timestamp (`load_dt`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** We need to track exactly when this record was ingested into our Data Lakehouse for auditing purposes.
* **Action:** Add a column `load_dt` using the function `current_timestamp()`.

**2. Create Full Name (`full_name`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The reporting dashboard requires a single field for the driver's name instead of separate columns.
* **Action:** Create `full_name` by concatenating `first_name` and `last_name` with a space separator.
* **Result:** "Rajesh" + " " + "Kumar" -> **"Rajesh Kumar"**

**3. Define Route Segment (`route_segment`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
* **Action:** Combine `source_city` and `destination_city` with a hyphen.
* **Result:** "Chennai" + "-" + "Pune" -> **"Chennai-Pune"**

**4. Generate Vehicle Identifier (`vehicle_identifier`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** We need a unique tracking code that immediately tells us the vehicle type and the shipment ID.
* **Action:** Combine `vehicle_type` and `shipment_id` to create a composite key.
* **Result:** "Truck" + "_" + "500001" -> **"Truck_500001"**

In [0]:

from pyspark.sql.functions import *
from pyspark.sql.types import *

#1
audit_df = scrubbed_df4.withColumn("load_dt",current_timestamp())
#display(audit_df)

#2
concat_df = audit_df.withColumn('full_name',concat_ws(" ",col("first_name"),col("last_name")))
#display(concat_df)

#3
concat_df1 = jsondf1.withColumn('source_destination',concat_ws("-",col("source_city"),col("destination_city")))
#display(concat_df1)

#4
concat_df2=jsondf1.withColumn("vehicle_identifier",concat(col("vehicle_type"),lit("_"),col("shipment_id")))
display(concat_df2)

###### Deriving of Columns (Time Intelligence)
*Extracting temporal features from dates to enable period-based analysis and reporting.*<br>
Source File: logistics_shipment_detail_3000.json<br>
**1. Derive Shipment Year (`shipment_year`)**
* **Scenario:** Management needs an annual performance report to compare growth year-over-year.
* **Action:** Extract the year component from `shipment_date`.
* **Result:** "2024-04-23" -> **2024**

**2. Derive Shipment Month (`shipment_month`)**
* **Scenario:** Analysts want to identify seasonal peaks (e.g., increased volume in December).
* **Action:** Extract the month component from `shipment_date`.
* **Result:** "2024-04-23" -> **4** (April)

**3. Flag Weekend Operations (`is_weekend`)**
* **Scenario:** The Operations team needs to track shipments handled during weekends to calculate overtime pay or analyze non-business day capacity.
* **Action:** Flag as **'True'** if the `shipment_date` falls on a Saturday or Sunday.

**4. Flag shipment status (`is_expedited`)**
* **Scenario:** The Operations team needs to track shipments is IN_TRANSIT or DELIVERED.
* **Action:** Flag as **'True'** if the `shipment_status` IN_TRANSIT or DELIVERED.

In [0]:
#1
year_extract_df = expedited_cast_df.withColumn("shipment_year",year("shipment_date"))

#2
year_month_df = year_extract_df.withColumn("shipment_month",month("shipment_date"))

#3
flag_df = year_month_df.withColumn("is_weekend",when((dayofweek("shipment_date") == 1) | (dayofweek("shipment_date") == 7), True).otherwise(False))

#4
flag_df1 = expedited_cast_df.withColumn("is_expedited",when((col("shipment_status") == "IN_TRANSIT") | (col("shipment_status") == "DELIVERED"), True).otherwise(col("is_expedited"))
)
display(flag_df1)

###### Enrichment/Business Logics (Calculated Fields)
*Deriving new metrics and financial indicators using mathematical and date-based operations.*<br>
Source File: logistics_shipment_detail_3000.json<br>

**1. Calculate Unit Cost (`cost_per_kg`)**
* **Scenario:** The Finance team wants to analyze the efficiency of shipments by determining the cost incurred per unit of weight.
* **Action:** Divide `shipment_cost` by `shipment_weight_kg`.
* **Logic:** `shipment_cost / shipment_weight_kg`

**2. Track Shipment Age (`days_since_shipment`)**
* **Scenario:** The Operations team needs to monitor how long it has been since a shipment was dispatched to identify potential delays.
* **Action:** Calculate the difference in days between the `current_date` and the `shipment_date`.
* **Logic:** `datediff(current_date(), shipment_date)`

**3. Compute Tax Liability (`tax_amount`)**
* **Scenario:** For invoicing and compliance, we must calculate the Goods and Services Tax (GST) applicable to each shipment.
* **Action:** Calculate 18% GST on the total `shipment_cost`.
* **Logic:** `shipment_cost * 0.18`

In [0]:
#1
cost_unit_of_weight = expedited_cast_df.withColumn("cost_per_kg",(col("shipment_cost")/col("shipment_weight_kg")).cast("decimal(10,2)"))
#display(unitcost_df)

#2
days_diff_df = cost_unit_of_weight.withColumn("days_since_shipment", datediff(current_date(), col("shipment_date")))
#display(shipment_age_df)

#3
tax_calc_df = days_diff_df.withColumn("tax_amount",(col("shipment_cost")*lit(0.18)).cast("decimal(10,2)"))
display(tax_calc_df)

###### Remove/Eliminate (drop, select, selectExpr)
*Excluding unnecessary or redundant columns to optimize storage and privacy.*<br>
Source File: DF of logistics_source1 and logistics_source2<br>

**1. Remove Redundant Name Columns**
* **Scenario:** Since we have already created the `full_name` column in the Enrichment step, the individual name columns are now redundant and clutter the dataset.
* **Action:** Drop the `first_name` and `last_name` columns.
* **Logic:** `df.drop("first_name", "last_name")`

In [0]:
drop_first_last_name_df = concat_df.drop("first_name","last_name")
source2_df = drop_first_last_name_df.select("shipment_id","full_name","role","age","hub_location","vehicle_type","data_source","load_dt")
display(source2_df)

##### Splitting & Merging/Melting of Columns
*Reshaping columns to extract hidden values or combine fields for better analysis.*<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
**1. Splitting (Extraction)**
*Breaking one column into multiple to isolate key information.*
* **Split Order Code:**
  * **Action:** Split `order_id` ("ORD100000") into two new columns:
    * `order_prefix` ("ORD")
    * `order_sequence` ("100000")
* **Split Date:**
  * **Action:** Split `shipment_date` into three separate columns for partitioning:
    * `ship_year` (2024)
    * `ship_month` (4)
    * `ship_day` (23)

**2. Merging (Concatenation)**
*Combining multiple columns into a single unique identifier or description.*
* **Create Route ID:**
  * **Action:** Merge `source_city` ("Chennai") and `destination_city` ("Pune") to create a descriptive route key:
    * `route_lane` ("Chennai->Pune")

In [0]:
1#
split_df = tax_calc_df.withColumn("order_prefix",substring(col("order_id"),1,3))\
    .withColumn("order_sequence",substring(col("order_id"),4,9)) \
    .withColumn("ship_year", year("shipment_date")) \
    .withColumn("ship_month", month("shipment_date")) \
    .withColumn("ship_day", dayofmonth("shipment_date"))
#display(split_df.limit(3))

2#
route_id_df = split_df. withColumn("route_lane",concat_ws("->","source_city","destination_city"))
#display(route_id_df.printSchema())
display(route_id_df.select("shipment_id","order_id","cargo_type","vehicle_type","shipment_status","payment_mode","destination_city","source_city","shipment_weight_kg","shipment_cost","shipment_date","domain","is_expedited","ingestion_timestamp","cost_per_kg","days_since_shipment","tax_amount","order_prefix","order_sequence","ship_year","ship_month","ship_day","route_lane"))
 

## 3. Data Customization & Processing - Application of Tailored Business Specific Rules

### **UDF1: Complex Incentive Calculation**
**Scenario:** The Logistics Head wants to calculate a "Performance Bonus" for drivers based on tenure and role complexity.

**Action:** Create a Python function `calculate_bonus(role, age)` and register it as a Spark UDF.

**Logic:**
* **IF** `Role` == 'Driver' **AND** `Age` > 50:
  * `Bonus` = 15% of Salary (Reward for Seniority)
* **IF** `Role` == 'Driver' **AND** `Age` < 30:
  * `Bonus` = 5% of Salary (Encouragement for Juniors)
* **ELSE**:
  * `Bonus` = 0

**Result:** A new derived column `projected_bonus` is generated for every row in the dataset.

---

### **UDF2: PII Masking (Privacy Compliance)**
**Scenario:** For the analytics dashboard, we must hide the full identity of the staff to comply with privacy laws (GDPR/DPDP), while keeping names recognizable for internal managers.

**Business Rule:** Show the first 2 letters, mask the middle characters with `****`, and show the last letter.

**Action:** Create a UDF `mask_identity(name)`.

**Example:**
* **Input:** `"Rajesh"`
* **Output:** `"Ra****h"`
<br>
**Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.**

In [0]:
from pyspark.sql.functions import udf
#udf1
def calculate_bonus(role,age):
    age = int(age)                    
    if  role == 'Driver' and age > 50:
        return 0.15
    elif role == 'Driver' and age < 30:
        return 0.05
    else:
        return 0
    
bonusfn= udf(calculate_bonus)
perf_bonus_df = source2_df.withColumn("projected_bonus",bonusfn(col("role"),col("age")))
#display(perf_bonus_df)

#udf2
def mask_identity(name):
    return name[0:2]+"****"+name[-1]

maskfn = udf(mask_identity)
mask_identity_df = perf_bonus_df.withColumn("name",maskfn(col("full_name")))
display(mask_identity_df)

## 4. Data Core Curation & Processing (Pre-Wrangling)
*Applying business logic to focus, filter, and summarize data before final analysis.*

**1. Select (Projection)**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The Driver App team only needs location data, not sensitive HR info.
* **Action:** Select only `first_name`, `role`, and `hub_location`.

**2. Filter (Selection)**<br>
Source File: DF of json<br>
* **Scenario:** We need a report on active operational problems.
* **Action:** Filter rows where `shipment_status` is **'DELAYED'** or **'RETURNED'**.
* **Scenario:** Insurance audit for senior staff.
* **Action:** Filter rows where `age > 50`.

**3. Derive Flags & Columns (Business Logic)**<br>
Source File: DF of json<br>
* **Scenario:** Identify high-value shipments for security tracking.
* **Action:** Create flag `is_high_value` = **True** if `shipment_cost > 50,000`.
* **Scenario:** Flag weekend operations for overtime calculation.
* **Action:** Create flag `is_weekend` = **True** if day is Saturday or Sunday.

In [0]:
#1
projection_df = concat_df.select("first_name","role","hub_location")
#display(projection_dfter_df)

#2
selection_df =expedited_cast_df.filter((col("shipment_status") == "DELIVERED") & (col("shipment_status") != "RETURNED"))
#display(selection_df)
selection_df1=concat_df.filter((col("age") > 50))
display(selection_df1)

#3
flag_df1 = expedited_cast_df.withColumn("is_high_value",col("shipment_cost")> 50000)
flag_df2 = flag_df1.withColumn("is_weekend",when(dayofweek(col('shipment_date')).isin([1,7]),True).otherwise(False))

## 4. Data Core Curation & Processing (Pre-Wrangling)

**4. Format (Standardization)**<br>
Source File: DF of json<br>
* **Scenario:** Finance requires readable currency formats.
* **Action:** Format `shipment_cost` to string like **"₹30,695.80"**.
* **Scenario:** Standardize city names for reporting.
* **Action:** Format `source_city` to Uppercase (e.g., "chennai" → **"CHENNAI"**).

**5. Group & Aggregate (Summarization)**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** Regional staffing analysis.
* **Action:** Group by `hub_location` and **Count** the number of staff.
* **Scenario:** Fleet capacity analysis.
* **Action:** Group by `vehicle_type` and **Sum** the `shipment_weight_kg`.

**6. Sorting (Ordering)**<br>
Source File: DF of json<br>
* **Scenario:** Prioritize the most expensive shipments.
* **Action:** Sort by `shipment_cost` in **Descending** order.
* **Scenario:** Organize daily dispatch schedule.
* **Action:** Sort by `shipment_date` (Ascending).

**7. Limit (Top-N Analysis)**<br>
Source File: DF of json<br>
* **Scenario:** Dashboard snapshot of critical delays.
* **Action:** Filter for 'DELAYED', Sort by Cost, and **Limit to top 10** rows.

In [0]:
#4
currency_df = flag_df2.withColumn("shipment_cost",concat_ws("",lit('₹'),col("shipment_cost")))
upper_city_df = currency_df.withColumn("source_city",upper("source_city"))
#display(upper_city_df)

#5
reg_staff_df = concat_df.groupBy("hub_location").agg(count("full_name").alias("staff_count"))
#display(reg_staff_df)
fleet_cap_df = flag_df2.groupBy("vehicle_type").agg(round(sum("shipment_weight_kg"),2).alias("sum_of_shipment_weight"))
#display(grpby_df)

#6
sort_df=flag_df2.orderBy(col("shipment_cost"),descending=True)
sort_df1=sort_df.orderBy(col("shipment_date"),ascending=True)
#display(sort_df1)

#7
fil_df = flag_df2.filter(col("shipment_status")=='DELAYED').orderBy(col("shipment_cost"),ascending= False).limit(10)
display(fil_df)

## 5. Data Wrangling - Transformation & Analytics
*Combining, modeling, and analyzing data to answer complex business questions.*

### **1. Joins**
Source Files:<br>
Left Side (staff_df):<br> DF of logistics_source1 & logistics_source2<br>
Right Side (shipments_df):<br> DF of logistics_shipment_detail_3000.json<br>
#### **1.1 Frequently Used Simple Joins (Inner, Left)**
* **Inner Join (Performance Analysis):**
  * **Scenario:** We only want to analyze *completed work*. Connect Staff to the Shipments they handled.
  * **Action:** Join `staff_df` and `shipments_df` on `shipment_id`.
  * **Result:** Returns only rows where a staff member is assigned to a valid shipment.
* **Left Join (Idle Resource check):**
  * **Scenario:** Find out which staff members are currently *idle* (not assigned to any shipment).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right) on `shipment_id`. Filter where `shipments_df.shipment_id` is NULL.

#### **1.2 Infrequent Simple Joins (Self, Right, Full, Cartesian)**
* **Self Join (Peer Finding):**
  * **Scenario:** Find all pairs of employees working in the same `hub_location`.
  * **Action:** Join `staff_df` to itself on `hub_location`, filtering where `staff_id_A != staff_id_B`.
* **Right Join (Orphan Data Check):**
  * **Scenario:** Identify shipments in the system that have *no valid driver* assigned (Data Integrity Issue).
  * **Action:** Join `staff_df` (Left) with `shipments_df` (Right). Focus on NULLs on the left side.
* **Full Outer Join (Reconciliation):**
  * **Scenario:** A complete audit to find *both* idle drivers AND unassigned shipments in one view.
  * **Action:** Perform a Full Outer Join on `shipment_id`.
* **Cartesian/Cross Join (Capacity Planning):**
  * **Scenario:** Generate a schedule of *every possible* driver assignment to *every* pending shipment to run an optimization algorithm.
  * **Action:** Cross Join `drivers_df` and `pending_shipments_df`.

#### **1.3 Advanced Joins (Semi and Anti)**
* **Left Semi Join (Existence Check):**
  * **Scenario:** "Show me the details of Drivers who have *at least one* shipment." (Standard filtering).
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_semi")`.
  * **Benefit:** Performance optimization; it stops scanning the right table once a match is found.
* **Left Anti Join (Negation Check):**
  * **Scenario:** "Show me the details of Drivers who have *never* touched a shipment."
  * **Action:** `staff_df.join(shipments_df, "shipment_id", "left_anti")`.


In [0]:
df1 = spark.read.csv("/Volumes/workspace/default/logistics/logistic_data/logistics_source1",header=True,inferSchema=True).dropDuplicates()
df1 = df1.where("shipment_id not rlike '[a-zA-Z]' AND age not rlike '[a-zA-Z]' AND shipment_id IS NOT NULL" )

df2 = spark.read.csv("/Volumes/workspace/default/logistics/logistic_data/logistics_source2",header=True,inferSchema = True).dropDuplicates()
df2 = df2.withColumnRenamed("vehicle_type","vehicle_ty")
df2 = df2.where("shipment_id not rlike '[a-zA-Z]' AND age not rlike '[a-zA-Z]' AND shipment_id IS NOT NULL" )

staff_df = df1.unionByName(df2,allowMissingColumns=True)
staff_df = staff_df.na.fill("Not Provided",subset=["hub_location"]).na.fill("NA",subset=["first_name","last_name","role"])
#display(staff_df)

from pyspark.sql.types import StructType,StructField,StringType,DateType,DoubleType,LongType

str1 = StructType([StructField('cargo_type', StringType(), True), StructField('destination_city', StringType(), True), StructField('order_id', StringType(), True), StructField('payment_mode', StringType(), True), StructField('shipment_cost', DoubleType(), True), StructField('shipment_date', DateType(), True), StructField('shipment_id', LongType(), True), StructField('shipment_status', StringType(), True), StructField('shipment_weight_kg', DoubleType(), True), StructField('source_city', StringType(), True), StructField('vehicle_type', StringType(), True)])

shipments_df = spark.read.schema(str1).json("/Volumes/workspace/default/logistics/logistic_data/logistics_shipment_detail_3000.json",multiLine=True,mode="Permissive")
shipments_df = shipments_df.where("shipment_id not rlike '[a-zA-Z]' AND shipment_id IS NOT NULL" )
#display(shipments_df)

#1.1
inner_df = staff_df.join(shipments_df,how="inner",on="shipment_id")
#display(inner_df)

left_df =staff_df.join(shipments_df,how="left",on="shipment_id")
#display(left_df)

left_df1 =staff_df.join(shipments_df,how="left",on="shipment_id").filter(col("shipment_id").isNull())
#display(left_df1)

left_df1 =staff_df.join(shipments_df,how="left_anti",on="shipment_id")
#display(left_df1)

#1.2
staff_df1 = staff_df.filter(col("hub_location")!= "Not Provided")
df1 = staff_df1.withColumnRenamed("shipment_id","staff_id_A")
df2 = staff_df1.withColumnRenamed("shipment_id","staff_id_B")
peer_df = df1.join(df2,how="inner",on="hub_location").filter((col("staff_id_A"))==(col("staff_id_B")))
#display(peer_df)

orphan_df = staff_df.join(shipments_df,how="right",on="shipment_id").filter("role is null")
#display(orphan_df)

reconciliation_df = staff_df.join(shipments_df,how="full",on="shipment_id")
#display(reconciliation_df)

drivers_df = staff_df.filter(col("role") == "Driver")
pending_shipments_df = shipments_df.filter(col("shipment_status").isin("IN_TRANSIT", "DELAYED"))
cross_df = drivers_df.join(pending_shipments_df,how="cross")
#display(cross_df)

#1.3
std_fil_df = staff_df.join(shipments_df,how="left_semi",on="shipment_id").filter(col("role")== "Driver")
#display(std_fil_df)

Negation_df = staff_df.join(shipments_df,how="left_anti",on="shipment_id").filter(col("role")== "Driver")
#display(Negation_df)


## 5. Data Wrangling - Transformation & Analytics
*Combining, modeling, and analyzing data to answer complex business questions.*

### **2. Lookup**<br>
Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF) and Master_City_List.csv<br>
* **Scenario:** Validation. Check if the `hub_location` in the staff file exists in the dataframe of corporate `Master_City_List.csv`.
* **Action:** Compare values against this Master_City_List list.

### **3. Lookup & Enrichment**<br>
Source File: DF of logistics_source1 and logistics_source2 (merged into Staff DF) and Master_City_List.csv dataframe<br>
* **Scenario:** Geo-Tagging.
* **Action:** Lookup `hub_location` (eg. "Pune") in a Master Latitude/Longitude Master_City_List.csv dataframe and enrich our logistics_source (merged dataframe) by adding `lat` and `long` columns for map plotting.

### **4. Schema Modeling (Denormalization)**<br>
Source Files: DF of All 3 Files (logistics_source1, logistics_source2, logistics_shipment_detail_3000.json)<br>
* **Scenario:** Creating a "Gold Layer" Table for PowerBI/Tableau.
* **Action:** Flatten the Star Schema. Join `Staff`, `Shipments`, and `Vehicle_Master` into one wide table (`wide_shipment_history`) so analysts don't have to perform joins during reporting.

### **5. Windowing (Ranking & Trends)**<br>
Source Files:<br>
DF of logistics_source2: Provides hub_location (Partition Key).<br>
logistics_shipment_detail_3000.json: Provides shipment_cost (Ordering Key)<br>
* **Scenario:** "Who are the Top 3 Drivers by Cost in *each* Hub?"
* **Action:**
  1. Partition by `hub_location`.
  2. Order by `total_shipment_cost` Descending.
  3. Apply `dense_rank()` and `row_number()
  4. Filter where `rank or row_number <= 3`.

In [0]:
#2
master_df = spark.read.csv("/Volumes/workspace/default/logistics/logistic_data/Master_City_List.csv",header=True,inferSchema=True)
master_df = master_df.withColumnRenamed("city_name","hub_location")
#display(master_df)
lookup_df = staff_df.join(master_df,how="semi",on="hub_location")
#display(lookup_df)

#3
staff_df1 = staff_df.filter(col("hub_location")== "Pune")
lookup_enrich_df =  staff_df1.join(master_df.select("hub_location","latitude","longitude"),how="left",on='hub_location')
#display(lookup_enrich_df)

#4
denormalized_df = staff_df.join(shipments_df,how="full",on="shipment_id")
#display(denormalized_df)

#5
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number,desc,lag,datediff,to_date
source_df2 = spark.read.csv("/Volumes/workspace/default/logistics/logistic_data/logistics_source2",header=True,inferSchema = True).dropDuplicates()
source_df2 = source_df2.where("shipment_id not rlike '[a-zA-Z]' AND age not rlike '[a-zA-Z]' AND shipment_id IS NOT NULL" ).withColumnRenamed("vehicle_type","vehicle_ty")

shipments_df = spark.read.schema(str1).json("/Volumes/workspace/default/logistics/logistic_data/logistics_shipment_detail_3000.json",multiLine=True,mode="Permissive")
shipments_df = shipments_df.where("shipment_id not rlike '[a-zA-Z]' AND shipment_id IS NOT NULL" )

top3_df =source_df2.join(shipments_df,how="full",on="shipment_id").select("shipment_id","order_id","hub_location","role","shipment_cost")
top3_df = top3_df.filter(col("role")== "Driver").withColumn("rn",row_number().over(Window.partitionBy("hub_location").orderBy(desc("shipment_cost")))).where("rn <= 3 ")
display(top3_df)

## 5. Data Wrangling - Transformation & Analytics
*Combining, modeling, and analyzing data to answer complex business questions.*

### **6. Analytical Functions (Lead/Lag)**<br>
Source File: <br>
DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** Idle Time Analysis.
* **Action:** For each driver, calculate the days elapsed since their *previous* shipment.

### **7. Set Operations**<br>
Source Files: DF of logistics_source1 and logistics_source2<br>
* **Union:** Combining `Source1` (Legacy) and `Source2` (Modern) into one dataset (Already done in Active Munging).
* **Intersect:** Identifying Staff IDs that appear in *both* Source 1 and Source 2 (Duplicate/Migration Check).
* **Except (Difference):** Identifying Staff IDs present in Source 2 but *missing* from Source 1 (New Hires).

### **8. Grouping & Aggregations (Advanced)**<br>
Source Files:<br>
DF of logistics_source2: Provides hub_location and vehicle_type (Grouping Dimensions).<br>
DF of logistics_shipment_detail_3000.json: Provides shipment_cost (Aggregation Metric).<br>
* **Scenario:** The CFO wants a subtotal report at multiple levels:
  1. Total Cost by Hub.
  2. Total Cost by Hub AND Vehicle Type.
  3. Grand Total.
* **Action:** Use `cube("hub_location", "vehicle_type")` or `rollup()` to generate all these subtotals in a single query.

In [0]:
#6
top3_df = source_df2.join(shipments_df,how="full",on="shipment_id")
lag_df = top3_df.withColumn("previous_shipment_date",lag("shipment_date",1).over(Window.partitionBy("role").orderBy("shipment_date"))).filter(col("role")=="Driver")

lag_df1 = lag_df.withColumn("shipment_date",to_date("shipment_date","yyyy-MM-dd")).withColumn("previous_shipment_date",to_date("previous_shipment_date","yyyy-MM-dd")).withColumn("days_elapsed",datediff("shipment_date","previous_shipment_date"))
#display(lag_df1)

#7
read_df1 = spark.read.csv("/Volumes/workspace/default/logistics/logistic_data/logistics_source1",inferSchema = True,header=True)
read_df2 = spark.read.csv("/Volumes/workspace/default/logistics/logistic_data/logistics_source2",inferSchema = True,header=True)

union_df = read_df1.unionByName(read_df2,allowMissingColumns=True)
#display(union_df)
intersect_df = read_df1.intersect(read_df2)
#display(intersect_df)                       
except_df = read_df1.exceptAll(read_df2)
#display(except_df)

#8
read_df = spark.read.csv("/Volumes/workspace/default/logistics/logistic_data/logistics_source2",inferSchema = True,header=True)
read_df = read_df.where("shipment_id not rlike '[a-zA-Z]'")
read_df = read_df.withColumnRenamed("vehicle_type","vehicle_ty")
group_df = read_df.join(shipments_df,how="inner",on="shipment_id")
#display(group_df)

from pyspark.sql.functions import col
"""Scenario: The CFO wants a subtotal report at multiple levels:"""
#Total Cost by Hub.
total_cost_df = group_df.groupBy("hub_location").agg(sum("shipment_cost").alias("total_cost"))
display(total_cost_df)

#Total Cost by Hub AND Vehicle Type.
total_cost_df1= group_df.groupBy("hub_location","vehicle_type").agg(sum("shipment_cost").alias("total_cost"))
display(total_cost_df1)

#Grand Total.
grand_total_df = group_df.agg(round(sum("shipment_cost"),2).alias("grand_total"))
display(grand_total_df)

#Action: Use cube("hub_location", "vehicle_type") or rollup() to generate all these subtotals in a single query.
cube_df = group_df.cube("hub_location","vehicle_type").agg(sum("shipment_cost").alias("cube_cost")).orderBy("hub_location","cube_cost",ascending=[False,False])
display(cube_df)

##6. Data Persistance (LOAD)-> Data Publishing & Consumption<br>

Store the inner joined, lookup and enrichment, Schema Modeling, windowing, analytical functions, set operations, grouping and aggregation data into the delta tables.

In [0]:
#Inner join delta table
inner_df = staff_df.join(shipments_df,how="inner",on="shipment_id")
inner_write_df = inner_df.write.format("delta").mode("overwrite").saveAsTable("workspace.default.inner_data_table")

#lookup delta table
lookup_df = staff_df.join(master_df,how="semi",on="hub_location")
lookup_write_df = lookup_df.write.format("delta").mode("overwrite").saveAsTable("workspace.default.lookup_data_table")

#lookup enrichment delta table
lookup_enrich_df =  staff_df1.join(master_df.select("hub_location","latitude","longitude"),how="left",on='hub_location')
lookup_enrich_write_df = lookup_enrich_df.write.format("delta").mode("overwrite").saveAsTable("workspace.default.lookup_enrich_data_table")

#Schema modeling delta table
denormalized_df = staff_df.join(shipments_df,how="full",on="shipment_id")
denormalized_write_df = denormalized_df.write.format("delta").mode("overwrite").saveAsTable("workspace.default.schema_modeling_data_table")

#Windowing delta table
top3_df = top3_df.filter(col("role")== "Driver").withColumn("rn",row_number().over(Window.partitionBy("hub_location").orderBy(desc("shipment_cost")))).where("rn <= 3 ")
window_write_df = top3_df.write.format("delta").mode('overwrite').saveAsTable("workspace.default.windowing_data_table")

#Analytical functions delta table
lag_df1 = lag_df.withColumn("shipment_date",to_date("shipment_date","yyyy-MM-dd")).withColumn("previous_shipment_date",to_date("previous_shipment_date","yyyy-MM-dd")).withColumn("days_elapsed",datediff("shipment_date","previous_shipment_date"))
analytical_write_df = lag_df1.write.format("delta").mode('overwrite').saveAsTable("workspace.default.analytical_data_table")

#Set operations delta table
union_df = read_df1.unionByName(read_df2,allowMissingColumns=True)
union_write_df = union_df.write.format("delta").mode('overwrite').saveAsTable("workspace.default.set_operation_data_table")

#Grouping and aggregation delta table
total_cost_df = group_df.groupBy("hub_location").agg(sum("shipment_cost").alias("total_cost"))
group_write_df = total_cost_df.write.format("delta").mode('overwrite').saveAsTable("workspace.default.grouping_data_table")

cube_df = group_df.cube("hub_location","vehicle_type").agg(sum("shipment_cost").alias("cube_cost")).orderBy("hub_location","cube_cost",ascending=[False,False])
cube_write_df = cube_df.write.format("delta").mode('overwrite').saveAsTable("workspace.default.aggregation_data_table")

##7.Take the copy of the above notebook and try to write the equivalent SQL for which ever applicable.