# Big Data source - Airbnb website
We hypothesized that our big data source is the aribnb website, which is constantly feeding us new reviews, new listings, new calendar (booking plan for the subsequent year) data. This means that we can model it as a streaming source

We can **control** this streaming source with the SparkStreaming library, which we connected to a mongoDB dataset with the mongoDB - spark connector (presented in the apposite notebook).

This notebook contains a simulation of the streaming source. We obtained it using a `SparkSession.readStream` object. We streamed the csv files (listings.csv, reviews.csv, ...) in a directory, specifying the schema with a `StructType`. In the real world application we would receive the streaming from Kafka or directly from a TCP socket, but SparkStreaming allows to handle it in the exact same way.

In [None]:
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row

# specify the app name
appName = "streaming_pyspark"

# I enabled 2 cores for the virtual machine
# 2 is the number of enabled RDD or Dataframe partitions
master = "local[2]" 

After importing the necessary modules and initialized spark (with `findspark.init()`) we created a SparkSession, connected to mongoDB.

In [None]:
input_uri = "mongodb+srv://analytics:analytics-password@mflix.ryqp8qp.mongodb.net/?retryWrites=true&w=majority"
ouput_uri = "mongodb+srv://analytics:analytics-password@mflix.ryqp8qp.mongodb.net/?retryWrites=true&w=majority"


myspark = SparkSession\
    .builder\
    .appName("MyApp")\
    .config("spark.mongodb.input.uri", input_uri)\
    .config("spark.mongodb.output.uri", ouput_uri)\
    .config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:2.4.2')\
    .getOrCreate()

# Simulation of the streming source
This is were the real simulation begins, we defined the schema, read the streaming from the csv files and checked is the resulting object `isStreaming()`.

We also printed part of the streming on the console to check if it was reading the files correctly. Then we wrote the stream on MongoDB.

In [None]:
from pyspark.sql.types import FloatType, StructField, StructType, StringType, DateType, IntegerType
# specify the schema (we are streaming the csv file, not loading it,
#so we can't go over it all to infer the schema)
reviewsSchema = StructType([
    StructField("listing_id",IntegerType(), True),
    StructField("id",IntegerType(), True),
    StructField("date",DateType(), True),
    StructField("reviewer_id",IntegerType(), True),
    StructField("reviewer_name",StringType(), True),
    StructField("comment",StringType(), True)
])
# read the csv file reviews, by streaming it (readStream method instead of just read)
reviews_streamed = spark.readStream.schema(reviewsSchema).option("maxfilesperTrigger",1).csv("./Florence/Streaming_Simulation/", header = True)

#check if data are streaming
print(reviews_streamed.isStreaming)

True


In [None]:
# write stream on the console, to see if it works
reviews_streamed.writeStream.format("console").outputMode("append").start()  # IT DOES! :)

# simple group by
dfc = reviews_streamed.groupby("date").count()
#dfc.writeStream.outputMode("complete").format("console").start().


22/09/17 17:20:23 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e35afd63-5b8d-4baa-9197-e19b5b8186d1. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/09/17 17:20:24 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
22/09/17 17:20:28 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: listing_id, id, date, reviewer_id, reviewer_name, comments
 Schema: listing_id, _id, date, reviewer_id, reviewer_name, comment
Expected: _id but found: id
CSV file: file:///home/dsbda/DS_project/Florence/Streaming_Simulation/reviews.csv


In [None]:
reviews_streamed.writeStream.format("com.mongodb.spark.sql.DefaultSource")\
.option("uri", input_uri)\
.option("database", "Florence")\
.option("collection", "reviews")\
.outputMode("append").start()

# MongoDB queries and long term storage on hive databases
Now we can use mongoDB to perfom fast queries in real time on the newly imported data.

Also we can periodically (e.g. every month) transfer all new data to HDFS as hive tables, in order to build a datawarehouse for long term storage and analysis on the entire dataset.

