In [1]:
import gdown
import os
import findspark
import logging
from pyspark.sql import SparkSession

In [2]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [3]:
#=========================================== LOADING DATA ===========================================

In [4]:
ROOT_FOLDER = os.path.dirname(os.getcwd())
DATA_FOLDER = ROOT_FOLDER + "/data/"

In [5]:
DATA_FOLDER

'/Users/mmt6314/My Drive/AI_ML/recommendation-system/data/'

In [6]:
LINKS_CSV = 'links.csv'
MOVIES_CSV = 'movies.csv'
RATINGS_CSV = 'ratings.csv'
TAGS_CSV = 'tags.csv'

In [7]:
data_links = 'https://drive.google.com/uc?id=19cRdbSbDD4lnKAbv6nfwppRXL7kko6HT'
data_movies = 'https://drive.google.com/uc?id=14s8JDudJHGirQT3VYFwp18JBwSX3liYZ'
data_ratings = 'https://drive.google.com/uc?id=1hYYWUHk5hrDsCdj4BJG0UXBhQ_-AiZWO'
data_tags = 'https://drive.google.com/uc?id=1y7px4xin3_9KBvdAnmiz_uH0uBtPo-hH'

In [8]:
logger.info("Initializing spark environment...")
findspark.init()
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
logger.info("Spark environment initialized.")

INFO:__main__:Initializing spark environment...
INFO:__main__:Spark environment initialized.


In [9]:
def download_from_drive(url, output_filename):
    output_filepath = DATA_FOLDER + output_filename
    if not os.path.isfile(output_filepath):
        gdown.download(url, output_filepath, quiet=True)

def read_csv_to_df(filename):
    return spark.read.option("header", "true").csv(DATA_FOLDER + filename)

def print_df(name, spark_df):
    print("=========== {} ===========".format(name))
    spark_df.show(5);

def load_data():
    # Download the dataset from the google drive so as to avoid large files in github.
    download_from_drive(data_links, LINKS_CSV)
    download_from_drive(data_movies, MOVIES_CSV)
    download_from_drive(data_ratings, RATINGS_CSV)
    download_from_drive(data_tags, TAGS_CSV)

    # Read date files from spark context
    links_df = read_csv_to_df(LINKS_CSV)
    movies_df = read_csv_to_df(MOVIES_CSV)
    ratings_df = read_csv_to_df(RATINGS_CSV)
    tags_df = read_csv_to_df(TAGS_CSV)

    # Show data head
    print_df('RATINGS', ratings_df);
    print_df('MOVIES', movies_df);
    print_df('TAGS', tags_df);
    print_df('LINKS', links_df);

    return links_df, movies_df, ratings_df, tags_df


In [10]:
links_df, movies_df, ratings_df, tags_df = load_data()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|


In [11]:
#=========================================== DATA PREPROCESSING ===========================================

In [12]:
ratings_df = ratings_df.withColumn('userId', ratings_df.userId.cast('int'))
ratings_df = ratings_df.withColumn('movieId', ratings_df.movieId.cast('int'))
ratings_df = ratings_df.withColumn('rating', ratings_df.rating.cast('int'))
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: string (nullable = true)



In [13]:
#=========================================== TRAINING MODEL ===========================================

In [14]:
from pyspark.ml.recommendation import ALS

In [15]:
#Setting model config
config = {
    'train_test': {
        'split_factor': 0.8
    },
    'model_params': {
        'maxIter': 10,
        'regParam': 0.1,
        'rank': 8,
        'seed': 43,
   }
}

In [16]:
split_factor = config["train_test"]["split_factor"]
model_params = config["model_params"]
maxIter, regParam, rank = [model_params[k] for k in ('maxIter', 'regParam', 'rank')]

In [17]:
logger.info("Training the ALS model...")
train_data, test_data = ratings_df.randomSplit([split_factor, 1-split_factor])
print_df("train_data", train_data)
print_df("test_data", test_data)

INFO:__main__:Training the ALS model...


+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|     4|964982703|
|     1|      3|     4|964981247|
|     1|      6|     4|964982224|
|     1|     47|     5|964983815|
|     1|     50|     5|964982931|
+------+-------+------+---------+
only showing top 5 rows

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|    223|     3|964980985|
|     1|    260|     5|964981680|
|     1|    923|     5|964981529|
|     1|   1009|     3|964981775|
|     1|   1030|     3|964982903|
+------+-------+------+---------+
only showing top 5 rows



In [18]:
als = ALS(
    userCol="userId", itemCol="movieId", ratingCol="rating"
    #maxIter=maxIter,
    #regParam=regParam,
    #rank=rank,
    #coldStartStrategy="drop",
    #nonnegative=True
)

In [19]:
model = als.fit(train_data)

In [20]:
#=========================================== MODEL PERFORMANCE ===========================================

In [21]:
from pyspark.ml.evaluation import RegressionEvaluator

In [22]:
predictions = model.transform(test_data)
predictions.show(10)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   436|    471|     3| 833530187|  3.688673|
|   602|    471|     4| 840876085| 3.4019654|
|   182|    471|     4|1054779644| 3.1394024|
|   474|    471|     3| 974668858| 3.3648813|
|   287|    471|     4|1110231536|  1.604083|
|   469|    471|     5| 965425364| 2.9393153|
|   608|    471|     1|1117161794|  2.583754|
|   541|    471|     3| 835643551| 3.4622846|
|   373|    471|     5| 846830388| 3.4612875|
|   104|    471|     4|1238111129| 3.9173393|
+------+-------+------+----------+----------+
only showing top 10 rows



In [23]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
RMSE = evaluator.evaluate(predictions)
print("RMSE = " + str(RMSE))

RMSE = nan


In [24]:
#=========================================== MODEL USAGES ===========================================

In [25]:
from pyspark.sql.functions import col

In [26]:
#Fidnding users for given movie

INPUT_MOVIE_NAME = 'Toy Story (1995)'
NO_OF_USERS = 5

movieId_RDD = movies_df.where(col('title') == INPUT_MOVIE_NAME).select("movieId").rdd.flatMap(lambda x: x)
if not movieId_RDD.isEmpty():
    movieId = int(movieId_RDD.first())
    recommendations_df = model.recommendForItemSubset(spark.createDataFrame([(movieId, )], ['movieId']), NO_OF_USERS)
    recommended_users = (recommendations_df.select('recommendations').rdd
         .map(lambda x: x[0])
         .flatMap(lambda x: x)
         .map(lambda x: [x.userId])
         .toDF(['userId']))
    print('User for which \'{}\' can be recommended: '.format(INPUT_MOVIE_NAME));
    recommended_users.show()
else:
    print('Movie \'{}\' not present in dataset.'.format(INPUT_MOVIE_NAME))


User for which 'Toy Story (1995)' can be recommended: 
+------+
|userId|
+------+
|   258|
|    43|
|   543|
|    53|
|   389|
+------+



In [27]:
#Finding movies for given user
INPUT_USER_ID = 171;
NO_OF_MOVIES = 5;

recommended_movies_df = model.recommendForUserSubset(spark.createDataFrame([(INPUT_USER_ID, )], ['userId']), NO_OF_MOVIES)
if not recommended_movies_df.rdd.isEmpty():
    recommended_movies = (recommended_movies_df.select('recommendations').rdd
         .map(lambda x: x[0])
         .flatMap(lambda x: x)
         .map(lambda x: [x.movieId, round(x.rating, 2)])
         .toDF(['movieId', 'rating']))
    print('Movies which can be recommended for user={}'.format(INPUT_USER_ID));
    recommended_movies.join(movies_df, on=['movieId'], how='left_outer').select('movieId', 'title', 'genres', 'rating').show(truncate=False)
else:
    print('User not found in the database!!!')


Movies which can be recommended for user=171
+-------+-----------------------------------------------------+------------------------+------+
|movieId|title                                                |genres                  |rating|
+-------+-----------------------------------------------------+------------------------+------+
|2936   |Sullivan's Travels (1941)                            |Adventure|Comedy|Romance|5.82  |
|184245 |De platte jungle (1978)                              |Documentary             |5.82  |
|171495 |Cosmos                                               |(no genres listed)      |5.82  |
|26073  |Human Condition III, The (Ningen no joken III) (1961)|Drama|War               |5.82  |
|117531 |Watermark (2014)                                     |Documentary             |5.82  |
+-------+-----------------------------------------------------+------------------------+------+



In [28]:
#=========================================== MODEL SAVE FOR INFERENCE PIPELINE ===========================================

In [29]:
MODEL_DUMP_FOLDER = ROOT_FOLDER + '/model_dumps/'
MODEL_FILE_NAME = 'als_matrix_factorization.pkl'

In [30]:
# Save the model as a pickle in a file
model.write().overwrite().save(MODEL_DUMP_FOLDER + MODEL_FILE_NAME)

In [41]:
print(recommended_users.toJSON())

MapPartitionsRDD[516] at toJavaRDD at NativeMethodAccessorImpl.java:0
