### Creating the Catalog & Schena

In [0]:
%sql
CREATE CATALOG if not exists logistics_catalog_assign;
CREATE SCHEMA IF NOT EXISTS logistics_catalog_assign.landing_zone;
CREATE VOLUME IF NOT EXISTS logistics_catalog_assign.landing_zone.landing_vol;

### Creating the Directories

In [0]:
dbutils.fs.mkdirs("/Volumes/logistics_catalog_assign/landing_zone/landing_vol/logistics_source1/")
dbutils.fs.mkdirs("/Volumes/logistics_catalog_assign/landing_zone/landing_vol/logistics_source2/")
dbutils.fs.mkdirs("/Volumes/logistics_catalog_assign/landing_zone/landing_vol/logistics_shipment_detail/")

### Programatically try to find couple of data patterns applying below EDA (File: logistics_source1)

- Apply inferSchema and toDF to create a DF and analyse the actual data.

In [0]:
log_src1_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/logistics_catalog_assign/landing_zone/landing_vol/logistics_source1/logistics_source1")

log_src1_df.show(100,truncate=False)

- Analyse the schema, datatypes, columns etc.,

In [0]:
log_src1_df.printSchema()
print(log_src1_df.schema)
print(log_src1_df.columns)

- Analyse the duplicate records count and summary of the dataframe.

In [0]:
print(log_src1_df.count())
print(log_src1_df.distinct().count())
display(log_src1_df.summary())

- ###  a. Passive Data Munging - (File: logistics_source1 and logistics_source2)

In [0]:
log_src2_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Volumes/logistics_catalog_assign/landing_zone/landing_vol/logistics_source2/logistics_source2")
display(log_src2_df)
from pyspark.sql.functions import col

#shipment_id is non-numeric
log_src1_df.filter(~col("shipment_id").rlike("^[0-9]+$")).select("shipment_id").show(truncate=False)
log_src2_df.filter(~col("shipment_id").rlike("^[0-9]+$")).select("shipment_id").show(truncate=False)

#age is not an integer
log_src1_df.filter(~col("age").rlike("^[0-9]+$")).select("age").show(truncate=False)
log_src2_df.filter(~col("age").rlike("^[0-9]+$")).select("age").show(truncate=False)

### b. Active Data Munging File: logistics_source1 and logistics_source2

- Read both files without enforcing schema
- Align them into a single canonical schema: shipment_id, first_name, last_name, age, role, hub_location, vehicle_type, data_source
- Add data_source column with values as: system1, system2 in the respective dataframes

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.functions import lit
schema =StructType([StructField('shipment_id', IntegerType(), True),
                     StructField('first_name', StringType(), True), 
                     StructField('last_name', StringType(), True), 
                     StructField('age', IntegerType(), True),
                      StructField('role', StringType(), True), 
                      StructField('hub_location', StringType(), True), 
                      StructField('vehicle_type', StringType(), True),
                      StructField('data_source', StringType(), True),
                      StructField('corruptedrows', StringType(), True)
                      ])

source1_df=spark.read.schema(schema).csv(path="/Volumes/logistics_catalog_assign/landing_zone/landing_vol/logistics_source1/logistics_source1",mode='permissive',columnNameOfCorruptRecord="corruptedrows",header=True)
source1_df=source1_df.withColumn("data_source",lit("system1"))
source2_df=spark.read.schema(schema).csv(path="/Volumes/logistics_catalog_assign/landing_zone/landing_vol/logistics_source2/logistics_source2",columnNameOfCorruptRecord="corruptedrows",header=True)
source2_df=source2_df.withColumn("data_source",lit("system2"))

single_canonical_df = source1_df.union(source2_df)
single_canonical_df= single_canonical_df.select(col('shipment_id'),col('first_name'),col('last_name'),col('age'),col('role'),col('hub_location'),col('vehicle_type'),col('data_source'))


#####Cleansing, Scrubbing:
#####Cleansing (removal of unwanted datasets)

- Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role
- Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name
- Join Readiness Rule - Drop records where the join key is null: shipment_id

In [0]:
print(single_canonical_df.count())
cleanseddf=single_canonical_df.na.drop(how="any",subset=["shipment_id","role"])
print(cleanseddf.count())
display(cleanseddf)

In [0]:
cleanseddf2=cleanseddf.na.drop(how="all",subset=["first_name","last_name"])
print(cleanseddf2.count())

In [0]:
cleanseddf3=cleanseddf2.na.drop(how='all',subset=["shipment_id"])
print(cleanseddf2.count())

Scrubbing (convert raw to tidy)
- Age Defaulting Rule - Fill NULL values in the age column with: -1
- Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN
- Invalid Age Replacement - Replace the following values in age: "ten" to -1 "" to -1
- Vehicle Type Normalization - Replace inconsistent vehicle types: truck to LMV bike to TwoWheeler

In [0]:
cleanseddf4=cleanseddf3.na.fill(-1,['age'])
cleanseddf4.where('age=-1').show()

In [0]:
cleanseddf5=cleanseddf4.na.fill('UNKNOWN',['vehicle_type'])
cleanseddf5.where('vehicle_type=="UNKNOWN"').show()

In [0]:
find_and_replace = {'Truck':'LMV','Bike':'TwoWheeler'}
cleanseddf6=cleanseddf5.na.replace(find_and_replace,subset=['vehicle_type'])
cleanseddf6.where('vehicle_type=="LMV"').show()

##### 3. Standardization, De-Duplication and Replacement / Deletion of Data to make it in a usable format

Creating shipments Details data Dataframe creation

Create a DF by Reading Data from logistics_shipment_detail.json
As this data is a clean json data, it doesn't require any cleansing or scrubbing.

In [0]:
logistics_shipment_df = (
    spark.read
         .option("multiline", "true")   
         .option("mode", "PERMISSIVE") 
         .json("/Volumes/logistics_catalog_assign/landing_zone/landing_vol/logistics_shipment_detail/logistics_shipment_detail_3000.json")
)

logistics_shipment_df.show(100,truncate=False)

Add a column
Source File: DF of logistics_shipment_detail_3000.json
- domain as 'Logistics', current timestamp 'ingestion_timestamp' and 'False' as 'is_expedited'

In [0]:
from pyspark.sql.functions import current_timestamp,lit
logistics_shipment_df2 = logistics_shipment_df.withColumn("domain",lit("Logistics")).withColumn("ingestion_timestamp",current_timestamp()).withColumn("is_expedited",lit("False"))
logistics_shipment_df2.show(100,truncate=False)

Column Uniformity: role - Convert to lowercase
- Source File: DF of merged(logistics_source1 & logistics_source2)
  - vehicle_type - Convert values to UPPERCASE
- Source Files: DF of logistics_shipment_detail_3000.json             
  - hub_location - Convert values to initcap case
- Source Files: DF of merged(logistics_source1 & logistics_source2)

In [0]:
from pyspark.sql.functions import upper,initcap
cleanseddf7=cleanseddf6.withColumn("vehicle_type",upper(col("vehicle_type"))).withColumn("hub_location",initcap(col("hub_location")))
cleanseddf7.where('data_source == "system2"').show()

Format Standardization:
- Source Files: DF of logistics_shipment_detail_3000.json
  - Convert shipment_date to yyyy-MM-dd
  - Ensure shipment_cost has 2 decimal precision

In [0]:
from pyspark.sql.functions import to_date,date_format
logistics_shipment_df3 = logistics_shipment_df2.withColumn("shipment_date", date_format(to_date(col("shipment_date"), "yy-MM-dd"),"MM-dd-yyyy")).withColumn("shipment_cost", col("shipment_cost").cast("decimal(18,2)"))
logistics_shipment_df3.show()

Data Type Standardization
Standardizing column data types to fix schema drift and enable mathematical operations.
 - Source File: DF of merged(logistics_source1 & logistics_source2)
   - age: Cast String to Integer
 - Source File: DF of logistics_shipment_detail_3000.json
   - shipment_weight_kg: Cast to Double
- Source File: DF of logistics_shipment_detail_3000.json
   - is_expedited: Cast to Boolean

In [0]:
cleanseddf8 = cleanseddf7.withColumn("age",col('age').cast("int"))
logistics_shipment_df4 = logistics_shipment_df3.withColumn("shipment_weight",col('shipment_weight_kg').cast("double")).withColumn("is_expedited",col('is_expedited').cast("boolean"))
logistics_shipment_df4.show(10000,truncate=False)

Naming Standardization
Source File: DF of merged(logistics_source1 & logistics_source2)
 - Rename: first_name to staff_first_name
 - Rename: last_name to staff_last_name
 - Rename: hub_location to origin_hub_city

In [0]:
cleanseddf9 = cleanseddf8.withColumnRenamed("first_name","staff_first_name").withColumnRenamed("last_name","staff_last_name").withColumnRenamed("hub_location","origin_hub_city")

Reordering columns logically in a better standard format:
Source File: DF of Data from all 3 files
shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)

In [0]:
cleanseddf10 = cleanseddf9.selectExpr('shipment_id','staff_first_name','staff_last_name','age','role','origin_hub_city','vehicle_type','data_source')
cleanseddf10.show(100,truncate=False)

In [0]:
logistics_shipment_df5 = logistics_shipment_df4.selectExpr('shipment_id','domain','cargo_type','source_city','destination_city','order_id','shipment_date','shipment_cost','shipment_weight','shipment_status','is_expedited','ingestion_timestamp')
logistics_shipment_df5.show(100,truncate=False)


Deduplication:

 - Apply Record Level De-Duplication
 - Apply Column Level De-Duplication (Primary Key Enforcement)

In [0]:
cleanseddf11 = cleanseddf10.dropDuplicates()
cleanseddf12 = cleanseddf11.dropDuplicates(['shipment_id'])
display(cleanseddf12)

In [0]:
logistics_shipment_df6 = logistics_shipment_df5.dropDuplicates()
logistics_shipment_df7 = logistics_shipment_df6.dropDuplicates(['shipment_id'])
display(logistics_shipment_df7)

### 2. Data Enrichment - Detailing of data

1. Add Audit Timestamp (load_dt) Source File: DF of logistics_source1 and logistics_source2

 - Scenario: We need to track exactly when this record was ingested into our Data Lakehouse for auditing purposes.
 - Action: Add a column load_dt using the function current_timestamp().

In [0]:
cleanseddf13 = cleanseddf12.withColumn("load_dt",current_timestamp())
cleanseddf13.printSchema()

2. Create Full Name (full_name) Source File: DF of logistics_source1 and logistics_source2

 - Scenario: The reporting dashboard requires a single field for the driver's name instead of separate columns.
 - Action: Create full_name by concatenating first_name and last_name with a space separator.
Result: "Rajesh" + " " + "Kumar" -> "Rajesh Kumar"

In [0]:
from pyspark.sql.functions import *
cleanseddf14 = cleanseddf13.withColumn("full_name",concat(col("staff_first_name"),lit(" "),col("staff_last_name")))
display(cleanseddf14)

3. Define Route Segment (route_segment) Source File: DF of logistics_shipment_detail_3000.json

 - Scenario: The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
 - Action: Combine source_city and destination_city with a hyphen.
 - Result: "Chennai" + "-" + "Pune" -> "Chennai-Pune"

In [0]:
logistics_shipment_df8 = logistics_shipment_df7.withColumn("route_segment", concat(col("source_city"),lit("->"),col("destination_city")))
display(logistics_shipment_df8)

4. Generate Vehicle Identifier (vehicle_identifier) Source File: DF of logistics_shipment_detail_3000.json

 - Scenario: We need a unique tracking code that immediately tells us the vehicle type and the shipment ID.
 - Action: Combine vehicle_type and shipment_id to create a composite key.
 - Result: "Truck" + "_" + "500001" -> "Truck_500001"

In [0]:
cleanseddf15 = cleanseddf14.withColumn("vehicle_identifier",concat(col('vehicle_type'),lit("_"),col('shipment_id')))
display(cleanseddf15)

1. Derive Shipment Year (shipment_year)

 - Scenario: Management needs an annual performance report to compare growth year-over-year.
 - Action: Extract the year component from shipment_date.-
 - Result: "2024-04-23" -> 2024


In [0]:
logistics_shipment_df9 = logistics_shipment_df8.withColumn("shipment_year",year(to_date(col('shipment_date'),'MM-dd-yyyy')))
logistics_shipment_df9.show()

2. Derive Shipment Month (shipment_month)

 - Scenario: Analysts want to identify seasonal peaks (e.g., increased volume in December).
 - Action: Extract the month component from shipment_date.
 - Result: "2024-04-23" -> 4 (April)

In [0]:
logistics_shipment_df10 = logistics_shipment_df9.withColumn("shipment_month",month(to_date(col('shipment_date'),'MM-dd-yyyy')))
logistics_shipment_df10.show()

Flag shipment status (is_expedited)

- Scenario: The Operations team needs to track shipments is IN_TRANSIT or DELIVERED.
-  Action: Flag as 'True' if the shipment_status IN_TRANSIT or DELIVERED.

In [0]:
logistics_shipment_df11 =logistics_shipment_df10.withColumn("is_expedited",when(col("shipment_status")=="IN_TRANSIT",True).when(col("shipment_status")=="DELIVERED",True).otherwise(col("is_expedited")))
logistics_shipment_df11.show()

**Enrichment/Business Logics (Calculated Fields)**
 - Deriving new metrics and financial indicators using mathematical and date-based operations.
 - Source File: logistics_shipment_detail_3000.json

 1. Calculate Unit Cost (cost_per_kg)

- Scenario: The Finance team wants to analyze the efficiency of shipments by determining the cost incurred per unit of weight.
 - Action: Divide shipment_cost by shipment_weight_kg.
 - Logic: shipment_cost / shipment_weight_kg

In [0]:
logistics_shipment_df12 = logistics_shipment_df11.withColumn("cost_per_kg",col("shipment_cost")/col("shipment_weight")).withColumn("cost_per_kg",col("cost_per_kg").cast("decimal(18,2)"))
logistics_shipment_df12.show()

2. Track Shipment Age (days_since_shipment)

 - Scenario: The Operations team needs to monitor how long it has been since a shipment was dispatched to identify potential delays.
 - Action: Calculate the difference in days between the current_date and the shipment_date.
 - Logic: datediff(current_date(), shipment_date)

In [0]:
logistics_shipment_df13 =logistics_shipment_df12.withColumn("days_since_shipment",datediff(current_date(),to_date(col('shipment_date'),'MM-dd-yyyy')))
logistics_shipment_df13.show()

3. Compute Tax Liability (tax_amount)

 - Scenario: For invoicing and compliance, we must calculate the Goods and Services Tax (GST) applicable to each shipment.
 - Action: Calculate 18% GST on the total shipment_cost.
 - Logic: shipment_cost * 0.18

In [0]:
logistics_shipment_df14 = logistics_shipment_df13.withColumn("tax_amount",col("shipment_cost")*0.18).withColumn("tax_amount",col("tax_amount").cast("decimal(18,2)"))
logistics_shipment_df14.show()

**Remove/Eliminate (drop, select, selectExpr)**
 - Excluding unnecessary or redundant columns to optimize storage and privacy.
 - Source File: DF of logistics_source1 and logistics_source2

1. Remove Redundant Name Columns

 - Scenario: Since we have already created the full_name column in the Enrichment step, the individual name columns are now redundant and clutter the dataset.
- Action: Drop the first_name and last_name columns.
 - Logic: df.drop("first_name", "last_name")

In [0]:
cleanseddf16 = cleanseddf15.drop("staff_first_name","staff_last_name")
display(cleanseddf16)

**Splitting & Merging/Melting of Columns**                            
Reshaping columns to extract hidden values or combine fields for better analysis.
Source File: DF of logistics_shipment_detail_3000.json
1. Splitting (Extraction) Breaking one column into multiple to isolate key information.

Split Order Code:
 - Action: Split order_id ("ORD100000") into two new columns:
order_prefix ("ORD")
order_sequence ("100000")

In [0]:
logistics_shipment_df15 = logistics_shipment_df14.withColumn("order_prefix",substring(col("order_id"),0,3)).withColumn("order_suffix",substring(col("order_id"),4,length(col("order_id"))))
display(logistics_shipment_df15)

Split Date:
 - Action: Split shipment_date into three separate columns for partitioning:
  - ship_year (2024)
  - ship_month (4)
  - ship_day (23)

In [0]:
logistics_shipment_df16 = logistics_shipment_df15.withColumn("shipment_month",day(to_date(col('shipment_date'),'MM-dd-yyyy')))
logistics_shipment_df16.show()

**3. Data Customization & Processing - Application of Tailored Business Specific Rules**

**UDF1: Complex Incentive Calculation**                        
 - **Scenario**: The Logistics Head wants to calculate a "Performance Bonus" for drivers based on tenure and role complexity.

 - **Action**: Create a Python function calculate_bonus(role, age) and register it as a Spark UDF.

- **Logic**:

  - IF Role == 'Driver' AND Age > 50: Bonus = 15% of Salary (Reward for Seniority)        
  - IF Role == 'Driver' AND Age < 30: Bonus = 5% of Salary (Encouragement for Juniors)
  - ELSE: Bonus = 0       
 - **Result**: A new derived column projected_bonus is generated for every row in the dataset.



In [0]:
def calculate_bonus(role,age):
    if role=="Driver" and age>50:
        return 15
    elif role=="Driver" and age<30:
        return 5
    else:
        return 0
bonusUDF = udf(calculate_bonus)
cleanseddf17 = cleanseddf16.withColumn("projected_bonus",bonusUDF(col("role"),col("age"))).withColumn("projected_bonus",col("projected_bonus").cast("decimal(18,2)"))
cleanseddf17.where('projected_bonus > 0.0').show()

**UDF2: PII Masking (Privacy Compliance)**    
 - **Scenario**: For the analytics dashboard, we must hide the full identity of the staff to comply with privacy laws (GDPR/DPDP), while keeping names recognizable for internal managers.

 - **Business Rule**: Show the first 2 letters, mask the middle characters with ****, and show the last letter.

 - **Action**: Create a UDF mask_identity(name).

Example:

Input: "Rajesh"     
Output: "Ra****h"     
Note: Convert the above udf logic to inbult function based transformation to ensure the performance is improved.

In [0]:
def mask_identity(fullname):
    if fullname is None:
        return None
    if len(fullname) == 0:
        return None
    if len(fullname) < 3:
        return fullname[:1] + "*" + fullname[-1]
    else:
        return fullname[:2] + "**" + fullname[-1]
maskUDF = udf(mask_identity,StringType())
cleanseddf18 = cleanseddf17.withColumn("masked_name",maskUDF(split(col("full_name"),' ')[0]))
cleanseddf18.show()

**4. Data Core Curation & Processing (Pre-Wrangling)**    
Applying business logic to focus, filter, and summarize data before final analysis.

**Select** (Projection)
Source Files: DF of logistics_source1 and logistics_source2

**Scenario**: The Driver App team only needs location data, not sensitive HR info.    
**Action**: Select only first_name, role, and hub_location.

In [0]:
driver_app_df = cleanseddf18.selectExpr("full_name","role","origin_hub_city")
display(driver_app_df.where('origin_hub_city is not null'))

2. **Filter** (Selection)   
Source File: DF of json

**Scenario**: We need a report on active operational problems.  
**Action**: Filter rows where shipment_status is 'DELAYED' or 'RETURNED'.     
**Scenario**: Insurance audit for senior staff.   
**Action**: Filter rows where age > 50.

In [0]:
delay_status_df = logistics_shipment_df16.filter(col('shipment_status').isin("RETURNED","DELAYED"))
delay_status_df.show()
insurance_audit_df = cleanseddf18.filter(col('age')>50)
insurance_audit_df.show()

**3. Derive Flags & Columns** (Business Logic)      
Source File: DF of json

**Scenario**: Identify high-value shipments for security tracking.  
**Action**: Create flag is_high_value = True if shipment_cost > 40,000.
**Scenario**: Flag weekend operations for overtime calculation.   
**Action**: Create flag is_weekend = True if day is Saturday or Sunday.

In [0]:
logistics_shipment_df17 = logistics_shipment_df16.withColumn("is_high_value",when(col("shipment_cost")>40000,True).otherwise(False)).withColumn("is_weekend",when((dayofweek(to_date(col("shipment_date"),"MM-dd-yyyy"))==1) | (dayofweek(to_date(col("shipment_date"),"MM-dd-yyyy"))==7),True).otherwise(False))
logistics_shipment_df17.where('is_high_value==True').show()


4. **Format** (Standardization)   
Source File: DF of json

**Scenario**: Finance requires readable currency formats.   
**Action**: Format shipment_cost to string like "₹30,695.80".   
**Scenario**: Standardize city names for reporting.   
**Action**: Format source_city to Uppercase (e.g., "chennai" → "CHENNAI").

In [0]:
logistics_shipment_df18 = logistics_shipment_df17.withColumn("shipment_cost",concat(lit("$"),format_number(col("shipment_cost"),2))).withColumn("source_city",upper(col("source_city"))).withColumn("destination_city",upper(col("destination_city")))
logistics_shipment_df18.show()


**5. Group & Aggregate** (Summarization)    
Source Files: DF of logistics_source1 and logistics_source2

**Scenario**: Regional staffing analysis.   
**Action**: Group by hub_location and Count the number of staff.    
**Scenario**: Fleet capacity analysis.  
**Action**: Group by shimpent_status and Sum the shipment_weight_kg

In [0]:
location_wise_count= cleanseddf18.groupBy("origin_hub_city").agg(countDistinct("shipment_id").alias("distinct_shipments"))
display(location_wise_count)
vehicle_type_wise_shipment_weight = logistics_shipment_df18.groupBy("shipment_status").agg(sum("shipment_weight").alias("total_weight"))
display(vehicle_type_wise_shipment_weight)

**6. Sorting** (Ordering)   
Source File: DF of json

**Scenario**: Prioritize the most expensive shipments.  
**Action**: Sort by shipment_cost in Descending order.    
**Scenario**: Organize daily dispatch schedule.   
**Action**: Sort by shipment_date (Ascending).

In [0]:
logistics_shipment_df19=logistics_shipment_df18.sort([col("shipment_date"),col('shipment_cost')],ascending=[False,False])
logistics_shipment_df19.show()

**7. Limit** (Top-N Analysis)   
Source File: DF of json

**Scenario**: Dashboard snapshot of critical delays.  
**Action**: Filter for 'DELAYED', Sort by Cost, and Limit to top 10 rows.

In [0]:
top_delay_df = logistics_shipment_df19.filter(col('shipment_status') == 'DELAYED').sort(col("shipment_cost").desc()).limit(10)
top_delay_df.show()