In [1]:
import numpy as np
import pandas as pd

from operator import add
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.ml.feature import StopWordsRemover, VectorAssembler
from pyspark.ml.feature import Word2Vec, Word2VecModel
from pyspark.ml.feature import IDF

from pyspark.ml import Pipeline, PipelineModel

from pyspark.sql.functions import *

from pyspark.sql.types import *

import folium
import html


In [2]:
#path
data = '/Users/jerrygeorge/Desktop/finalproj/data/'
model = '/Users/jerrygeorge/Desktop/finalproj/model/'
output = '/Users/jerrygeorge/Desktop/finalproj/output/'

In [3]:

bus_df = spark.read.parquet(data + 'business.parquet')
user_df = spark.read.parquet(data + 'users.parquet')
rev_df = spark.read.parquet(data + '1.parquet',data +'2.parquet',data +'3.parquet',data +'4.parquet',data +'5.parquet')

In [4]:
bus_df.createOrReplaceTempView("businesses")
user_df.createOrReplaceTempView("users")
rev_df.createOrReplaceTempView("reviews")


In [5]:
rt = spark.sql("SELECT business_id, review_text FROM reviews")
rt.show(6)

+--------------------+--------------------+
|         business_id|         review_text|
+--------------------+--------------------+
|0W4lkclzZThpx3V65...|Love the staff, l...|
|AEx2SYEUJmTxVVB18...|Super simple plac...|
|VR6GpWIda3SfvPC-l...|Small unassuming ...|
|CKC0-MOWMqoeWf6s-...|Lester's is locat...|
|ACFtxLv8pGrrxMm6E...|Love coming here....|
|s2I_Ni76bjJNK9yG6...|Had their chocola...|
+--------------------+--------------------+
only showing top 6 rows



In [None]:
rt = rt.rdd
rb_rdd = rt.map(tuple).reduceByKey(add)  
rb = spark.createDataFrame(rb_rdd)
rb = rb \
                            .withColumnRenamed('_1', 'business_id') \
                            .withColumnRenamed('_2', 'text')


In [None]:

# Build the pipeline 
regexTokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'text', outputCol = 'token')
stopWordsRemover = StopWordsRemover(inputCol = 'token', outputCol = 'nostopwrd')
countVectorizer = CountVectorizer(inputCol="nostopwrd", outputCol="rawFeature")
iDF = IDF(inputCol="rawFeature", outputCol="idf_vec")
word2Vec = Word2Vec(vectorSize = 100, minCount = 5, inputCol = 'nostopwrd', outputCol = 'word_vec', seed=123)
vectorAssembler = VectorAssembler(inputCols=['idf_vec', 'word_vec'], outputCol='comb_vec')
pipeline = Pipeline(stages=[regexTokenizer, stopWordsRemover, countVectorizer, iDF, word2Vec, vectorAssembler])

# fit the model
pipeline_modl = pipeline.fit(reviews_by_business_df)

In [None]:
rb1 = pipeline_mdl.transform(rb)


In [None]:
rb1.select( 'text', 'nostopwrd', 'idf_vec', 'word_vec', 'comb_vec').show(5)

In [None]:
def Cosine(v1, v2): 
    return np.dot(v1, v2) / np.sqrt(np.dot(v1, v1)) / np.sqrt(np.dot(v2, v2))

In [None]:
bv = rb1.select('business_id', 'word_vec').rdd.map(lambda x: (x[0], x[1])).collect()

In [None]:
def SimBusiness(ids, lim=10):
    
    schema = StructType([   
                            StructField("business_id", StringType(), True)
                            ,StructField("score", IntegerType(), True)
                            ,StructField("input_business_id", StringType(), True)
                        ])
    
    sd = spark.createDataFrame([], schema)
    
    for idd in ids:
        
        iiv = [(r[1]) for r in bv if r[0] == idd][0]
        

        sbr = sc.parallelize((i[0], float(Cosine(iv, i[1]))) for i in bv)

        sbd = spark.createDataFrame(sbr) \
            .withColumnRenamed('_1', 'business_id') \
            .withColumnRenamed('_2', 'score') \
            .orderBy("score", ascending = False)
            
        sbd = sbd.filter(col("business_id") != b_id).limit(lim)
        sbd = sbd.withColumn('input_business_id', lit(idd))
        
        sbd = sbd \.union(sbd)
        
    
    return sbd

In [None]:
def Business(ib):
    
    a = ib.alias("a")
    b = bd.alias("b")
    
    return a.join(b, col("a.business_id") == col("b.business_id"), 'inner') \
             .select([col('a.'+xx) for xx in a.columns] + [col('b.business_name'),col('b.categories'),
                                                           col('b.stars'),col('b.review_count'),
                                                           col('b.latitude'),col('b.longitude')])

In [None]:
def showInMap(df):
    
    mp = folium.Map(location=[39.71, -69.43], zoom_start=10)

    for i, r in df.toPandas().iterrows():
        folium.Marker(
                    location =[r.latitude, r.longitude], 
                    popup = html.escape(r["business_name"]) + '<br>' + 'Stars: ' + str(r.stars) + '<br>' + 'Reviews: ' + str(r.review_count),    
                    icon = folium.Icon(color='green')).add_to(mp)
    return mp


In [None]:
def contentrec(ud, lim=10):
    
    query = """
    SELECT distinct business_id FROM reviews  
    where stars >= 3.0 
    and user_id = "{}"
    """.format(ud)

    urb = sqlContext.sql(query)
    
    urb = urb.sample(False, 0.5).limit(5)

    urbd = Business(urb)
    
    # show the sample details
    print('\nSample:')
    urbd.select(['business_id', 'business_name', 'categories']).show(truncate = False)

    bl = [i.business_id for i in urb.collect()]

    #  restaurants similar to the sample
    sbd = SimBusiness(bl, sbl)

    s = sbd.alias("s")
    r = urb.alias("r")
    j = s.join(r, col("s.business_id") == col("r.business_id"), 'left_outer') \
         .where(col("r.business_id").isNull()) \
         .select([col('s.business_id'),col('s.score')])

    a = j.orderBy("score", ascending = False).limit(sim_bus_limit)

    return Business(a)

In [None]:

# test recoms for a user

ud = 'ZWD8UH1T7QXQr0Eq-mcWYg'

cd = contentrec(ud)

print("Businesses recommended")
cd.toPandas()

In [None]:
showInMap(cd)
