In [67]:
!pip install numpy pandas

[33mYou are using pip version 19.0.3, however version 20.3.3 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [68]:
import pandas as pd 

In [20]:
%%time
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 pyspark-shell'

conf = pyspark.SparkConf()
conf.setMaster("k8s://https://192.168.49.2:8443")
conf.set("spark.kubernetes.container.image", "kublr/spark-py:2.4.0-hadoop-2.6") 
conf.set("spark.kubernetes.namespace", "project") 
conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "project") 
conf.set("spark.executor.instances", "4") 
conf.set("spark.driver.host", "pyspark-notebook-spark-driver.project.svc.cluster.local")
conf.set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0")
conf.set("spark.sql.parquet.compression.codec", "gzip")

sparkSession = SparkSession.builder \
    .master("k8s://https://192.168.49.2:8443") \
    .appName("notebook") \
    .config(conf=conf) \
    .getOrCreate()
#     .config("spark.kubernetes.container.image", "kublr/spark-py:2.4.0-hadoop-2.6") \
#     .config("spark.kubernetes.namespace", "project") \
#     .config("spark.kubernetes.authenticate.driver.serviceAccountName", "project") \
#     .config("spark.executor.instances", "2") \
#     .config("spark.driver.host", "pyspark-notebook-spark-driver.project.svc.cluster.local") \
#     .getOrCreate()
sc = sparkSession.sparkContext

CPU times: user 111 ms, sys: 137 ms, total: 248 ms
Wall time: 31.9 s


In [21]:
HDFS_URI = "hdfs://hdfs-hadoop-hdfs-nn-0.hdfs-hadoop-hdfs-nn.project.svc.cluster.local:9000"

In [51]:
data = sparkSession.read.csv(HDFS_URI + "/ml-100k/u.data", sep='\t',inferSchema=True, enforceSchema=False, columnNameOfCorruptRecord='broken')

In [52]:
newNames = ["userId", "movieId", "rating", "timestamp"]
dfRenamed = data.toDF(*newNames)
dfRenamed.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)



In [55]:
df2 = dfRenamed.withColumn("rating", dfRenamed["rating"].cast(DoubleType()))
df2.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [57]:
ratings = (df2.select('userId', 'movieId', 'rating')).cache()
print(ratings)
ratings.printSchema()

DataFrame[userId: int, movieId: int, rating: double]
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [58]:
(train, test) = ratings.randomSplit([0.8, 0.2])

In [60]:
%%time
from pyspark.ml.evaluation import RegressionEvaluator
als = ALS(
            rank = 10,
            maxIter = 15, 
            regParam = 0.01, 
            userCol = "userId", 
            itemCol = "movieId", 
            ratingCol = "rating", 
            coldStartStrategy = "drop",
            implicitPrefs = False
        )

model = als.fit(train)

# Evaluate model by computing RMSE on train data 
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName = 'rmse', labelCol = "rating", predictionCol = "prediction")
RMSE = evaluator.evaluate(predictions)

CPU times: user 137 ms, sys: 43.9 ms, total: 181 ms
Wall time: 53.8 s


In [61]:
print("Root-mean-square error = " + str(RMSE))

Root-mean-square error = 1.107650288605733


In [62]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
userRecs.count()
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
movieRecs.count()

1659

In [74]:
userRecs.toPandas()

AttributeError: 'NoneType' object has no attribute 'setCallSite'

In [76]:
%%time
model.save(HDFS_URI + "/model-ml")
# tvsFitted.bestModel.write().overwrite().save(HDFS_URI + "/model-ml")

CPU times: user 19.8 ms, sys: 14.5 ms, total: 34.4 ms
Wall time: 13.3 s


In [26]:
%%time
PipelineModel.load(HDFS_URI + "/model-ml")

CPU times: user 96.3 ms, sys: 161 ms, total: 258 ms
Wall time: 10.3 s


PipelineModel_069fcfa7a99f

In [19]:
sc.stop()