In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [2]:
spark = (
    SparkSession
    .builder
    .appName("smartHome")
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,mysql:mysql-connector-java:8.0.33")
    .getOrCreate()
)

In [3]:
kafka_topic = "smart-home"
kafka_bootstrap_servers = "ed-kafka:29092"

In [4]:
df_kafka = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
    .option("subscribe", kafka_topic)
    .option("startingOffsets", "earliest")
    .load()
)

In [6]:
df_kafka

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [12]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
schema = StructType([
    StructField("use", FloatType(), True),
    StructField("gen", FloatType(), True),
    StructField("House overall", FloatType(), True),
    StructField("Dishwasher", FloatType(), True),
    StructField("Furnace 1", FloatType(), True),
    StructField("Furnace 2", FloatType(), True),
    StructField("Home office", FloatType(), True),
    StructField("Fridge", FloatType(), True),
    StructField("Wine cellar", FloatType(), True),
    StructField("Garage door", FloatType(), True),
    StructField("Kitchen 12", FloatType(), True),
    StructField("Kitchen 14", FloatType(), True),
    StructField("Kitchen 38", FloatType(), True),

    StructField("Barn", FloatType(), True),
    StructField("Well", FloatType(), True),
    StructField("Microwave", FloatType(), True),
    StructField("Living room", FloatType(), True),
    StructField("Solar", FloatType(), True)
])

In [13]:
df_value = df_kafka.selectExpr("CAST(value AS STRING) AS json_data")
df_parsed = df_value.select(from_json(col("json_data"), schema).alias("parsed_data"))
df_final = df_parsed.select("parsed_data.*")

In [14]:
df_final

DataFrame[use: float, gen: float, House overall: float, Dishwasher: float, Furnace 1: float, Furnace 2: float, Home office: float, Fridge: float, Wine cellar: float, Garage door: float, Kitchen 12: float, Kitchen 14: float, Kitchen 38: float, Barn: float, Well: float, Microwave: float, Living room: float, Solar: float]

In [None]:
df_final.take(100, truncate=False)