###Enterprise Fleet Analytics Pipeline: Focuses on the business outcome (analytics) and the domain (fleet/logistics).

![logistics](logistics_project.png)

Download the data from the below gdrive and upload into the catalog
https://drive.google.com/drive/folders/1J3AVJIPLP7CzT15yJIpSiWXshu1iLXKn?usp=drive_link

##**1. Data Munging** -

####1. Visibily/Manually opening the file and capture couple of data patterns (Manual Exploratory Data Analysis)
##### - Null Values
##### - Null Values coming with shipment_id
##### 


In [0]:
lst=dbutils.fs.ls('/Volumes/workspace/wd36schema/ingestion_volume/target/')
print(lst)
print(lst[0].path)

####2. Programatically try to find couple of data patterns applying below EDA (File: logistics_source1)
1. Apply inferSchema and toDF to create a DF and analyse the actual data.
2. Analyse the schema, datatypes, columns etc.,
3. Analyse the duplicate records count and summary of the dataframe.

In [0]:
#create the sparksession
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.appName("logistic_usecase").getOrCreate()



In [0]:
#ingest the source1
df = spark.read.csv("dbfs:///Volumes/workspace/wd36schema/ingestion_volume/target/logistics_source1",header=True,inferSchema=True).toDF("shipment_id","first_name","last_name","age","role")
display(df)

In [0]:
df.schema


In [0]:
df.columns

In [0]:
df.printSchema()

In [0]:
display(df.summary())

In [0]:
src1_df = df.dropDuplicates().filter("shipment_id!='ten'")
display(src1_df)
display(src1_df.summary())

In [0]:
#remove the age holds the ten
from pyspark.sql.functions import *
src1_df = src1_df.filter(upper(col("age"))== lower(col("age"))).where("age <> 'ten'")
display(src1_df)

In [0]:
#ingest the source2
src2_df = spark.read.csv("dbfs:///Volumes/workspace/wd36schema/ingestion_volume/target/logistics_source2",header=True,inferSchema=True).dropDuplicates(subset=["shipment_id"])
display(src2_df)
display(src2_df.distinct())

In [0]:
display(src2_df.summary())

In [0]:
src2_df = src2_df.filter(upper(col("age"))==lower(col("age")))
display(src2_df)

###a. Passive Data Munging -  (File: logistics_source1  and logistics_source2)
Without modifying the data, identify:<br>
Shipment IDs that appear in both master_v1 and master_v2<br>
Records where:<br>
1. shipment_id is non-numeric
2. age is not an integer<br>

Count rows having:
3. fewer columns than expected
4. more columns than expected

In [0]:
master_v1 = src1_df.withColumns({"shipment_id":col("shipment_id").cast("int"),"age":col("age").cast("int")})
master_v1.printSchema()
master_v2 = src2_df.withColumn("age",col("age").cast("int"))
master_v2.printSchema()


In [0]:
display(master_v1.unionByName(master_v2,allowMissingColumns=True))

In [0]:
def getSparkSession():
    spark = SparkSession.builder.appName("logistics_usecase").getOrCreate()
    return spark

In [0]:
#Create a Spark Session Object
ss = getSparkSession()




###**b. Active Data Munging** File: logistics_source1 and logistics_source2

#####1.Combining Data + Schema Merging (Structuring)
1. Read both files without enforcing schema
2. Align them into a single canonical schema: shipment_id,
first_name,
last_name,
age,
role,
hub_location,
vehicle_type,
data_source
3. Add data_source column with values as: system1, system2 in the respective dataframes

In [0]:
print(ss)
src1 = ss.read.csv("dbfs:///Volumes/workspace/wd36schema/ingestion_volume/target/logistics_source1",header=True)
src2 = ss.read.csv("dbfs:///Volumes/workspace/wd36schema/ingestion_volume/target/logistics_source2",header=True)

src1 = src1.withColumn("data_source",lit("system1"))
src2 = src2.withColumn("data_source",lit("system2"))
display(src1)
display(src2)

In [0]:
src1.printSchema()
src2.printSchema()

In [0]:
df = src1.unionByName(src2,allowMissingColumns=True)
df.printSchema()

In [0]:
#enforcing the schema evolution and evaluation
display(df.where("hub_location is not null"))

#####2. Cleansing, Scrubbing: 
Cleansing (removal of unwanted datasets)<br>
1. Mandatory Column Check - Drop any record where any of the following columns is NULL:shipment_id, role<br>
2. Name Completeness Rule - Drop records where both of the following columns are NULL: first_name, last_name<br>
3. Join Readiness Rule - Drop records where the join key is null: shipment_id<br>

Scrubbing (convert raw to tidy)<br>
4. Age Defaulting Rule - Fill NULL values in the age column with: -1<br>
5. Vehicle Type Default Rule - Fill NULL values in the vehicle_type column with: UNKNOWN<br>
6. Invalid Age Replacement - Replace the following values in age:
"ten" to -1
"" to -1<br>
7. Vehicle Type Normalization - Replace inconsistent vehicle types: 
truck to LMV
bike to TwoWheeler

In [0]:
filtered_df = df.na.drop(how='any', subset=["shipment_id","role"])
print(df.count())
print(filtered_df.count())

filtered_df = filtered_df.na.drop(how='all', subset=["first_name","last_name"])
print(filtered_df.count())
filtered_df = filtered_df.na.drop(how='any',subset=["shipment_id"])
print(filtered_df.count())

In [0]:
scrub_filtered_df = filtered_df.na.fill("-1", subset=['age'])
display(scrub_filtered_df)

In [0]:
scrub_filtered_df = scrub_filtered_df.na.fill("UNKNOWN", subset=['vehicle_type'])
display(scrub_filtered_df)

In [0]:


find_replace_str = {"ten":"-1"}
scrub_filtered_df = scrub_filtered_df.na.replace(find_replace_str,subset=["age"])
display(scrub_filtered_df)

In [0]:
display(scrub_filtered_df.groupBy("vehicle_type").agg(count("vehicle_type")))


In [0]:
vech_type ={"Truck":"LVM","Bike":"TwoWheeler"}
scrub_filtered_df = scrub_filtered_df.replace(vech_type, subset=["vehicle_type"])
display(scrub_filtered_df)

####3. Standardization, De-Duplication and Replacement / Deletion of Data to make it in a usable format

Creating shipments Details data Dataframe creation <br>
1. Create a DF by Reading Data from logistics_shipment_detail.json
2. As this data is a clean json data, it doesn't require any cleansing or scrubbing.

In [0]:
order_df = spark.read.option("multiLine", "true").json("/Volumes/workspace/wd36schema/ingestion_volume/target/logistics_shipment_detail_3000.json")
display(order_df)

Standardizations:<br>

1. Add a column<br> 
Source File: DF of logistics_shipment_detail_3000.json<br>: domain as 'Logistics',  current timestamp 'ingestion_timestamp' and 'False' as 'is_expedited'
2. Column Uniformity: 
role - Convert to lowercase<br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
vehicle_type - Convert values to UPPERCASE<br>
Source Files: DF of logistics_shipment_detail_3000.json
hub_location - Convert values to initcap case<br>
Source Files: DF of merged(logistics_source1 & logistics_source2)<br>
3. Format Standardization:<br>
Source Files: DF of logistics_shipment_detail_3000.json<br>
Convert shipment_date to yyyy-MM-dd<br>
Ensure shipment_cost has 2 decimal precision<br>
4. Data Type Standardization<br>
Standardizing column data types to fix schema drift and enable mathematical operations.<br>
Source File: DF of merged(logistics_source1 & logistics_source2) <br>
age: Cast String to Integer<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
shipment_weight_kg: Cast to Double<br>
Source File: DF of logistics_shipment_detail_3000.json<br>
is_expedited: Cast to Boolean<br>
5. Naming Standardization <br>
Source File: DF of merged(logistics_source1 & logistics_source2)<br>
Rename: first_name to staff_first_name<br>
Rename: last_name to staff_last_name<br>
Rename: hub_location to origin_hub_city<br>
6. Reordering columns logically in a better standard format:<br>
Source File: DF of Data from all 3 files<br>
shipment_id (Identifier), staff_first_name (Dimension)staff_last_name (Dimension), role (Dimension), origin_hub_city (Location), shipment_cost (Metric), ingestion_timestamp (Audit)

In [0]:
order_df = order_df.withColumn("domain", lit("Logistics")).withColumn("ingestion_timestamp",lit(current_timestamp())).withColumn("is_expedited",lit(False))
display(order_df)

In [0]:
scrub_filtered_df = scrub_filtered_df.withColumn("role",lower(col("role")))
display(scrub_filtered_df)

In [0]:
order_df = order_df.withColumn("vehicle_type",upper(col("vehicle_type")))
display(order_df)

In [0]:
order_df.printSchema()

In [0]:
import pyspark.sql.functions as F
scrub_filtered_df = scrub_filtered_df.withColumn("hub_location",F.initcap(F.col("hub_location")))
display(scrub_filtered_df)

In [0]:
order_df.show()
order_df = order_df.withColumn("shipment_date",to_date("shipment_date","yy-MM-dd"))
display(order_df)

In [0]:

order_df = order_df.withColumn("shipment_cost",F.col("shipment_cost").cast("decimal(10,2)"))
display(order_df)


In [0]:
scrub_filtered_df = scrub_filtered_df.withColumn("age",col("age").cast("int"))
scrub_filtered_df.printSchema()
display(scrub_filtered_df)

In [0]:
order_df = order_df.withColumn("shipment_weight_kg",F.col("shipment_weight_kg").cast("Double"))
order_df.printSchema()
display(order_df)


In [0]:
order_df.printSchema()
scrub_filtered_df.printSchema()
order_df.withColumn("is_expedited",F.col("is_expedited").cast("boolean")).show()

In [0]:
scrub_filtered_df = scrub_filtered_df.withColumnRenamed("first_name","staff_first_name").withColumnRenamed("last_name","staff_last_name").withColumnRenamed("hub_location","origin_hub_city")
display(scrub_filtered_df)

In [0]:
order_df = order_df.withColumn("shipment_id",F.col("shipment_id").cast("string"))
merged_df = scrub_filtered_df.join(order_df,how="left",on="shipment_id") \
.select(["shipment_id","staff_first_name","staff_last_name","role","origin_hub_city","shipment_cost","ingestion_timestamp"])
display(merged_df)


In [0]:
order_df.printSchema()
scrub_filtered_df.printSchema()

Deduplication:
1. Apply Record Level De-Duplication
2. Apply Column Level De-Duplication (Primary Key Enforcement)

In [0]:
df = merged_df.dropDuplicates()
display(df)

df = df.dropDuplicates(subset=["shipment_id"])
display(df)

##2. Data Enrichment - Detailing of data
Makes your data rich and detailed <br>

###### Adding of Columns (Data Enrichment)
*Creating new derived attributes to enhance traceability and analytical capability.*

**1. Add Audit Timestamp (`load_dt`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** We need to track exactly when this record was ingested into our Data Lakehouse for auditing purposes.
* **Action:** Add a column `load_dt` using the function `current_timestamp()`.

**2. Create Full Name (`full_name`)**
Source File: DF of logistics_source1 and logistics_source2<br>
* **Scenario:** The reporting dashboard requires a single field for the driver's name instead of separate columns.
* **Action:** Create `full_name` by concatenating `first_name` and `last_name` with a space separator.
* **Result:** "Rajesh" + " " + "Kumar" -> **"Rajesh Kumar"**

**3. Define Route Segment (`route_segment`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** The logistics team wants to analyze performance based on specific transport lanes (Source to Destination).
* **Action:** Combine `source_city` and `destination_city` with a hyphen.
* **Result:** "Chennai" + "-" + "Pune" -> **"Chennai-Pune"**

**4. Generate Vehicle Identifier (`vehicle_identifier`)**
Source File: DF of logistics_shipment_detail_3000.json<br>
* **Scenario:** We need a unique tracking code that immediately tells us the vehicle type and the shipment ID.
* **Action:** Combine `vehicle_type` and `shipment_id` to create a composite key.
* **Result:** "Truck" + "_" + "500001" -> **"Truck_500001"**

In [0]:
merged_df.withColumn("load_dt",lit(current_timestamp())).withColumn("full_name",F.concat(F.col("staff_first_name"),lit(","),F.col("staff_last_name"))).show()

In [0]:
order_df = order_df.withColumn("route_segment", F.concat(col("source_city"),F.lit("->"),F.col("destination_city")))
display(order_df)


In [0]:
order_df = order_df.withColumn("vehicle_identifier", F.concat(F.col("vehicle_type"),F.col("shipment_id")))
display(order_df)

In [0]:
order_df.withColumn("shipment_year",F.year(F.col('shipment_date'))) \
    .withColumn("shipment_month",F.month(F.col('shipment_date'))) \
        .withColumn("day_num",F.dayofweek(F.col('shipment_date'))) \
            .withColumn("cost_per_kg",try_divide(F.col("shipment_cost") , F.col("shipment_weight_kg"))) \
                .withColumn("is_weekend", 
                          when(col("day_num").isin(1,5), False)
                          .otherwise(True)) \
                              .withColumn("days_since_shipment", datediff(current_date(),F.col("shipment_date")) )\
                                  .withColumn("tax_amount",col("shipment_cost") * lit(0.18)) \
                                      .drop("")
                                  .show()
                              
