In [None]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import *

conf = SparkConf()

spark = SparkSession.builder \
    .appName('Stock Streaming with Spark and Kafka') \
    .master('local[*]') \
    .config('spark.jars', 'file:///C://Tools//StructuredStreaming//spark-sql-kafka-0-10_2.11-2.4.0.jar,file:///C://Tools//StructuredStreaming//kafka-clients-1.1.0.jar') \
    .getOrCreate()

spark.sparkContext.setLogLevel('ERROR')
spark.conf.set("spark.sql.shuffle.partitions", "2")

df = spark.readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'localhost:9092') \
    .option('subscribe', 'stockdata') \
    .option('startingOffsets', 'latest') \
    .load()

# Convert messages to strings
df_convert = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Explicitly set data structure
df_structure = StructType() \
    .add('Symbol', StringType()) \
    .add('Company', StringType()) \
    .add('Price', StringType()) \
    .add('DayChange', StringType()) \
    .add('Time', StringType())

df2 = df_convert.select(from_json(column('value'), df_structure))
df2_flat = df2.select('jsontostructs(value).*')

#query = df2_flat.writeStream \
#    .format('console') \
#    .outputMode('update') \
#    .trigger(processingTime = '60 seconds') \
#    .option('truncate', 'false') \
#    .start()

output = df2_flat.writeStream \
    .format('csv') \
    .option('path', r'C:/Stock-Stream') \
    .option('checkpointLocation', r'C:/Tools/checkpointv03') \
    .queryName('StockStreamOut') \
    .outputMode("append") \
    .trigger(processingTime = '60 seconds') \
    .start()

#query.awaitTermination()
output.awaitTermination()