### Spark Initialization

In [1]:
import findspark
findspark.init()

In [2]:
import sys
import copy
import csv

from string import atoi
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [3]:
import numpy as np

In [4]:
conf = SparkConf().setAppName("ContentBased")
conf = conf.setMaster("local[*]")

In [5]:
sc  = SparkContext(conf=conf)

### Train, Test and Output Files

In [6]:
train_file = "/Users/lakshya/Desktop/INF-553/Project/pittsburgh_review_with_text_20_res_lemma_data_train.txt"
test_file = "/Users/lakshya/Desktop/INF-553/Project/pittsburgh_review_with_text_20_res_lemma_data_test.txt"
train_output = '/Users/lakshya/Desktop/INF-553/Project/Pittsburgh_ReviewBased_TFIDF_train_predictions.txt'
test_output = '/Users/lakshya/Desktop/INF-553/Project/Pittsburgh_ReviewBased_TFIDF_test_predictions.txt'

### Load Train and Test Data

In [7]:
trainData = sc.textFile(train_file,use_unicode=False)
testData = sc.textFile(test_file,use_unicode=False)

In [8]:
train_rdd = trainData.mapPartitions(lambda x: csv.reader(x)).map(lambda x: ((x[0], x[1]), float(x[2])))
test_rdd = testData.mapPartitions(lambda x: csv.reader(x)).map(lambda x: ((x[0], x[1]), float(x[2])))

In [9]:
avg_rating = train_rdd.map(lambda x: (x[0][0], x[1])).groupByKey().map(lambda x: (x[0], list(x[1]))).map(lambda x: (x[0], sum(x[1])/len(x[1])))
prod_rating = train_rdd.map(lambda x: (x[0][1], x[1])).groupByKey().map(lambda x: (x[0], list(x[1]))).map(lambda x: (x[0], sum(x[1])/len(x[1])))

In [10]:
train_temp = trainData.mapPartitions(lambda x: csv.reader(x)).map(lambda x: ((x[0], x[1]), 1))

### Load review data

In [11]:
data = sc.textFile(train_file,use_unicode=False)

In [12]:
data.take(6)

['1VVHf1BvtGC0aSCCIjQyiA,K5jY2W5Q3eNnwssV5UZtow,4,2016-11-16,2,2,2,past sunday one several time ive spirit its always eccentric fun time first music performance second hang recent visit sundays bingo bango spirit know pizza good drink also awesome last time get chard margarita time get tomatillo margarita hot ciders its always pleasant surprise see whats menu food drink drink little pricey drawback one coolest things spirit atmosphere its always super strange positive way really never know expect bingo bango definitely family appropriate its fun activity friends maybe even date doesnt mind something ordinary',
 'QYKexxaOJQlseGWmc6soRg,rzByiKaj-bLeLz-zKNBQdg,2,2015-04-13,0,0,0,old cramp build lot enough employees staff keep demand cause long wait time',
 '-ARdx8hOcEWlMDjzwLYZ_g,3cbsPfoUUrysf-M8FI_0IA,4,2014-03-24,6,4,3,live long world without donut menu dont know group nine din three varieties donuts include lemon lavender chocolate espresso zeppolli amaze pepper donut concoction ever m

In [13]:
train_data = data.mapPartitions(lambda x: csv.reader(x)).map(lambda x: ((x[0], x[1]), (x[2],x[7]))).join(train_temp)

In [14]:
train_data.take(3)

[(('3egcdazws_x1wW35jgXfNw', 'gae9LAyt7Qvf_OgAkWASxA'),
  (('4',
    'family always love buca di beppo location food great service always excellent however wait time food delivery final bill keep us remember come return experience buca di beppo wonderful love family style din make reservations years occasion price isnt bad receive well discount coupons everywhere look know say family return'),
   1)),
 (('4wp4XI9AxKNqJima-xahlg', '7O_mNtg37-1sMvQ5xmA8Dw'),
  (('4',
    'appreciate whole vibe old reclaim wood chocolate color wall beautiful crystal chandeliers almost like shouldnt go together go well hear serve lunch want check along something sweet course go roast beef sandwich fresh load good stuff definitely need mayo thats totally give know condiment obsession love bun wasnt hard bite downstairs like sweet treat wonderland glitter lot glitter know theres edible glitter its home base im totally okay past glitter delicious look treat every nook corner cake cookies bon bons even ice cre

### Collect user data from train data (User, Product, Rating)

In [15]:
userReview = train_data.map(lambda x: (x[0][0], x[0][1], x[1][0][0]))

In [16]:
userReviewCollected = userReview.map(lambda x: (x[0], x[1], x[2]))

In [17]:
userReviewCollected.take(5)

[('3egcdazws_x1wW35jgXfNw', 'gae9LAyt7Qvf_OgAkWASxA', '4'),
 ('4wp4XI9AxKNqJima-xahlg', '7O_mNtg37-1sMvQ5xmA8Dw', '4'),
 ('BxDsaVNeWxc5mNyA1HtSHQ', 'oeW0vIYd3rUnAPgmD4fEFg', '5'),
 ('BBg_86FEejn3dNzj0JOR9Q', 'XItYW5ul3OW_AqpT2nDbBQ', '4'),
 ('z2YVGKKcup6mjQmDQ6arEg', 'RqmORv3974ZDC6Zh4nSQwg', '5')]

### Collect product data (Product, Review Text)

In [18]:
prodReview = train_data.map(lambda x: (x[0][1], x[1][0][1])).groupByKey().mapValues(list)

In [19]:
prodReviewCollected = prodReview.map(lambda x: (x[0], " ".join(x[1])))

In [20]:
prodReviewCollected.take(5)

[('5REYrZfsX3m4E3FTwovp5Q',
  'try first time last night pretty good one thing really annoy restaurant see review verde entire restaurant fill reservations can not even seat walk 2 people restaurant empty really avocado corn ceviche mojo criollo nigiri avocado crab delicious mojo criollo favorite everything taste super fresh flavorful serve good size date share wine meh would think place thats like tapas wine bar wine would better order frontera sauvignon blanc often buy liquor store slo wine hm lame hate pay 9 glass wine restaurant easily buy 10 liquor store seem justify least slo bottle service good nice time would come back absolutely love tacos havent try anything else menu taco addiction simply put food delicious however portion extremely small order leave place full youll spend pretty penny wouldnt recommend place youre big eater would terrific spot go show light snack drink food nicely present tapas style really nothing remarkable dont go dinner often would impress presentation 

### TF-IDF vector creation

In [21]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import linear_kernel

  _nan_object_mask = _nan_object_array != _nan_object_array


In [22]:
tf = TfidfVectorizer(analyzer='word',ngram_range=(1,1),min_df=0, norm='l2')

Convert user and product rdd to pandas dataframe

In [23]:
spark = SparkSession(sc)

userPandas = userReviewCollected.toDF().toPandas()
prodPandas = prodReviewCollected.toDF().toPandas()

Create TF-IDF vectors on product review text

In [24]:
tfidf_prod = tf.fit_transform(prodPandas['_2'])

In [25]:
tfidf_prod.shape

(3098, 26685)

Add TF-IDF vectors to product dataframe

In [26]:
prodPandas['Vector'] = tfidf_prod.toarray().tolist()

In [27]:
prodPandas.head()

Unnamed: 0,_1,_2,Vector
0,5REYrZfsX3m4E3FTwovp5Q,try first time last night pretty good one thin...,"[0.0048004686407, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0..."
1,HWrbZS1mxVRj2Y2VwMmDMg,oh man word can not describe excite bakeshop i...,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,MvlQo4bev1eqp1q0HYOLHg,first dance class probably decade probably hav...,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,X9Bql7RrPU5Mab5-hJsI8A,2nd visit promise first time order feature bur...,"[0.0224731737297, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0..."
4,owO2UkNKk9qrWWd_PTYLDA,feel like ive random experience place regulars...,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


Product category text not needed

In [28]:
del prodPandas['_2']

### Weighted Linear Combination of product vectors for users

In [29]:
userPandas['Vector'] = [[] for _ in range(len(userPandas))]

In [30]:
userPandas['_3'] = userPandas['_3'].astype(float)

In [31]:
for index, row in userPandas.iterrows():
    vector = np.array(prodPandas.loc[prodPandas['_1'] == row['_2'], 'Vector'].values[0])
    rating = row['_3']
    userPandas.at[index,'Vector'] = rating*vector

In [32]:
userPandas.head()

Unnamed: 0,_1,_2,_3,Vector
0,3egcdazws_x1wW35jgXfNw,gae9LAyt7Qvf_OgAkWASxA,4.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,4wp4XI9AxKNqJima-xahlg,7O_mNtg37-1sMvQ5xmA8Dw,4.0,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,BxDsaVNeWxc5mNyA1HtSHQ,oeW0vIYd3rUnAPgmD4fEFg,5.0,"[0.0444835542224, 0.0264260403666, 0.0, 0.0, 0..."
3,BBg_86FEejn3dNzj0JOR9Q,XItYW5ul3OW_AqpT2nDbBQ,4.0,"[0.022492077401, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,..."
4,z2YVGKKcup6mjQmDQ6arEg,RqmORv3974ZDC6Zh4nSQwg,5.0,"[0.0783406184889, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0..."


Product and rating column not needed

In [33]:
del userPandas['_2']
del userPandas['_3']

Linear combination of feature vectors

In [34]:
userPandas = userPandas.groupby(['_1']).sum()

Normalize the user feature vectors

In [35]:
from sklearn.preprocessing import Normalizer

In [36]:
for index, row in userPandas.iterrows():
    vector = np.array(row['Vector']).reshape(1, -1)
    transformer = Normalizer().fit(vector)
    userPandas.at[index,'Vector'] = transformer.transform(vector)

In [37]:
userPandas.head()

Unnamed: 0_level_0,Vector
_1,Unnamed: 1_level_1
-0-hVEpwWEcJLJoGq3rE3g,"[[0.00707135105417, 0.000361628795416, 0.0, 0...."
-2OB54nQ6FsGLUM-R1KXnA,"[[0.0081805485151, 0.000732556578107, 0.0, 0.0..."
-ARdx8hOcEWlMDjzwLYZ_g,"[[0.00891272247392, 0.00101404014352, 0.0, 0.0..."
-Pk25bOBsvemFaWKDBVBzA,"[[0.00582838200541, 0.0, 0.0, 0.0, 0.0, 0.0, 0..."
-Q2wBtscwW6JOqlBndji4A,"[[0.00646852989731, 0.0, 0.0, 0.0, 0.001247097..."


### Create user numpy matrix from feature vectors

In [38]:
user_matrix = np.zeros((len(userPandas), tfidf_prod.shape[1]))
idx = 0
for index, row in userPandas.iterrows():
    vector = np.array(row['Vector'])[0]
    user_matrix[idx] = vector
    idx += 1
    

In [39]:
user_matrix

array([[ 0.00707135,  0.00036163,  0.        , ...,  0.        ,
         0.        ,  0.        ],
       [ 0.00818055,  0.00073256,  0.        , ...,  0.        ,
         0.        ,  0.00038655],
       [ 0.00891272,  0.00101404,  0.        , ...,  0.        ,
         0.0003156 ,  0.        ],
       ..., 
       [ 0.0110992 ,  0.        ,  0.        , ...,  0.        ,
         0.        ,  0.        ],
       [ 0.0069594 ,  0.00076676,  0.        , ...,  0.        ,
         0.        ,  0.0004189 ],
       [ 0.0060828 ,  0.00035974,  0.        , ...,  0.        ,
         0.        ,  0.        ]])

### Create product numpy matrix from feature vectors

In [40]:
prod_matrix = np.zeros((len(prodPandas), tfidf_prod.shape[1]))
idx = 0
for index, row in prodPandas.iterrows():
    vector = np.array(row['Vector'])
    prod_matrix[idx] = vector
    idx += 1

In [41]:
prod_matrix

array([[ 0.00480047,  0.        ,  0.        , ...,  0.        ,
         0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        , ...,  0.        ,
         0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        , ...,  0.        ,
         0.        ,  0.        ],
       ..., 
       [ 0.        ,  0.        ,  0.        , ...,  0.        ,
         0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        , ...,  0.        ,
         0.        ,  0.        ],
       [ 0.        ,  0.        ,  0.        , ...,  0.        ,
         0.        ,  0.        ]])

### Compute cosine similarity by taking dot product

In [42]:
similarity_matrix = np.dot(user_matrix, prod_matrix.T)

In [43]:
similarity_matrix.shape

(987, 3098)

### Flatten similarity matrix to relate with user and products

In [44]:
prod = prodPandas['_1'].values
user = userPandas.index.values

In [45]:
zf = similarity_matrix.flatten()
xr = np.repeat(user, prod.size)
yt = np.tile(prod, user.size)
d = np.stack((xr, yt, zf), axis=-1)

In [46]:
d.shape

(3057726, 3)

### Convert similarity matrix to RDD

In [47]:
similarity_rdd = sc.parallelize(d)

In [48]:
similarity_rdd.take(5)

[array([u'-0-hVEpwWEcJLJoGq3rE3g', u'5REYrZfsX3m4E3FTwovp5Q',
        0.47567194032152077], dtype=object),
 array([u'-0-hVEpwWEcJLJoGq3rE3g', u'HWrbZS1mxVRj2Y2VwMmDMg',
        0.18905336787947438], dtype=object),
 array([u'-0-hVEpwWEcJLJoGq3rE3g', u'MvlQo4bev1eqp1q0HYOLHg',
        0.13765104745595919], dtype=object),
 array([u'-0-hVEpwWEcJLJoGq3rE3g', u'X9Bql7RrPU5Mab5-hJsI8A',
        0.40540105082096445], dtype=object),
 array([u'-0-hVEpwWEcJLJoGq3rE3g', u'owO2UkNKk9qrWWd_PTYLDA',
        0.22314458296277775], dtype=object)]

### Use Train and Test Data for predictions

In [49]:
trainData.take(5)

['1VVHf1BvtGC0aSCCIjQyiA,K5jY2W5Q3eNnwssV5UZtow,4,2016-11-16,2,2,2,past sunday one several time ive spirit its always eccentric fun time first music performance second hang recent visit sundays bingo bango spirit know pizza good drink also awesome last time get chard margarita time get tomatillo margarita hot ciders its always pleasant surprise see whats menu food drink drink little pricey drawback one coolest things spirit atmosphere its always super strange positive way really never know expect bingo bango definitely family appropriate its fun activity friends maybe even date doesnt mind something ordinary',
 'QYKexxaOJQlseGWmc6soRg,rzByiKaj-bLeLz-zKNBQdg,2,2015-04-13,0,0,0,old cramp build lot enough employees staff keep demand cause long wait time',
 '-ARdx8hOcEWlMDjzwLYZ_g,3cbsPfoUUrysf-M8FI_0IA,4,2014-03-24,6,4,3,live long world without donut menu dont know group nine din three varieties donuts include lemon lavender chocolate espresso zeppolli amaze pepper donut concoction ever m

Create key on (user, product)

In [50]:
train_rdd = trainData.mapPartitions(lambda x: csv.reader(x)).map(lambda x: ((x[0], x[1]), float(x[2])))
test_rdd = testData.mapPartitions(lambda x: csv.reader(x)).map(lambda x: ((x[0], x[1]), float(x[2])))

In [51]:
similarity_rdd = similarity_rdd.map(lambda x: ((x[0], x[1]), float(x[2])))

In [52]:
similarity_rdd.take(5)

[((u'-0-hVEpwWEcJLJoGq3rE3g', u'5REYrZfsX3m4E3FTwovp5Q'), 0.47567194032152077),
 ((u'-0-hVEpwWEcJLJoGq3rE3g', u'HWrbZS1mxVRj2Y2VwMmDMg'), 0.18905336787947438),
 ((u'-0-hVEpwWEcJLJoGq3rE3g', u'MvlQo4bev1eqp1q0HYOLHg'), 0.13765104745595919),
 ((u'-0-hVEpwWEcJLJoGq3rE3g', u'X9Bql7RrPU5Mab5-hJsI8A'), 0.40540105082096445),
 ((u'-0-hVEpwWEcJLJoGq3rE3g', u'owO2UkNKk9qrWWd_PTYLDA'), 0.22314458296277775)]

Join similarity matrix with train and test RDD to take only similarity values for training and testing

In [53]:
train = similarity_rdd.join(train_rdd)
test = similarity_rdd.join(test_rdd)

In [54]:
train.take(5)

[((u'hHqH_E9FCI_B6WubV0jPYA', u'ZNdV9ytExuxPTXSN8i2xhw'),
  (0.5484030300391493, 3.0)),
 ((u'V0pP_PQnWdtyKpF-pifiaw', u'Fpm3WvqtrAg2ueh_4pz7iA'),
  (0.6575895407217334, 4.0)),
 ((u'IKnLl7SbuP0u6HS34jwHhw', u'guQww9yGHP7rRTea6zTnDg'),
  (0.4790185445032895, 3.0)),
 ((u'4r33dXcE1oYZxjONrhxTiA', u'9gNko6cFCMZbvy1zhJ7-Xg'),
  (0.4673522425623029, 5.0)),
 ((u'Rem81Xoev05aqeA-mFbM4A', u'1LUaZFVMEjodl1tbAGF3sQ'),
  (0.3369241166739786, 4.0))]

Convert RDD to List

In [55]:
train_ratings = train.collect()
test_ratings = test.collect()

### Training Regressor on similarity values

In [56]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import AdaBoostRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.linear_model import LinearRegression
from sklearn.datasets import make_regression
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVR

Convert data to numpy array for regressor

In [57]:
X_train = []
y_train = []
for ratings in train_ratings:
    X_train.append(ratings[1][0])
    y_train.append(ratings[1][1])

X_train = np.array(X_train)
X_train = X_train.reshape(-1,1)

In [58]:
X_test = []
y_test = []
for ratings in test_ratings:
    X_test.append(ratings[1][0])
    y_test.append(ratings[1][1])

X_test = np.array(X_test)
X_test = X_test.reshape(-1,1)

In [59]:
X_test.shape

(9890, 1)

Train regressor on training data and make prediction on Test data<br>
Computed Mean Squared Error on predicted values

In [60]:
forest = RandomForestRegressor(max_depth=1, n_estimators=50)

rs = GradientBoostingRegressor(loss='ls', learning_rate=0.005, n_estimators=20)

reg = LinearRegression()

rs.fit(X_train, y_train)

from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix

expected = y_test
predicted = rs.predict(X_test)

train_expected = y_train
train_predicted = rs.predict(X_train)

print("Training:\n%s" % np.sqrt(mean_squared_error(train_expected, train_predicted)))
print("Result:\n%s" % np.sqrt(mean_squared_error(expected, predicted)))

Training:
1.03137059321
Result:
1.03775086639


### Computed Ratings on missing values

In [61]:
missing_test = test_rdd.subtractByKey(test)

In [62]:
missing_ratings_user = missing_test.map(lambda x: ((x[0][0]), (x[0][1], x[1]))).join(avg_rating).map(lambda x: ((x[0], x[1][0][0]), (x[1][1], x[1][0][1])))

In [63]:
missing_ratings_user.take(5)

[(('70sSlkooEgL_TEjWDQbr3A', 'afXMX5llxcMFzbaPaBBm6A'), (3.12, 4.0)),
 (('pr8_C12oHakeNB4ZPp_dig', '3ZcxnR9YkDVRqqkDJMRWBg'), (4.2, 5.0)),
 (('dz8CFWEWuR_4S1zlZhWCMQ', 'rKh_Nl5edIB9AevqnDmO6g'),
  (3.7291666666666665, 3.0)),
 (('dz8CFWEWuR_4S1zlZhWCMQ', '_VYUU5HPLYasd-xdKLimNA'),
  (3.7291666666666665, 3.0)),
 (('z4MQzyewTRzSoStg0NwL-w', 'lvZOJWiwNymeBhOAgoy11w'),
  (3.911764705882353, 4.0))]

In [64]:
predictions = test.map(lambda x: ((x[0]), (rs.predict(np.array(x[1][0]).reshape(1,-1))[0], x[1][1])))

In [65]:
final_predictions = predictions.union(missing_ratings_user)

In [73]:
final_predictions.count()

10002

In [74]:
mse = final_predictions.map(lambda x: (x[1][0]-x[1][1])**2)

In [75]:
np.sqrt(mse.mean())

1.0382404089064161

In [66]:
final_predictions = final_predictions.collect()

### Save predictions file

In [67]:
with open(test_output, 'w') as f:
    for item in final_predictions:
        f.write(str(item[0][0])+","+str(item[0][1])+","+str(item[1][0])+"\n")

### Making predictions on training data

In [68]:
train_predict = train.map(lambda x: ((x[0]), (rs.predict(np.array(x[1][0]).reshape(1,-1))[0], x[1][1])))

In [81]:
train_predict.take(5)

[((u'cCke3VtFLtqgzkgYSrSE2g', u'BMR_AsSBzTQHqW-SQabI4w'),
  (3.8266873834325938, 4.0)),
 ((u'YE54kKTuqJJPNYWIKIpOEQ', u'RvwZqjdkZ_pER0moPXLZAQ'),
  (3.8386699757233549, 3.0)),
 ((u'4wp4XI9AxKNqJima-xahlg', u'ziJsGjXvidzZWC1I0-SOSg'),
  (3.8386699757233549, 3.0)),
 ((u'U92V-gp13uMZL-sl_naTHA', u'zi6cB_bkswWPLD2k3IVtyg'),
  (3.8266873834325938, 4.0)),
 ((u'rZQCd47n7OwPd71igVX6Og', u'IfHkboQZGPZkKpqHZCW4ag'),
  (3.8306061915479983, 1.0))]

In [82]:
train_predictions = train_predict.map(lambda x: (x[1][0]-x[1][1])**2)

In [None]:
np.sqrt(train_predictions.mean())

In [69]:
train_predict = train_predict.collect()

In [70]:
with open(train_output, 'w') as f:
    for item in train_predict:
        f.write(str(item[0][0])+","+str(item[0][1])+","+str(item[1][0])+"\n")