In [111]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, to_date, concat_ws, lit, monotonically_increasing_id,
    year, month, dayofmonth, dayofweek, quarter
)
from pyspark.sql.types import IntegerType, BooleanType, TimestampType, StringType

# === Start Spark session ===
spark = SparkSession.builder.appName("InPostDataModel_Full").getOrCreate()

# === Step 1: Read input CSV ===
df = spark.read.option("header", True) \
               .option("escape", '"') \
               .option("multiLine", True) \
               .csv("/workspaces/inpost_analytics/input/Dane_zadanie_rekrutacyjne.csv")

# === Step 2: Dedupe and parse JSON ===
df_unique = df.dropDuplicates()

json_sample = df_unique.select("event").filter(col("event").isNotNull()).limit(1).collect()[0][0]
inferred_schema = spark.read.json(spark.sparkContext.parallelize([json_sample])).schema

df_parsed = df_unique.withColumn("event_parsed", from_json(col("event"), inferred_schema))
df_parsed_valid = df_parsed.filter(col("event_parsed").isNotNull())

# === Step 3: Flatten fact table ===
fact = df_parsed_valid.select(
    col("event_parsed.entry_date").cast(TimestampType()).alias("entry_date"),
    col("event_parsed.event_code").cast(StringType()).alias("event_code"),
    col("event_parsed.event_date").cast(TimestampType()).alias("event_date"),
    col("event_parsed.event_nature").cast(StringType()).alias("event_nature"),
    col("event_parsed.event_sub_code").cast(StringType()).alias("event_sub_code"),

    col("event_parsed.shipping.sign_code").cast(StringType()).alias("shipping_sign_code"),
    col("event_parsed.shipping.brand_code_alpha").cast(StringType()).alias("shipping_brand_code_alpha"),

    col("event_parsed.shipping.collection.prestation_code").cast(StringType()).alias("shipping_collection_prestation_code"),
    col("event_parsed.shipping.collection.round.codeAgence").cast(IntegerType()).alias("shipping_collection_round_codeAgence"),
    col("event_parsed.shipping.collection.round.pays").cast(StringType()).alias("shipping_collection_round_pays"),

    col("event_parsed.shipping.sav_folder").cast(BooleanType()).alias("shipping_sav_folder"),
    col("event_parsed.shipping.parcel_number").cast(IntegerType()).alias("shipping_parcel_number"),
    col("event_parsed.shipping.parcel_sequence").cast(IntegerType()).alias("shipping_parcel_sequence"),
    col("event_parsed.shipping.is_replaced").cast(BooleanType()).alias("shipping_is_replaced"),
    col("event_parsed.shipping.shipping_id").cast(StringType()).alias("shipping_shipping_id"),
    col("event_parsed.shipping.shipping_number").cast(StringType()).alias("shipping_shipping_number"),

    col("event_parsed.shipping.delivery.prestation_code").cast(StringType()).alias("shipping_delivery_prestation_code"),
    col("event_parsed.shipping.delivery.round.codeAgence").cast(IntegerType()).alias("shipping_delivery_round_codeAgence"),
    col("event_parsed.shipping.delivery.round.pays").cast(StringType()).alias("shipping_delivery_round_pays"),

    col("event_parsed.shipping.state.code").cast(StringType()).alias("shipping_state_code"),
    col("event_parsed.shipping.state.date").cast(TimestampType()).alias("shipping_state_date"),
    col("event_parsed.shipping.state.nature").cast(StringType()).alias("shipping_state_nature"),
    col("event_parsed.shipping.state.sousCode").cast(StringType()).alias("shipping_state_sousCode"),

    to_date(col("event_parsed.event_date")).alias("event_date_only")
)

# === Step 4: Write fact table to disk ===
fact.coalesce(1).write.mode("overwrite").csv("/workspaces/inpost_analytics/output/fact_shipping_events.csv", header=True)

# === Step 5: Build dim_customer ===
dim_customer = fact.select(
    col("shipping_brand_code_alpha"),
    col("shipping_sign_code")
).distinct() \
.withColumn("customer_key", concat_ws("_", col("shipping_brand_code_alpha"), col("shipping_sign_code")))

dim_customer.select(
    col("customer_key"),
    col("shipping_brand_code_alpha").alias("brand_code_alpha"),
    col("shipping_sign_code").alias("sign_code")
).coalesce(1).write.mode("overwrite").csv("/workspaces/inpost_analytics/output/dim_customer.csv", header=True)

# === Step 6: Build dim_event_type ===
dim_event_type = fact.select(
    col("event_code"),
    col("event_sub_code"),
    col("event_nature").alias("event_description")
).distinct()

dim_event_type.coalesce(1).write.mode("overwrite").csv("/workspaces/inpost_analytics/output/dim_event_type.csv", header=True)

# === Step 7: Build dim_location ===
collection_locations = fact.select(
    col("shipping_collection_round_codeAgence").alias("codeAgence"),
    col("shipping_collection_round_pays").alias("pays")
).distinct() \
.withColumn("location_type", lit("collection"))

delivery_locations = fact.select(
    col("shipping_delivery_round_codeAgence").alias("codeAgence"),
    col("shipping_delivery_round_pays").alias("pays")
).distinct() \
.withColumn("location_type", lit("delivery"))

dim_location = collection_locations.union(delivery_locations) \
    .dropDuplicates(["codeAgence", "pays", "location_type"]) \
    .withColumn("location_id", monotonically_increasing_id())

dim_location.select(
    col("location_id"),
    col("codeAgence"),
    col("pays"),
    col("location_type")
).coalesce(1).write.mode("overwrite").csv("/workspaces/inpost_analytics/output/dim_location.csv", header=True)

# === Step 8: Build dim_date ===
dim_date = fact.select(
    col("event_date_only").alias("date_key")
).distinct() \
.withColumn("year", year(col("date_key"))) \
.withColumn("quarter", quarter(col("date_key"))) \
.withColumn("month", month(col("date_key"))) \
.withColumn("day", dayofmonth(col("date_key"))) \
.withColumn("day_of_week", dayofweek(col("date_key")))

dim_date.coalesce(1).write.mode("overwrite").csv("/workspaces/inpost_analytics/output/dim_date.csv", header=True)


25/06/06 01:51:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

In [112]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, datediff, min as spark_min, max as spark_max, avg, countDistinct
from pyspark.sql.types import TimestampType

spark = SparkSession.builder.appName("InPostMetrics").getOrCreate()

# Load fact table (adjust path if needed)
fact = spark.read.option("header", True) \
    .option("inferSchema", True) \
    .csv("/workspaces/inpost_analytics/output/fact_shipping_events.csv")

# Filter and get event dates by event type per package
sender_events = fact.filter(
    (col("event_code") == "PEC") & 
    (col("event_sub_code").isin("APM", "REL"))
).select(
    col("shipping_shipping_id"),
    col("event_date").alias("sender_placement_date")
)

courier_events = fact.filter(
    (col("event_code") == "TRN") & 
    (col("event_sub_code").isin("APM", "REL"))
).select(
    col("shipping_shipping_id"),
    col("event_date").alias("courier_placement_date")
)

recipient_events = fact.filter(
    col("event_code") == "LIV"
).select(
    col("shipping_shipping_id"),
    col("event_date").alias("recipient_pickup_date")
)

# Aggregate dates per package: get min event date if multiple
from pyspark.sql.functions import first

sender_agg = sender_events.groupBy("shipping_shipping_id") \
    .agg(spark_min("sender_placement_date").alias("sender_placement_date"))

courier_agg = courier_events.groupBy("shipping_shipping_id") \
    .agg(spark_min("courier_placement_date").alias("courier_placement_date"))

recipient_agg = recipient_events.groupBy("shipping_shipping_id") \
    .agg(spark_min("recipient_pickup_date").alias("recipient_pickup_date"))

# Join all dates together on package ID
from functools import reduce
from pyspark.sql import DataFrame

dfs = [sender_agg, courier_agg, recipient_agg]

fact_dates = reduce(
    lambda left, right: left.join(right, on="shipping_shipping_id", how="inner"), dfs
)

# Calculate differences in days
fact_dates = fact_dates.withColumn(
    "delivery_time_days", datediff(col("courier_placement_date"), col("sender_placement_date"))
).withColumn(
    "total_lifetime_days", datediff(col("recipient_pickup_date"), col("sender_placement_date"))
).withColumn(
    "pickup_time_days", datediff(col("recipient_pickup_date"), col("courier_placement_date"))
)

# Calculate averages
avg_delivery_time = fact_dates.select(avg("delivery_time_days")).collect()[0][0]
avg_total_lifetime = fact_dates.select(avg("total_lifetime_days")).collect()[0][0]
avg_pickup_time = fact_dates.select(avg("pickup_time_days")).collect()[0][0]

# Total unique packages
total_packages = fact.select(countDistinct("shipping_shipping_id")).collect()[0][0]

print(f"Średni czas dostawy paczki (dni): {avg_delivery_time:.2f}")
print(f"Średni całkowity czas życia paczki (dni): {avg_total_lifetime:.2f}")
print(f"Średni czas odbioru paczki z docelowego punktu (dni): {avg_pickup_time:.2f}")
print(f"Całkowita liczba paczek: {total_packages}")


25/06/06 01:51:36 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Średni czas dostawy paczki (dni): 3.58
Średni całkowity czas życia paczki (dni): 4.67
Średni czas odbioru paczki z docelowego punktu (dni): 1.09
Całkowita liczba paczek: 46019


In [None]:
print