# Configured Spark, PostgreSQL and MongoDB

##### 1. PySpark Imports

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window
import psycopg2

##### 2. Configuring SparkSession for PostgreSQl and MongoDb

In [2]:
my_spark = SparkSession \
    .builder \
    .appName("restaurant") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .config("spark.jars", "/Users/deependrashekhawat/jars/postgresql-42.2.21.jar") \
    .getOrCreate()

# Processing Restaurant Data from PostgreSQL DB

##### 1. Creating cursor and executing SQL query to fetch all the restaurant data

In [3]:
conn = psycopg2.connect(host="localhost", database="restaurant", user="postgres", password="Welcome@1", port=5436)
curr = conn.cursor()

In [4]:
curr.execute("""
SELECT rs.restaurant_id, restaurant_name, street, city, state, postal_code, latitude, longitude, stars, review_count, cuisine_name
FROM restaurantcuisine rs
JOIN restaurants r ON (rs.restaurant_id = r.restaurant_id)
JOIN address a ON (r.address_id = a.address_id)
JOIN cuisines c ON (rs.cuisine_id = c.cuisine_id);
""")
restaurantCuisineRaw = curr.fetchall()

##### 2. Creating a Raw DataFrame

In [5]:
columns = ["restaurant_id", "restaurant_name", "street", "city", "state", \
           "postal_code", "latitude", "longitude", "stars", "review_count", "cuisine_name"]
dfRestCusRaw = my_spark.createDataFrame(data=restaurantCuisineRaw, schema = columns)

##### 3. Processing Restaurant data to fetch top 10 for each cuisine based on City and State.

In [6]:
window_spec = Window.partitionBy("city", "state", "cuisine_name")\
                    .orderBy(F.col("stars").desc(), F.col("review_count").desc())

max_number_of_rows_per_partition = 10

dfRestCusProcessed = dfRestCusRaw.withColumn("row_number", F.row_number().over(window_spec))\
  .filter(F.col("row_number") <= max_number_of_rows_per_partition)\
  .drop("row_number")

# City, State and Cuisine based Top 10 Restaurant Recommendation

In [7]:
dfCityCusRest = dfRestCusProcessed.groupBy("city", "state", "cuisine_name") \
                    .agg(F.collect_list(F.struct("restaurant_id", "restaurant_name", "street", "city", "state", \
                           "postal_code", "latitude", "longitude", "stars", "review_count", "cuisine_name")).alias("restaurants"))

In [9]:
dfCityCusRest.write \
.format("com.mongodb.spark.sql.DefaultSource") \
.mode("overwrite") \
.option("spark.mongodb.output.uri", "mongodb://localhost:27017/hungryApp.topcityrestaurants") \
.save()

# User preference based Restaurant Recommendation

##### 1. Reading User data from MongoDB

In [10]:
dfUserRaw = my_spark.read \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("spark.mongodb.input.uri", "mongodb://localhost:27017/hungryApp.users") \
    .load()

dfUser = dfUserRaw.select("_id", "city", "state", F.explode("preferences"))

##### 2. Joining User and City Cusine Restaurant DataFrames

In [11]:
dfUserJoinCityCusRest = dfUser \
    .select("_id", F.initcap("city").alias("city"), "state", F.initcap("col").alias("cuisine_name")) \
    .join(dfCityCusRest, ["cuisine_name", "city", "state"], "left")

##### 3. Writing top 10 restaurant recommendation for a user to MongoDB

In [12]:
dfUserJoinCityCusRest \
.groupBy("_id") \
.agg(F.collect_list(F.struct("cuisine_name", "restaurants")).alias("cuisines")) \
.write \
.format("com.mongodb.spark.sql.DefaultSource") \
.mode("overwrite") \
.option("spark.mongodb.output.uri", "mongodb://localhost:27017/hungryApp.restaurantrecommendations") \
.save()

# Creating State, City and Cuisine collection for User

##### 1. Writing state values to MongoDB

In [13]:
dfCityCusRest \
.select("state") \
.distinct() \
.groupBy() \
.agg(F.collect_list("state").alias("states")) \
.write \
.format("com.mongodb.spark.sql.DefaultSource") \
.mode("overwrite") \
.option("spark.mongodb.output.uri", "mongodb://localhost:27017/hungryApp.states") \
.save()

##### 2. Writing cities for all states to MongoDB

In [14]:
dfCityCusRest \
.select("city", "state") \
.distinct() \
.groupBy("state") \
.agg(F.collect_list("city").alias("cities")) \
.write \
.format("com.mongodb.spark.sql.DefaultSource") \
.mode("overwrite") \
.option("spark.mongodb.output.uri", "mongodb://localhost:27017/hungryApp.cities") \
.save()

##### 3. Writing Cuisines to MongoDB

In [15]:
dfCityCusRest \
.select("city", "state", "cuisine_name") \
.groupBy("city", "state") \
.agg(F.collect_list("cuisine_name").alias("cuisines")) \
.write \
.format("com.mongodb.spark.sql.DefaultSource") \
.mode("overwrite") \
.option("spark.mongodb.output.uri", "mongodb://localhost:27017/hungryApp.cuisines") \
.save()