In [13]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.streaming import StreamingQuery
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import *


from pyspark.sql.functions import expr




In [14]:
 spark = (
    SparkSession.builder.master("local[1]")
    .appName("Tutorial App")
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1")
    .getOrCreate()
)

In [15]:
df_carbonsense= spark.readStream \
                          .format("kafka") \
                          .option("kafka.bootstrap.servers", "kafka:29092") \
                          .option("subscribe", "carbonsense")

In [16]:
df_carbonsense= df_carbonsense.load()

In [17]:
df_carbonsense.dtypes


[('key', 'binary'),
 ('value', 'binary'),
 ('topic', 'string'),
 ('partition', 'int'),
 ('offset', 'bigint'),
 ('timestamp', 'timestamp'),
 ('timestampType', 'int')]

In [18]:
schema_carbonsense = StructType([
    StructField("room_id", StringType(), True),
    StructField("co2", StringType(), True),
    StructField("timestamp", TimestampType(), True)
])

In [19]:
df_carbonsense= df_carbonsense.withColumn("message_content", F.from_json(F.col("value").cast("string"),schema_carbonsense))


In [20]:
df_carbonsense = df_carbonsense.selectExpr("CAST(value AS STRING)")
df_carbonsense = df_carbonsense.select(from_json(df_carbonsense.value, schema_carbonsense).alias("data"))
df_carbonsense = df_carbonsense.select("data.*")


In [21]:
df_carbonsense

DataFrame[room_id: string, co2: string, timestamp: timestamp]

In [22]:
df_moisturemate= spark.readStream \
                          .format("kafka") \
                          .option("kafka.bootstrap.servers", "kafka:29092") \
                          .option("subscribe", "moisturemate")

In [23]:
df_moisturemate= df_moisturemate.load()

In [24]:
df_moisturemate.dtypes


[('key', 'binary'),
 ('value', 'binary'),
 ('topic', 'string'),
 ('partition', 'int'),
 ('offset', 'bigint'),
 ('timestamp', 'timestamp'),
 ('timestampType', 'int')]

In [25]:
schema_moisturemate = StructType([
    StructField("room_id", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("humidity", FloatType(), True),
    StructField("humidity_ratio", FloatType(), True),
    
])

In [26]:
df_moisturemate= df_moisturemate.withColumn("message_content", F.from_json(F.col("value").cast("string"),schema_moisturemate))


In [27]:
df_moisturemate = df_moisturemate.selectExpr("CAST(value AS STRING)")
df_moisturemate = df_moisturemate.select(from_json(df_moisturemate.value, schema_moisturemate).alias("data"))
df_moisturemate = df_moisturemate.select("data.*")


In [28]:
df_moisturemate

DataFrame[room_id: string, timestamp: timestamp, humidity: float, humidity_ratio: float]

In [29]:
df_join_df_carbonsense_and_df_moisturemate = df_carbonsense.join(df_moisturemate, on=["timestamp", "room_id"], how='inner')

In [30]:
df_join_df_carbonsense_and_df_moisturemate

DataFrame[timestamp: timestamp, room_id: string, co2: string, humidity: float, humidity_ratio: float]

In [31]:
df_luxmeter= spark.readStream \
                          .format("kafka") \
                          .option("kafka.bootstrap.servers", "kafka:29092") \
                          .option("subscribe", "luxmeter")

In [32]:
df_luxmeter= df_luxmeter.load()

In [33]:
schema_luxmeter = StructType([
    StructField("room_id", StringType(), True),
    StructField("measurements", MapType(StringType(), StringType() ), True)
   
])

In [34]:
df_luxmeter= df_luxmeter.withColumn("message_content", F.from_json(F.col("value").cast("string"),schema_luxmeter))


In [35]:
df_luxmeter = df_luxmeter.selectExpr("CAST(value AS STRING)")
df_luxmeter = df_luxmeter.select(from_json(df_luxmeter.value, schema_luxmeter).alias("data"))
df_luxmeter = df_luxmeter.select("data.*")


In [36]:
df_luxmeter = df_luxmeter.withColumn("timestamp",df_luxmeter.measurements["timestamp"]) \
  .withColumn("light_level",df_luxmeter.measurements["light_level"]) \
  .drop("measurements") \
  

In [37]:
df_join_df_carbonsense_and_df_moisturemate_and_df_luxmeter = df_join_df_carbonsense_and_df_moisturemate.join(df_luxmeter, on=["timestamp", "room_id"], how='inner')

In [38]:
df_join_df_carbonsense_and_df_moisturemate_and_df_luxmeter

DataFrame[timestamp: timestamp, room_id: string, co2: string, humidity: float, humidity_ratio: float, light_level: string]

In [39]:
df_thermo= spark.readStream \
                          .format("kafka") \
                          .option("kafka.bootstrap.servers", "kafka:29092") \
                          .option("subscribe", "thermo")

In [40]:
df_thermo= df_thermo.load()

In [41]:
df_thermo = df_thermo.select(col("value").cast("string").alias("json_data"))

In [42]:
schema_thermo = StructType([
    StructField("timestamp", MapType(StringType(), StringType())),
    StructField("room_id", MapType(StringType(), StringType())),
    StructField("temperature", MapType(StringType(), DoubleType()))
])


In [43]:
df_thermo = df_thermo.select(from_json(col("json_data"), schema_thermo).alias("data"))

In [44]:
df_thermo

DataFrame[data: struct<timestamp:map<string,string>,room_id:map<string,string>,temperature:map<string,double>>]

In [45]:
timestamps = df_thermo.select(explode("data.timestamp").alias("index", "timestamp"))


In [46]:
room_ids = df_thermo.select(explode("data.room_id").alias("index", "room_id"))


In [47]:
temperatures = df_thermo.select(explode("data.temperature").alias("index", "temperature"))

In [48]:
df_thermo = timestamps.join(room_ids, on="index").join(temperatures, on="index")

In [49]:
df_thermo

DataFrame[index: string, timestamp: string, room_id: string, temperature: double]

In [50]:
df_thermo = df_thermo.drop("index")


In [51]:
df_thermo = df_thermo.withColumn("temperature", expr("(temperature - 32) * 5/9"))


In [52]:
df_merge_all_4_sensors = df_join_df_carbonsense_and_df_moisturemate_and_df_luxmeter.join(df_thermo, on=["timestamp", "room_id"], how='inner')

In [54]:
OUTPUT = df_merge_all_4_sensors.writeStream \
  .outputMode("append") \
  .format("console") \
  .start()

In [53]:
(df_merge_all_4_sensors.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "even")
  .option("checkpointLocation", "./checkpoint")
  .start())

<pyspark.sql.streaming.StreamingQuery at 0x7f200d7bc490>