In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.streaming import StreamingQuery
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import *

In [2]:
 spark = (
    SparkSession.builder.master("local[1]")
    .appName("Tutorial App")
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1")
    .getOrCreate()
)

In [3]:
carbon_sense= spark.readStream \
                          .format("kafka") \
                          .option("kafka.bootstrap.servers", "kafka:29092") \
                          .option("subscribe", "carbonsense")

In [4]:
moisture_mate= spark.readStream \
                          .format("kafka") \
                          .option("kafka.bootstrap.servers", "kafka:29092") \
                          .option("subscribe", "moisturemate")

In [5]:
carbon_sense= carbon_sense.load()

In [6]:
moisture_mate=moisture_mate.load()

In [7]:
carbon_sense_schema = StructType([
  StructField("timestamp", TimestampType()),
  StructField("room_id", StringType()),
  StructField("co2", FloatType())
])

In [8]:
moisture_mate_schema = StructType([
  StructField("timestamp", TimestampType()),
  StructField("room_id", StringType()),
  StructField("humidity", FloatType()),
  StructField("humidity_ratio", FloatType())
])

In [9]:

carbon_sense= carbon_sense.withColumn("message_content", F.from_json(F.col("value").cast("string"),carbon_sense_schema))
moisture_mate= moisture_mate.withColumn("message_content", F.from_json(F.col("value").cast("string"),moisture_mate_schema))

In [10]:
carbon_sense = carbon_sense.selectExpr("CAST(value AS STRING)")
carbon_sense = carbon_sense.select(from_json(carbon_sense.value, carbon_sense_schema).alias("data"))
carbon_sense = carbon_sense.select("data.*")

In [11]:
moisture_mate = moisture_mate.selectExpr("CAST(value AS STRING)", "timestamp")
moisture_mate = moisture_mate.select(from_json(moisture_mate.value, moisture_mate_schema).alias("data"))
moisture_mate = moisture_mate.select("data.*")

In [12]:
joined_df = moisture_mate.join(carbon_sense, on=["room_id", "timestamp"])

In [13]:
query = joined_df.writeStream \
  .outputMode("append") \
  .format("console") \
  .start()

In [14]:
query

<pyspark.sql.streaming.StreamingQuery at 0x7f81240f3790>