In [1]:
# tell jupyter where pyspark is
import findspark
findspark.init()

In [2]:
# import ALS and Linear Regression models
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.regression import LinearRegression
from pyspark.sql import Row
from pyspark.sql import SparkSession

In [4]:
# Build a SparkSession
# SparkSession provides a single point of entry to interact with underlying Spark functionality
spark = SparkSession\
    .builder\
    .appName("ALSExample")\
    .getOrCreate()

In [10]:
# Load data as RDD, then transform it to DataFrame format
lines = spark.read.text("/usr/local/spark/data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
# Split data to training part and test part
(training, test) = ratings.randomSplit([0.8, 0.2])

In [11]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [12]:
# Make predictions using the model we just built; 
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.8696232645250332


In [13]:
# Generate top 5 movie recommendations for each user
userRecs = model.recommendForAllUsers(5)
userRecs.show()

# Generate top 5 user recommendations for each movie
movieRecs = model.recommendForAllItems(5)
movieRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    28|[[62, 8.525843], ...|
|    26|[[39, 6.8696904],...|
|    27|[[49, 4.7381926],...|
|    12|[[49, 5.093774], ...|
|    22|[[90, 6.1015964],...|
|     1|[[17, 5.4589934],...|
|    13|[[93, 3.6132293],...|
|     6|[[25, 5.1587577],...|
|    16|[[52, 5.0461907],...|
|     3|[[74, 5.86853], [...|
|    20|[[22, 4.922925], ...|
|     5|[[30, 5.366032], ...|
|    19|[[90, 4.2803607],...|
|    15|[[46, 5.164592], ...|
|    17|[[46, 5.0489945],...|
|     9|[[65, 5.4990354],...|
|     4|[[29, 4.0381455],...|
|     8|[[29, 5.1080737],...|
|    23|[[49, 5.8362117],...|
|     7|[[25, 4.5248313],...|
+------+--------------------+
only showing top 20 rows

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     31|[[21, 3.7460542],...|
|     85|[[22, 5.860422], ...|
|     65|[[9, 5.4990354], ...|
|     53|[[8, 5.0997677], ...|
|     78|[[3, 1.2662673], ...|
|     

In [14]:
# Generate top 5 movie recommendations for a specified user
user = ratings.select(als.getUserCol()).distinct().limit(1)
userSubsetRecs = model.recommendForUserSubset(user, 5)
userSubsetRecs.show()

# Generate top 5 user recommendations for a specified movie
movie = ratings.select(als.getItemCol()).distinct().limit(1)
movieSubSetRecs = model.recommendForItemSubset(movie, 5)
movieSubSetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    26|[[39, 6.8696904],...|
+------+--------------------+

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     26|[[24, 5.111163], ...|
+-------+--------------------+



In [15]:
spark = SparkSession\
    .builder\
    .appName("LinearRegressionWithElasticNet")\
    .getOrCreate()

In [16]:
# Load training data
training = spark.read.format("libsvm")\
    .load("/usr/local/spark/data/mllib/sample_linear_regression_data.txt")

In [17]:
# Create a linear regression model and fit the model
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lrModel = lr.fit(training)

# Print the coefficients and intercept of the model
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

Coefficients: [0.0,0.32292516677405936,-0.3438548034562218,1.9156017023458414,0.05288058680386263,0.765962720459771,0.0,-0.15105392669186682,-0.21587930360904642,0.22025369188813426]
Intercept: 0.1598936844239736


In [18]:
# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)


numIterations: 7
RMSE: 10.189077
