In [2]:
from collections import defaultdict, Counter
from functools import partial
import json
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import os
import pickle
import pandas as pd
from pywaffle import Waffle
import squarify
import scipy
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.metrics.pairwise import linear_kernel, cosine_similarity
from wordcloud import WordCloud

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.window import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALSModel

In [3]:
spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local") \
    .getOrCreate()

In [4]:
business = spark.read.json("../../temp/yelp_json_yelp_academic_dataset_business.json")
review = spark.read.json("../../temp/yelp_json_yelp_academic_dataset_review.json")
user = spark.read.json("../../temp/yelp_json_yelp_academic_dataset_user.json")

In [5]:
userRatings=review.groupBy("user_id").count().select('user_id')
window = Window.orderBy(col('user_id'))
userRatings = userRatings.withColumn('userid', row_number().over(window))
buiRatings=review.groupBy("business_id").count().select('business_id')
window = Window.orderBy(col('business_id'))
buiRatings = buiRatings.withColumn('businessid', row_number().over(window))
newratings=review.join(userRatings, ['user_id'])
newratings=newratings.join(buiRatings, ['business_id'])
newratings=newratings.withColumn("stars",col("stars").cast(IntegerType()))
newratings=newratings.withColumn("date",to_timestamp("date"))

In [6]:
review.show(5)
business.show(5)
user.show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute inter

### Weighted Average <a class="anchor" id="Weighted_average"></a>

In [7]:
from pyspark.sql.functions import avg,count
res=newratings.groupBy('businessid','business_id').agg(avg("stars").alias("meanStar"),count('stars').alias('numRate'))
res=res.filter((col('meanStar')>4) & (col('numRate')>20))
butemp=business.select('business_id','name','categories','address','city','state','is_open')
res=res.join(butemp,['business_id'])
del butemp

C = res.select(mean ('meanStar')).collect()[0][0]
m=20

C, m

(4.408811766467476, 20)

In [8]:
def weighted_rating(x, m=m, C=C):
    v=x.toPandas()['numRate']
    R=x.toPandas()['meanStar']
    return (v/(v+m) * R) + (m/(m+v) * C)

In [9]:
def getUniqueDF(df):
    window_spec = Window.partitionBy('businessid').orderBy(col('stars').desc(),col('date').desc(),length('text').desc())
    df_with_row_number = df.withColumn('row_number', row_number().over(window_spec))
    df_with_row_number=df_with_row_number.filter(col('row_number')==1).select('review_id','business_id','businessid','text','meanStar','name','numRate','categories','address','city','state','is_open')
    return df_with_row_number

In [10]:
print(res.count())
res=res.join(newratings,['businessid','business_id'])
q_res=getUniqueDF(res)
print(q_res.count())

22686
22686


In [11]:
score = weighted_rating(q_res)

q_ress=q_res.toPandas()
q_ress['score']=score

In [12]:
q_ressPD = q_ress.sort_values('score', ascending=False)
q_ressPD[['businessid','business_id', 'name','score','categories','address','city','state','is_open']].head(5)

Unnamed: 0,businessid,business_id,name,score,categories,address,city,state,is_open
14721,56737,NDwoKO79_T49UEKVDlHd3A,Sustainable Wine Tours,4.953705,"Wine Tours, Hotels & Travel, Tours, Transporta...","27 West Anapamu St, Ste 104",Santa Barbara,CA,1
16775,28619,B2Tuf5M1wQhdwAKnD-w7Yw,New Orleans Airboat Tours,4.928681,"Tours, Active Life, Bus Tours, Hotels & Travel...",4757 Orleans Way,Marrero,LA,1
18859,3142,0IjDqJexP6jTH4F_Kg4mrQ,A New Twist Balloons and Face Painting,4.925168,"Event Planning & Services, Face Painting, Fest...",,Boise,ID,1
759,100646,ez4kMLP6OJEIaMbMrrGRdA,New Orleans Secrets Tours,4.920238,"Souvenir Shops, Historical Tours, Arts & Enter...","519 Wilkinson St, Ste 100",New Orleans,LA,1
20553,90747,ak-RiYgUZzM_90c8ONWFzw,Burgundy Blue Photography,4.920082,"Professional Services, Event Planning & Servic...",1100 Anacapa St,Santa Barbara,CA,1


In [13]:
with open('../data/res_scores.pickle', 'wb') as handle:
    pickle.dump(q_ressPD[['businessid','business_id', 'name','score','categories','address','city','state','is_open']], handle, protocol=pickle.HIGHEST_PROTOCOL)

### Content based <a class="anchor" id="Content_based"></a>

In [14]:
q_ressPD.head(5)

Unnamed: 0,review_id,business_id,businessid,text,meanStar,name,numRate,categories,address,city,state,is_open,score
14721,X767MHcDjzoKIVkTZe_9gQ,NDwoKO79_T49UEKVDlHd3A,56737,"Fantastic, first class experience both tasting...",4.983562,Sustainable Wine Tours,365,"Wine Tours, Hotels & Travel, Tours, Transporta...","27 West Anapamu St, Ste 104",Santa Barbara,CA,1,4.953705
16775,JAzjsCSnB8RAZ6g_p9uLcg,B2Tuf5M1wQhdwAKnD-w7Yw,28619,This is a wonderful trip on a warm winter day....,4.965035,New Orleans Airboat Tours,286,"Tours, Active Life, Bus Tours, Hotels & Travel...",4757 Orleans Way,Marrero,LA,1,4.928681
18859,1GHK37bdhKIkMabZPMMHUQ,0IjDqJexP6jTH4F_Kg4mrQ,3142,We've booked from them a few years in a row an...,4.961404,A New Twist Balloons and Face Painting,285,"Event Planning & Services, Face Painting, Fest...",,Boise,ID,1,4.925168
759,3RFoZPJwxqybmZ1IdBKPfw,ez4kMLP6OJEIaMbMrrGRdA,100646,This was such a great experience! Roger was am...,4.967593,New Orleans Secrets Tours,216,"Souvenir Shops, Historical Tours, Arts & Enter...","519 Wilkinson St, Ste 100",New Orleans,LA,1,4.920238
20553,udXsGaCNbhz4FweZnEtXMw,ak-RiYgUZzM_90c8ONWFzw,90747,Okay first off researching and picking a photo...,4.977528,Burgundy Blue Photography,178,"Professional Services, Event Planning & Servic...",1100 Anacapa St,Santa Barbara,CA,1,4.920082


In [15]:
with open('../data/res_df.pickle', 'wb') as handle:
    pickle.dump(q_ressPD[['review_id','business_id','businessid','name']], handle, protocol=pickle.HIGHEST_PROTOCOL)

# Old

In [None]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.pipeline import Pipeline

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(numFeatures=10000,inputCol="words", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])
model = pipeline.fit(df_with_row_number)
result = model.transform(df_with_row_number)


In [None]:
result.write.format("json").mode("overwrite").save("../data/results.json")

In [None]:
result=spark.read.json('../data/results.json')

In [None]:
result.show(2)

In [None]:
matrix=result.select('business_id','features')

In [None]:
matrix.show(3)

In [None]:
matrix.write.parquet("../data/tfidfs.parquet")

In [None]:
matrix=spark.read.parquet('../data/tfidfs.parquet')

In [None]:
similarity_udf = udf(lambda x, y: float(x.dot(y)) / (x.numNonzeros() * y.numNonzeros()), DoubleType())
joined_data = matrix.alias('d1').join(matrix.alias('d2'), on=col('d1.business_id') != col('d2.business_id'))
similarity_scores = joined_data.select(col('d1.business_id').alias('business_id_1'), col('d2.business_id').alias('business_id_2'), similarity_udf(col('d1.features'), col('d2.features')).alias('similarity'))
top_related_businesses = similarity_scores.groupBy('business_id_1').agg(collect_list(struct('business_id_2', 'similarity')).alias('related_businesses'))

In [None]:
top_related_businesses = top_related_businesses.withColumn('top10_related_businesses', udf(lambda related_businesses: [x[0] for x in sorted(related_businesses, key=lambda x: x[1], reverse=True)[:5]], ArrayType(StringType()))(col('related_businesses'))).select('business_id_1', 'top10_related_businesses')


In [None]:
top_related_businesses.write.format('json').mode("overwrite").save("../data/related.json")

In [None]:
from pyspark.ml.linalg import SparseVector

num_features=10000
sparse_vectors = matrix.rdd.map(lambda row: row['features'].toArray()) \
                             .map(lambda arr: SparseVector(num_features, [(i, arr[i]) for i in range(num_features)]))

In [None]:
sparse_vectors.take(1)

In [None]:
similarity_matrix = sparse_vectors.cartesian(sparse_vectors).map(lambda x: (x[0][0], x[1][0], x[0][1].dot(x[1][1])))

schema = StructType([
    StructField("id1", IntegerType(), True),
    StructField("id2", IntegerType(), True),
    StructField("similarity", DoubleType(), True)
])

similarity_df = similarity_matrix.toDF(schema)


In [None]:
similarity_df = similarity_df.select("*", monotonically_increasing_id().alias("row_id"))

# Optionally, cache the DataFrame for better performance
similarity_df.cache()

In [None]:
similarity_matrix = sparse_vectors.cartesian(sparse_vectors).map(lambda x: (x[0].dot(x[1]),))

In [None]:
similarity_rows = similarity_matrix.filter(lambda row: len(row) >= 3) \
    .map(lambda row: Row(id1=row[0], id2=row[1], similarity=row[2])).toDF()

In [None]:
from pyspark.sql import Row

# similarity_rows = similarity_matrix.filter(lambda x: len(x) >= 3).map(lambda x: Row(id1=x[0], id2=x[1], similarity=x[2])).collect()

similarity_rows = similarity_matrix.filter(lambda row: len(row) >= 3) \
    .map(lambda row: Row(id1=row[0], id2=row[1], similarity=row[2])).toDF()

# similarity_df = spark.createDataFrame(similarity_rows)

# similarity_df.take(1)

In [None]:
similarity_df.write.parquet("../data/similarity.parquet")

In [None]:
similarity_matrix = sparse_vectors.cartesian(sparse_vectors).map(lambda x: (x[0].dot(x[1]),)).toDF(['similarity'])

# New

In [17]:
a=q_ressPD

In [18]:
tfidf = TfidfVectorizer(stop_words='english')

a['text'] = a['text'].fillna('')

tfidf_matrix = tfidf.fit_transform(a['text'])
tfidf_matrix.shape 

(22686, 32652)

In [None]:
cosine_sim = linear_kernel(tfidf_matrix, tfidf_matrix)
indices = pd.Series(a.index, index=a['name']).drop_duplicates()

In [None]:
def get_recommendations(text, cosine_sim=cosine_sim):
    idx = indices[text]
    sim_scores = list(enumerate(cosine_sim[idx]))
    sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
    sim_scores = sim_scores[1:6]
    res_indices = [i[0] for i in sim_scores]
    res_similarity = [i[1] for i in sim_scores]

    return pd.DataFrame(zip(a['name'].iloc[res_indices], res_similarity), columns=["name", "similarity"])

In [None]:
scipy.sparse.save_npz('../data/res_matrix.npz', tfidf_matrix)

In [None]:
get_recommendations('Liberties Parcel')

In [None]:
get_recommendations('SkinRN Aesthetics')