In [None]:
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

**Init**
- https://docs.google.com/document/d/1JRy6SiN6zs4Ib-n28Qs4jjEmblWQ6S3cc3tpWaj2_pE/edit?usp=sharing

**Reference** : 
- https://medium.com/ymedialabs-innovation/apache-spark-on-a-multi-node-cluster-b75967c8cb2b=0
- https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f
- https://github.com/IBM/elasticsearch-spark-recommender

**Python** :
- version : 3.6

**Spark** :
- version : 2.2

**Elasticsearch**
- version : 5.3

**Elasticsearch-hadoop**
- version : 5.3

# Creating a Scalable Recommender with Apache Spark & Elasticsearch

In this notebook, you will create a recommendation engine using Spark and Elasticsearch. Using some movie rating data,
you will train a collaborative filtering model in Spark and export the trained model to Elasticsearch. Once exported, 
you can test your recommendations by querying Elasticsearch and displaying the results.

### _Prerequisites_

The notebook assumes you have installed Elasticsearch, the Elasticsearch vector-scoring plugin, Apache Spark and the Elasticsearch Spark connector detailed in the [setup steps](https://github.com/MLnick/elasticsearch-spark-recommender-demo/tree/master#steps).

> _Optional:_

> In order to display the images in the recommendation demo, you will need to access [The Movie Database (TMdb) API](https://www.themoviedb.org/documentation/api). Please follow the [instructions](https://developers.themoviedb.org/3/getting-started) to get an API key.

## Overview

You will work through the following steps

1. Pyspark init
2. Prepare the data
3. Transform Pyspark df to Pandas df
4. Content based Recommendation
5. Convert Pandas df to Pyspark df
6. Load Data into Elasticsearch
7. Train a recommmender model on the ratings data
8. Export ALS user and item factor vectors to Elasticsearch and train a collaborative filtering recommendation model using Spark MLlib
9. Save the model to Elasticsearch

## Step 1: Pyspark Init

In [1]:
import findspark

In [2]:
# to make pyspark importable as a regular library
findspark.init()

ValueError: Couldn't find Spark, make sure SPARK_HOME env is set or Spark is in an expected location (e.g. from homebrew installation).

In [3]:
findspark.add_jars('/home/gene/Desktop/Python/elasticsearch-spark-recommender/elasticsearch-hadoop-5.3.0/dist/elasticsearch-spark-20_2.11-5.3.0.jar')

In [4]:
import pyspark

In [5]:
sc = pyspark.SparkContext(master="local[*]", appName= "PySpark")

In [6]:
sc

In [7]:
from pyspark.sql import SparkSession

In [8]:
spark = SparkSession(sc)

In [9]:
spark

## Step 2: Prepare the data

* This notebook uses the "small" version of the latest MovieLens movie rating dataset, containing about 100,000 ratings, 9,000 movies and 700 users
* The latest version of the data can be downloaded at https://grouplens.org/datasets/movielens/latest/
* Download the `ml-latest-small.zip` file and unzip it to a suitable location on your system.

The folder should contain a number of CSV files. We will be using the following files:
* `ratings.csv` - movie rating data
* `links.csv` - external database ids for each movie
* `movies.csv` - movie title and genres

In [None]:
# first import a few utility methods that we'll use later on
from IPython.display import Image, HTML, display

In [10]:
# if you unzipped the data to a different location than that specified in the Journey setup steps
# you can change the path below to point to the correct location
PATH_TO_DATA = "../data/ml-latest-small"

### Load rating and movie data

**Ratings**

The ratings data consists of around 100,000 ratings given by users to movies. Each row of the `DataFrame` consists of a `userId`, `movieId` and `timestamp` for the event, together with the `rating` given by the user to the movie

In [11]:
# load ratings data
ratings = spark.read.csv(PATH_TO_DATA + "/ratings.csv", header=True, inferSchema=True)
ratings.cache()
print("Number of ratings: %i" % ratings.count())
print("Sample of ratings:")
ratings.show(5)

Number of ratings: 100836
Sample of ratings:
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [12]:
ratings = ratings.select(
    ratings.userId, ratings.movieId, ratings.rating, (ratings.timestamp.cast("long") * 1000).alias("timestamp"))
ratings.show(5)

+------+-------+------+------------+
|userId|movieId|rating|   timestamp|
+------+-------+------+------------+
|     1|      1|   4.0|964982703000|
|     1|      3|   4.0|964981247000|
|     1|      6|   4.0|964982224000|
|     1|     47|   5.0|964983815000|
|     1|     50|   5.0|964982931000|
+------+-------+------+------------+
only showing top 5 rows



**Movies**

The file `movies.csv` contains the `movieId`, `title` and `genres` for each movie. As you can see, the `genres` field is a bit tricky to use, as the genres are in the form of one string delimited by the `|` character: `Adventure|Animation|Children|Comedy|Fantasy`.

In [13]:
# load raw data from CSV
raw_movies = spark.read.csv(PATH_TO_DATA + "/movies.csv", header=True, inferSchema=True)
print("Raw movie data:")
raw_movies.select("movieId", "title", "genres").show(5, truncate=False)

Raw movie data:
+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows



Create a `DataFrame` user-defined function (UDF) to extract this delimited string into a list of genres.

In [14]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

In [15]:
extract_genres = udf(lambda x: x.lower().split("|"), ArrayType(StringType()))

In [16]:
import re
# define a UDF to extract the release year from the title, and return the new title and year in a struct type
def extract_year_fn(title):
    result = re.search("\(\d{4}\)", title)
    try:
        if result:
            group = result.group()
            year = group[1:-1]
            start_pos = result.start()
            title = title[:start_pos-1]
            return (title, year)
        else:
            return (title, 1970)
    except:
        print(title)

extract_year = udf(extract_year_fn, StructType([StructField("title", StringType(), True), StructField("release_date", StringType(), True)]))
    
# test out our function
s = "Jumanji (1995)"
extract_year_fn(s)

('Jumanji', '1995')

In [229]:
movies = raw_movies.select(
    "movieId", extract_year("title").title.alias("title"),\
    extract_year("title").release_date.alias("release_date"),\
    extract_genres("genres").alias("genres"))
print("Cleaned movie data:")
movies.show(5, truncate=False)

Cleaned movie data:
+-------+---------------------------+------------+-------------------------------------------------+
|movieId|title                      |release_date|genres                                           |
+-------+---------------------------+------------+-------------------------------------------------+
|1      |Toy Story                  |1995        |[adventure, animation, children, comedy, fantasy]|
|2      |Jumanji                    |1995        |[adventure, children, fantasy]                   |
|3      |Grumpier Old Men           |1995        |[comedy, romance]                                |
|4      |Waiting to Exhale          |1995        |[comedy, drama, romance]                         |
|5      |Father of the Bride Part II|1995        |[comedy]                                         |
+-------+---------------------------+------------+-------------------------------------------------+
only showing top 5 rows



Next, join the `links.csv` data to `movies` so that there is an id for _The Movie Database_ corresponding to each movie. You can use this id to retrieve movie poster images when displaying your recommendations later.

### Movie Data Extend
**links**

In [18]:
link_data = spark.read.csv(PATH_TO_DATA + "/links.csv", header=True, inferSchema=True)
# join movies with links to get TMDB id
movie_data = movies.join(link_data, movies.movieId == link_data.movieId, how='left')\
    .select(movies.movieId, movies.title, movies.release_date, movies.genres, link_data.tmdbId)
num_movies = movie_data.count()
print("Cleaned movie data with tmdbId links:")
movie_data.show(5, truncate=False)

Cleaned movie data with tmdbId links:
+-------+---------------------------+------------+-------------------------------------------+------+
|movieId|title                      |release_date|genres                                     |tmdbId|
+-------+---------------------------+------------+-------------------------------------------+------+
|1      |Toy Story                  |1995        |Adventure|Animation|Children|Comedy|Fantasy|862   |
|2      |Jumanji                    |1995        |Adventure|Children|Fantasy                 |8844  |
|3      |Grumpier Old Men           |1995        |Comedy|Romance                             |15602 |
|4      |Waiting to Exhale          |1995        |Comedy|Drama|Romance                       |31357 |
|5      |Father of the Bride Part II|1995        |Comedy                                     |11862 |
+-------+---------------------------+------------+-------------------------------------------+------+
only showing top 5 rows



In [19]:
movie_data.dtypes

[('movieId', 'int'),
 ('title', 'string'),
 ('release_date', 'string'),
 ('genres', 'string'),
 ('tmdbId', 'int')]

**Meta Data**

In [20]:
# load raw meta from CSV
raw_meta = spark.read.csv(PATH_TO_DATA + "/meta.csv", header=True, inferSchema=True)
print("Raw meta data:")
raw_meta.select(raw_meta.tmdbId, raw_meta.poster_path).show(5, truncate=False)

Raw meta data:
+------+--------------------------------+
|tmdbId|poster_path                     |
+------+--------------------------------+
|862   |/rhIRbceoE9lR4veEXuwCC2wARtG.jpg|
|8844  |/vzmL6fP7aPKNKPRTFnZmiUfciyV.jpg|
|15602 |/6ksm1sjKMFLbO7UY2i6G1ju9SML.jpg|
|31357 |/16XOMpEaLWkrcPqSQqhTmeJuqQl.jpg|
|11862 |/e64sOI48hQXyru7naBFyssKFxVd.jpg|
+------+--------------------------------+
only showing top 5 rows



In [21]:
raw_meta.dtypes

[('tmdbId', 'string'),
 ('poster_path', 'string'),
 ('runtime', 'int'),
 ('budget', 'string'),
 ('popularity', 'string'),
 ('revenue', 'bigint'),
 ('vote_average', 'double'),
 ('vote_count', 'int')]

In [22]:
meta_data = raw_meta.select(raw_meta.tmdbId.cast('int'), raw_meta.poster_path, raw_meta.runtime.cast('int'), raw_meta.budget.cast('int'), raw_meta.popularity.cast('int'), raw_meta.revenue.cast('int'), raw_meta.vote_average.cast('float'), raw_meta.vote_count.cast('int'))

In [23]:
meta_data.show(20)

+------+--------------------+-------+--------+----------+---------+------------+----------+
|tmdbId|         poster_path|runtime|  budget|popularity|  revenue|vote_average|vote_count|
+------+--------------------+-------+--------+----------+---------+------------+----------+
|   862|/rhIRbceoE9lR4veE...|     81|30000000|        21|373554033|         7.7|      5415|
|  8844|/vzmL6fP7aPKNKPRT...|    104|65000000|        17|262797249|         6.9|      2413|
| 15602|/6ksm1sjKMFLbO7UY...|    101|       0|        11|        0|         6.5|        92|
| 31357|/16XOMpEaLWkrcPqS...|    127|16000000|         3| 81452156|         6.1|        34|
| 11862|/e64sOI48hQXyru7n...|    106|       0|         8| 76578911|         5.7|       173|
|   949|/zMyfPUelumio3tiD...|    170|60000000|        17|187436818|         7.7|      1886|
| 11860|/jQh15y5YB7bWz1Nt...|    127|58000000|         6|        0|         6.2|       141|
| 45325|/sGO5Qa55p7wTu7FJ...|     97|       0|         2|        0|         5.4|

In [24]:
meta_data.dtypes

[('tmdbId', 'int'),
 ('poster_path', 'string'),
 ('runtime', 'int'),
 ('budget', 'int'),
 ('popularity', 'int'),
 ('revenue', 'int'),
 ('vote_average', 'float'),
 ('vote_count', 'int')]

### Movie Data + Meta Data

In [25]:
movie_cb = movie_data.join(meta_data, movie_data.tmdbId == meta_data.tmdbId, how='left')\
    .select(movie_data.movieId, movie_data.tmdbId, movie_data.title, movie_data.release_date, movie_data.genres, meta_data.poster_path, meta_data.runtime, meta_data.budget, meta_data.popularity, meta_data.revenue, meta_data.vote_average, meta_data.vote_count)

In [28]:
movie_cb.show(10)

+-------+------+--------------------+------------+--------------------+--------------------+-------+--------+----------+---------+------------+----------+
|movieId|tmdbId|               title|release_date|              genres|         poster_path|runtime|  budget|popularity|  revenue|vote_average|vote_count|
+-------+------+--------------------+------------+--------------------+--------------------+-------+--------+----------+---------+------------+----------+
|      1|   862|           Toy Story|        1995|Adventure|Animati...|/rhIRbceoE9lR4veE...|     81|30000000|        21|373554033|         7.7|      5415|
|      2|  8844|             Jumanji|        1995|Adventure|Childre...|/vzmL6fP7aPKNKPRT...|    104|65000000|        17|262797249|         6.9|      2413|
|      3| 15602|    Grumpier Old Men|        1995|      Comedy|Romance|/6ksm1sjKMFLbO7UY...|    101|       0|        11|        0|         6.5|        92|
|      4| 31357|   Waiting to Exhale|        1995|Comedy|Drama|Romance

In [29]:
movie_cb.dtypes

[('movieId', 'int'),
 ('tmdbId', 'int'),
 ('title', 'string'),
 ('release_date', 'string'),
 ('genres', 'string'),
 ('poster_path', 'string'),
 ('runtime', 'int'),
 ('budget', 'int'),
 ('popularity', 'int'),
 ('revenue', 'int'),
 ('vote_average', 'float'),
 ('vote_count', 'int')]

##  Step 3: Spark Dataframe to Pandas Dataframe

In [187]:
df_cb = movie_cb.toPandas()

In [188]:
df_cb.head(2)

Unnamed: 0,movieId,tmdbId,title,release_date,genres,poster_path,runtime,budget,popularity,revenue,vote_average,vote_count
0,1,862.0,Toy Story,1995,Adventure|Animation|Children|Comedy|Fantasy,/rhIRbceoE9lR4veEXuwCC2wARtG.jpg,81.0,30000000.0,21.0,373554033.0,7.7,5415.0
1,2,8844.0,Jumanji,1995,Adventure|Children|Fantasy,/vzmL6fP7aPKNKPRTFnZmiUfciyV.jpg,104.0,65000000.0,17.0,262797249.0,6.9,2413.0


In [189]:
df_cb[df_cb['tmdbId'].isna()]

Unnamed: 0,movieId,tmdbId,title,release_date,genres,poster_path,runtime,budget,popularity,revenue,vote_average,vote_count
624,791,,"Last Klezmer: Leopold Kozlowski, His Life and ...",1994,Documentary,,,,,,,
844,1107,,Loser,1991,Comedy,,,,,,,
2143,2851,,Saturn 3,1980,Adventure|Sci-Fi|Thriller,,,,,,,
3029,4051,,Horrors of Spider Island (Ein Toter Hing im Netz),1960,Horror|Sci-Fi,,,,,,,
5541,26587,,"Decalogue, The (Dekalog)",1989,Crime|Drama|Romance,,,,,,,
5863,32600,,Eros,2004,Drama,,,,,,,
6068,40697,,Babylon 5,1970,Sci-Fi,,,,,,,
7392,79299,,"No. 1 Ladies' Detective Agency, The",2008,Comedy|Crime|Mystery,,,,,,,


In [190]:
df_cb.shape

(9753, 12)

**Fillna**

In [192]:
df_cb.loc[df_cb['tmdbId'].isna(), 'tmdbId'] = 0

In [193]:
df_cb.loc[df_cb['poster_path'].isna(), 'poster_path'] = "NA"

In [194]:
df_cb.loc[df_cb['runtime'].isna(), 'runtime'] = 0

In [195]:
df_cb.loc[df_cb['budget'].isna(), 'budget'] = 0
df_cb.loc[df_cb['popularity'].isna(), 'popularity'] = 0
df_cb.loc[df_cb['revenue'].isna(), 'revenue'] = 0
df_cb.loc[df_cb['vote_average'].isna(), 'vote_average'] = 0
df_cb.loc[df_cb['vote_count'].isna(), 'vote_count'] = 0

In [196]:
df_cb.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9753 entries, 0 to 9752
Data columns (total 12 columns):
movieId         9753 non-null int64
tmdbId          9753 non-null float64
title           9753 non-null object
release_date    9753 non-null object
genres          9753 non-null object
poster_path     9753 non-null object
runtime         9753 non-null float64
budget          9753 non-null float64
popularity      9753 non-null float64
revenue         9753 non-null float64
vote_average    9753 non-null float64
vote_count      9753 non-null float64
dtypes: float64(7), int64(1), object(4)
memory usage: 914.4+ KB


**Transform dtype**

In [197]:
df_cb['tmdbId'] = df_cb['tmdbId'].astype(int)
df_cb['runtime'] = df_cb['runtime'].astype(int)
df_cb['budget'] = df_cb['budget'].astype(int)
df_cb['popularity'] = df_cb['popularity'].astype(int)
df_cb['revenue'] = df_cb['revenue'].astype(int)
df_cb['vote_count'] = df_cb['vote_count'].astype(int)

In [198]:
df_cb.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9753 entries, 0 to 9752
Data columns (total 12 columns):
movieId         9753 non-null int64
tmdbId          9753 non-null int64
title           9753 non-null object
release_date    9753 non-null object
genres          9753 non-null object
poster_path     9753 non-null object
runtime         9753 non-null int64
budget          9753 non-null int64
popularity      9753 non-null int64
revenue         9753 non-null int64
vote_average    9753 non-null float64
vote_count      9753 non-null int64
dtypes: float64(1), int64(7), object(4)
memory usage: 914.4+ KB


In [199]:
df_cb.head(5)

Unnamed: 0,movieId,tmdbId,title,release_date,genres,poster_path,runtime,budget,popularity,revenue,vote_average,vote_count
0,1,862,Toy Story,1995,Adventure|Animation|Children|Comedy|Fantasy,/rhIRbceoE9lR4veEXuwCC2wARtG.jpg,81,30000000,21,373554033,7.7,5415
1,2,8844,Jumanji,1995,Adventure|Children|Fantasy,/vzmL6fP7aPKNKPRTFnZmiUfciyV.jpg,104,65000000,17,262797249,6.9,2413
2,3,15602,Grumpier Old Men,1995,Comedy|Romance,/6ksm1sjKMFLbO7UY2i6G1ju9SML.jpg,101,0,11,0,6.5,92
3,4,31357,Waiting to Exhale,1995,Comedy|Drama|Romance,/16XOMpEaLWkrcPqSQqhTmeJuqQl.jpg,127,16000000,3,81452156,6.1,34
4,5,11862,Father of the Bride Part II,1995,Comedy,/e64sOI48hQXyru7naBFyssKFxVd.jpg,106,0,8,76578911,5.7,173


In [200]:
df_cb.shape

(9753, 12)

In [201]:
df_cb['tmdbId'].dtype

dtype('int64')

## Step 4: Content Based Recommendation

In [68]:
import pandas as pd
import numpy as np
from ast import literal_eval
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.metrics.pairwise import linear_kernel, cosine_similarity

In [232]:
md = pd.read_csv(PATH_TO_DATA + '/movies_metadata.csv')
md.head(2)

  interactivity=interactivity, compiler=compiler, result=result)


Unnamed: 0,adult,belongs_to_collection,budget,genres,homepage,id,imdb_id,original_language,original_title,overview,...,release_date,revenue,runtime,spoken_languages,status,tagline,title,video,vote_average,vote_count
0,False,"{'id': 10194, 'name': 'Toy Story Collection', ...",30000000,"[{'id': 16, 'name': 'Animation'}, {'id': 35, '...",http://toystory.disney.com/toy-story,862,tt0114709,en,Toy Story,"Led by Woody, Andy's toys live happily in his ...",...,1995-10-30,373554033.0,81.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,,Toy Story,False,7.7,5415.0
1,False,,65000000,"[{'id': 12, 'name': 'Adventure'}, {'id': 14, '...",,8844,tt0113497,en,Jumanji,When siblings Judy and Peter discover an encha...,...,1995-12-15,262797249.0,104.0,"[{'iso_639_1': 'en', 'name': 'English'}, {'iso...",Released,Roll the dice and unleash the excitement!,Jumanji,False,6.9,2413.0


In [233]:
md['genres'] = md['genres'].fillna('[]').apply(literal_eval).apply(lambda x: [i['name'] for i in x] if isinstance(x, list) else [])

In [234]:
md.head(2)

Unnamed: 0,adult,belongs_to_collection,budget,genres,homepage,id,imdb_id,original_language,original_title,overview,...,release_date,revenue,runtime,spoken_languages,status,tagline,title,video,vote_average,vote_count
0,False,"{'id': 10194, 'name': 'Toy Story Collection', ...",30000000,"[Animation, Comedy, Family]",http://toystory.disney.com/toy-story,862,tt0114709,en,Toy Story,"Led by Woody, Andy's toys live happily in his ...",...,1995-10-30,373554033.0,81.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,,Toy Story,False,7.7,5415.0
1,False,,65000000,"[Adventure, Fantasy, Family]",,8844,tt0113497,en,Jumanji,When siblings Judy and Peter discover an encha...,...,1995-12-15,262797249.0,104.0,"[{'iso_639_1': 'en', 'name': 'English'}, {'iso...",Released,Roll the dice and unleash the excitement!,Jumanji,False,6.9,2413.0


In [235]:
#Check EDA Notebook for how and why I got these indices.

"""
those data are no id

"""

md = md.drop([19730, 29503, 35587])

**Metadata Based Recommender**

In [236]:
credits = pd.read_csv(PATH_TO_DATA + '/credits_small.csv')
keywords = pd.read_csv(PATH_TO_DATA + '/keywords_small.csv')

In [237]:
keywords['id'] = keywords['id'].astype('int')
credits['id'] = credits['id'].astype('int')
md['id'] = md['id'].astype('int')

In [238]:
md.head(2)

Unnamed: 0,adult,belongs_to_collection,budget,genres,homepage,id,imdb_id,original_language,original_title,overview,...,release_date,revenue,runtime,spoken_languages,status,tagline,title,video,vote_average,vote_count
0,False,"{'id': 10194, 'name': 'Toy Story Collection', ...",30000000,"[Animation, Comedy, Family]",http://toystory.disney.com/toy-story,862,tt0114709,en,Toy Story,"Led by Woody, Andy's toys live happily in his ...",...,1995-10-30,373554033.0,81.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,,Toy Story,False,7.7,5415.0
1,False,,65000000,"[Adventure, Fantasy, Family]",,8844,tt0113497,en,Jumanji,When siblings Judy and Peter discover an encha...,...,1995-12-15,262797249.0,104.0,"[{'iso_639_1': 'en', 'name': 'English'}, {'iso...",Released,Roll the dice and unleash the excitement!,Jumanji,False,6.9,2413.0


In [239]:
keywords.head(2)

Unnamed: 0,id,keywords
0,862,"['jealousy', 'toy', 'boy', 'friendship', 'frie..."
1,8844,"['board game', 'disappearance', ""based on chil..."


In [240]:
credits.head(2)

Unnamed: 0,id,director,cast
0,862,John Lasseter,"['Tom Hanks', 'Tim Allen', 'Don Rickles']"
1,8844,Joe Johnston,"['Robin Williams', 'Jonathan Hyde', 'Kirsten D..."


In [241]:
md = md.merge(credits, on='id')
md = md.merge(keywords, on='id')

In [242]:
md.head(2)

Unnamed: 0,adult,belongs_to_collection,budget,genres,homepage,id,imdb_id,original_language,original_title,overview,...,spoken_languages,status,tagline,title,video,vote_average,vote_count,director,cast,keywords
0,False,"{'id': 10194, 'name': 'Toy Story Collection', ...",30000000,"[Animation, Comedy, Family]",http://toystory.disney.com/toy-story,862,tt0114709,en,Toy Story,"Led by Woody, Andy's toys live happily in his ...",...,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,,Toy Story,False,7.7,5415.0,John Lasseter,"['Tom Hanks', 'Tim Allen', 'Don Rickles']","['jealousy', 'toy', 'boy', 'friendship', 'frie..."
1,False,,65000000,"[Adventure, Fantasy, Family]",,8844,tt0113497,en,Jumanji,When siblings Judy and Peter discover an encha...,...,"[{'iso_639_1': 'en', 'name': 'English'}, {'iso...",Released,Roll the dice and unleash the excitement!,Jumanji,False,6.9,2413.0,Joe Johnston,"['Robin Williams', 'Jonathan Hyde', 'Kirsten D...","['board game', 'disappearance', ""based on chil..."


In [243]:
md.shape

(46628, 27)

In [244]:
md = md.rename(columns={"id":"tmdbId"})

In [245]:
md.head(2)

Unnamed: 0,adult,belongs_to_collection,budget,genres,homepage,tmdbId,imdb_id,original_language,original_title,overview,...,spoken_languages,status,tagline,title,video,vote_average,vote_count,director,cast,keywords
0,False,"{'id': 10194, 'name': 'Toy Story Collection', ...",30000000,"[Animation, Comedy, Family]",http://toystory.disney.com/toy-story,862,tt0114709,en,Toy Story,"Led by Woody, Andy's toys live happily in his ...",...,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,,Toy Story,False,7.7,5415.0,John Lasseter,"['Tom Hanks', 'Tim Allen', 'Don Rickles']","['jealousy', 'toy', 'boy', 'friendship', 'frie..."
1,False,,65000000,"[Adventure, Fantasy, Family]",,8844,tt0113497,en,Jumanji,When siblings Judy and Peter discover an encha...,...,"[{'iso_639_1': 'en', 'name': 'English'}, {'iso...",Released,Roll the dice and unleash the excitement!,Jumanji,False,6.9,2413.0,Joe Johnston,"['Robin Williams', 'Jonathan Hyde', 'Kirsten D...","['board game', 'disappearance', ""based on chil..."


In [246]:
md['tmdbId'].dtypes

dtype('int64')

**Using small meta data to train model**

In [86]:
links = pd.read_csv(PATH_TO_DATA + "/links.csv")
links = links[links['tmdbId'].notnull()]['tmdbId'].astype('int')

In [87]:
links.head(5)

0      862
1     8844
2    15602
3    31357
4    11862
Name: tmdbId, dtype: int64

In [88]:
links.shape

(9734,)

In [258]:
smd = md[md['tmdbId'].isin(links)]
smd.shape

(9657, 27)

In [259]:
smd.head(2)

Unnamed: 0,adult,belongs_to_collection,budget,genres,homepage,tmdbId,imdb_id,original_language,original_title,overview,...,spoken_languages,status,tagline,title,video,vote_average,vote_count,director,cast,keywords
0,False,"{'id': 10194, 'name': 'Toy Story Collection', ...",30000000,"[Animation, Comedy, Family]",http://toystory.disney.com/toy-story,862,tt0114709,en,Toy Story,"Led by Woody, Andy's toys live happily in his ...",...,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,,Toy Story,False,7.7,5415.0,John Lasseter,"['Tom Hanks', 'Tim Allen', 'Don Rickles']","['jealousy', 'toy', 'boy', 'friendship', 'frie..."
1,False,,65000000,"[Adventure, Fantasy, Family]",,8844,tt0113497,en,Jumanji,When siblings Judy and Peter discover an encha...,...,"[{'iso_639_1': 'en', 'name': 'English'}, {'iso...",Released,Roll the dice and unleash the excitement!,Jumanji,False,6.9,2413.0,Joe Johnston,"['Robin Williams', 'Jonathan Hyde', 'Kirsten D...","['board game', 'disappearance', ""based on chil..."


In [270]:
# convert string to list

smd['cast'] = smd['cast'].apply(literal_eval)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


In [271]:
smd['cast'][0]

['Tom Hanks', 'Tim Allen', 'Don Rickles']

In [273]:
smd['cast'] = smd['cast'].apply(lambda x: [str.lower(i.replace(" ", "")) for i in x])

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


In [274]:
smd['cast'][0]

['tomhanks', 'timallen', 'donrickles']

In [275]:
# Mention Director 3 times to give it more weight relative to the entire cast.

smd['director'] = smd['director'].astype('str').apply(lambda x: str.lower(x.replace(" ", "")))
smd['director'] = smd['director'].apply(lambda x: [x,x, x])

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  This is separate from the ipykernel package so we can avoid doing imports until
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  after removing the cwd from sys.path.


In [276]:
smd['director'][0]

['johnlasseter', 'johnlasseter', 'johnlasseter']

**Extract Keywords**

In [None]:
# convert sting to list

smd['keywords'] = smd['keywords'].apply(literal_eval)

In [277]:
smd['keywords'].head(2)

0    [jealousy, toy, boy, friendship, friends, riva...
1    [board game, disappearance, based on children'...
Name: keywords, dtype: object

In [278]:
s = smd.apply(lambda x: pd.Series(x['keywords']) ,axis=1).stack().reset_index(level=1, drop=True)
s.name = 'keyword'

In [279]:
s.head()

0      jealousy
0           toy
0           boy
0    friendship
0       friends
Name: keyword, dtype: object

In [280]:
s = s.value_counts()
s[:5]

independent film        613
woman director          573
murder                  398
based on novel          336
duringcreditsstinger    330
Name: keyword, dtype: int64

In [281]:
s = s[s > 1]

In [282]:
def filter_keywords(x):
    words = []
    for i in x:
        if i in s:
            words.append(i)
    return words

In [283]:
smd['keywords'] = smd['keywords'].apply(filter_keywords)
smd['keywords'] = smd['keywords'].apply(lambda x: [str.lower(i.replace(" ", "")) for i in x])

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


In [284]:
smd['keywords'].head()

0    [jealousy, toy, boy, friendship, friends, riva...
1    [boardgame, disappearance, basedonchildren'sbo...
2          [fishing, bestfriend, duringcreditsstinger]
3    [basedonnovel, interracialrelationship, single...
4    [baby, midlifecrisis, confidence, aging, daugh...
Name: keywords, dtype: object

In [285]:
smd['soup'] = smd['keywords'] + smd['cast'] + smd['director'] + smd['genres']

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


In [286]:
smd['soup'].head() # list

0    [jealousy, toy, boy, friendship, friends, riva...
1    [boardgame, disappearance, basedonchildren'sbo...
2    [fishing, bestfriend, duringcreditsstinger, wa...
3    [basedonnovel, interracialrelationship, single...
4    [baby, midlifecrisis, confidence, aging, daugh...
Name: soup, dtype: object

In [287]:
smd['soup'] = smd['soup'].apply(lambda x: ' '.join(x))

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


In [288]:
smd['soup'].head() # string

0    jealousy toy boy friendship friends rivalry bo...
1    boardgame disappearance basedonchildren'sbook ...
2    fishing bestfriend duringcreditsstinger walter...
3    basedonnovel interracialrelationship singlemot...
4    baby midlifecrisis confidence aging daughter m...
Name: soup, dtype: object

In [289]:
smd['soup'][0]

'jealousy toy boy friendship friends rivalry boynextdoor newtoy toycomestolife tomhanks timallen donrickles johnlasseter johnlasseter johnlasseter Animation Comedy Family'

**Modeling**

In [290]:
count = CountVectorizer(analyzer='word',ngram_range=(1, 2),min_df=0, stop_words='english')
count_matrix = count.fit_transform(smd['soup'])

In [291]:
count_matrix.shape

(9657, 112362)

In [292]:
cosine_sim = cosine_similarity(count_matrix, count_matrix)

In [293]:
cosine_sim[0]

array([1.        , 0.02441931, 0.02738955, ..., 0.02934836, 0.03698634,
       0.03049971])

In [294]:
smd.head(2)

Unnamed: 0,adult,belongs_to_collection,budget,genres,homepage,tmdbId,imdb_id,original_language,original_title,overview,...,status,tagline,title,video,vote_average,vote_count,director,cast,keywords,soup
0,False,"{'id': 10194, 'name': 'Toy Story Collection', ...",30000000,"[Animation, Comedy, Family]",http://toystory.disney.com/toy-story,862,tt0114709,en,Toy Story,"Led by Woody, Andy's toys live happily in his ...",...,Released,,Toy Story,False,7.7,5415.0,"[johnlasseter, johnlasseter, johnlasseter]","[tomhanks, timallen, donrickles]","[jealousy, toy, boy, friendship, friends, riva...",jealousy toy boy friendship friends rivalry bo...
1,False,,65000000,"[Adventure, Fantasy, Family]",,8844,tt0113497,en,Jumanji,When siblings Judy and Peter discover an encha...,...,Released,Roll the dice and unleash the excitement!,Jumanji,False,6.9,2413.0,"[joejohnston, joejohnston, joejohnston]","[robinwilliams, jonathanhyde, kirstendunst]","[boardgame, disappearance, basedonchildren'sbo...",boardgame disappearance basedonchildren'sbook ...


In [295]:
smd = smd.reset_index()

In [296]:
smd.head(2)

Unnamed: 0,index,adult,belongs_to_collection,budget,genres,homepage,tmdbId,imdb_id,original_language,original_title,...,status,tagline,title,video,vote_average,vote_count,director,cast,keywords,soup
0,0,False,"{'id': 10194, 'name': 'Toy Story Collection', ...",30000000,"[Animation, Comedy, Family]",http://toystory.disney.com/toy-story,862,tt0114709,en,Toy Story,...,Released,,Toy Story,False,7.7,5415.0,"[johnlasseter, johnlasseter, johnlasseter]","[tomhanks, timallen, donrickles]","[jealousy, toy, boy, friendship, friends, riva...",jealousy toy boy friendship friends rivalry bo...
1,1,False,,65000000,"[Adventure, Fantasy, Family]",,8844,tt0113497,en,Jumanji,...,Released,Roll the dice and unleash the excitement!,Jumanji,False,6.9,2413.0,"[joejohnston, joejohnston, joejohnston]","[robinwilliams, jonathanhyde, kirstendunst]","[boardgame, disappearance, basedonchildren'sbo...",boardgame disappearance basedonchildren'sbook ...


In [297]:
smd.shape

(9657, 29)

In [298]:
titles = smd['title']
tmdbId = smd['tmdbId']
indices = pd.Series(smd.index, index=smd['tmdbId'])

In [299]:
def get_recommendations(tmdbid):
    try:
        idx = indices[tmdbid] # tmdbId value
        sim_scores = list(enumerate(cosine_sim[idx])) # tmdbId的 value作為 index 取出該 tmdbId的 cosine_sim array
        sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True) # sim_scores[0] = (0, 1.0000000000000018), x[1] 取出 1.0000000000000018
        sim_scores = sim_scores[1:6]
        movie_indices = [i[0] for i in sim_scores]
        movie_scores = [i[1] for i in sim_scores]
        
        tmdb_list = list(tmdbId.iloc[movie_indices])
        title_list = list(titles.iloc[movie_indices])
        score_list = movie_scores
        
        csim_movies = []
        for a,b,c in zip(tmdb_list, title_list, score_list):
            csim_movies.append(str(a) + '|' +  b + '|' + str(round(c,3)))
        
        return csim_movies
    except:
        return []
        pass

In [300]:
get_recommendations(862) # tmdbId|movie title|sim_score

['13925|Luxo Jr.|0.555',
 '863|Toy Story 2|0.512',
 '49013|Cars 2|0.429',
 '920|Cars|0.391',
 "9487|A Bug's Life|0.384"]

**Get Recommendation for all movies**

In [202]:
df_cb.head(2)

Unnamed: 0,movieId,tmdbId,title,release_date,genres,poster_path,runtime,budget,popularity,revenue,vote_average,vote_count
0,1,862,Toy Story,1995,Adventure|Animation|Children|Comedy|Fantasy,/rhIRbceoE9lR4veEXuwCC2wARtG.jpg,81,30000000,21,373554033,7.7,5415
1,2,8844,Jumanji,1995,Adventure|Children|Fantasy,/vzmL6fP7aPKNKPRTFnZmiUfciyV.jpg,104,65000000,17,262797249,6.9,2413


In [203]:
df_final = df_cb.copy()

In [204]:
df_final.head(2)

Unnamed: 0,movieId,tmdbId,title,release_date,genres,poster_path,runtime,budget,popularity,revenue,vote_average,vote_count
0,1,862,Toy Story,1995,Adventure|Animation|Children|Comedy|Fantasy,/rhIRbceoE9lR4veEXuwCC2wARtG.jpg,81,30000000,21,373554033,7.7,5415
1,2,8844,Jumanji,1995,Adventure|Children|Fantasy,/vzmL6fP7aPKNKPRTFnZmiUfciyV.jpg,104,65000000,17,262797249,6.9,2413


In [205]:
df_final['csim_movies'] = df_final['tmdbId'].apply(get_recommendations)

In [206]:
df_final.head(3)

Unnamed: 0,movieId,tmdbId,title,release_date,genres,poster_path,runtime,budget,popularity,revenue,vote_average,vote_count,csim_movies
0,1,862,Toy Story,1995,Adventure|Animation|Children|Comedy|Fantasy,/rhIRbceoE9lR4veEXuwCC2wARtG.jpg,81,30000000,21,373554033,7.7,5415,"[13925|Luxo Jr.|0.555, 863|Toy Story 2|0.512, ..."
1,2,8844,Jumanji,1995,Adventure|Children|Fantasy,/vzmL6fP7aPKNKPRTFnZmiUfciyV.jpg,104,65000000,17,262797249,6.9,2413,"[15139|The Pagemaster|0.433, 9354|Honey, I Shr..."
2,3,15602,Grumpier Old Men,1995,Comedy|Romance,/6ksm1sjKMFLbO7UY2i6G1ju9SML.jpg,101,0,11,0,6.5,92,"[27472|The Odd Couple II|0.553, 41579|Getting ..."


## Step 5: Convert Pandas Dataframe to Spark Dataframe

In [149]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [209]:
spark_df = sqlContext.createDataFrame(df_final)

In [210]:
spark_df.show(2)

+-------+------+---------+------------+--------------------+--------------------+-------+--------+----------+---------+-----------------+----------+--------------------+
|movieId|tmdbId|    title|release_date|              genres|         poster_path|runtime|  budget|popularity|  revenue|     vote_average|vote_count|         csim_movies|
+-------+------+---------+------------+--------------------+--------------------+-------+--------+----------+---------+-----------------+----------+--------------------+
|      1|   862|Toy Story|        1995|Adventure|Animati...|/rhIRbceoE9lR4veE...|     81|30000000|        21|373554033|7.699999809265137|      5415|[13925|Luxo Jr.|0...|
|      2|  8844|  Jumanji|        1995|Adventure|Childre...|/vzmL6fP7aPKNKPRT...|    104|65000000|        17|262797249|6.900000095367432|      2413|[15139|The Pagema...|
+-------+------+---------+------------+--------------------+--------------------+-------+--------+----------+---------+-----------------+----------+--

In [211]:
spark_df.dtypes

[('movieId', 'bigint'),
 ('tmdbId', 'bigint'),
 ('title', 'string'),
 ('release_date', 'string'),
 ('genres', 'string'),
 ('poster_path', 'string'),
 ('runtime', 'bigint'),
 ('budget', 'bigint'),
 ('popularity', 'bigint'),
 ('revenue', 'bigint'),
 ('vote_average', 'double'),
 ('vote_count', 'bigint'),
 ('csim_movies', 'array<string>')]

**Convert data types**

In [212]:
spark_df = spark_df.select(
    spark_df.movieId.cast("int"), spark_df.tmdbId.cast("int"), "title",\
    "release_date", "genres", "poster_path", spark_df.runtime.cast("int"),\
    spark_df.budget.cast("int"), spark_df.popularity.cast("int"), spark_df.revenue.cast("int"), \
    spark_df.vote_average.cast("float"), spark_df.vote_count.cast("int"), "csim_movies"
)
print("Cleaned movie data:")
spark_df.show(5)

Cleaned movie data:
+-------+------+--------------------+------------+--------------------+--------------------+-------+--------+----------+---------+------------+----------+--------------------+
|movieId|tmdbId|               title|release_date|              genres|         poster_path|runtime|  budget|popularity|  revenue|vote_average|vote_count|         csim_movies|
+-------+------+--------------------+------------+--------------------+--------------------+-------+--------+----------+---------+------------+----------+--------------------+
|      1|   862|           Toy Story|        1995|Adventure|Animati...|/rhIRbceoE9lR4veE...|     81|30000000|        21|373554033|         7.7|      5415|[13925|Luxo Jr.|0...|
|      2|  8844|             Jumanji|        1995|Adventure|Childre...|/vzmL6fP7aPKNKPRT...|    104|65000000|        17|262797249|         6.9|      2413|[15139|The Pagema...|
|      3| 15602|    Grumpier Old Men|        1995|      Comedy|Romance|/6ksm1sjKMFLbO7UY...|    101|

In [213]:
spark_df.dtypes

[('movieId', 'int'),
 ('tmdbId', 'int'),
 ('title', 'string'),
 ('release_date', 'string'),
 ('genres', 'string'),
 ('poster_path', 'string'),
 ('runtime', 'int'),
 ('budget', 'int'),
 ('popularity', 'int'),
 ('revenue', 'int'),
 ('vote_average', 'float'),
 ('vote_count', 'int'),
 ('csim_movies', 'array<string>')]

In [214]:
spark_df.count()

9753

## Step 6: Load data into Elasticsearch

Now that you have your dataset processed and prepared, you will load it into Elasticsearch.

_Note:_ for the purposes of this demo notebook you have started with an existing example dataset and will load that into Elasticsearch. In practice you may write your event data as well as user and item metadata from your application directly into Elasticsearch.

First test that your Elasticsearch instance is running and you can connect to it using the Python Elasticsearch client.

In [164]:
from elasticsearch import Elasticsearch

# test your ES instance is running

# es = Elasticsearch([
#     {'host': 'localhost'},
#     {'host': 'othernode', 'port': 443, 'url_prefix': 'es', 'use_ssl': True},
# ])

es = Elasticsearch([{'host':'192.168.56.1', 'port':9200}])
es.info(pretty=True)

{'name': 'lsXsT1B',
 'cluster_name': 'elasticsearch',
 'cluster_uuid': 'zncNIjQFTAmcUMfJB7hyMg',
 'version': {'number': '5.3.0',
  'build_hash': '3adb13b',
  'build_date': '2017-03-23T03:31:50.652Z',
  'build_snapshot': False,
  'lucene_version': '6.4.1'},
 'tagline': 'You Know, for Search'}

### Create an Elasticsearch index with mappings for users, movies and rating events

In Elasticsearch, an "index" is roughly similar to a "database", while a "document type" is roughly similar to a "table" in that database. The schema for a document type is called an index mapping.

While Elasticsearch supports dynamic mapping, it's advisable to specify the mapping explicitly when creating an index if you know what your data looks like.

For the purposes of your recommendation engine, this is also necessary so that you can specify a custom analyzer for the field that will hold the recommendation "model" (that is, the factor vectors). This will ensure the vector-scoring plugin will work correctly.

> _Note_ This notebook does not go into detail about the underlying scoring mechanism or the relevant Elasticsearch internals. See the talks and slides in the [Journey Links section](https://github.com/MLnick/elasticsearch-spark-recommender-demo/blob/master/README.md#links) for more detail.

__References:__
* [Create index request](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html)
* [Delimited payload filter](https://www.elastic.co/guide/en/elasticsearch/reference/2.4/analysis-delimited-payload-tokenfilter.html)
* [Term vectors](https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-termvectors.html#_term_information)
* [Mapping](https://www.elastic.co/guide/en/elasticsearch/reference/2.4/mapping.html)

> **_Optional_**

> If you are re-running the notebook and have previously created the `demo` index in Elasticsearch, you should first delete it by un-commenting and running the next cell, before running the index creation cell that follows.

In [None]:
# Delete Elasticsearch index if exist

es.indices.delete(index="demo")

> **Create Elasticsearch index**

In [216]:
create_index = {
    "settings": {
        "analysis": {
            "analyzer": {
                # this configures the custom analyzer we need to parse vectors such that the scoring
                # plugin will work correctly
                "payload_analyzer": {
                    "type": "custom",
                    "tokenizer":"whitespace",
                    "filter":"delimited_payload_filter"
                }
            }
        }
    },
    "mappings": {
        "ratings": {
          # this mapping definition sets up the fields for the rating events
          "properties": {
                "timestamp": {
                    "type": "date"
                },
                "userId": {
                    "type": "integer"
                },
                "movieId": {
                    "type": "integer"
                },
                "rating": {
                    "type": "double"
                }
            }  
        },
        "users": {
            # this mapping definition sets up the metadata fields for the users
            "properties": {
                "userId": {
                    "type": "integer"
                },
                "@model": {
                    # this mapping definition sets up the fields for user factor vectors of our model
                    "properties": {
                        "factor": {
                            "type": "text",
                            "term_vector": "with_positions_offsets_payloads",
                            "analyzer" : "payload_analyzer"
                        },
                        "version": {
                            "type": "keyword"
                        },
                        "timestamp": {
                            "type": "date"
                        }
                    }
                }
            }
        },
        "movies": {
            # this mapping definition sets up the metadata fields for the movies
            "properties": {
                "movieId": {
                    "type": "integer"
                },
                "title": {
                    "type": "keyword"
                },
                "genres": {
                    "type": "keyword"
                },
                "poster_path": {
                    "type": "text"
                },
                "runtime": {
                    "type": "integer"
                },
                "budget": {
                    "type": "integer"
                },
                "popularity": {
                    "type": "integer"
                },
                "revenue": {
                    "type": "integer"
                },
                "vote_average": {
                    "type": "float"
                },
                "vote_count": {
                    "type": "integer"
                },
                "release_date": {
                    "type": "date",
                    "format": "year"
                },
                "tmdbId": {
                    "type": "keyword"
                },
                "csim_movies": {
                    "type": "text"
                },
                "@model": {
                    # this mapping definition sets up the fields for movie factor vectors of our model
                    "properties": {
                        "factor": {
                            "type": "text",
                            "term_vector": "with_positions_offsets_payloads",
                            "analyzer" : "payload_analyzer"
                        },
                        "version": {
                            "type": "keyword"
                        },
                        "timestamp": {
                            "type": "date"
                        }
                    }
                }
            }
        }
    }
}
# create index with the settings and mappings above
es.indices.create(index="demo", body=create_index)

{'acknowledged': True, 'shards_acknowledged': True}

### Load Ratings and Movies DataFrames into Elasticsearch

First you will write the ratings data to Elasticsearch. Notice that you can simply use the Spark Elasticsearch connector to write a `DataFrame` with the native Spark datasource API by specifying `format("es")`

In [217]:
# write ratings data
ratings.write.format("es").save("demo/ratings")

# check write went ok
print("Dataframe count: %d" % ratings.count())
print("ES index count:  %d" % es.count(index="demo", doc_type="ratings")['count'])

Dataframe count: 100836
ES index count:  100836


In [218]:
# test things out by retrieving a few rating event documents from Elasticsearch
es.search(index="demo", doc_type="ratings", q="*", size=3)

{'took': 3,
 'timed_out': False,
 '_shards': {'total': 5, 'successful': 5, 'failed': 0},
 'hits': {'total': 100836,
  'max_score': 1.0,
  'hits': [{'_index': 'demo',
    '_type': 'ratings',
    '_id': 'AWequLrWBqnZJ6HSmrb6',
    '_score': 1.0,
    '_source': {'userId': 1,
     'movieId': 163,
     'rating': 5.0,
     'timestamp': 964983650000}},
   {'_index': 'demo',
    '_type': 'ratings',
    '_id': 'AWequLrWBqnZJ6HSmrcD',
    '_score': 1.0,
    '_source': {'userId': 1,
     'movieId': 349,
     'rating': 4.0,
     'timestamp': 964982563000}},
   {'_index': 'demo',
    '_type': 'ratings',
    '_id': 'AWequLrWBqnZJ6HSmrcF',
    '_score': 1.0,
    '_source': {'userId': 1,
     'movieId': 362,
     'rating': 5.0,
     'timestamp': 964982588000}}]}}

**Write the movie metadata**

In [219]:
# write movie data, specifying the DataFrame column to use as the id mapping
spark_df.write.format("es").option("es.mapping.id", "movieId").save("demo/movies")

# check load went ok
print("Movie DF count: %d" % spark_df.count())
print("ES index count: %d" % es.count(index="demo", doc_type="movies")['count'])

Movie DF count: 9753
ES index count: 9742


Again you can harness the power of search to query the movie metadata:

In [220]:
# test things out by searching for movies containing "matrix" in the title
# es.search(index="demo", doc_type="movies", q="title:matrix", size=3)
es.search(index="demo", doc_type="movies", q="title:Catwalk", size=3)

{'took': 5,
 'timed_out': False,
 '_shards': {'total': 5, 'successful': 5, 'failed': 0},
 'hits': {'total': 1,
  'max_score': 7.153052,
  'hits': [{'_index': 'demo',
    '_type': 'movies',
    '_id': '108',
    '_score': 7.153052,
    '_source': {'movieId': 108,
     'tmdbId': 89333,
     'title': 'Catwalk',
     'release_date': '1996',
     'genres': 'Documentary',
     'poster_path': '/yhFcbTCnsWjg3nH3PLL6RoltjqS.jpg',
     'runtime': 95,
     'budget': 0,
     'popularity': 0,
     'revenue': 0,
     'vote_average': 7.0,
     'vote_count': 2,
     'csim_movies': ['295538|The Culture High|0.091',
      '22010|Grass|0.084',
      '18603|Hands on a Hard Body: The Documentary|0.081',
      '35199|Wordplay|0.081',
      '14358|Mad Hot Ballroom|0.078']}}]}}

## Step 7: Train a recommmender model on the ratings data

Your data is now stored in Elasticsearch and you will use the ratings data to build a collaborative filtering recommendation model.

[Collaborative filtering](https://en.wikipedia.org/wiki/Collaborative_filtering) is a recommendation approach that is effectively based on the "wisdom of the crowd". It makes the assumption that, if two people share similar preferences, then the things that one of them prefers could be good recommendations to make to the other. In other words, if user A tends to like certain movies, and user B shares some of these preferences with user A, then the movies that user A likes, that user B _has not yet seen_, may well be movies that user B will also like.

In a similar manner, we can think about _items_ as being similar if they tend to be rated highly by the same people, on average. 

Hence these models are based on the combined, collaborative preferences and behavior of all users in aggregate. They tend to be very effective in practice (provided you have enough preference data to train the model). The ratings data you have is a form of _explicit preference data_, perfect for training collaborative filtering models.

### Alternating Least Squares

Alternating Least Squares (ALS) is a specific algorithm for solving a type of collaborative filtering model known as [matrix factorization (MF)](https://en.wikipedia.org/wiki/Matrix_decomposition). The core idea of MF is to represent the ratings as a _user-item ratings matrix_. In the diagram below you will see this matrix on the left (with users as _rows_ and movies as _columns_). The entries in this matrix are the ratings given by users to movies.

You may also notice that the matrix has _missing entries_ because not all users have rated all movies. In this situation we refer to the data as _sparse_.

![als-diagram.png](../doc/source/images/als-diagram.png)

MF methods aim to find two much smaller matrices (one representing the _users_ and the other the _items_) that, when multiplied together, re-construct the original ratings matrix as closely as possible. This is know as _factorizing_ the original matrix, hence the name of the technique.

The two smaller matrices are called _factor matrices_ (or _latent features_). The user and movie factor matrices are illustrated on the right in the diagram above. The idea is that each user factor vector is a compressed representation of the user's preferences and behavior. Likewise, each item factor vector is a compressed representation of the item. Once the model is trained, the factor vectors can be used to make recommendations, which is what you will do in the following sections.

__Further reading:__

* [Spark MLlib Collaborative Filtering](http://spark.apache.org/docs/latest/ml-collaborative-filtering.html)
* [Alternating Least Squares and collaborative filtering](https://datasciencemadesimpler.wordpress.com/tag/alternating-least-squares/)
* [Quora question on Alternating Least Squares](https://www.quora.com/What-is-the-Alternating-Least-Squares-method-in-recommendation-systems-And-why-does-this-algorithm-work-intuition-behind-this)

Fortunately, Spark's MLlib machine learning library has a scalable, efficient implementation of matrix factorization built in, which we can use to train our recommendation model. Next, you will use Spark's ALS to train a model on your ratings data from Elasticsearch.

In [221]:
ratings_from_es = spark.read.format("es").load("demo/ratings")
ratings_from_es.show(5)

+-------+------+-------------------+------+
|movieId|rating|          timestamp|userId|
+-------+------+-------------------+------+
|  68237|   4.0|2015-09-13 22:20:17|   249|
|  68791|   3.0|2012-11-28 21:06:30|   249|
|  69069|   3.5|2015-05-01 20:26:53|   249|
|  69278|   3.5|2012-12-27 03:30:33|   249|
|  71106|   4.0|2012-11-25 07:43:52|   249|
+-------+------+-------------------+------+
only showing top 5 rows



**Modeling**

In [222]:
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", regParam=0.01, rank=20, seed=12)
model = als.fit(ratings_from_es)
model.userFactors.show(5)
model.itemFactors.show(5)

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[-1.2723869, -1.0...|
| 20|[-0.28433216, -0....|
| 30|[-0.23606953, 0.6...|
| 40|[0.31492335, -0.0...|
| 50|[0.006268716, 0.0...|
+---+--------------------+
only showing top 5 rows

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.07222334, 0.37...|
| 20|[-0.40369913, 0.5...|
| 30|[-0.65819603, -0....|
| 40|[-0.2619177, 0.49...|
| 50|[-0.46155798, 0.1...|
+---+--------------------+
only showing top 5 rows



## Step 8: Export ALS user and item factor vectors to Elasticsearch

Congratulations, you've trained a recommendation model! The next step is to export the model factors (shown in the `DataFrames` above) to Elasticsearch.

In order to store the model in the correct format for the index mappings set up earlier, you will need to create some utility functions. These functions will allow you to convert the raw vectors (which are equivalent to a Python list in the factor `DataFrames` above) to the correct _delimited string format_. This ensures Elasticsearch will parse the vector field in the model correctly using the delimited token filter custom analyzer you configured earlier.

You will also create a function to convert a vector and related metadata (such as the Spark model id and a timestamp) into a `DataFrame` field that matches the `model` field in the Elasticsearch index mapping.

### Utility functions for converting factor vectors

In [223]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf, lit, current_timestamp, unix_timestamp

def convert_vector(x):
    '''Convert a list or numpy array to delimited token filter format'''
    return " ".join(["%s|%s" % (i, v) for i, v in enumerate(x)])

def reverse_convert(s):
    '''Convert a delimited token filter format string back to list format'''
    return  [float(f.split("|")[1]) for f in s.split(" ")]

def vector_to_struct(x, version, ts):
    '''Convert a vector to a SparkSQL Struct with string-format vector and version fields'''
    return (convert_vector(x), version, ts)

vector_struct = udf(vector_to_struct, \
                    StructType([StructField("factor", StringType(), True), \
                                StructField("version", StringType(), True),\
                                StructField("timestamp", LongType(), True)]))

In [224]:
# test out the vector conversion function
test_vec = model.userFactors.select("features").first().features
print(test_vec)
print()
print(convert_vector(test_vec))

[-1.272386908531189, -1.040471076965332, 0.6506963968276978, -0.7358267307281494, 0.2215612679719925, -0.24009384214878082, -0.03175492584705353, -0.1631995439529419, 0.3178538382053375, -0.18343225121498108, -0.6073395609855652, 0.30508336424827576, 0.44866761565208435, -0.2556912899017334, 0.1966400444507599, -0.0496751107275486, -0.47266942262649536, -0.45564523339271545, -0.3641686737537384, -0.40033015608787537]

0|-1.272386908531189 1|-1.040471076965332 2|0.6506963968276978 3|-0.7358267307281494 4|0.2215612679719925 5|-0.24009384214878082 6|-0.03175492584705353 7|-0.1631995439529419 8|0.3178538382053375 9|-0.18343225121498108 10|-0.6073395609855652 11|0.30508336424827576 12|0.44866761565208435 13|-0.2556912899017334 14|0.1966400444507599 15|-0.0496751107275486 16|-0.47266942262649536 17|-0.45564523339271545 18|-0.3641686737537384 19|-0.40033015608787537


### Convert factor vectors to [factor, version, timestamp] form and write to Elasticsearch

In [225]:
ver = model.uid
ts = unix_timestamp(current_timestamp())
movie_vectors = model.itemFactors.select("id", vector_struct("features", lit(ver), ts).alias("@model"))
movie_vectors.select("id", "@model.factor", "@model.version", "@model.timestamp").show(5)

user_vectors = model.userFactors.select("id", vector_struct("features", lit(ver), ts).alias("@model"))
user_vectors.select("id", "@model.factor", "@model.version", "@model.timestamp").show(5)

+---+--------------------+--------------------+----------+
| id|              factor|             version| timestamp|
+---+--------------------+--------------------+----------+
| 10|0|0.0722233429551...|ALS_45528893eb6cb...|1544757569|
| 20|0|-0.403699129819...|ALS_45528893eb6cb...|1544757569|
| 30|0|-0.658196032047...|ALS_45528893eb6cb...|1544757569|
| 40|0|-0.261917710304...|ALS_45528893eb6cb...|1544757569|
| 50|0|-0.461557984352...|ALS_45528893eb6cb...|1544757569|
+---+--------------------+--------------------+----------+
only showing top 5 rows

+---+--------------------+--------------------+----------+
| id|              factor|             version| timestamp|
+---+--------------------+--------------------+----------+
| 10|0|-1.272386908531...|ALS_45528893eb6cb...|1544757569|
| 20|0|-0.284332156181...|ALS_45528893eb6cb...|1544757569|
| 30|0|-0.236069530248...|ALS_45528893eb6cb...|1544757569|
| 40|0|0.3149233460426...|ALS_45528893eb6cb...|1544757569|
| 50|0|0.0062687159515...|ALS_4

## Step 9 : Save the model to Elasticsearch

In [226]:
# write data to ES, use:
# - "id" as the column to map to ES movie id
# - "update" write mode for ES, since you want to update new fields only
# - "append" write mode for Spark
movie_vectors.write.format("es") \
    .option("es.mapping.id", "id") \
    .option("es.write.operation", "update") \
    .save("demo/movies", mode="append")

In [227]:
# write data to ES, use:
# - "id" as the column to map to ES movie id
# - "index" write mode for ES, since you have not written to the user index previously
# - "append" write mode for Spark
user_vectors.write.format("es") \
    .option("es.mapping.id", "id") \
    .option("es.write.operation", "index") \
    .save("demo/users", mode="append")

### Check the data  was written correctly

You can search for a movie to see if the model factor vector was written correctly. You should see a `'@model': {'factor': '0|...` field in the returned movie document.

In [228]:
# search for a particular sci-fi movie
es.search(index="demo", doc_type="movies", q="star wars phantom menace", size=1)['hits']['hits'][0]

{'_index': 'demo',
 '_type': 'movies',
 '_id': '2628',
 '_score': 12.797813,
 '_source': {'movieId': 2628,
  'tmdbId': 1893,
  'title': 'Star Wars: Episode I - The Phantom Menace',
  'release_date': '1999',
  'genres': 'Action|Adventure|Sci-Fi',
  'poster_path': '/n8V09dDc02KsSN6Q4hC2BX6hN8X.jpg',
  'runtime': 136,
  'budget': 115000000,
  'popularity': 15,
  'revenue': 924317558,
  'vote_average': 6.4,
  'vote_count': 4526,
  'csim_movies': ['1894|Star Wars: Episode II - Attack of the Clones|0.555',
   '1895|Star Wars: Episode III - Revenge of the Sith|0.512',
   '11|Star Wars|0.465',
   '636|THX 1138|0.356',
   '838|American Graffiti|0.283'],
  '@model': {'factor': '0|0.5367742776870728 1|1.315629482269287 2|0.908174991607666 3|-0.17290841042995453 4|0.6622549891471863 5|-0.1442941427230835 6|-0.43587976694107056 7|-0.5659969449043274 8|0.7238355278968811 9|-0.9211186170578003 10|1.0284417867660522 11|0.6716753840446472 12|0.9536047577857971 13|0.07491487264633179 14|-1.7383807897567