In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, explode, arrays_zip
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType, LongType, StringType

In [5]:
# Initialize Spark Session
spark = SparkSession.builder.appName("StockPriceStream").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")  # Reduce logs

In [6]:
# Define schema for incoming JSON data
schema = StructType([
    StructField("unix_timestamp", ArrayType(LongType()), True),
    StructField("stockname", ArrayType(StringType()), True),
    StructField("open", ArrayType(DoubleType()), True),
    StructField("close", ArrayType(DoubleType()), True),
    StructField("high", ArrayType(DoubleType()), True),
    StructField("low", ArrayType(DoubleType()), True),
    StructField("volume", ArrayType(DoubleType()), True)
])

In [7]:
# Read stream from socket
raw_stream = spark.readStream.format("socket") \
    .option("host", "localhost") \
    .option("port", 3456) \
    .load()

In [8]:
# Parse JSON
parsed_stream = raw_stream.select(from_json(col("value"), schema).alias("data")).select("data.*")

In [9]:
# Zip all arrays together
zipped_stream = parsed_stream.withColumn("zipped", arrays_zip(
    col("unix_timestamp"),
    col("stockname"),
    col("open"),
    col("close"),
    col("high"),
    col("low"),
    col("volume")
))

In [10]:
# Explode into a structured DataFrame
flattened_stream = zipped_stream.select(
    explode(col("zipped")).alias("row")
).select(
    col("row.unix_timestamp").alias("unix_timestamp"),
    col("row.stockname").alias("stockname"),
    col("row.open").alias("open"),
    col("row.close").alias("close"),
    col("row.high").alias("high"),
    col("row.low").alias("low"),
    col("row.volume").alias("volume")
)

In [11]:
update_query = flattened_stream.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("stock_current_data") \
    .start()

append_query = flattened_stream.writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("path", "/home/ayushkhaire/code/dataennginneerinng/stocksly/stream-processing/output") \
    .option("checkpointLocation", "/home/ayushkhaire/code/dataennginneerinng/stocksly/stream-processing/checkpoints") \
    .start()

In [None]:
# Wait for termination
update_query.awaitTermination()
append_query.awaitTermination()

25/03/16 15:40:59 ERROR MicroBatchExecution: Query [id = f73b164b-dbf5-4d33-ad28-5d0189d752f3, runId = 29ccae40-577c-4e14-81cb-e0c79472b961] terminated with error
java.lang.IndexOutOfBoundsException: at 0 deleting 68
	at scala.collection.mutable.ListBuffer.remove(ListBuffer.scala:273)
	at scala.collection.mutable.BufferLike.trimStart(BufferLike.scala:178)
	at scala.collection.mutable.BufferLike.trimStart$(BufferLike.scala:178)
	at scala.collection.mutable.AbstractBuffer.trimStart(Buffer.scala:50)
	at org.apache.spark.sql.execution.streaming.sources.TextSocketMicroBatchStream.commit(TextSocketMicroBatchStream.scala:162)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$commitSources$1(MicroBatchExecution.scala:568)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$commitSources$1$adapted(MicroBatchExecution.scala:565)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at sc