In [0]:
import os
import json
import gzip
import pandas as pd

In [0]:
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.neighbors import NearestNeighbors
from sklearn.cluster import KMeans
from sklearn.metrics import adjusted_rand_score

In [0]:
import pickle


In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [0]:
!wget -q http://apache.mirrors.pair.com/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz

In [0]:
!tar -xvf spark-2.4.5-bin-hadoop2.7.tgz

In [0]:
!pip install -q pyspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import matplotlib.pyplot as plt

# %matplotlib inline
plt.style.use("ggplot")

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:

data = []
with gzip.open('/content/drive/My Drive/reviews_amazon.gz') as f:
    for l in f:
        data.append(json.loads(l.strip()))
    
# total length of list, this number equals total number of products
print(len(data))

# first row of the list
print(data[0])

151254
{'reviewerID': 'A1VEELTKS8NLZB', 'asin': '616719923X', 'reviewerName': 'Amazon Customer', 'helpful': [0, 0], 'reviewText': 'Just another flavor of Kit Kat but the taste is unique and a bit different.  The only thing that is bothersome is the price.  I thought it was a bit expensive....', 'overall': 4.0, 'summary': 'Good Taste', 'unixReviewTime': 1370044800, 'reviewTime': '06 1, 2013'}


In [0]:
df_review = pd.DataFrame.from_dict(data)
df_review  = df_review.iloc[:,[1,4]]
df_review = df_review.dropna()
df_review

Unnamed: 0,asin,reviewText
0,616719923X,Just another flavor of Kit Kat but the taste i...
1,616719923X,I bought this on impulse and it comes from Jap...
2,616719923X,Really good. Great gift for any fan of green t...
3,616719923X,"I had never had it before, was curious to see ..."
4,616719923X,I've been looking forward to trying these afte...
...,...,...
151249,B00KCJRVO2,Delicious gluten-free oatmeal: we tried both t...
151250,B00KCJRVO2,With the many selections of instant oatmeal ce...
151251,B00KCJRVO2,"While I usually review CDs and DVDs, as well a..."
151252,B00KCJRVO2,My son and I enjoyed these oatmeal packets. H...


In [0]:
df_review['reviewText'].head(500)

0      Just another flavor of Kit Kat but the taste i...
1      I bought this on impulse and it comes from Jap...
2      Really good. Great gift for any fan of green t...
3      I had never had it before, was curious to see ...
4      I've been looking forward to trying these afte...
                             ...                        
495    Imagine the horror my son felt as he unwrapped...
496    Daddy delivers is a big way... Just tie a stri...
497    I'm hopping mad just seeing this. We should al...
498    My only complaint with this product is that ot...
499    I bought two, left them alone in the refrigera...
Name: reviewText, Length: 500, dtype: object

In [0]:
vectorizer = TfidfVectorizer(stop_words='english')
vec = vectorizer.fit_transform(df_review["reviewText"])
vec

<151254x78009 sparse matrix of type '<class 'numpy.float64'>'
	with 5356538 stored elements in Compressed Sparse Row format>

In [0]:
#data =  vec
#model = KMeans(n_clusters=12, init='k-means++')
#clusters = model.fit_predict(data)


In [0]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import *
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml import Pipeline

In [0]:
from pyspark import SparkConf, SparkContext
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF

In [0]:
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()


In [0]:
# create Spark context with Spark configuration
conf = SparkConf().setAppName("recommendation system")
sc = SparkContext(conf=conf)

In [0]:
df = spark.createDataFrame(df_review.astype(str))

In [0]:
df.show()

+----------+--------------------+
|      asin|          reviewText|
+----------+--------------------+
|616719923X|Just another flav...|
|616719923X|I bought this on ...|
|616719923X|Really good. Grea...|
|616719923X|I had never had i...|
|616719923X|I've been looking...|
|616719923X|These Kit-kats ar...|
|616719923X|I found these in ...|
|616719923X|Creamy white choc...|
|616719923X|After hearing mix...|
|616719923X|I love green tea,...|
|616719923X|I ordered these i...|
|616719923X|These are definit...|
|616719923X|Yes - this is one...|
|616719923X|I love the green ...|
|616719923X|I love Kit Kat & ...|
|616719923X|I tried this for ...|
|9742356831|This curry paste ...|
|9742356831|I've purchased di...|
|9742356831|I love ethnic foo...|
|9742356831|I started a new d...|
+----------+--------------------+
only showing top 20 rows



In [0]:
df.cache()

DataFrame[asin: string, reviewText: string]

In [0]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer, StopWordsRemover
from pyspark.ml.evaluation import ClusteringEvaluator
from pandas.core.reshape.concat import concat

In [0]:
tokenizer = Tokenizer(inputCol="reviewText", outputCol="tokens")
remover = StopWordsRemover(inputCol="tokens", outputCol="stopWordsRemovedTokens")
hashingTF = HashingTF(inputCol="stopWordsRemovedTokens", outputCol="rawFeatures", numFeatures=2000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

In [0]:
kmeans = KMeans(k=12)
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, kmeans])

In [0]:
model = pipeline.fit(df)


In [0]:
results = model.transform(df)


In [0]:
results.show()

+----------+--------------------+--------------------+----------------------+--------------------+--------------------+----------+
|      asin|          reviewText|              tokens|stopWordsRemovedTokens|         rawFeatures|            features|prediction|
+----------+--------------------+--------------------+----------------------+--------------------+--------------------+----------+
|616719923X|Just another flav...|[just, another, f...|  [another, flavor,...|(2000,[150,295,44...|(2000,[150,295,44...|         0|
|616719923X|I bought this on ...|[i, bought, this,...|  [bought, impulse,...|(2000,[43,139,156...|(2000,[43,139,156...|         0|
|616719923X|Really good. Grea...|[really, good., g...|  [really, good., g...|(2000,[54,260,358...|(2000,[54,260,358...|         0|
|616719923X|I had never had i...|[i, had, never, h...|  [never, before,, ...|(2000,[17,123,126...|(2000,[17,123,126...|         0|
|616719923X|I've been looking...|[i've, been, look...|  [looking, forward...|(2000,

In [0]:
df6 = results.select("asin","prediction")


In [0]:
df7 = df6.toPandas()

In [0]:
#df7 = df7.groupby('prediction').agg({'asin': 'sum'})
df7 = df7.groupby(['prediction'])['asin'].apply(lambda x: ','.join(x)).reset_index()

In [0]:
df7 = df7.rename(columns={"prediction": "Cluster Number", "asin": "Product IDs"})
df7

Unnamed: 0,Cluster Number,Product IDs
0,0,"616719923X,616719923X,616719923X,616719923X,61..."
1,1,B001AG6BMO
2,2,"9742356831,B0000531B7,B0000531B7,B0000CFN0Y,B0..."
3,3,"9742356831,B00004S1C5,B00005344V,B00005C2M2,B0..."
4,4,"B000EF18WU,B001EQ4EHE,B0037YP75E,B004TPKAN4,B0..."
5,5,"B00006IUTN,B00006IUTN,B00006IUTN,B00006IUTN,B0..."
6,6,"B0000DBN1H,B0000DBN1L,B0000DBN1L,B0000DBN1L,B0..."
7,7,"B0000531B7,B0000531B7,B0000537AF,B0000CH39R,B0..."
8,8,"B0000CFLCT,B00014DJL2,B00015YTU6,B00027CE2S,B0..."
9,9,"B00028Q45A,B000ED7M2W,B000ED9L6C,B000ED9LDU,B0..."


In [0]:
df7.to_csv(r'/content/drive/My Drive/recom.csv')

In [0]:
df7 = pd.read_csv('/content/drive/My Drive/recom.csv')

In [0]:
def search(a):
  l = [(a, )]
  gh = spark.createDataFrame(l, ['reviewText'])
  pred = model.transform(gh)
  output = pred.collect()[0]['prediction']
  fin = df7.loc[df7['Cluster Number'] == output]
  fin1 = fin['Product IDs'].values[0]
  fin1 = fin1.split(',')
  return fin1

In [0]:
def unique(a):
  uni = []
  for i in a:
    if i not in uni:
      uni.append(i)
  return uni

In [0]:
a = "Japnese Kit Kat"
search(a)
unique(search(a))[:20]

['616719923X',
 '9742356831',
 'B00004S1C5',
 'B0000531B7',
 'B00005344V',
 'B0000537AF',
 'B00005C2M2',
 'B00006IUTN',
 'B0000CCZYY',
 'B0000CD06J',
 'B0000CDBQN',
 'B0000CDEPD',
 'B0000CETGM',
 'B0000CFLCT',
 'B0000CFLIL',
 'B0000CFN0Y',
 'B0000CFPI2',
 'B0000CH39R',
 'B0000CNU15',
 'B0000CNU1S']