In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder\
      .master("local[3]")\
      .appName("Stream Stream Join Demo")\
      .config("spark.streaming.stopGracefullyOnShutdown", "true")\
      .config("spark.sql.shuffle.partitions", 2).config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8")\
      .getOrCreate()

In [4]:
impressionSchema = StructType([\
      StructField("InventoryID", StringType(),True),\
      StructField("CreatedTime", StringType(),True),\
      StructField("Campaigner", StringType(),True)])

In [5]:
clickSchema = StructType([\
      StructField("InventoryID", StringType(),True),\
      StructField("CreatedTime", StringType(),True)])

In [6]:
kafkaImpressionDF = spark.readStream.format("kafka")\
      .option("kafka.bootstrap.servers", "localhost:9092")\
      .option("subscribe", "impressions")\
      .option("startingOffsets", "earliest").load()

In [7]:
impressionsDF = kafkaImpressionDF\
      .select(from_json(col("value").cast("string"), impressionSchema).alias("value"))\
      .selectExpr("value.InventoryID as ImpressionID", "value.CreatedTime", "value.Campaigner")\
      .withColumn("ImpressionTime", to_timestamp(col("CreatedTime"), "yyyy-MM-dd HH:mm:ss"))\
      .drop("CreatedTime")

In [8]:
kafkaClickDF = spark.readStream.format("kafka")\
      .option("kafka.bootstrap.servers", "localhost:9092")\
      .option("subscribe", "clicks")\
      .option("startingOffsets", "earliest")\
      .load()

In [9]:
clicksDF = kafkaClickDF.select(\
      from_json(col("value").cast("string"), clickSchema).alias("value"))\
      .selectExpr("value.InventoryID as ClickID", "value.CreatedTime")\
      .withColumn("ClickTime", to_timestamp(col("CreatedTime"), "yyyy-MM-dd HH:mm:ss"))\
      .drop("CreatedTime")

In [11]:
joinExpr = "ImpressionID == ClickID"
joinType = "inner"

In [12]:
joinedDF = impressionsDF.join(clicksDF,expr(joinExpr), joinType)

In [13]:
outputQuery = joinedDF.writeStream.format("console")\
      .outputMode("append")\
      .option("checkpointLocation", "chk2")\
      .trigger(processingTime="1 minute")\
      .start()