This Notebook contains the following functions:
  
1. Python UDF Function
2. Generic Framework - Business specific
3. Generic Framework - Common Functions

In [0]:
print("Running Utility Notebook to initialize all functions to use further")

Creating UDF to convert string to number, hence we don't have to filter string values or manipulate string values manually using dictionary word_to_num={'one':'1','two':'2'}
Eg. If we pass "twenty thousand two hundred and one" -> 20201

In [0]:
%pip install word2number

In [0]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
from word2number import w2n

def word_to_num(value):
    try:
        # If already numeric
        return int(value)
    except:
        try:
            return w2n.word_to_num(value.lower())
        except:
            return None

word_to_num = udf(word_to_num, IntegerType())

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def standardize_staff(df):
    return (
        df
        .withColumn("shipment_id",word_to_num_udf(F.col("shipment_id")).cast("long"))
        .withColumn("age",word_to_num_udf(F.col("age")).cast("int"))
        .withColumn("role", F.lower("role"))
        .withColumn("origin_hub_city", F.initcap("hub_location"))
        .withColumn("load_dt", F.current_timestamp())
        .withColumn("full_name", F.concat_ws(" ", "first_name", "last_name"))
        .withColumn("hub_location", F.initcap("hub_location"))
        .drop("first_name", "last_name")
        .withColumnRenamed("full_name", "staff_full_name")
    )

def scrub_geotag(df):
    return (
        df
        .withColumn("city_name", F.initcap("city_name"))
        .withColumn("masked_hub_location", F.initcap("country"))
    )

def standardize_shipments(df):
    return (
        df
        .withColumn("domain", F.lit("Logistics"))
        .withColumn("ingestion_timestamp", F.current_timestamp())
        .withColumn("is_expedited", F.lit(False).cast("boolean"))
        .withColumn("shipment_date", F.to_date("shipment_date", "yy-MM-dd"))
        .withColumn("shipment_cost", F.round("shipment_cost", 2))
        .withColumn("shipment_weight_kg", F.col("shipment_weight_kg").cast("double"))
    )

def enrich_shipments(df):
    return (
        df
        .withColumn("route_segment",
            F.concat_ws("-", "source_city", "destination_city"))
        .withColumn("vehicle_identifier",
            F.concat_ws("_", "vehicle_type", "shipment_id"))
        .withColumn("shipment_year", F.year("shipment_date"))
        .withColumn("shipment_month", F.month("shipment_date"))
        .withColumn("is_weekend",
            F.dayofweek("shipment_date").isin([1,7]))
        .withColumn("is_expedited",
            F.col("shipment_status").isin("IN_TRANSIT", "DELIVERED"))
        .withColumn("cost_per_kg",
            F.round(F.col("shipment_cost") / F.col("shipment_weight_kg"), 2))
        .withColumn("tax_amount",
            F.round(F.col("shipment_cost") * 0.18, 2))
        .withColumn("days_since_shipment",
            F.datediff(F.current_date(), "shipment_date"))
        .withColumn("is_high_value",
            F.col("shipment_cost") > 50000)
    )

def split_columns(df):
    return (
        df
        .withColumn("order_prefix", F.substring("order_id", 1, 3))
        .withColumn("order_sequence", F.substring("order_id", 4, 10))
        .withColumn("ship_year", F.year("shipment_date"))
        .withColumn("ship_month", F.month("shipment_date"))
        .withColumn("ship_day", F.dayofmonth("shipment_date"))
        .withColumn("route_lane",
            F.concat_ws("->", "source_city", "destination_city"))
    )

def mask_name(col):
    return F.concat(
        F.substring(col, 1, 2),
        F.lit("****"),
        F.substring(col, -1, 1)
    )

Generic Functions