In [39]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType
from time import sleep
from pyspark.sql.functions import col, size, length, lit


sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("airbnb_reviews")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
sparkConf.set("spark.sql.streaming.checkpointLocation", '/home/jovyan/checkpoint')
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()


dataSchema = StructType(
        [StructField("index", LongType(), True),
         StructField("Renter_ID", LongType(), True),
         StructField("Listing_ID", LongType(), True),
         StructField("Review", StringType(), True),
         StructField("Date", StringType(), True),])


conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Read from source 
sdf = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1) \
        .option("header","true") \
        .csv("gs://assignment2_airbnb/airbnb_data/Reviews")

#preprocessing
reviews = sdf.select(['Renter_ID','Listing_ID','Review','Date'])
reviews = reviews.filter("Review is not NULL")
reviews = reviews.filter("Listing_ID <10000000000")
#add the city of the listing to the review table

listings_amsterdam_file_path = 'gs://assignment2_airbnb/airbnb_data/Amsterdam/listings_a.csv' 
listings_rotterdam_file_path = 'gs://assignment2_airbnb/airbnb_data/Rotterdam/listings_r.csv' 
listings_denhaag_file_path = 'gs://assignment2_airbnb/airbnb_data/DenHaag/listings_dh.csv' 
df_listings_amsterdam = spark.read.format("csv").option("header", "true") \
       .load(listings_amsterdam_file_path)
df_listings_rotterdam = spark.read.format("csv").option("header", "true") \
       .load(listings_rotterdam_file_path)
df_listings_denhaag = spark.read.format("csv").option("header", "true") \
       .load(listings_denhaag_file_path)

df_listings_amsterdam=df_listings_amsterdam.withColumn("City", lit('Amsterdam'))
df_listings_rotterdam=df_listings_rotterdam.withColumn("City", lit('Rotterdam'))
df_listings_denhaag=df_listings_denhaag.withColumn("City", lit('Den Haag'))
df_listings_unfiltered=df_listings_amsterdam.union(df_listings_rotterdam.union(df_listings_denhaag))


reviews = reviews.join(df_listings_unfiltered, reviews.Listing_ID == df_listings_unfiltered.Listing_ID, 'left_outer') \
                .select(reviews.Renter_ID, reviews.Listing_ID, reviews.Review, reviews.Date, df_listings_unfiltered.City)



# Write to bucket
activityQuery = reviews.writeStream \
                    .format("csv").outputMode("append") \
                    .option ("path", "gs://assignment2_airbnb/Results_stream") \
                    .option("header", "true") \
                    .start()


In [38]:
spark.stop()
