In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
from pyspark.sql.functions import from_json, col, to_timestamp, last, avg, first, window

# --- Configuration ---
region = "us-east-1"
stream_name = "stock_stream"

# Output Path
s3_output_path = "s3://databricks-stock-project-2025-10-02/streaming/avg_1min/"
checkpoint_path = "s3://databricks-stock-project-2025-10-02/streaming/checkpoints/avg_1min/"
# -------------------------------------------------------

In [0]:
# 1. Schema
schema = StructType([
    StructField("ticker", StringType()),
    StructField("date", StringType()),        
    StructField("timestamp", StringType()),   
    StructField("open", DoubleType()),
    StructField("high", DoubleType()),
    StructField("low", DoubleType()),
    StructField("close", DoubleType()),
    StructField("volume", LongType())
])

In [0]:
# 2. Read from Kinesis
raw_stream_df = (
    spark.readStream
        .format("kinesis")
        .option("streamName", stream_name)
        .option("region", region)
        .option("initialPosition", "LATEST")
        .option("awsUseInstanceProfile", "true")  # Optional here because we have set up an instance profile
        .load()
)

In [0]:
# 3. Parse JSON & convert timestamp
json_stream_df = (
    raw_stream_df
        .selectExpr("CAST(data AS STRING) AS json_data")
        .select(from_json(col("json_data"), schema).alias("parsed"))
        .select("parsed.*")
        .withColumn("event_time", to_timestamp(col("timestamp")))
)

In [0]:
# 4. Latest close
latest_close_df = (
    json_stream_df
        .withWatermark("event_time", "2 minutes")
        .groupBy("ticker")
        .agg(last("close").alias("latest_close"))
)


In [0]:
display(latest_close_df)

ticker,latest_close
AAPL,191.23757934570312


In [0]:
# 5. Avg close per minutes with date and latest close
avg_1min_df = (
    json_stream_df
        .withWatermark("event_time", "2 minutes")
        .groupBy(window(col("event_time"), "1 minute"), col("ticker"))
        .agg(
            avg("close").alias("avg_close"),
            first("date").alias("trade_date"),
            last("close").alias("last_close")
        )
        .select(
            col("ticker"),
            col("trade_date"),
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),
            col("avg_close"),
            col("last_close")
        )
)


In [0]:
display(avg_1min_df)   

ticker,trade_date,window_start,window_end,avg_close,last_close
AAPL,2024-04-12,2025-10-11T21:38:00Z,2025-10-11T21:39:00Z,180.76507110595705,195.7513427734375
AAPL,2024-07-01,2025-10-11T21:33:00Z,2025-10-11T21:34:00Z,221.38118515014648,226.1275787353516
AAPL,2024-10-11,2025-10-11T21:22:00Z,2025-10-11T21:23:00Z,228.9244605170356,238.7424774169922
AAPL,2024-09-17,2025-10-11T21:28:00Z,2025-10-11T21:29:00Z,227.05867462158204,223.43682861328125
AAPL,2024-12-18,2025-10-11T21:36:00Z,2025-10-11T21:37:00Z,201.51443716195917,183.72067260742188
AAPL,2024-05-02,2025-10-11T21:32:00Z,2025-10-11T21:33:00Z,195.0284519195557,209.4019317626953
AAPL,2024-08-27,2025-10-11T21:34:00Z,2025-10-11T21:35:00Z,225.31241822854068,235.38450622558597
AAPL,2024-03-27,2025-10-11T21:25:00Z,2025-10-11T21:26:00Z,174.64600372314453,189.7959747314453
AAPL,2024-11-12,2025-10-11T21:29:00Z,2025-10-11T21:30:00Z,233.43522057166467,184.00218200683597
AAPL,2024-01-09,2025-10-11T21:30:00Z,2025-10-11T21:31:00Z,183.87680130004884,167.91416931152344


In [0]:
# 6. Write to S3 simultaneously (clone to avoid conflict with display)
(
    avg_1min_df.select("*")  # Clone a new logical plan so it can run alongside display()
        .writeStream
        .format("parquet")                        
        .option("path", s3_output_path)                   # Destination path for streaming output
        .option("checkpointLocation", checkpoint_path)    # Checkpoint for state recovery
        .outputMode("append")                             # Append mode to continuously add new results
        .partitionBy("ticker")
        .trigger(processingTime="1 minute")               # Trigger every 1 minute
        .start()
)


<pyspark.sql.streaming.query.StreamingQuery at 0x7f799415e6f0>