Sample code: read data from kafka

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *

import os

In [2]:
SCHEMA = StructType([StructField("Arrival_Time",LongType(),True), 
                     StructField("Creation_Time",LongType(),True),
                     StructField("Device",StringType(),True), 
                     StructField("Index", LongType(), True),
                     StructField("Model", StringType(), True),
                     StructField("User", StringType(), True),
                     StructField("gt", StringType(), True),
                     StructField("x", DoubleType(), True),
                     StructField("y", DoubleType(), True),
                     StructField("z", DoubleType(), True)])

# The config packages will try to download the needed packages from maven.org --> you need internet connection
spark = SparkSession.builder.appName('demo_app')\
    .config("spark.kryoserializer.buffer.max", "512m")\
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-streaming-kafka-0-10-assembly_2.12:2.4.4')\
    .getOrCreate()

In [3]:
#kafka_server = 'dds2020s-kafka.eastus.cloudapp.azure.com:9092'
kafka_server = "kafka:29092"  # internal name in the Docker network
topic = "activities"

In [4]:
static_df = spark.read\
                  .format("kafka")\
                  .option("kafka.bootstrap.servers", kafka_server)\
                  .option("subscribe", topic)\
                  .option("startingOffsets", "earliest")\
                  .option("failOnDataLoss",False)\
                  .load()
s2 = static_df.select(f.from_json(f.decode("value", "US-ASCII"), schema=SCHEMA).alias("value")).select("value.*")

In [5]:
%%time 
# on my pc, there is a fixed 3 sec time for each of count() and show()  ?!
# this is probably a spark config: https://stackoverflow.com/questions/59916338/why-is-there-a-delay-in-the-launch-of-spark-executors
print("%d records in frame" % s2.count())
s2.show()

546083 records in frame
+-------------+-------------------+--------+-----+------+----+-----+-------------+-------------+-------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|            x|            y|            z|
+-------------+-------------------+--------+-----+------+----+-----+-------------+-------------+-------------+
|1424686735175|1424686733176178965|nexus4_1|   35|nexus4|   g|stand| 0.0014038086|    5.0354E-4|-0.0124053955|
|1424686735378|1424686733382813486|nexus4_1|   76|nexus4|   g|stand|-0.0039367676|  0.026138306|  -0.01133728|
|1424686735577|1424686733579072031|nexus4_1|  115|nexus4|   g|stand|  0.003540039| -0.034744263| -0.019882202|
|1424686735779|1424688581834321412|nexus4_2|  163|nexus4|   g|stand|  0.002822876|  0.005584717|  0.017318726|
|1424686735982|1424688582035859498|nexus4_2|  203|nexus4|   g|stand| 0.0017547607| -0.018981934| -0.022201538|
|1424686736186|1424686734188508066|nexus4_1|  236|nexus4|   g|stand| 0.0014038086|  0.01