# Create enriched movie dataset from The Movie Database API

* MovieLens latest data can be downloaded at http://grouplens.org/datasets/movielens/
* This demo uses the `ml-latest-small` dataset of 100k ratings, 9k movies and 700 users
* Data enrichment requires access to [The Movie Database API](https://www.themoviedb.org/documentation/api)

**Note** set up index mappings _before_ loading data

_Using Spark 2.1.0 and Elasticsearch 5.3.0_

## Step 1: Create index mappings in Elasticsearch

References:
* [Create index request](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html)
* [Delimited payload filter](https://www.elastic.co/guide/en/elasticsearch/reference/2.4/analysis-delimited-payload-tokenfilter.html)
* [Term vectors](https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-termvectors.html#_term_information)
* [Mapping](https://www.elastic.co/guide/en/elasticsearch/reference/2.4/mapping.html)

In [None]:
from elasticsearch import Elasticsearch
es = Elasticsearch()

create_index = {
    "settings": {
        "analysis": {
            "analyzer": {
                "payload_analyzer": {
                    "type": "custom",
                    "tokenizer":"whitespace",
                    "filter":"delimited_payload_filter"
                }
            }
        }
    },
    "mappings": {
        "ratings": {
          "properties": {
                "timestamp": {
                    "type": "date"
                },
                "userId": {
                    "type": "keyword"
                },
                "movieId": {
                    "type": "keyword"
                },
                "rating": {
                    "type": "double"
                }
            }  
        },
        "users": {
            "properties": {
                "name": {
                    "type": "text"
                },
                "@model": {
                    "properties": {
                        "factor": {
                            "type": "text",
                            "term_vector": "with_positions_offsets_payloads",
                            "analyzer" : "payload_analyzer"
                        },
                        "version": {
                            "type": "keyword"
                        }
                    }
                }
            }
        },
        "movies": {
            "properties": {
                "genres": {
                    "type": "keyword"
                },
                "original_language": {
                    "type": "keyword"
                },
                "image_url": {
                    "type": "keyword"       
                },
                "release_date": {
                    "type": "date"
                },
                "popularity": {
                    "type": "double"
                },
                "@model": {
                    "properties": {
                        "factor": {
                            "type": "text",
                            "term_vector": "with_positions_offsets_payloads",
                            "analyzer" : "payload_analyzer"
                        },
                        "version": {
                            "type": "keyword"
                        }
                    }
                }
            }
        }
    }
}
# create index with the settings & mappings above
es.indices.create(index="demo", body=create_index)

## Step 2: Load ratings data into Elasticsearch

In [None]:
# Load data from CSV
ratings = spark.read.csv("data/ml-latest-small/ratings.csv", header=True, inferSchema=True)
ratings.show(5)

In [None]:
# write to ES
ratings.write.format("es").save("demo/ratings")

In [None]:
# check write went ok
print "Dataframe count: %d" % ratings.count()
print "ES index count:  %d" % es.count(index="demo", doc_type="ratings")['count']

## Step 3: Generate random names for each unique user and save to ES

In [None]:
import names
from pyspark.sql.functions import udf
from pyspark.sql.types import *
# define UDF to create random user names
random_name = udf(lambda x: names.get_full_name(), StringType())

In [None]:
users = ratings.select("userId").distinct().select("userId", random_name("userId").alias("name"))
users.write.format("es").option("es.mapping.id", "userId").save("demo/users")

In [None]:
# check write went ok
print "User DF count: %d" % users.count()
print "ES index count: %d" % es.count(index="demo", doc_type="users")['count']

## Step 4: Enrich movie data with TMDB metadata

**NOTE** this can take a while as it involves HTTP API calls!

In [None]:
# define a UDF to convert the raw genres string to an array
extract_genres = udf(lambda x: x.split("|"), ArrayType(StringType()))

In [None]:
# load raw data from CSV
raw_movies = spark.read.csv("data/ml-latest-small/movies.csv", header=True, inferSchema=True)
link_data = spark.read.csv("data/ml-latest-small/links.csv", header=True, inferSchema=True)
# we'll extract the genres to an array
movies = raw_movies.select("movieId", "title", extract_genres("genres").alias("genres"))
# join movies with links to get TMDB id
movie_data = movies.join(link_data, movies.movieId == link_data.movieId)\
    .select(movies.movieId, movies.title, movies.genres, link_data.tmdbId)
num_movies = movie_data.count()
movie_data.show(5)

In [None]:
import tmdbsimple as tmdb
tmdb.API_KEY = 'YOUR_KEY'
# base URL for TMDB poster images
IMAGE_URL = 'https://image.tmdb.org/t/p/w500'
import csv
from requests import HTTPError

In [None]:
data = movie_data.collect()
enriched = []
i = 0
for row in data:
    try:
        m = tmdb.Movies(row.tmdbId).info()
        poster_url = IMAGE_URL + m['poster_path'] if 'poster_path' in m and m['poster_path'] is not None else ""
        movie = {
            "movieId": row.movieId,
            "title": m['title'],
            "originalTitle": row.title,
            "genres": row.genres,
            "overview": m['overview'],
            "release_date": m['release_date'],
            "popularity": m['popularity'],
            "original_language": m['original_language'],
            "image_url": poster_url
        }
        enriched.append(movie)
    except HTTPError as e:
        print "Encountered error: %s for movieId=%d title=%s" % (e, row.movieId, row.title)
        movie = {
            "movieId": row.movieId,
            "title": row.title,
            "originalTitle": row.title,
            "genres": row.genres,
            "overview": "",
            "release_date": "",
            "popularity": 0,
            "original_language": "",
            "image_url": ""
        }
        enriched.append(movie)
    i += 1
    if i % 1 == 0: print "Enriched movie %s of %s" % (i, num_movies)

### Write enriched movie data to Elasticsearch

In [None]:
for m in enriched:
    if 'release_date' in m and m['release_date'] == "": m.pop('release_date')
    es.index("demo", "movies", id=m['movieId'], body=m)

In [None]:
# check load went ok
print "Movie DF count: %d" % movie_data.count()
print "ES index count: %d" % es.count(index="demo", doc_type="movies")['count']