In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("ScalableMovieRecommendations") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/05 23:43:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/01/05 23:43:59 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/01/05 23:43:59 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/01/05 23:43:59 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [3]:
movies_df_schema = "ID integer, title string"

moviesDF = spark.read.csv("../datasets/ml-latest/movies.csv", header=True, schema=movies_df_schema)

In [5]:
display(moviesDF)
moviesDF.show()

DataFrame[ID: int, title: string]

+---+--------------------+
| ID|               title|
+---+--------------------+
|  1|    Toy Story (1995)|
|  2|      Jumanji (1995)|
|  3|Grumpier Old Men ...|
|  4|Waiting to Exhale...|
|  5|Father of the Bri...|
|  6|         Heat (1995)|
|  7|      Sabrina (1995)|
|  8| Tom and Huck (1995)|
|  9| Sudden Death (1995)|
| 10|    GoldenEye (1995)|
| 11|American Presiden...|
| 12|Dracula: Dead and...|
| 13|        Balto (1995)|
| 14|        Nixon (1995)|
| 15|Cutthroat Island ...|
| 16|       Casino (1995)|
| 17|Sense and Sensibi...|
| 18|   Four Rooms (1995)|
| 19|Ace Ventura: When...|
| 20|  Money Train (1995)|
+---+--------------------+
only showing top 20 rows



22/01/05 23:45:15 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 3, schema size: 2
CSV file: file:///home/proxslick/spark-movie-lens/datasets/ml-latest/movies.csv


In [6]:
ratings_df_schema = "userId integer, movieId integer, rating float"

ratingsDF = spark.read.csv("../datasets/ml-latest/ratings.csv", header=True, schema=ratings_df_schema).cache()

ratingsCount = ratingsDF.count()
moviesCount = moviesDF.count()

print('There are {0} ratings and {1} movies in the datasets.'.format(ratingsCount, moviesCount))

22/01/05 23:45:59 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 4, schema size: 3
CSV file: file:///home/proxslick/spark-movie-lens/datasets/ml-latest/ratings.csv
                                                                                

There are 27753444 ratings and 58098 movies in the datasets.


In [7]:
display(ratingsDF)
ratingsDF.show()

DataFrame[userId: int, movieId: int, rating: float]

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    307|   3.5|
|     1|    481|   3.5|
|     1|   1091|   1.5|
|     1|   1257|   4.5|
|     1|   1449|   4.5|
|     1|   1590|   2.5|
|     1|   1591|   1.5|
|     1|   2134|   4.5|
|     1|   2478|   4.0|
|     1|   2840|   3.0|
|     1|   2986|   2.5|
|     1|   3020|   4.0|
|     1|   3424|   4.5|
|     1|   3698|   3.5|
|     1|   3826|   2.0|
|     1|   3893|   3.5|
|     2|    170|   3.5|
|     2|    849|   3.5|
|     2|   1186|   3.5|
|     2|   1235|   3.0|
+------+-------+------+
only showing top 20 rows



In [8]:
seed=42
(trainingDF, testDF) = ratingsDF.randomSplit([0.8, 0.2], seed=seed)

print('Training: {0}, test: {1}'.format(trainingDF.count(), testDF.count()))



Training: 22204203, test: 5549241


                                                                                

In [9]:
from pyspark.ml.recommendation import ALS

spark.conf.set("spark.sql.shuffle.partitions", "16")

als = (ALS()
       .setUserCol("userId")
       .setItemCol("movieId")
       .setRatingCol("rating")
       .setPredictionCol("predictions")
       .setMaxIter(2)
       .setSeed(seed)
       .setRegParam(0.1)
       .setColdStartStrategy("drop")
       .setRank(12))

alsModel = als.fit(trainingDF)

22/01/05 23:48:51 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/01/05 23:48:51 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/01/05 23:48:52 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator

regEval = RegressionEvaluator(predictionCol="predictions", labelCol="rating", metricName="mse")

predictedTestDF = alsModel.transform(testDF)

testMse = regEval.evaluate(predictedTestDF)

print('The model had a MSE on the test set of {0}'.format(testMse))



The model had a MSE on the test set of 0.7466344590904963


                                                                                

In [11]:
userFactors = alsModel.userFactors.selectExpr("id as userId", "features as uFeatures")
itemFactors = alsModel.itemFactors.selectExpr("id as movieId", "features as iFeatures")
joinedTrainDF = trainingDF.join(itemFactors, on="movieId").join(userFactors, on="userId")
joinedTestDF = testDF.join(itemFactors, on="movieId").join(userFactors, on="userId")



In [12]:
from itertools import chain
from pyspark.sql.functions import *
from pyspark.sql.types import *

def concat_arrays(*args):
    return list(chain(*args))
    
concat_arrays_udf = udf(concat_arrays, ArrayType(FloatType()))

concatTrainDF = (joinedTrainDF
                 .select('userId', 'movieId', concat_arrays_udf(col("iFeatures"), col("uFeatures")).alias("features"), "rating"))
concatTestDF = (joinedTestDF
                .select('userId', 'movieId', concat_arrays_udf(col("iFeatures"), col("uFeatures")).alias("features"), "rating"))

In [None]:
import tensorflow as tf
import horovod.tensorflow as hvd

tf.set_random_seed(seed=40)

def model_fn(features, labels, mode, params):
    print("HVD Size: ", hvd.size())
    features_with_shape = tf.reshape(features["features"], [-1, 24]) # Explicitly specify dimensions
    
    hidden_layer1 = tf.layers.dense(inputs=features_with_shape, units=params["hidden_layer1"], activation=tf.nn.relu)
    hidden_layer2 = tf.layers.dense(inputs=hidden_layer1, units=params["hidden_layer2"], activation=tf.nn.relu)
    predictions = tf.squeeze(tf.layers.dense(inputs=hidden_layer2, units=1, activation=None), axis=-1)
    
    # If the estimator is running in PREDICT mode, we can stop building our model graph here and simply return
    # our model's inference outputs
    serving_key = tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY
    export_outputs = {serving_key: tf.estimator.export.PredictOutput({"predictions": predictions})}
    if mode == tf.estimator.ModeKeys.PREDICT:
        return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions, export_outputs=export_outputs)
      
    # Calculate Loss (for both TRAIN and EVAL modes)
    loss = tf.losses.mean_squared_error(labels, predictions)
    if mode == tf.estimator.ModeKeys.TRAIN:
        optimizer = tf.train.AdamOptimizer(learning_rate=params["learning_rate"])
        optimizer = hvd.DistributedOptimizer(optimizer)
        
        train_op = optimizer.minimize(loss=loss, global_step=tf.train.get_global_step())
        return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op,
                                          export_outputs=export_outputs)
    # If running in EVAL mode, add model evaluation metrics (accuracy) to our EstimatorSpec so that
    # they're logged when model evaluation runs
    eval_metric_ops = {"rmse": tf.metrics.root_mean_squared_error(labels=labels, predictions=predictions)}
    return tf.estimator.EstimatorSpec(
        mode=mode, loss=loss, eval_metric_ops=eval_metric_ops, export_outputs=export_outputs)

In [None]:
import time

trainValDF = concatTrainDF.withColumn("isVal", when(rand() > 0.8, True).otherwise(False))

model_dir = "/tmp/horovodDemo/" + str(int(time.time()))
print(model_dir)

In [None]:
from sparkdl.estimators.horovod_estimator.estimator import HorovodEstimator

est = HorovodEstimator(modelFn=model_fn,
                       featureMapping={"features":"features"},
                       modelDir=model_dir,
                       labelCol="rating",
                       batchSize=128,
                       maxSteps=20000,
                       isValidationCol="isVal",  
                       modelFnParams={"hidden_layer1": 30, "hidden_layer2": 20, "learning_rate": 0.0001},
                       saveCheckpointsSecs=30)
transformer = est.fit(trainValDF)

In [None]:
display(transformer.transform(concatTestDF).select("userId", "movieId", "predictions", "rating"))

In [None]:
testMse = regEval.evaluate(transformer.transform(concatTestDF))

print('The model had a MSE on the test set of {0}'.format(testMse))