# Gold Layer: Streaming Ingestion
## Use Case: User Activity per Event Type per Day
### Aggregate:
count of events per event_type AND grouped by day

### Inputs/Outputs:
Input Silver Table: /mnt/s3mock/silver/events
Output Gold Table: /mnt/s3mock/gold/aggregates

### 1. Init Spark and S3 paths

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, approx_count_distinct

spark = (
    SparkSession.builder.appName("GoldAggregation")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

# S3 Paths
silver_path = "/mnt/s3mock/silver/realtime/events"
gold_path = "/mnt/s3mock/gold/realtime/aggregates"
checkpoint_path = "/mnt/s3mock/checkpoints/gold_aggregates"


### 2. Core logic

In [2]:
silver_df = spark.readStream.format("delta").load(silver_path)

agg_df = silver_df \
    .withColumn("event_date", to_date(col("event_timestamp"))) \
    .groupBy("event_date", "event_type") \
    .agg(
        approx_count_distinct("user_id").alias("unique_users"),
        approx_count_distinct("timestamp").alias("event_count")
    )

query = agg_df.writeStream \
    .format("delta") \
    .outputMode("complete") \
    .option("checkpointLocation", checkpoint_path) \
    .start(gold_path)

query.awaitTermination()

StreamingQueryException: [STREAM_FAILED] Query [id = e128a26b-af85-460f-ab6a-14d3b8132816, runId = e4f14841-f3bb-4d30-9ff5-e79a00f4f34c] terminated with exception: Job aborted due to stage failure: Task 24 in stage 5169.0 failed 1 times, most recent failure: Lost task 24.0 in stage 5169.0 (TID 193391) (91abc9f4dbc0 executor driver): java.lang.IllegalStateException: Error committing version 593 into HDFSStateStore[id=(op=0,part=24),dir=file:/mnt/s3mock/checkpoints/gold_aggregates/state/0/24]
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:148)
	at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerBaseImpl.commit(StreamingAggregationStateManager.scala:89)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$7(statefulOperators.scala:436)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:640)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:143)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:143)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:392)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:436)
	at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:68)
	at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:127)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.FileNotFoundException: /mnt/s3mock/checkpoints/gold_aggregates/state/0/24/.593.delta.3ef33e81-3d95-42ff-9344-c5e32620f46c.TID193391.tmp (Operation not permitted)
	at java.base/java.io.FileInputStream.open0(Native Method)
	at java.base/java.io.FileInputStream.open(FileInputStream.java:216)
	at java.base/java.io.FileInputStream.<init>(FileInputStream.java:157)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.<init>(RawLocalFileSystem.java:146)
	at org.apache.hadoop.fs.RawLocalFileSystem.open(RawLocalFileSystem.java:275)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:417)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:391)
	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:341)
	at org.apache.hadoop.fs.RawLocalFileSystem.rename(RawLocalFileSystem.java:505)
	at org.apache.hadoop.fs.FileSystem.rename(FileSystem.java:1624)
	at org.apache.hadoop.fs.DelegateToFileSystem.renameInternal(DelegateToFileSystem.java:206)
	at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:790)
	at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
	at org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:489)
	at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:720)
	at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1036)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:377)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:154)
	at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:198)
	at java.base/java.io.FilterOutputStream.close(FilterOutputStream.java:188)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:450)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:320)
	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:141)
	... 24 more

Driver stacktrace: