In [None]:
!apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar -xvf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [None]:
from google.colab import drive
drive.mount("/content/gdrive", force_remount=True)
%cd '/content/gdrive/MyDrive/LDS9_K265_TranHoangBach/Week_4/data_day_7'

Mounted at /content/gdrive
/content/gdrive/MyDrive/LDS9_K265_TranHoangBach/Week_4/data_day_7


In [None]:
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

import matplotlib.pyplot as plt
import seaborn as sb
import numpy as np
import pandas as pd

%matplotlib inline

In [None]:
spark = SparkSession \
  .builder \
  .master("local[*]")\
  .appName("New-Spark") \
  .config("spark.memory.fraction", 0.8) \
  .config("spark.executor.memory", "10g") \
  .config("spark.driver.memory", "10g")\
  .config("spark.sql.shuffle.partitions" , "800") \
  .config("spark.memory.offHeap.enabled",'true')\
  .config("spark.memory.offHeap.size","10g")\
  .getOrCreate()
spark

In [None]:
df = spark.read.csv("ratings_Beauty.csv", header=False, inferSchema=True)
df = df.withColumnRenamed('_c1', 'product_id')
df = df.withColumnRenamed('_c0', 'user_id')
df = df.withColumnRenamed('_c2', 'label')
df = df.select('product_id', 'user_id', 'label')
df.show(5)

+----------+--------------+-----+
|product_id|       user_id|label|
+----------+--------------+-----+
|0205616461|A39HTATAQ9V7YF|  5.0|
|0558925278|A3JM6GV9MNOF9X|  3.0|
|0558925278|A1Z513UWSAAO0F|  5.0|
|0733001998|A1WMRR494NWEWV|  4.0|
|0737104473|A3IAAVS479H7M7|  1.0|
+----------+--------------+-----+
only showing top 5 rows



In [None]:
df.count()

2023070

In [None]:
df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- label: double (nullable = true)



In [None]:
n_product = df.select('product_id').distinct().count()
n_user = df.select('user_id').distinct().count()
print(n_product, n_user)

249274 1210271


In [None]:
# How big the matrix
n_product * n_user

301689093254

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.pipeline import Pipeline

indexer_product = StringIndexer(inputCol='product_id', outputCol='product_idx')
indexer_user = StringIndexer(inputCol='user_id', outputCol='user_idx')

pre_pipeline = Pipeline(stages=[indexer_product, indexer_user])
pre_pipeline_fitted = pre_pipeline.fit(df)
final_df = pre_pipeline_fitted.transform(df)

final_df.show(5)

+----------+--------------+-----+-----------+--------+
|product_id|       user_id|label|product_idx|user_idx|
+----------+--------------+-----+-----------+--------+
|0205616461|A39HTATAQ9V7YF|  5.0|   145790.0| 70392.0|
|0558925278|A3JM6GV9MNOF9X|  3.0|   103581.0|265306.0|
|0558925278|A1Z513UWSAAO0F|  5.0|   103581.0|552933.0|
|0733001998|A1WMRR494NWEWV|  4.0|   145791.0|536779.0|
|0737104473|A3IAAVS479H7M7|  1.0|   145792.0| 14679.0|
+----------+--------------+-----+-----------+--------+
only showing top 5 rows



In [None]:
# Shoule take 20.000 sample to config the model, and then use model tunned for final all data
sample_df = final_df.sample(0.01, seed=42)
sample_train_df, sample_test_df = sample_df.randomSplit([0.8, 0.2], seed=42)

train_df, test_df = final_df.randomSplit([0.8, 0.2], seed=42)

In [None]:
sample_df.count()

20487

In [None]:
sample_train_df.cache()
sample_test_df.cache()

DataFrame[product_id: string, user_id: string, label: double, product_idx: double, user_idx: double]

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from sklearn.metrics import mean_squared_error
import time
tic = time.time()

als = ALS(maxIter=10,           # Number of iterations
          regParam=0.1,        # Regularization parameter beta
          rank=15,              # Number of features
          numItemBlocks=10,     # Number partitioned to parallelize computation
          alpha=0.5,            # Learning rate
          coldStartStrategy='drop',
          nonnegative=True,
          userCol='user_idx',     
          itemCol='product_idx',
          ratingCol='label')
model = als.fit(sample_train_df)

predictions = model.transform(sample_test_df)
# predictions.show(5)
predictions.cache()

evaluator = RegressionEvaluator(metricName='rmse')
rmse = evaluator.evaluate(predictions)
print('RMSE: {:.4f}'.format(rmse))

toc = time.time()
print('Total time: {:.2f} seconds'.format(toc-tic))

RMSE: 3.1180
Total time: 1404.02 seconds


In [None]:
tic = time.time()
print(rmse.collect())
toc = time.time()
print('Total time: {:.2f} seconds'.format(toc-tic))

[Row(rmse=3.3119045634937643)]
Total time: 0.10 seconds


In [None]:
import pyspark.sql.functions as psf
tic = time.time()
rmse = predictions.withColumn("squarederror",
                   psf.pow(psf.col("label") - psf.col("prediction"),
                           psf.lit(2)
                  ))\
       .agg(psf.avg(psf.col("squarederror")).alias("mse"))\
       .withColumn("rmse", psf.sqrt(psf.col("mse")))

print(rmse.collect())
print('Total time: {:.2f} seconds'.format(toc-tic))

[Row(mse=10.968711837690822, rmse=3.3119045634937643)]
Total time: -48.03 seconds


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
import time
tic = time.time()

als = ALS(maxIter=10,           # Number of iterations
          regParam=0.1,        # Regularization parameter beta
          rank=15,              # Number of features
          numItemBlocks=10,     # Number partitioned to parallelize computation
          alpha=0.1,            # Learning rate
          coldStartStrategy='drop',
          nonnegative=True,
          userCol='user_idx',     
          itemCol='product_idx',
          ratingCol='label')
model = als.fit(train_df)

predictions = model.transform(test_df)
# predictions.show(5)

toc = time.time()
print('Total fit time: {:.2f} seconds'.format(toc-tic))

Total fit time: 483.44 seconds


In [None]:
tic = time.time()
evaluator = RegressionEvaluator(metricName='rmse')
rmse = evaluator.evaluate(predictions)
print('RMSE: {:.4f}'.format(rmse))
toc = time.time()
print('Total evaluate time: {:.2f} seconds'.format(toc-tic))

RMSE: 1.8035
Total evaluate time: 1505.85 seconds


In [None]:
tic = time.time()
predictions = predictions.withColumn('difference', col('label') - col('prediction'))
predictions = predictions.withColumn('squared_difference', pow(col('difference'), 2))
rmse = predictions.select(sqrt(avg(col('squared_difference'))).alias('rmse'))
rmse.show()
toc = time.time()
print('Total evaluate time: {:.2f} seconds'.format(toc-tic))

+------------------+
|              rmse|
+------------------+
|1.8034768182749659|
+------------------+

Total evaluate time: 1727.85 seconds


In [None]:
user_recom = model.recommendForAllUsers(5)
for user in user_recom.head(5):
    print(user)
    print('')

In [None]:
product_label = pre_pipeline_fitted.stages[0].labels
user_label = pre_pipeline_fitted.stages[1].labels

def convert_product(list_tuple, product_label):
    converted = []
    for product_idx, rating in list_tuple:
        converted.append((product_label[product_idx], rating))
    return converted

convert_recom = udf(lambda x: convert_product(x, product_label), ArrayType(ArrayType(StringType())))
user_recom = user_recom.withColumn('recommendation_product', convert_recom('recommendations'))

convert_user = udf(lambda x: user_label[x], StringType())
user_recom = user_recom.withColumn('user_id', convert_user('user_idx'))

user_recom_by_id = user_recom.select('user_id', 'recommendation_product')
user_recom_by_id.toPandas()