In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [3]:
spark = (
    SparkSession.builder.master("local[1]")
    .appName("Tutorial App")
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1")
    .getOrCreate()
)

# Spark UI

In [4]:
spark

# Reading Data from Kafka

<font color='blue' size="5" >Creating all Schemas</font>

In [5]:
moisturemate_schema = (
    T.StructType()
    .add("timestamp", T.StringType())
    .add("room_id", T.StringType())
    .add("humidity", T.FloatType())
    .add("humidity_ratio", T.FloatType()))

carbonsense_schema = (
    T.StructType()
    .add("timestamp", T.StringType())
    .add("room_id", T.StringType())
    .add("co2", T.FloatType()))

luxmeter_schema = (
    T.StructType()
    .add("timestamp", T.StringType())
    .add("light_level", T.FloatType())
    .add("room_id", T.StringType()))

smartthermo_schema = (
    T.StructType()
    .add("timestamp", T.StringType())
    .add("room_id", T.StringType())
    .add("temperature", T.FloatType()))

<font color='blue' size="5" >Moisturemate Data</font>

In [6]:
df_moisturemate = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "moisturemate") \
  .option("failOnDataLoss", "true") \
  .load()
# df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").select("value").toPandas()
df_moisturemate.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
df_moisturemate= df_moisturemate.withColumn("message_content", F.from_json(F.col("value").cast("string"),moisturemate_schema ))
df_moisturemate_minimal = df_moisturemate.select("message_content.*")
df_moisturemate_minimal.toPandas()

Unnamed: 0,timestamp,room_id,humidity,humidity_ratio
0,2023-02-10T10:51:00,kitchen,31.356667,0.005039
1,2023-02-10T10:51:00,bedroom,26.700001,0.00369
2,2023-02-10T10:51:00,bathroom,26.34,0.00364
3,2023-02-10T10:51:00,living_room,24.1,0.004466
4,2023-02-10T10:52:00,kitchen,23.9175,0.003913
5,2023-02-10T10:52:00,bedroom,32.290001,0.004785
6,2023-02-10T10:52:00,bathroom,32.5,0.004865
7,2023-02-10T10:52:00,living_room,30.389999,0.004449


<font color='blue' size="5" >Carbonsense Data</font>

In [8]:
df_carbonsense = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "carbonsense") \
  .option("failOnDataLoss", "true") \
  .load()
# df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").select("value").toPandas()
df_carbonsense.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [9]:
df_carbonsense= df_carbonsense.withColumn("message_content", F.from_json(F.col("value").cast("string"),carbonsense_schema ))
df_carbonsense_minimal = df_carbonsense.select("message_content.*")
df_carbonsense_minimal.toPandas()

Unnamed: 0,timestamp,room_id,co2
0,2023-02-10T10:51:00,kitchen,617.333313
1,2023-02-10T10:51:00,bedroom,466.5
2,2023-02-10T10:51:00,bathroom,465.5
3,2023-02-10T10:51:00,living_room,685.0
4,2023-02-10T10:52:00,kitchen,847.0
5,2023-02-10T10:52:00,bedroom,679.0
6,2023-02-10T10:52:00,bathroom,671.0
7,2023-02-10T10:52:00,living_room,718.333313
8,2023-02-10T10:53:00,kitchen,1053.0
9,2023-02-10T10:53:00,bedroom,511.5


<font color='blue' size="5" >Luxmeter Data</font>

In [10]:
df_luxmeter = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "luxmeter") \
  .option("failOnDataLoss", "true") \
  .load()
# df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").select("value").toPandas()
df_luxmeter.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [11]:
df_luxmeter= df_luxmeter.withColumn("message_content", F.from_json(F.col("value").cast("string"),luxmeter_schema ))
df_luxmeter_minimal = df_luxmeter.select("message_content.*")
df_luxmeter_minimal.toPandas()

Unnamed: 0,timestamp,light_level,room_id
0,2023-02-10T10:52:00,433.0,kitchen
1,2023-02-10T10:52:00,0.0,bedroom
2,2023-02-10T10:52:00,14.0,bathroom
3,2023-02-10T10:52:00,0.0,living_room


<font color='blue' size="5" >Smart Thermo Data</font>

In [12]:
df_smartthermo = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "smartthermo") \
  .option("failOnDataLoss", "true") \
  .load()
# df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").select("value").toPandas()
df_smartthermo.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [13]:
df_smartthermo= df_smartthermo.withColumn("message_content", F.from_json(F.col("value").cast("string"),smartthermo_schema ))
df_smartthermo_minimal = df_smartthermo.select("message_content.*")
df_smartthermo_minimal.toPandas()

Unnamed: 0,timestamp,room_id,temperature
0,2023-02-10T10:50:00,kitchen,71.059998
1,2023-02-10T10:50:00,bedroom,69.599998
2,2023-02-10T10:50:00,bathroom,68.360001
3,2023-02-10T10:50:00,living_room,67.870003
4,2023-02-10T10:52:00,kitchen,71.639999
5,2023-02-10T10:52:00,bedroom,68.699997
6,2023-02-10T10:52:00,bathroom,68.989998
7,2023-02-10T10:52:00,living_room,68.360001
8,2023-02-10T10:53:00,kitchen,72.300003
9,2023-02-10T10:53:00,bedroom,68.900002


<font color='blue' size="5" >Merge Data on "timestamp" and "room_id" columns</font>

In [14]:
merged_df = df_smartthermo_minimal.join(df_luxmeter_minimal, on=["timestamp","room_id"], how="inner").join(df_carbonsense_minimal, on=["timestamp","room_id"], how="inner").join(df_moisturemate_minimal, on=["timestamp","room_id"], how="inner")

In [15]:
merged_df.toPandas()

Unnamed: 0,timestamp,room_id,temperature,light_level,co2,humidity,humidity_ratio
0,2023-02-10T10:52:00,bathroom,68.989998,14.0,671.0,32.5,0.004865
1,2023-02-10T10:52:00,bedroom,68.699997,0.0,679.0,32.290001,0.004785
2,2023-02-10T10:52:00,kitchen,71.639999,433.0,847.0,23.9175,0.003913
3,2023-02-10T10:52:00,living_room,68.360001,0.0,718.333313,30.389999,0.004449
4,2023-02-10T10:53:00,bathroom,68.519997,0.0,506.5,31.200001,0.004594
5,2023-02-10T10:53:00,bedroom,68.900002,0.0,511.5,31.5,0.004699
6,2023-02-10T10:53:00,kitchen,72.300003,489.0,1053.0,28.200001,0.004724
7,2023-02-10T10:53:00,living_room,68.519997,0.0,474.0,32.900002,0.004846


<font color='blue' size="5" >Checking data type of timestamp</font>

In [16]:
merged_df.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- room_id: string (nullable = true)
 |-- temperature: float (nullable = true)
 |-- light_level: float (nullable = true)
 |-- co2: float (nullable = true)
 |-- humidity: float (nullable = true)
 |-- humidity_ratio: float (nullable = true)



<font color='blue' size="5" >Converting 'timestamp: string' to 'timestamp: timestamp', so that we can sort time</font>

In [17]:
merged_df = merged_df.withColumn("timestamp", F.to_timestamp(merged_df["timestamp"], "yyyy-MM-dd'T'HH:mm:ss"))

In [18]:
merged_df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- room_id: string (nullable = true)
 |-- temperature: float (nullable = true)
 |-- light_level: float (nullable = true)
 |-- co2: float (nullable = true)
 |-- humidity: float (nullable = true)
 |-- humidity_ratio: float (nullable = true)



<font color='blue' size="5" >Sorting dataframe based on timestamp</font>

In [19]:
sorted_df = merged_df.sort("timestamp", ascending=[True])

In [20]:
sorted_df.toPandas().head(8)

  series = series.astype(t, copy=False)


Unnamed: 0,timestamp,room_id,temperature,light_level,co2,humidity,humidity_ratio
0,2023-02-10 10:52:00,bathroom,68.989998,14.0,671.0,32.5,0.004865
1,2023-02-10 10:52:00,bedroom,68.699997,0.0,679.0,32.290001,0.004785
2,2023-02-10 10:52:00,kitchen,71.639999,433.0,847.0,23.9175,0.003913
3,2023-02-10 10:52:00,living_room,68.360001,0.0,718.333313,30.389999,0.004449
4,2023-02-10 10:53:00,bathroom,68.519997,0.0,506.5,31.200001,0.004594
5,2023-02-10 10:53:00,bedroom,68.900002,0.0,511.5,31.5,0.004699
6,2023-02-10 10:53:00,kitchen,72.300003,489.0,1053.0,28.200001,0.004724
7,2023-02-10 10:53:00,living_room,68.519997,0.0,474.0,32.900002,0.004846
