1. Read modules and librairies

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.types import StructType
from src.churn.extract import extract_parking_data
import shutil
import os

In [None]:
spark = (
    SparkSession.builder
    .appName("job_001_availableGroupParkingPipeline")
    .master("spark://spark-master:7077")
    .getOrCreate()
)

2. Load data into a processing environment

In [None]:
# define schema for parking_group
parking_schema = StructType([
    StructField("adresse", StringType(), True),
    StructField("grp_horodatage", TimestampType(), True),
    StructField("grp_identifiant", IntegerType(), True), 
    StructField("grp_complet", IntegerType(), True),
    StructField("grp_disponible", IntegerType(), True),
    StructField("grp_exploitation", IntegerType(), True),
    StructField("grp_statut", IntegerType(), True),
    StructField("location", StructType([
        StructField("lat", DoubleType(), True),
        StructField("lon", DoubleType(), True)
    ]), True)
])
# define files repertories
path ="/data/raw/"
f_archive ="/data/archive"

In [None]:
parking_group_df = extract_parking_data(path, spark,parking_schema,f_archive)

In [None]:
# alter data types
parking_group_df = parking_group_df\
     .withColumnRenamed("grp_horodatage","grp_timestamp")

In [None]:
# Filter and aggregate parkings available for hourly
hourly_available_parking = parking_group_df\
                        .filter(F.col("grp_statut") == 5)\
                        .groupBy(F.window(F.col("grp_timestamp"), "1 hour"))\
                        .agg(F.sum("grp_disponible").alias("total_parkings_available"))
hourly_available_parking.show(truncate=False)

In [None]:
# Filter and aggregate parkings available for hourly
daily_available_parking = parking_group_df\
                        .filter(F.col("grp_statut") == 5)\
                        .groupBy(F.window(F.col("grp_timestamp"), "1 day"))\
                        .agg(F.sum("grp_disponible").alias("total_parkings_available"))
daily_available_parking.show(truncate=False)

In [None]:
# save parking available hourly into parquet files repertories
import pandas as pd
print(f"hourly_available_parking has {hourly_available_parking.count()} rows")
print(f"daily_available_parking has {daily_available_parking.count()} rows")

try:
    # Try to write to /data/cleaned first using coalesce to single executor
    import subprocess
    subprocess.run(['mkdir', '-p', '/data/cleaned/hourly'], check=False)
    subprocess.run(['mkdir', '-p', '/data/cleaned/daily'], check=False)
    print("Attempting to write to /data/cleaned...")
    hourly_available_parking.coalesce(1).write.format("parquet").mode("overwrite").save("/data/cleaned/hourly")
    daily_available_parking.coalesce(1).write.format("parquet").mode("overwrite").save("/data/cleaned/daily")
    print("Successfully wrote parquet files to /data/cleaned/hourly and /data/cleaned/daily")
except Exception as e:
    print(f"Error writing to /data: {type(e).__name__}")
    print("Falling back to Pandas + Parquet write from driver...")
    # Fallback: convert to Pandas and write locally (driver-side)
    workspace_output = "/home/jovyan/work/parquet_output"
    if os.path.exists(workspace_output):
        shutil.rmtree(workspace_output)
    os.makedirs(os.path.join(workspace_output, "hourly"), exist_ok=True)
    os.makedirs(os.path.join(workspace_output, "daily"), exist_ok=True)
    
    # Convert to Pandas and save as parquet using pyarrow
    print("Converting hourly to Pandas and saving...")
    hourly_pdf = hourly_available_parking.toPandas()
    hourly_pdf.to_parquet(os.path.join(workspace_output, "hourly", "part-00000.parquet"), engine='pyarrow')
    
    print("Converting daily to Pandas and saving...")
    daily_pdf = daily_available_parking.toPandas()
    daily_pdf.to_parquet(os.path.join(workspace_output, "daily", "part-00000.parquet"), engine='pyarrow')
    
    print(f"Wrote fallback parquet files to {workspace_output}")
    print(f"Hourly shape: {hourly_pdf.shape}, Daily shape: {daily_pdf.shape}")

In [None]:
# Load parquet files from fallback path (created by Pandas write)
parquet_daily_path = os.path.join(os.getcwd(), "parquet_output/daily")
parquetDaily = spark.read.parquet(parquet_daily_path)
parquetDaily.createOrReplaceTempView("parking_daily")
print(f"Loaded daily parquet from {parquet_daily_path}")
parquetDaily.show()

AnalysisException: [UNABLE_TO_INFER_SCHEMA] Unable to infer schema for Parquet. It must be specified manually.

In [None]:
# Query the parking_daily temp view to transform window column
parking_daily = spark.sql("SELECT window.start as started, window.end as ended, total_parkings_available FROM parking_daily")
parking_daily.show(truncate=False)