## Starting streaming pipeline with Kafka

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, TimestampType

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Lab9_Ex3")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# We need to set the following configuration whenever we need to use GCS.
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "temp_degroup11"
spark.conf.set('temporaryGcsBucket', bucket)

# Define the PySpark schema for the streaming data
data_schema = StructType([
    StructField("Address", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Price", IntegerType(), True),
    StructField("Lot_size", StringType(), True),
    StructField("Living_space_size", StringType(), True),
    StructField("Build_year", StringType(), True),
    StructField("Build_type", StringType(), True),
    StructField("House_type", StringType(), True),
    StructField("Roof", StringType(), True),
    StructField("Rooms", StringType(), True),
    StructField("Toilet", StringType(), True),
    StructField("Floors", StringType(), True),
    StructField("Energy_label", StringType(), True),
    StructField("Position", StringType(), True),
    StructField("Garden", StringType(), True),
    StructField("Estimated_neighbourhood_price_per", StringType(), True),
    StructField("Availability", BooleanType(), True),
    StructField("event_time",TimestampType(), True),
])

#Loading the Cookie
cookie_id_df = spark.read \
      .format("bigquery") \
      .load(" degroup11.group11dataset.cookie_ID_houses")

#Getting the mortgage threshold
price_threshold = cookie_id_df.first()["possible_mortgage_amount"]

# Read the whole dataset as a batch
kafkaStream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("failOnDataLoss", "false") \
    .option("subscribe", "mock") \
    .option("startingOffsets", "latest") \
    .load()

df = kafkaStream.selectExpr("CAST(value AS STRING)")

df1 = df.select(from_json(df.value, data_schema.simpleString()))

df1.printSchema()

sdf = df1.select(col("from_json(value).*"))

sdf.printSchema()

top_10_prices_df = sdf \
    .groupBy(window(col("event_time"), "10 seconds"),"Address","City", "Price","Availability") \
    .agg(F.max("event_time").alias("event_time")) \
    .where((col("Price") <= price_threshold) & (col("Availability") == True)) \
    .orderBy("Price", ascending=False)
 
top_10_prices_df = top_10_prices_df.dropDuplicates(["Address", "Price"])


def my_foreach_batch_function(df, batch_id):
    
    df.show()
    df.write.format('bigquery') \
      .option('table', 'degroup11.group11dataset.house_pricing_kafka') \
      .mode("append") \
      .save()

query = top_10_prices_df.writeStream.outputMode("complete") \
                    .trigger(processingTime = '2 seconds').foreachBatch(my_foreach_batch_function).start()

try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stopped the streaming query and the spark context")

root
 |-- from_json(value): struct (nullable = true)
 |    |-- Address: string (nullable = true)
 |    |-- City: string (nullable = true)
 |    |-- Price: integer (nullable = true)
 |    |-- Lot_size: string (nullable = true)
 |    |-- Living_space_size: string (nullable = true)
 |    |-- Build_year: string (nullable = true)
 |    |-- Build_type: string (nullable = true)
 |    |-- House_type: string (nullable = true)
 |    |-- Roof: string (nullable = true)
 |    |-- Rooms: string (nullable = true)
 |    |-- Toilet: string (nullable = true)
 |    |-- Floors: string (nullable = true)
 |    |-- Energy_label: string (nullable = true)
 |    |-- Position: string (nullable = true)
 |    |-- Garden: string (nullable = true)
 |    |-- Estimated_neighbourhood_price_per: string (nullable = true)
 |    |-- Availability: boolean (nullable = true)
 |    |-- event_time: timestamp (nullable = true)

root
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Price: integer (n

In [6]:
spark.stop()