In [1]:
import boto3
import json
import time
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType
import os

# Load environment variables from .env file
load_dotenv()

# Initialize AWS credentials from the .env file
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_SESSION_TOKEN = os.getenv('AWS_SESSION_TOKEN')
AWS_REGION = os.getenv('AWS_REGION')

# Initialize the S3 bucket name


In [None]:
# Path to your local JAR files
local_jars = "/Users/borja/Documents/Somniumrema/projects/de/route_optimizer/jars/aws-java-sdk-kinesis-1.12.364.jar"

spark = (SparkSession.builder
    .master("local[*]")  # Running Spark locally with all available cores
    .appName("DeltaLakeAggregation")  # Application name
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")  # Delta Lake extension
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")  # Delta Catalog
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0,org.apache.hadoop:hadoop-aws:3.3.2,com.amazonaws:aws-java-sdk-bundle:1.11.1026")  # JARs for Delta Lake, S3, and AWS SDK
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")  # S3A file system for S3 access
    .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID)  # AWS access key
    .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)  # AWS secret key
    .config("spark.hadoop.fs.s3a.session.token", AWS_SESSION_TOKEN)  # AWS session token (if using temporary credentials)
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")  # S3 endpoint
    .config("spark.sql.files.maxPartitionBytes", "134217728")  # Max partition size in bytes (128MB)
    .config("spark.driver.memory", "6g")  # Memory allocated to the Spark driver
    .config("spark.executor.memory", "6g")  # Memory allocated to Spark executors
    .config("spark.sql.adaptive.enabled", "false")  # Disabling Adaptive Query Execution for control
    .config("spark.sql.debug.maxToStringFields", "100")  # Increase the max number of fields for debugging output
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true")  # Enable schema auto-merge for Delta Lake
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")  # Enable Arrow optimizations for PySpark
    .config("spark.executor.extraJavaOptions", "-XX:+UseParallelGC")  # Garbage collection optimization
    .config("spark.driver.extraJavaOptions", "-XX:+UseParallelGC")  # Garbage collection optimization for the driver
    .getOrCreate())

# Optional: Adjust logging level
spark.sparkContext.setLogLevel("WARN")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


# Read the stream from the Delta Lake (Bronze layer) without specifying schema
df_stream = spark.readStream \
    .format("delta") \
    .option("ignoreDeletes", "true") \
    .option("ignoreChanges", "true") \
    .load("s3a://orders-for-dispatch/bronze/")

# Add hour-based column for partitioning and change status from 'RECEIVED' to 'READY_FOR_DELIVERY'
df_with_hour_and_status = df_stream \
    .withWatermark("order_timestamp", "15 minute") \
    .withColumn("order_hour", F.date_format(F.col("order_timestamp"), "yy-MM-dd-HH")) \
    .withColumn("status", F.when(F.col("status") == "RECEIVED", "READY_FOR_DELIVERY").otherwise(F.col("status")))

# Write the streaming data to Delta table, partitioning by hour and keeping the row structure
query = df_with_hour_and_status.writeStream \
    .format("delta") \
    .outputMode("append") \
    .partitionBy("order_hour") \
    .option("checkpointLocation", "s3a://orders-for-dispatch/checkpoints/silver_stream") \
    .trigger(processingTime="5 minutes") \
    .start("s3a://orders-for-dispatch/silver/")

query.awaitTermination()


In [3]:
# for s in spark.streams.active:
#     s.stop()

In [4]:
# df_stream = spark.readStream \
#     .format("delta") \
#     .option("ignoreDeletes", "true") \
#     .option("ignoreChanges", "true") \
#     .load("s3a://orders-for-dispatch/bronze/")

# df_stream.writeStream \
#     .format("console") \
#     .option("checkpointLocation", "s3a://orders-for-dispatch/checkpoints/bronze_stream") \
#     .start()


In [5]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.streaming import DataStreamWriter




In [6]:


from pyspark.sql.streaming.state import GroupState, GroupStateTimeout


In [8]:
# from pyspark.sql import SparkSession
# from pyspark.sql import functions as F
# from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# # Define schema for the data
# state_schema = StructType([
#     StructField("order_id", StringType(), True),
#     StructField("customer_id", StringType(), True),
#     StructField("total_weight", DoubleType(), True),
#     StructField("total_volume", DoubleType(), True),
#     StructField("total_price", DoubleType(), True),
#     StructField("order_timestamp", TimestampType(), True),
#     StructField("lat", DoubleType(), True),
#     StructField("lon", DoubleType(), True)
# ])

# # Define thresholds and lorry capacity
# VOLUME_THRESHOLD = 1000  # Minimum volume for dispatch
# WEIGHT_THRESHOLD = 500  # Minimum weight for dispatch
# MAX_VOLUME_CAPACITY = 20000  # Maximum lorry volume capacity
# MAX_WEIGHT_CAPACITY = 5000  # Maximum lorry weight capacity

# # Read the stream from the Delta Lake (Bronze layer)
# df_stream = spark.readStream \
#     .format("delta") \
#     .option("ignoreDeletes", "true") \
#     .option("ignoreChanges", "true") \
#     .load("s3a://orders-for-dispatch/bronze/")



# # Watermark and group orders within a time window, summing volume and weight
# df_with_window = df_stream \
#     .withWatermark("order_timestamp", "1 minute") \
#     .groupBy(
#         F.window("order_timestamp", "10 minutes")  # 10-minute tumbling window
#     ).agg(
#         F.sum("total_volume").alias("accumulated_volume"),
#         F.sum("total_weight").alias("accumulated_weight"),
#         F.collect_list("order_id").alias("order_id"),
#         F.collect_list("customer_id").alias("customer_id"),
#         F.collect_list("total_volume").alias("total_volume"),
#         F.collect_list("total_weight").alias("total_weight"),
#         F.collect_list("lat").alias("lat"),
#         F.collect_list("lon").alias("lon")
#     )

# # UDF to split orders based on lorry capacity
# def split_orders(volume, weight, order_ids, customer_ids, lats, lons):
#     batch = {"order_id": [], "customer_id": [], "lat": [], "lon": [], "total_volume": 0, "total_weight": 0}
#     leftover = {"order_id": [], "customer_id": [], "lat": [], "lon": [], "total_volume": 0, "total_weight": 0}
    
#     for v, w, oid, cid, lat, lon in zip(volume, weight, order_ids, customer_ids, lats, lons):
#         if (batch["volume"] + v <= MAX_VOLUME_CAPACITY) and (batch["weight"] + w <= MAX_WEIGHT_CAPACITY):
#             batch["order_ids"].append(oid)
#             batch["customer_ids"].append(cid)
#             batch["lat"].append(lat)
#             batch["lon"].append(lon)
#             batch["total_volume"] += v
#             batch["total_weight"] += w
#         else:
#             leftover["order_id"].append(oid)
#             leftover["customer_id"].append(cid)
#             leftover["lat"].append(lat)
#             leftover["lon"].append(lon)
#             leftover["total_volume"] += v
#             leftover["total_weight"] += w
    
#     return batch, leftover

# # Register UDF
# split_orders_udf = F.udf(split_orders, 
#     StructType([
#         StructField("batch", 
#             StructType([
#                 StructField("order_id", StringType(), True),
#                 StructField("customer_id", StringType(), True),
#                 StructField("lat", StringType(), True),
#                 StructField("lon", StringType(), True),
#                 StructField("total_volume", DoubleType(), True),
#                 StructField("total_weight", DoubleType(), True)
#             ]), 
#         True),
#         StructField("remaining", 
#             StructType([
#                 StructField("order_id", StringType(), True),
#                 StructField("customer_id", StringType(), True),
#                 StructField("lat", StringType(), True),
#                 StructField("lon", StringType(), True),
#                 StructField("total_volume", DoubleType(), True),
#                 StructField("total_weight", DoubleType(), True)
#             ]), 
#         True)
#     ])
# )

# # Apply the split function on each windowed batch of orders
# df_split = df_with_window.withColumn(
#     "split_result", split_orders_udf(
#         F.col("total_volume"), F.col("total_weight"), F.col("order_id"), 
#         F.col("customer_id"), F.col("lat"), F.col("lon")
#     )
# )

# # Extract the batches and the leftovers
# df_batches = df_split.select(
#     F.col("split_result.batch.order_id").alias("order_id"),
#     F.col("split_result.batch.customer_ids").alias("customer_id"),
#     F.col("split_result.batch.lats").alias("lat"),
#     F.col("split_result.batch.lons").alias("lon"),
#     F.col("split_result.batch.total_volume").alias("total_volume"),
#     F.col("split_result.batch.total_weight").alias("total_weight"),
#     F.lit("READY_FOR_DISPATCH").alias("status")
# )

# df_leftovers = df_split.select(
#     F.col("split_result.remaining.order_id").alias("order_id"),
#     F.col("split_result.remaining.customer_id").alias("customer_id"),
#     F.col("split_result.remaining.lat").alias("lat"),
#     F.col("split_result.remaining.lon").alias("lon"),
#     F.col("split_result.remaining.total_volume").alias("total_volume"),
#     F.col("split_result.remaining.total_weight").alias("total_weight")
# ).withColumn("status", F.lit("RECEIVED"))

# # Union the batches ready for dispatch and the leftover orders
# df_final = df_batches.unionByName(df_leftovers)

# # Write the result to the Delta Lake (Silver layer)
# query = df_final.writeStream \
#     .format("delta") \
#     .outputMode("append") \
#     .option("checkpointLocation", "s3a://orders-for-dispatch/checkpoints/silver_stream") \
#     .trigger(processingTime="10 seconds") \
#     .start("s3a://orders-for-dispatch/silver/")

# query.awaitTermination()


In [None]:
# from pyspark.sql import SparkSession
# from pyspark.sql import functions as F
# from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# # Define schema for the data
# state_schema = StructType([
#     StructField("order_id", StringType(), True),
#     StructField("customer_id", StringType(), True),
#     StructField("total_weight", DoubleType(), True),
#     StructField("total_volume", DoubleType(), True),
#     StructField("total_price", DoubleType(), True),
#     StructField("order_timestamp", TimestampType(), True),  # TimestampType for time-based operations
#     StructField("lat", DoubleType(), True),
#     StructField("lon", DoubleType(), True)
# ])

# # Define the thresholds and maximum dispatch capacity
# VOLUME_THRESHOLD = 1000  # Minimum volume for dispatch
# WEIGHT_THRESHOLD = 500  # Minimum weight for dispatch
# MAX_VOLUME_CAPACITY = 20000  # Maximum lorry capacity in terms of volume
# MAX_WEIGHT_CAPACITY = 5000  # Maximum lorry capacity in terms of weight

# # Read the stream from the Delta Lake (Bronze layer)
# df_stream = spark.readStream \
#     .format("delta") \
#     .option("ignoreDeletes", "true") \
#     .option("ignoreChanges", "true") \
#     .load("s3a://orders-for-dispatch/bronze/")

# # Apply a watermark and group by 10-minute tumbling windows, summing up total volume and weight
# df_with_window = df_stream \
#     .withWatermark("order_timestamp", "1 minutes") \
#     .groupBy(
#         F.window("order_timestamp", "1 minutes")  # Time-based tumbling window
#     ).agg(
#         F.sum("total_volume").alias("accumulated_volume"),
#         F.sum("total_weight").alias("accumulated_weight"),
#         F.collect_list("total_volume").alias("total_volume"),
#         F.collect_list("total_weight").alias("total_weight"),
#         F.collect_list("order_id").alias("order_ids"),
#         F.collect_list("customer_id").alias("customer_ids"),
#         F.collect_list("total_price").alias("total_prices"),
#         F.collect_list("lat").alias("lats"),
#         F.collect_list("lon").alias("lons")
#     )

# # Process orders that meet the dispatch threshold and split into batches based on lorry capacity
# df_filtered = df_with_window.filter(
#     (F.col("accumulated_volume") > VOLUME_THRESHOLD) &
#     (F.col("accumulated_weight") > WEIGHT_THRESHOLD)
# )

# # Split the batches into manageable chunks based on the lorry capacity
# df_ready_for_dispatch = df_filtered.withColumn(
#     "ready_volume", F.when(F.col("accumulated_volume") > MAX_VOLUME_CAPACITY, MAX_VOLUME_CAPACITY).otherwise(F.col("accumulated_volume"))
# ).withColumn(
#     "ready_weight", F.when(F.col("accumulated_weight") > MAX_WEIGHT_CAPACITY, MAX_WEIGHT_CAPACITY).otherwise(F.col("accumulated_weight"))
# ).withColumn(
#     "status", F.lit("READY_FOR_DISPATCH")
# )

# # Write the processed stream to the console (for testing purposes)
# query = df_ready_for_dispatch.writeStream \
#     .format("console") \
#     .outputMode("append") \
#     .option("truncate", "false") \
#     .option("checkpointLocation", "s3a://orders-for-dispatch/checkpoints/silver_stream") \
#     .trigger(processingTime="10 seconds") \
#     .start()

# query.awaitTermination()
