In [1]:
import os
# save using vs code
SCALA_VERSION = '2.12'
SPARK_VERSION = '3.1.3'
# Download Kafka Jar file, this for readStream.format("kafka"), "kafka" is a driver
# kafka driver code is part of Maven Jar file
# https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.12/3.1.3
# pyspark-shell shall download the jar file behind..
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell'

In [2]:
# here we implement windowed candle data for 1 minute
# here we implement windowed candle data for 3 minute
# here we implement windowed candle data for 5 minute

# we use hadoop-vm as hostname, which is running on another system


#     kafka-topics  --create --bootstrap-server hadoop-vm:9092 --replication-factor 1 --partitions 1 --topic stock-ticks 
    
#     kafka-topics  --create --bootstrap-server hadoop-vm:9092 --replication-factor 1 --partitions 1 --topic candles-1min 
#     kafka-console-consumer --bootstrap-server hadoop-vm:9092 --topic  candles-1min  --from-beginning
 
#     kafka-topics  --create --bootstrap-server hadoop-vm:9092 --replication-factor 1 --partitions 1 --topic candles-3min 
#     kafka-console-consumer --bootstrap-server hadoop-vm:9092 --topic  candles-3min  --from-beginning

#     kafka-topics  --create --bootstrap-server hadoop-vm:9092 --replication-factor 1 --partitions 1 --topic candles-5min 
#     kafka-console-consumer --bootstrap-server hadoop-vm:9092 --topic  candles-5min  --from-beginning

In [3]:
#import sys sys.stdout = open("/dev/stdout", "w")

In [4]:
import findspark
findspark.init()

In [5]:
import pyspark


from pyspark.sql import SparkSession
# spark groupBy has default setting for spark.sql.shuffle.partitions as 200
# we set to  4, should NOT be done in production 
spark = SparkSession.builder.master("local[1]")\
                            .config('spark.sql.shuffle.partitions', 4)\
                            .appName("SparkStreamingKafkaCandles-task1").getOrCreate()

22/03/27 07:57:38 WARN Utils: Your hostname, ubuntu-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.80.128 instead (on interface ens33)
22/03/27 07:57:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark-3.1.3-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2dba46e1-ee98-4bef-9043-5f7c077e64ca;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.3 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 565ms :: artifacts dl 9ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#commons-pool2;2.6.2 from central 

In [6]:
# read from kafka, here spark is consumer for kafka topic called test
# spark streaming works as dataframe/sql
# group.id is consumer group id
# subcribe is kafka topic
# "kafka" driver is not available by default with spark, we need to download it, we did on cell 1
# GROUP ID SHOULD BE CHANGED FOR EVERY PERSON 
kafkaDf = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("subscribe", "stock-ticks")\
  .option("group.id", "stock-ticks-groupAK-hdfs")\
  .load()

#.option("timestampFormat", "dd-MMM-yyyy")\

In [7]:
# key and value are binary type, we need to CAST To STRING type
kafkaDf.printSchema()
# timestampType values
# CreateTime:  Timestamp relates to message creation time as set by a Kafka client/producer
# LogAppendTime : Timestamp relates to the time a message was appended to a Kafka log.
# UnknownType

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [8]:
# Kafka value, key are in binary format
# we need to type cast to STRING, 
# we pick only value, timestamp as we need for 10 minute window
ticksDf = kafkaDf.selectExpr("CAST(value AS STRING)", "timestamp")
ticksDf.printSchema() # we get only value as string

root
 |-- value: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [9]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, DoubleType, StringType, LongType, TimestampType

schema = StructType( [
    StructField("symbol", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("volume", LongType(), True),
    StructField("timestamp", LongType(),  True)
])

In [10]:
# we parse value which is JSON string to kafka struct, 
# this create a new column [replacing existing value which is string] with type
jsonDf = ticksDf.withColumn("value", F.from_json("value", schema))
jsonDf.printSchema()

root
 |-- value: struct (nullable = true)
 |    |-- symbol: string (nullable = true)
 |    |-- price: double (nullable = true)
 |    |-- volume: long (nullable = true)
 |    |-- timestamp: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [11]:
# now extract all the sub fields symbol, price, volume, timestamp into data frame
# exisitng value, timestamp shall be removed. existing timestamp is value produced time, not a stock tick time
# value.timestamp that comes from producer is basically a event time
stockTickDf = jsonDf.select (F.col("value.*")) # extract all sub fields as schema
stockTickDf.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- price: double (nullable = true)
 |-- volume: long (nullable = true)
 |-- timestamp: long (nullable = true)



In [12]:
# we have data coming in stream, each record has timestamp
# we have milli seconds 1647283268253, the actual seconds Mon Mar 14 2022 14:41:08
# now we have to calculate 1 minute window, sum(volume) up to 1 minute
# after conversions, we take timestamp which is truncatd to minute instead of milli seconds, drop temp columns
stockTickDf  = stockTickDf \
                .withColumn("timestampTemp", (F.col("timestamp") / 1000).cast("timestamp"))\
                .withColumn("trade_time", F.date_trunc("minute", F.col("timestampTemp")))\
                .drop("timestamp")\
                .drop("timestampTemp")\
                .withColumnRenamed("trade_time", "timestamp")
              

In [13]:
# from pyspark.sql.functions import date_format

# stockTickDf = stockTickDf.withColumn("Year", date_format("TIMESTAMP", "yyyy"))\
#                      .withColumn("Month", date_format("TIMESTAMP", "MM"))\
#                      .withColumn("Day", date_format("TIMESTAMP", "dd"))\
#                      .withColumn("Hours",date_format("TIMESTAMP","hh"))\
#                      .withColumn("Minutes",date_format("TIMESTAMP","mm"))


In [14]:
#stockTickDf .write.mode("overwrite")\
#            .partitionBy("Year", "Month", "Day","Hours","Minutes","symbol")\
 #           .format("csv")\
  #          .save("hdfs://localhost:9000/stocks-ticks")
            
#stockTickDf .printSchema()
#stockTickDf .show(5)

In [15]:
stockTickDf1 = stockTickDf\
                            .selectExpr("timestamp", "symbol",  "struct(*) as value")\
                            .withWatermark("timestamp", "1 minutes")\
                            .groupBy("symbol", F.window("timestamp", "60 seconds"))\
                          .agg( F.collect_list("value").alias("ticks"))

stockTickDf1.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- ticks: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- symbol: string (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- volume: long (nullable = true)
 |    |    |-- timestamp: timestamp (nullable = true)



In [16]:
# spark stream with batch output
# spark basically process the data as batch
# for developer convinence, we have api, that can give us finite data frame for writing to file/jsbc/db/custom coding
# process discrete output batch
# this function is called for every update/delete/comlete triggers with result data set as dataframe
# candleBatchDf is BATCH DATAFRAME
# stockTickDf1Min is STREAM DATAFRAME
def processBatchData(candleBatchDf, batch_id):
    print ("process batch called", batch_id, "writing ", candleBatchDf.count())
    # result = candleBatchDf.collect()
    # print(result)
    # candleBatchDf.show(2)
    # write to csv/json/parqeut/database /jdbc etc
    # window is nested column, has its children columns called start and end
    # start and end are part of nested window column
    # append mode will not delete existin data, instead append to existing table
    (
        
     candleBatchDf
        .coalesce(1)
        .write
        .partitionBy("Year", "Month", "Day" , "Hours", "symbol")
        .outputMode("append")
        .format("csv")
        .option("header", True)
        .save( "hdfs://localhost:9000/layers/raw/stocks-ticks-csv")

    )
       
stockTickDf.writeStream.outputMode("append").foreachBatch(processBatchData).start()
    


22/03/27 07:58:37 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-cf12ae63-bc35-4ab2-993a-55227f518751. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x7f6b64553fd0>

22/03/27 07:58:41 ERROR MicroBatchExecution: Query [id = 94df4ff7-f91e-4e81-ab77-6cacff0730ac, runId = f90b14ce-7c28-41be-9941-addd80fc392d] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/opt/spark-3.1.3-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/opt/spark-3.1.3-bin-hadoop2.7/python/pyspark/sql/utils.py", line 196, in call
    raise e
  File "/opt/spark-3.1.3-bin-hadoop2.7/python/pyspark/sql/utils.py", line 193, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "/tmp/ipykernel_247017/3446248603.py", line 22, in processBatchData
    .partitionBy("Year", "Month", "Day" , "Hours", "symbol")
AttributeError: 'DataFrameWriter' object has no attribute 'outputMode'

	at py4j.Protocol.getReturnValue(Protocol.java:476)
	at py4j.reflection.PythonProxyHandler.

process batch called 0 writing  0


In [17]:
import pyspark.sql.functions as F
# output DataStreamWriter
# copy symbol to _symbol for partition, as spark shall remove the column 
# if we use  the column in partitionBy

(
    stockTickDf
     .withColumn("year", F.date_format(F.col("timestamp"), "yyyy"))
     .withColumn("month", F.date_format(F.col("timestamp"), "MM"))
     .withColumn("day", F.date_format(F.col("timestamp"), "dd"))  
     .withColumn("hour", F.date_format(F.col("timestamp"), "HH"))   
     .withColumn("_symbol", F.col("symbol"))   
    .writeStream
    .trigger(processingTime='65 seconds')\
    .queryName("Write Ticks to CSV trigger by 1 min hour")
    .format("csv")
    .option("path", "hdfs://localhost:9000/layers/raw/stocks-ticks-csv")
    .option("header", True)
    .option("checkpointLocation", "hdfs://localhost:9000/checkpoint/tickscsvtohdfs7")
    .partitionBy("year", "month", "day", "hour", "_symbol")
    .option("truncate", False)
    .start()
)

<pyspark.sql.streaming.StreamingQuery at 0x7f6b64566610>

                                                                                