## Merge the kafka dataset of the right individual with their available houses and outputs into a final bigquery dataset

In [1]:
import sys
!{sys.executable} -m pip install -q --upgrade pip
!{sys.executable} -m pip install -q google-cloud-bigquery

## Initialize Spark

In [38]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("BigqueryExample")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

## Load dataframes

In [39]:
# Load data from BigQuery.
df_kafka = spark.read \
  .format("bigquery") \
  .load(" degroup11.group11dataset.house_pricing_kafka")    

df_kafka=df_kafka.drop("window")
df_kafka.printSchema()

df_batch = spark.read \
  .format("bigquery") \
  .load(" degroup11.group11dataset.available_houses_for_individual")  
df_batch.printSchema()


root
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Price: long (nullable = true)
 |-- Availability: boolean (nullable = true)
 |-- event_time: timestamp (nullable = true)

root
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Price: long (nullable = true)
 |-- Availability: boolean (nullable = true)
 |-- event_time: timestamp (nullable = true)

825276


In [40]:
df_combined = df_batch.union(df_kafka)

df_combined=df_combined.dropDuplicates(["Address", "Price"])

df_combined = df_combined.orderBy("Price", ascending=False)
df_combined.show()
df_combined = df_combined.limit(500)



+--------------------+------------------+------+------------+--------------------+
|             Address|              City| Price|Availability|          event_time|
+--------------------+------------------+------+------------+--------------------+
|       Aekingaweg 12|         Appelscha|825000|        true|2023-12-05 12:01:...|
|        Bikkeldam 24|           Horssen|825000|        true|2023-12-05 12:30:...|
|   Bodenheimstraat 5|               Ede|825000|        true|2023-12-05 12:35:...|
|Distelvlinderstra...|          Aalsmeer|825000|        true|2023-12-05 13:25:...|
|J.W. van Puttestr...|            Ameide|825000|        true|2023-12-05 14:54:...|
|  Jos Colerstraat 10|         Rotterdam|825000|        true|2023-12-05 15:05:...|
|        Lage Maat 11|Wijk bij Duurstede|825000|        true|2023-12-05 15:44:...|
|      Lindestraat 10|    St. Willebrord|825000|        true|2023-12-05 15:57:...|
|Maerten van Heems...|         Beverwijk|825000|        true|2023-12-05 16:05:...|
|   

## Enrich combined dataframe with information about the houses so that Looker Studio can use it, so we don't need blended data

In [41]:
df_housing = spark.read \
  .format("bigquery") \
  .load(" degroup11.group11dataset.house_pricing")    # project_id.datatset.tablename. Use your project id

In [48]:
from pyspark.sql.functions import col, desc
df_combined_enriched = df_housing.join(df_combined,\
                                        (df_combined.Address == df_housing.Address) \
                                        & (df_combined.City == df_housing.City) \
                                        & (df_combined.Price == df_housing.Price) \
                                        & (df_combined.Availability == df_housing.Availability) \
                                        & (df_combined.event_time == df_housing.event_time), "leftsemi")

df_combined_enriched = df_combined_enriched.orderBy(col("Price").desc())
df_combined_enriched.printSchema()
df_combined_enriched.count()

root
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Price: long (nullable = true)
 |-- Lot_size: string (nullable = true)
 |-- Living_space_size: string (nullable = true)
 |-- Build_year: string (nullable = true)
 |-- Build_type: string (nullable = true)
 |-- House_type: string (nullable = true)
 |-- Roof: string (nullable = true)
 |-- Rooms: string (nullable = true)
 |-- Toilet: string (nullable = true)
 |-- Floors: string (nullable = true)
 |-- Energy_label: string (nullable = true)
 |-- Position: string (nullable = true)
 |-- Garden: string (nullable = true)
 |-- Estimated_neighbourhood_price_per: double (nullable = true)
 |-- Availability: boolean (nullable = true)
 |-- event_time: timestamp (nullable = true)


500

## Write merged dataframe of the top available houses

In [None]:
df_combined_enriched.write.format("bigquery").\
option('table', "degroup11.group11dataset.top_houses").\
option("temporaryGcsBucket", "temp_degroup11"). \
mode("overwrite").save()

In [None]:
from google.cloud import bigquery

#Initialize the BigQuery client
client = bigquery.Client(project="degroup11")

#Delete temporary kafka table
client.delete_table("degroup11.group11dataset.house_pricing_kafka", not_found_ok=True)

#Delete temporary batch table
client.delete_table("degroup11.group11dataset.available_houses_for_individual", not_found_ok=True)

#Delete cookie as session is over
client.delete_table("degroup11.group11dataset.cookie_ID_houses", not_found_ok=True)