# Import Modules

In [2]:
# Regular modules
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import pyspark
import matplotlib.pyplot as plt
from wordcloud import WordCloud
import pandas as pd
import re 
import string
%matplotlib inline
import keras
import folium


#spark sql imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.sql.functions import unix_timestamp, from_unixtime, to_timestamp, col, round, month, year, udf, date_format, to_date, datediff, lower


#spark ML imports
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, Word2Vec, OneHotEncoder, StringIndexer, OneHotEncoderEstimator, RFormula
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import bigdl
from pyspark.mllib.classification import LogisticRegressionWithLBFGS,SVMWithSGD
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
#!conda install systemml keras tensorflow 
#from systemml.mllearn import Keras2DML
#!conda update -n base -c defaults conda
#!conda install -c johnsnowlabs spark-nlp
#!y
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.embeddings import *


#change configuration settings on Spark 
spark = SparkSession.builder.master('yarn-client').appName("local[*]").getOrCreate()
conf = spark.sparkContext._conf.setAll([("spark.sql.crossJoin.enabled", "true"),('spark.executor.memory', '8g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '32'), ('spark.cores.max', '32'), ('spark.driver.memory','15g'),("spark.jars.packages", "JohnSnowLabs:spark-nlp:2.1.0")])
sqlContext = pyspark.SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)

In [3]:
df_income = spark.read.csv('data/project/kaggle_income.csv',header=True, inferSchema="true")

In [4]:
df_income.printSchema()

root
 |-- id: integer (nullable = true)
 |-- State_Code: integer (nullable = true)
 |-- State_Name: string (nullable = true)
 |-- State_ab: string (nullable = true)
 |-- County: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Place: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Primary: string (nullable = true)
 |-- Zip_Code: integer (nullable = true)
 |-- Area_Code: string (nullable = true)
 |-- ALand: long (nullable = true)
 |-- AWater: long (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Lon: double (nullable = true)
 |-- Mean: integer (nullable = true)
 |-- Median: integer (nullable = true)
 |-- Stdev: integer (nullable = true)
 |-- sum_w: double (nullable = true)



### Where are the least/most wealthiest places to live?

In [5]:
df_income2 = df_income.groupby(["State_Name","City"]).agg({"Median":"avg"})
df_income2 = df_income2.withColumn("avg(Median)", round(df_income2["avg(Median)"],2).alias("median"))
df_income2.sort(col("avg(Median)").desc()).show(5)
df_income2.sort(col("avg(Median)").asc()).show(5)

+-----------+-----------+-----------+
| State_Name|       City|avg(Median)|
+-----------+-----------+-----------+
|Connecticut|Marlborough|   300000.0|
|   Missouri|    Buffalo|   300000.0|
|    Indiana|  Greentown|   300000.0|
|   Kentucky|    Hickman|   300000.0|
|   Colorado| Wellington|   300000.0|
+-----------+-----------+-----------+
only showing top 5 rows

+--------------+----------+-----------+
|    State_Name|      City|avg(Median)|
+--------------+----------+-----------+
|         Texas|   Anthony|        0.0|
|North Carolina|     Cosby|        0.0|
|        Hawaii|Lanai City|        0.0|
|      Michigan| Dansville|        0.0|
|       Alabama|   Vincent|        0.0|
+--------------+----------+-----------+
only showing top 5 rows



# Find Average income by state and city

In [6]:
df_income3 = df_income.groupby(["City","Zip_Code"]).agg({"Median":"avg"}).alias("median1")
df_income3 = df_income2.select(col("City").alias("City_match"), col("avg(Median)").alias("Median_match"))
df_income4 = df_income2.join(df_income3, df_income2.City== df_income3.City_match, how="left")
df_income5 = df_income4.groupby(["State_Name","City"]).agg({"avg(Median)":"Mean"})
df_income5.show(20)

+-----------+---------------+----------------+
| State_Name|           City|avg(avg(Median))|
+-----------+---------------+----------------+
| California|     San Marino|        171149.5|
| California|    Yorba Linda|        300000.0|
| California|         Orange|       100843.75|
|   Colorado|      Kremmling|        300000.0|
|   Colorado|    Pueblo West|        112403.5|
|   Colorado|           Pine|        194713.5|
|Connecticut|    Glastonbury|        300000.0|
|Connecticut|      Waterbury|         63470.7|
|    Florida|           Alva|         57012.0|
|    Florida|  South Daytona|         37519.5|
|    Georgia|      Morganton|         55045.0|
|    Georgia|          Perry|         64217.0|
|      Idaho|         Salmon|         24559.0|
|      Idaho|   Soda Springs|         62716.0|
|      Idaho|        Heyburn|         47037.0|
|   Illinois|Round Lake Park|         43465.0|
|   Illinois|  East St Louis|         26002.0|
|   Illinois|        Swansea|         35622.0|
|    Indiana|

In [7]:
df_income2.show(200)

+--------------+-------------------+-----------+
|    State_Name|               City|avg(Median)|
+--------------+-------------------+-----------+
|    California|         San Marino|   171149.5|
|    California|        Yorba Linda|   300000.0|
|    California|             Orange|  100843.75|
|      Colorado|          Kremmling|   300000.0|
|      Colorado|        Pueblo West|   112403.5|
|      Colorado|               Pine|   194713.5|
|   Connecticut|        Glastonbury|   300000.0|
|   Connecticut|          Waterbury|    63470.7|
|       Florida|               Alva|    57012.0|
|       Florida|      South Daytona|    37519.5|
|       Georgia|          Morganton|    55045.0|
|       Georgia|              Perry|    64217.0|
|         Idaho|             Salmon|    24559.0|
|         Idaho|       Soda Springs|    62716.0|
|         Idaho|            Heyburn|    47037.0|
|      Illinois|    Round Lake Park|    43465.0|
|      Illinois|      East St Louis|    26002.0|
|      Illinois|    

# Import all yelp data

In [10]:
df_bus = sqlContext.read.json('user/adhamsuliman/data/business.json').dropna(thresh=1,subset=('state','city','business_id',"longitude","latitude"))
df_bus.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string 

In [11]:
df_bus.groupby("state").count().sort(col("count").desc()).show(50)

+-----+-----+
|state|count|
+-----+-----+
|   AZ|51112|
|   NV|32660|
|   ON|30128|
|   NC|13319|
|   OH|13179|
|   PA|10065|
|   QC| 8319|
|   AB| 7226|
|   WI| 4613|
|   IL| 1731|
|   SC| 1053|
|   NY|   20|
|   CA|   18|
|   TX|    5|
|  XGM|    4|
|   AL|    3|
|   FL|    3|
|   CT|    3|
|   WA|    3|
|   VA|    2|
|   NE|    2|
|   VT|    2|
|  XWY|    2|
|   AK|    2|
|   GA|    2|
|  BAS|    1|
|   NJ|    1|
|   BC|    1|
|  XGL|    1|
|   TN|    1|
|  DOW|    1|
|   AR|    1|
|  DUR|    1|
|  CON|    1|
+-----+-----+



In [12]:
df_checkin = sqlContext.read.json('user/adhamsuliman/data/checkin.json').dropna(thresh=1, subset='business_id')
df_checkin.printSchema()
df_review = sqlContext.read.json('user/adhamsuliman/data/review.json').dropna(thresh=1, subset=('stars','business_id'))
df_review.printSchema()
df_tip = sqlContext.read.json('user/adhamsuliman/data/tip.json').dropna(thresh=1, subset=('business_id','user_id'))
df_tip.printSchema()
df_user = sqlContext.read.json('user/adhamsuliman/data/user.json')
df_user.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)

root
 |-- _corrupt_record: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)

root
 |-- business_id: string (nullable = true)
 |-- compliment_count: long (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)



#### Filter tables for strictly Restaurants

In [13]:
df_restaurants = df_bus.filter(df_bus.categories.like('%Restaurants%')|df_bus.categories.like('%Food%'))
df_restaurants_match = df_restaurants.groupby(["City"]).agg(F.count('address'))
df_income6 = df_income5.select(col("City").alias("City_inc"), col("avg(avg(Median))"))
df_res_inc = df_income6.join(df_restaurants_match, df_restaurants_match.City == df_income6.City_inc, how="inner" )
df_res_inc1 = df_res_inc.select(col("City"),col("avg(avg(Median))").alias("Median"),col("count(address)").alias("count")).sort(col("count").asc()) #.show(100)

#### Create a visualization for restaurants displyaing their review ratings for the city of Las Vegas. 

In [14]:
import folium.plugins as plugins
rating_data = df_restaurants.toPandas()
lat = 36.207430
lon = -115.268460
lon_min, lon_max = lon-0.3,lon+0.5
lat_min, lat_max = lat-0.4,lat+0.5
#subset for vegas
ratings_data_vegas=rating_data[(rating_data["longitude"]>lon_min) &\
                    (rating_data["longitude"]<lon_max) &\
                    (rating_data["latitude"]>lat_min) &\
                    (rating_data["latitude"]<lat_max)]


data=[]
#rearranging data to suit the format needed for folium
stars_list=list(rating_data['stars'].unique())
for star in stars_list:
    subset=ratings_data_vegas[ratings_data_vegas['stars']==star]
    data.append(subset[['latitude','longitude']].values.tolist())
#initialize at vegas
lat = 36.127430
lon = -115.138460
zoom_start=11
print("                     Vegas Review heatmap Animation ")

# basic map
m = folium.Map(location=[lat, lon], tiles="OpenStreetMap", zoom_start=zoom_start)
#inprovising the Heatmapwith time plugin to show variations across star ratings 
hm = plugins.HeatMapWithTime(data,max_opacity=0.3,auto_play=True,display_index=True,radius=7)
hm.add_to(m)
m

                     Vegas Review heatmap Animation 


#### Join restaurants and review yelp tables

In [16]:
df_review2 = df_review.select(col("business_id"), col("date").alias("review_date"),col("stars").alias("review_stars"), col("text").alias("review_text"))
df_res_review = df_restaurants1.join(df_review2, df_restaurants.business_id == df_review2.business_id, how="inner"  )
df_res_review.show(5)

+---------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-------+--------+-----------+--------------------+-----------+------------+-----+-----+--------------------+-------------------+------------+--------------------+
|_corrupt_record|             address|          attributes|         business_id|          categories|     city|               hours|is_open|latitude|  longitude|                name|postal_code|review_count|stars|state|         business_id|        review_date|review_stars|         review_text|
+---------------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+-------+--------+-----------+--------------------+-----------+------------+-----+-----+--------------------+-------------------+------------+--------------------+
|           null|3930 Las Vegas Bl...|[,,,,,,,,, True, ...|BD18SKv935HDmlKrL...|Food, Chocolatier...|Las Vegas|    

# NLP

### Apply lower case

In [18]:
df_res_review1 = df_res_review.select("review_text","review_stars", "city") 
df_res_review1 = df_res_review1.withColumn("review_text",lower(col('review_text')))
#review1.collect()

### Tokenize text

In [19]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
df_res_review1 = tokenizer.transform(df_res_review1)

### Remove stop words

In [20]:
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
df_res_review2 = remover.transform(df_res_review1)
df_res_review2.collect()

[Row(review_text="this shit is ass!! worst one yet!! food wasn't good, horrible quality! bread looks like it's on crack, not even soft or toasted. 2nd time & its ass.. only go if the other one is closed", review_stars=1.0, city='Las Vegas', words=['this', 'shit', 'is', 'ass!!', 'worst', 'one', 'yet!!', 'food', "wasn't", 'good,', 'horrible', 'quality!', 'bread', 'looks', 'like', "it's", 'on', 'crack,', 'not', 'even', 'soft', 'or', 'toasted.', '2nd', 'time', '&', 'its', 'ass..', 'only', 'go', 'if', 'the', 'other', 'one', 'is', 'closed'], filtered=['shit', 'ass!!', 'worst', 'one', 'yet!!', 'food', 'good,', 'horrible', 'quality!', 'bread', 'looks', 'like', 'crack,', 'even', 'soft', 'toasted.', '2nd', 'time', '&', 'ass..', 'go', 'one', 'closed']),
 Row(review_text="best place for awesome rotisserie chicken in las vegas!! soooo good...like absolutely everything is amazing :)  if you're thinking about spending money at some naty fast food place....take your care to the corner of sahara and jo

### Apply Word2Vec

In [33]:
word2Vec = Word2Vec(vectorSize=20, minCount=10, inputCol="filtered", outputCol="wordVectors")
w2VM = word2Vec.fit(df_res_review2)
nlpdf = w2VM.transform(df_res_review2)
nlpdf = nlpdf.withColumn('review_stars',nlpdf.review_stars-1)
nlpdf = nlpdf.select("review_stars","wordVectors","city")

### Use RFormula to create one feature column

In [28]:
rf = RFormula(formula="review_stars  ~ wordVectors ") #+ city
final_df_rf = rf.fit(nlpdf).transform(nlpdf)
final_df_rf1 = final_df_rf.select("features","label")

### Create an RDD and then use LabeledPoint() to prepare data for input into multinomial logistic regression

In [30]:
final_df_rf1.show(5)
nlpdf1 = final_df_rf1.rdd
nlpdf2 = nlpdf1.map(lambda line: LabeledPoint(line[1],[line[0]]))

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-0.0764558017253...|  0.0|
|[-0.0979853154919...|  4.0|
|[0.01316816176668...|  0.0|
|[-0.0691151730716...|  1.0|
|[-0.0132922393341...|  0.0|
+--------------------+-----+
only showing top 5 rows



### Split into train and test

In [31]:
splits = nlpdf2.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]

### Fit the model and check accuracy score on test data

In [32]:
# Compute raw scores on the test set
model = LogisticRegressionWithLBFGS.train(train_df, numClasses=9)
predictionAndLabels = test_df.map(lambda lp: (float(model.predict(lp.features)), lp.label))
metrics = MulticlassMetrics(predictionAndLabels)
accuracy = metrics.accuracy
print("Summary Stats")
print("Accuracy = %s" % accuracy)

Summary Stats
Accuracy = 0.5637096774193548
