# Movie Recommendation with Databricks Spark and MongoDB Atlas

## High-Level Steps

1. First make sure that you've already loaded `sample_mflix` database through loading sample data set.
2. And make sure that you have `ratings` collection in the `sample_mflix` collection already filled with the ratings.
3. And make sure you have `movie_id`  field in the `movies` collection in the `sample_mflix` database.


###Let's define the connection parameters: connection string, database, and collection names. 


In [0]:
connectionString='mongodb+srv://main_user:main_user@mediatest.5tka5.mongodb.net/'
database="sample_mflix"
collection_ratings="ratings"
collection_recommendations='recommendations'


### Get the ratings of the users 
- Ratings are stored in the `ratings` collection in a nested way. All the ratings of a user are stored in one single document.
- It must be converted into one to one (1 user : 1 rating) format for ALS package to train the data for the recommendation.


In [0]:
pipeline = [
    {
        '$unwind': {
            'path': '$ratings'
        }
    }, {
        '$project': {
            'user_id': '$user_id', 
            'movie_id': '$ratings.movie_id', 
            'rating': '$ratings.rating'
        }
    }
]
# MongoSamplePartitioner
df = spark.read.format('mongo').option("database", database).option("collection", collection_ratings).option("pipeline", pipeline).option("partitioner", "MongoSamplePartitioner").option("spark.mongodb.input.uri", connectionString).load()



### We set the parameters for ALS (Alternating Least Squares) algorithm
- We define the field names for ALS algorithm 
-- user, movie (item), and rating that are all the output of the aggregation query above

In [0]:

from pyspark.ml.recommendation import ALS
als = ALS(  maxIter=10, 
            regParam=0.5, 
            userCol="user_id", # output of the aggregation query
            itemCol = "movie_id", # output of the aggregation query
            ratingCol =    "rating", # output of the aggregation query
            coldStartStrategy = "drop")

# set the train and test data but we will not need it, we will run the training on all the data
#train, test = df.randomSplit([0.8, 0.2])


### Run the model training with full data

In [0]:
#Training the Model
alsModel = als.fit(df)
# we could set some training data rather than whole dataset but now we are running training of our model on the whole dataset

### It's time to see the predictions. 
- Let's ask ALS Model to provide 3 recommendations for each user 
- Then we will show the top 10 recommendations here.


In [0]:
recommended_movie_df = alsModel.recommendForAllUsers(3)
recommended_movie_df.show(10, False)


+-------+------------------------------------------------------------+
|user_id|recommendations                                             |
+-------+------------------------------------------------------------+
|26     |[{1532, 3.2443335}, {9064, 3.228116}, {2752, 3.2205803}]    |
|27     |[{12162, 4.135733}, {4176, 4.1316957}, {1927, 4.119496}]    |
|28     |[{11870, 3.5797307}, {14581, 3.412075}, {12154, 3.4117937}] |
|31     |[{10132, 3.9271512}, {14917, 3.7641351}, {2196, 3.7191703}] |
|34     |[{79, 3.6154218}, {2552, 3.5307894}, {4724, 3.5279698}]     |
|44     |[{1927, 3.2515619}, {6388, 3.1706517}, {15938, 3.0852861}]  |
|53     |[{15326, 2.160154}, {79, 2.1144907}, {15092, 2.1051211}]    |
|65     |[{10132, 2.6041791}, {11762, 2.5328312}, {18108, 2.5095086}]|
|76     |[{7602, 2.474003}, {16902, 2.448454}, {20782, 2.4259713}]   |
|78     |[{14943, 3.6943364}, {15247, 3.6115117}, {14017, 3.5994654}]|
+-------+------------------------------------------------------------+
only s

### Let's save the recommendation results into a MongoDB collection


In [0]:
recommended_movie_df.write.format("mongo").option("spark.mongodb.output.uri", connectionString).option("database",database).option("collection",collection_recommendations).mode("overwrite").save()