In [1]:
import pyspark

In [2]:
from platform import python_version

print(python_version())

3.7.4


In [3]:
import pyspark
import pyspark.sql.functions as F

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Window
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

In [4]:
sc = SparkContext()
sql = SQLContext(sc)

In [5]:
%%time
r_data = sql.read.json('/Users/hka/Desktop/project/test100000.json')

In [6]:
r_data = r_data.drop('helpful', 'reviewTime', 'summary', 'unixReviewTime', 'reviewText')
print(r_data.show(10))

+----------+-------+--------------------+--------------------+
|      asin|overall|          reviewerID|        reviewerName|
+----------+-------+--------------------+--------------------+
|000100039X|    5.0|A10000012B7CGYKOM...|                Adam|
|000100039X|    5.0|      A2S166WSCFIFP5|adead_poet@hotmai...|
|000100039X|    5.0|      A1BM81XB4QHOA3|Ahoro Blethends "...|
|000100039X|    5.0|      A1MOSTXNIO5MPJ|           Alan Krug|
|000100039X|    5.0|      A2XQ5LZHTD4AFT|            Alaturka|
|000100039X|    5.0|      A3V1MKC2BVWY48|         Alex Dawson|
|000100039X|    5.0|      A12387207U8U24|                Alex|
|000100039X|    5.0|      A29TRDMK51GKZR|        Alpine Plume|
|000100039X|    5.0|      A3FI0744PG1WYG|Always Reading "tkm"|
|000100039X|    5.0|      A2LBBQHYLEHM7P|Amazon Customer "...|
+----------+-------+--------------------+--------------------+
only showing top 10 rows

None


In [7]:
print(r_data.select('asin').distinct().count())

2641


In [8]:
r_data.filter(r_data.reviewerName.isNull()).show()

+----------+-------+--------------+------------+
|      asin|overall|    reviewerID|reviewerName|
+----------+-------+--------------+------------+
|0002727463|    5.0| AVXZ5AF18WDHG|        null|
|0007133103|    5.0|A3LXCRJOPYGVWT|        null|
|0007154615|    5.0| A75W6T9I2S8BA|        null|
|0007155417|    5.0| ASQ5357GTL3PV|        null|
|0007156634|    2.0| A3QZQ2ZE8JL7Q|        null|
|0007170602|    4.0| AP4TSMD212B4C|        null|
|0007446977|    4.0|A343C98QJO0JBE|        null|
|0007524404|    5.0|  AZTA5A260B8T|        null|
|000755947X|    3.0|A3IKLTT9JQACCI|        null|
|0025174215|    5.0|A29FPL4ZNJ5RVS|        null|
|0025668609|    5.0|A1CPZQ3MLCYJBH|        null|
|0028609980|    5.0|A1LFIIB1AQ9ER5|        null|
|0028633873|    5.0| APRTK7SCYNOHT|        null|
|0028638840|    4.0|A3LS40E7D4TQS6|        null|
|0028740637|    5.0|A2G7Y6ETCISQ4G|        null|
|0029184657|    4.0| AIYK8D9FTZ790|        null|
|006000150X|    5.0| AZRFX7G4WX2RE|        null|
|0060002107|    3.0|

In [9]:
r_data.columns

['asin', 'overall', 'reviewerID', 'reviewerName']

In [10]:
r_data = r_data.fillna('', subset=['reviewerName'])

In [11]:
r_data.filter(r_data.reviewerName.isNull()).count()

0

In [12]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [13]:
indexer = [StringIndexer(inputCol = column, outputCol = column + "index") for column in list(set(r_data.columns) - set(['overall','reviewerName']))]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(r_data).transform(r_data)

In [14]:
transformed.columns

['asin',
 'overall',
 'reviewerID',
 'reviewerName',
 'reviewerIDindex',
 'asinindex']

In [18]:
%%time
als = ALS(maxIter=15, regParam=0.05, userCol = "reviewerIDindex", itemCol = "asinindex", ratingCol = "overall", coldStartStrategy="drop")
als_model = als.fit(transformed)

CPU times: user 28.7 ms, sys: 24.5 ms, total: 53.1 ms
Wall time: 33.4 s


In [30]:
%%time
recommendations = als_model.recommendForAllUsers(5).show(10)
print(recommendations)
print(type(recommendations))

+---------------+--------------------+
|reviewerIDindex|     recommendations|
+---------------+--------------------+
|            148|[[1218, 6.933681]...|
|            463|[[1401, 9.847683]...|
|            471|[[1570, 8.539649]...|
|            496|[[704, 8.5722], [...|
|            833|[[1570, 10.254099...|
|           1088|[[1968, 5.378343]...|
|           1238|[[1796, 6.9148154...|
|           1342|[[1570, 9.958483]...|
|           1580|[[2000, 7.0686593...|
|           1591|[[713, 8.119806],...|
+---------------+--------------------+
only showing top 10 rows

None
<class 'NoneType'>
CPU times: user 3.99 ms, sys: 3.77 ms, total: 7.75 ms
Wall time: 24.5 s


In [28]:
print(recommendations)

None
