In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

from pyspark.sql.functions import expr,to_date,col



df_src1_raw = spark.read.options(header=True,inferSchema=True).format("csv").load("/Volumes/workspace/logistics_data/logistics_volume/logistics_source1")

df_src2_raw = spark.read.options(header=True,inferSchema=True).format("csv").load("/Volumes/workspace/logistics_data/logistics_volume/logistics_source2")

df_logistics_ship_raw = spark.read.options(header=True,inferSchema=True,multiline=True).format("json").load("/Volumes/workspace/logistics_data/logistics_volume/logistics_shipment_detail_3000.json")


print('count of df_src1_raw:-',df_src1_raw.count())
print('count of df_src2_raw:-',df_src2_raw.count())
print('count of df_json_raw:-',df_logistics_ship_raw.count())


In [0]:
# Task 1 and 2

from pyspark.sql.functions import lit,to_date,col,expr,when,lower,upper,cast,current_timestamp, initcap,count
from pyspark.sql.types import DecimalType

df_src1_clned = df_src1_raw.withColumn("source",lit("system1"))
df_src2_clned = df_src2_raw.withColumn("source",lit("system2"))

df_cust_combined = df_src1_clned.unionByName(df_src2_clned,allowMissingColumns = True).select("shipment_id","first_name","last_name","age","role","hub_location","vehicle_type","source")

df_customer = df_cust_combined.dropna(subset=["shipment_id","role"]) \
    .filter(col("first_name").isNotNull() | col("last_name").isNotNull()) \
    .filter(col("shipment_id").rlike("^[0-9]+$")) \
    .dropDuplicates(["shipment_id"]) \
    .withColumn("age",when(col("age").rlike("^[0-9]+$"), col("age").cast("int")).otherwise(-1)) \
    .withColumn("vehicle_type",when (col("vehicle_type").isNull(),"UNKNOWN")
    .when (lower(col("vehicle_type")) == "truck", "LMV")
    .when (lower(col("vehicle_type")) == "bike", "TwoWheeler") \
    .otherwise(col("vehicle_type"))) \
    .withColumn("role",lower(col("role"))) \
    .withColumn("vehicle_type",upper(col("role"))) \
    .withColumn("hub_location",initcap(col("hub_location"))) \
    .withColumnsRenamed({"hub_location":"origin_hub_city","first_name":"staff_first_name","last_name":"staff_last_name"})

df_customer=df_customer.dropDuplicates()
df_customer=df_customer.dropDuplicates(subset=["shipment_id"])

# df_customer.display()


In [0]:
 
df_log_data_stand =df_logistics_ship_raw.withColumn("domain", lit("logistics")) \
    .withColumn("ingestion_timestamp", current_timestamp()) \
    .withColumn("is_expedited", lit("False")) \
    .withColumn("shipment_date", to_date(col("shipment_date"), 'yyyy-MM-dd')) \
    .withColumn("shipment_cost", col("shipment_cost").cast(DecimalType(15, 2))) \
    .withColumn("shipment_weight_kg", col("shipment_weight_kg").cast("double"))


df_trans_data=df_log_data_stand.withColumn("is_expedited",col("is_expedited") \
    .cast("boolean"))
# de duplication
df_trans_data=df_trans_data.dropDuplicates()


# df_trans_data.display()

  

2. Data Enrichment - Detailing of data

In [0]:
from pyspark.sql.functions import concat_ws,year,month,dayofweek,day

df_cust_enrich = df_customer.withColumn("load_dt",current_timestamp()) \
    .withColumn("full_name",concat_ws(" ",col("staff_first_name"),col("staff_last_name")))


df_trans_enrich = df_trans_data \
    .withColumn("route_segment",concat_ws("-",col("source_city"),col("destination_city"))) \
    .withColumn("vehicle_identifier",concat_ws("_",col("vehicle_type"),col("shipment_id"))) \
    .withColumn("shipment_year",year(col("shipment_date"))) \
    .withColumn("shipment_month",month(col("shipment_date"))) \
    .withColumn("shipment_day",day(col("shipment_date"))) \
    .withColumn("is_weekend",dayofweek(col("shipment_date")).isin(1,7)) \
    .withColumn("is_expedited",when(col("shipment_status").isin ("IN_TRANSIT","DELIVERED"),lit("TRUE")).otherwise("FALSE"))
    
# df_trans_enrich=df_trans_enrich.dropDuplicates)

# df_cust_enrich.display()
# df_trans_enrich.display()
# df_trans_enrich.show()

Enrichment/Business Logics (Calculated Fields)

In [0]:
from pyspark.sql.functions import datediff,current_date

df_trans_enrich_bl = df_trans_enrich.withColumn("cost_per_kg",col("shipment_cost")/col("shipment_weight_kg")) \
    .withColumn("days_since_shipment",datediff(current_date(),col("shipment_date"))) \
    .withColumn("tax_amount",col("shipment_cost")*lit(0.18))

# df_trans_enrich_bl.display()

Remove/Eliminate (drop, select, selectExpr)<br>
 UDF1: Complex Incentive Calculation

In [0]:
from pyspark.sql.functions import udf, substring, concat, repeat, length
from pyspark.sql.types import IntegerType, StringType

def calculate_bonus(role, age):
    if role is not None and isinstance(role, str):
        if role.lower() == "driver" and age is not None:
            if age >= 50:
                return 15
            elif age < 30:
                return 5
    return 0

def mask_identity(val):
    if val is not None and len(val) > 2:
        return val[:2] + "*" * (len(val) - 2)
    elif val is not None:
        return val
    return None

bonus_udf = udf(calculate_bonus, IntegerType())
mask_identity_udf = udf(mask_identity, StringType())

In [0]:
from pyspark.sql.functions import split,substring,length,day

df_cust_en_clean = df_cust_enrich.drop("staff_first_name","staff_last_name") \
    .withColumn("projected_bonus",bonus_udf(col("role"),col("age"))) \
    .withColumn("masked_name",mask_identity_udf("full_name"))

df_trans_enrich_bl1 = df_trans_enrich_bl.withColumn("order_prefix",substring(col("order_id"),1,3)) \
    .withColumn("order_sequence",substring(col("order_id"),4,100)) \
    .withColumn("route_lane",concat_ws("->",col("source_city"),col("destination_city")))
# df_cust_en_clean.display()
# df_trans_enrich_bl1.display()

4. Data Core Curation & Processing (Pre-Wrangling)