In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import window, col, avg, from_csv, concat, lit, asc
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, ShortType, FloatType, ByteType, IntegerType
from time import sleep

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Assignment2_Stream_new")
sparkConf.set("spark.driver.memory", "10g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

dataSchema = StructType(
        [StructField("ArrDelay", FloatType(), True),
         StructField("ArrTime", FloatType(), True),
         StructField("DepDelay", FloatType(), True),
         StructField("Dest", StringType(), True),
         StructField("Timestamp", LongType(), True)            
         ])

StreamKafka = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka1:9093") \
        .option("subscribe", "input_stream") \
        .option("startingOffsets", "earliest") \
        .load()

df = StreamKafka.selectExpr("CAST(value AS STRING)")

df1 = df.select(from_csv(df.value, dataSchema.simpleString()))
df1.printSchema()

sdf = df1.select(col("from_csv(value).*"))
#sdf.printSchema()

# create the event time column 
newsdf = sdf.selectExpr(
    "*",
    "cast(Timestamp as timestamp) as event_time") 

#newsdf.printSchema()

avgscoredf = newsdf \
    .groupBy(window(col("event_time"), "60 seconds"), "ArrTime", "Dest") \
    .avg("ArrDelay", "DepDelay").withColumnRenamed("avg(ArrDelay)", "avg_ArrDelay").withColumnRenamed("avg(DepDelay)", "avg_DepDelay").orderBy(asc("ArrTime"))


# .agg(avg("ArrDelay").alias("value"))
# .agg(avg("DepDelay")).alias("value2")
#avgscoredf.printSchema()

resultdf = avgscoredf.select(concat(col("ArrTime"), lit(" "), col("Dest")).alias("key"), \
                             concat(col("avg_ArrDelay"), lit(" "), col("avg_DepDelay")).alias("value").cast("string"))

## resultdf = avgscoredf.select(col("ArrTime").alias("key2"), col("Dest").alias("key"), col("value").cast("string"))
## resultdf = avgscoredf.select(col("ArrTime"), col("Dest"), col("avg_ArrDelay").cast("string"), col("avg_DepDelay").cast("string"))
         
query = resultdf \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("checkpointLocation", "/home/jovyan/checkpoint") \
    .option("topic", "output_stream") \
    .outputMode("complete") \
    .start()


try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stopped the streaming query and the spark context")


#This works to test whether the kafka is working correctly (it does!)
# query = resultdf.writeStream.queryName("df_per_window") \
# .format("memory") \
# .outputMode("complete").start()

# try:
#     for x in range(100):
#         spark.sql("SELECT * FROM df_per_window").show(30)
#         sleep(10)
# except KeyboardInterrupt:
#     query.stop()
#     spark.stop()
#     print("stop it")

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

+----+--------------------+-----+---------+------+--------------------+-------------+
| key|               value|topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----+---------+------+--------------------+-------------+
|null|[C3 AF C2 BB C2 B...| word|        0|     0|2021-10-31 10:02:...|            0|
|null|                [0A]| word|        0|     1|2021-10-31 10:02:...|            0|
|null|[4D 61 6E 79 20 6...| word|        0|     2|2021-10-31 10:02:...|            0|
|null|                [0A]| word|        0|     3|2021-10-31 10:02:...|            0|
|null|[49 6E 20 61 20 6...| word|        0|     4|2021-10-31 10:02:...|            0|
|null|                [0A]| w

In [None]:
# Stop the spark context
spark.stop()