In [4]:
from collections import defaultdict, Counter
from functools import partial
import json
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import os
import pickle
import pandas as pd
from pywaffle import Waffle
import squarify
import scipy
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.metrics.pairwise import linear_kernel, cosine_similarity
from wordcloud import WordCloud

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.window import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALSModel

In [8]:
spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local") \
    .getOrCreate()

In [14]:
business = spark.read.json("../../temp/yelp_json_yelp_academic_dataset_business.json")
review = spark.read.json("../../temp/yelp_json_yelp_academic_dataset_review.json")
user = spark.read.json("../../temp/yelp_json_yelp_academic_dataset_user.json")

In [15]:
userRatings=review.groupBy("user_id").count().select('user_id')
window = Window.orderBy(col('user_id'))
userRatings = userRatings.withColumn('userid', row_number().over(window))
buiRatings=review.groupBy("business_id").count().select('business_id')
window = Window.orderBy(col('business_id'))
buiRatings = buiRatings.withColumn('businessid', row_number().over(window))
newratings=review.join(userRatings, ['user_id'])
newratings=newratings.join(buiRatings, ['business_id'])
newratings=newratings.withColumn("stars",col("stars").cast(IntegerType()))
newratings=newratings.withColumn("date",to_timestamp("date"))

In [None]:
review.show(5)
business.show(5)
user.show(5)

### Weighted Average <a class="anchor" id="Weighted_average"></a>

In [None]:
from pyspark.sql.functions import avg,count
res=newratings.groupBy('businessid','business_id').agg(avg("stars").alias("meanStar"),count('stars').alias('numRate'))

C = res.select(mean ('meanStar')).collect()[0][0]
m = res.approxQuantile("numRate", [0.7], 0.25)[0]

C, m

In [None]:
q_res = res.where(col('numRate')>=m)
q_res.count()

In [None]:
def weighted_rating(x, m=m, C=C):
    v=x.toPandas()['numRate']
    R=x.toPandas()['meanStar']
    return (v/(v+m) * R) + (m/(m+v) * C)

In [None]:
score = weighted_rating(q_res)

q_ress=q_res.toPandas()
q_ress['score']=score
q_ress = spark.createDataFrame(q_ress) 
# q_ress=q_res.withColumn('score',lit(score))

In [None]:
q_ress.show(5)

In [None]:
q_ressPD=q_ress.join(business,['business_id'])
q_ressPD = q_ressPD.toPandas().sort_values('score', ascending=False)
q_ressPD[['businessid','business_id', 'name','score']].head(5)

In [None]:
with open('../data/res_scores.pickle', 'wb') as handle:
    pickle.dump(q_ressPD[['business_id', 'name','score']], handle, protocol=pickle.HIGHEST_PROTOCOL)

### Content based <a class="anchor" id="Content_based"></a>

In [None]:
newratings.show(5)

In [16]:
new=newratings.groupBy('businessid').agg(avg("stars").alias("meanStar"),count('stars').alias('numRate'))
new=new.filter(col('meanStar')>4.6)
temp=new.join(newratings,['businessid'])
butemp=business.select('business_id','name')
temp=temp.join(butemp,['business_id'])
del butemp
temp.count()

536315

In [None]:
g=spark.read.json('../data/res_df.json/')
g.show(3)

In [None]:
new_res=temp.join(business,['business_id']).select('name','review_id')
new_res.show(3)

In [None]:
new_res.write.format("json").mode("overwrite").save("../data/ress_df.json")

In [None]:
new.count()

In [17]:
def getUniqueDF(df):
    window_spec = Window.partitionBy('businessid').orderBy(col('stars').desc(),col('date').desc(),length('text').desc())
    df_with_row_number = df.withColumn('row_number', row_number().over(window_spec))
    df_with_row_number=df_with_row_number.filter(col('row_number')==1).select('review_id','business_id','businessid','text','meanStar','name')
    return df_with_row_number

In [18]:
df_with_row_number=getUniqueDF(temp)
df_with_row_number.count()

21905

In [19]:
df_with_row_number.write.format("json").mode("overwrite").save("../data/temp.json")

In [20]:
temp=df_with_row_number.toPandas()
temp.head()

Unnamed: 0,review_id,business_id,businessid,text,meanStar,name
0,g7wkIEW9sBV7xYg2AzkGrw,-5ink0kIoVfuS5Zi_6QBnQ,243,Liberties Parcel is my go-to place to ship and...,4.958904,Liberties Parcel
1,wkRIxZ4H8O_R3qgmmPN22A,-LiECrK7Cunuy0RAaKVmhQ,833,VIP Collision was amazing! Needed a bumper pai...,4.712329,VIP Collision Center
2,NfuW7wxMGIQU657ydGD6jA,-duPMGeNQSCGEUl0s552Cw,1522,Having lived in philly was excited when this o...,4.727273,Rita's Italian Ice & Frozen Custard
3,WKIaEBJ39QK4_u9WexmIbQ,-fgqxSoaPN3QrB7FrIxk7Q,1591,I came to Caliber after being rear ended. The ...,5.0,Caliber Collision
4,22m2a3hDaD3n1eD8sqWMLQ,-nJid5B14dYu1wPtMp149g,1884,Great product made fresh to order.\nI had the ...,4.8125,MJ's Backyard BBQ and Catering


In [21]:
with open('../data/res_df.pickle', 'wb') as handle:
    pickle.dump(temp[['review_id','business_id','name']], handle, protocol=pickle.HIGHEST_PROTOCOL)

In [22]:
with open('../data/res_df.pickle', 'rb') as handle:
    movie = pickle.load(handle)
movie.head()

Unnamed: 0,review_id,business_id,name
0,g7wkIEW9sBV7xYg2AzkGrw,-5ink0kIoVfuS5Zi_6QBnQ,Liberties Parcel
1,wkRIxZ4H8O_R3qgmmPN22A,-LiECrK7Cunuy0RAaKVmhQ,VIP Collision Center
2,NfuW7wxMGIQU657ydGD6jA,-duPMGeNQSCGEUl0s552Cw,Rita's Italian Ice & Frozen Custard
3,WKIaEBJ39QK4_u9WexmIbQ,-fgqxSoaPN3QrB7FrIxk7Q,Caliber Collision
4,22m2a3hDaD3n1eD8sqWMLQ,-nJid5B14dYu1wPtMp149g,MJ's Backyard BBQ and Catering


# Old

In [None]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.pipeline import Pipeline

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(numFeatures=10000,inputCol="words", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])
model = pipeline.fit(df_with_row_number)
result = model.transform(df_with_row_number)


In [None]:
result.write.format("json").mode("overwrite").save("../data/results.json")

In [None]:
result=spark.read.json('../data/results.json')

In [None]:
result.show(2)

In [None]:
matrix=result.select('business_id','features')

In [None]:
matrix.show(3)

In [None]:
matrix.write.parquet("../data/tfidfs.parquet")

In [None]:
matrix=spark.read.parquet('../data/tfidfs.parquet')

In [None]:
similarity_udf = udf(lambda x, y: float(x.dot(y)) / (x.numNonzeros() * y.numNonzeros()), DoubleType())
joined_data = matrix.alias('d1').join(matrix.alias('d2'), on=col('d1.business_id') != col('d2.business_id'))
similarity_scores = joined_data.select(col('d1.business_id').alias('business_id_1'), col('d2.business_id').alias('business_id_2'), similarity_udf(col('d1.features'), col('d2.features')).alias('similarity'))
top_related_businesses = similarity_scores.groupBy('business_id_1').agg(collect_list(struct('business_id_2', 'similarity')).alias('related_businesses'))

In [None]:
top_related_businesses = top_related_businesses.withColumn('top10_related_businesses', udf(lambda related_businesses: [x[0] for x in sorted(related_businesses, key=lambda x: x[1], reverse=True)[:5]], ArrayType(StringType()))(col('related_businesses'))).select('business_id_1', 'top10_related_businesses')


In [None]:
top_related_businesses.write.format('json').mode("overwrite").save("../data/related.json")

In [None]:
from pyspark.ml.linalg import SparseVector

num_features=10000
sparse_vectors = matrix.rdd.map(lambda row: row['features'].toArray()) \
                             .map(lambda arr: SparseVector(num_features, [(i, arr[i]) for i in range(num_features)]))

In [None]:
sparse_vectors.take(1)

In [None]:
similarity_matrix = sparse_vectors.cartesian(sparse_vectors).map(lambda x: (x[0][0], x[1][0], x[0][1].dot(x[1][1])))

schema = StructType([
    StructField("id1", IntegerType(), True),
    StructField("id2", IntegerType(), True),
    StructField("similarity", DoubleType(), True)
])

similarity_df = similarity_matrix.toDF(schema)


In [None]:
similarity_df = similarity_df.select("*", monotonically_increasing_id().alias("row_id"))

# Optionally, cache the DataFrame for better performance
similarity_df.cache()

In [None]:
similarity_matrix = sparse_vectors.cartesian(sparse_vectors).map(lambda x: (x[0].dot(x[1]),))

In [None]:
similarity_rows = similarity_matrix.filter(lambda row: len(row) >= 3) \
    .map(lambda row: Row(id1=row[0], id2=row[1], similarity=row[2])).toDF()

In [None]:
from pyspark.sql import Row

# similarity_rows = similarity_matrix.filter(lambda x: len(x) >= 3).map(lambda x: Row(id1=x[0], id2=x[1], similarity=x[2])).collect()

similarity_rows = similarity_matrix.filter(lambda row: len(row) >= 3) \
    .map(lambda row: Row(id1=row[0], id2=row[1], similarity=row[2])).toDF()

# similarity_df = spark.createDataFrame(similarity_rows)

# similarity_df.take(1)

In [None]:
similarity_df.write.parquet("../data/similarity.parquet")

In [None]:
similarity_matrix = sparse_vectors.cartesian(sparse_vectors).map(lambda x: (x[0].dot(x[1]),)).toDF(['similarity'])

# New

In [23]:
a=temp

In [26]:
tfidf = TfidfVectorizer(stop_words='english')

a['text'] = a['text'].fillna('')

tfidf_matrix = tfidf.fit_transform(a['text'])
tfidf_matrix.shape 

(21905, 34792)

In [25]:
tfidf_matrix = scipy.sparse.load_npz('../data/res_matrix.npz')

In [4]:
cosine_sim = linear_kernel(tfidf_matrix, tfidf_matrix)

In [None]:
cosine_sim = linear_kernel(tfidf_matrix, tfidf_matrix)
indices = pd.Series(a.index, index=a['name']).drop_duplicates()

In [None]:
def get_recommendations(text, cosine_sim=cosine_sim):
    idx = indices[text]
    sim_scores = list(enumerate(cosine_sim[idx]))
    sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
    sim_scores = sim_scores[1:6]
    res_indices = [i[0] for i in sim_scores]
    res_similarity = [i[1] for i in sim_scores]

    return pd.DataFrame(zip(a['name'].iloc[res_indices], res_similarity), columns=["name", "similarity"])

In [None]:
scipy.sparse.save_npz('../data/res_matrix.npz', tfidf_matrix)

In [None]:
get_recommendations('Liberties Parcel')