
# Create enriched movie dataset from The Movie Database API
* MovieLens latest data can be downloaded at http://grouplens.org/datasets/movielens/
* This demo uses the ml-latest-small dataset of 100k ratings, 9k movies and 700 users
* Data enrichment requires access to The Movie Database API
Note set up index mappings before loading data
## Using Spark 2.1.0 and Elasticsearch 5.3.0
Step 1: Create index mappings in Elasticsearch
References:
* Create index request
* Delimited payload filter
* Term vectors
* Mapping

In [1]:
from elasticsearch import Elasticsearch
es = Elasticsearch()

In [2]:
create_index = {
    "settings": {
        "analysis": {
            "analyzer": {
                "payload_analyzer": {
                    "type": "custom",
                    "tokenizer":"whitespace",
                    "filter":"delimited_payload_filter"
                }
            }
        }
    },
    "mappings": {
        "ratings": {
          "properties": {
                "timestamp": {
                    "type": "date"
                },
                "userId": {
                    "type": "keyword"
                },
                "movieId": {
                    "type": "keyword"
                },
                "rating": {
                    "type": "double"
                }
            }  
        },
        "users": {
            "properties": {
                "name": {
                    "type": "text"
                },
                "@model": {
                    "properties": {
                        "factor": {
                            "type": "text",
                            "term_vector": "with_positions_offsets_payloads",
                            "analyzer" : "payload_analyzer"
                        },
                        "version": {
                            "type": "keyword"
                        }
                    }
                }
            }
        },
        "movies": {
            "properties": {
                "genres": {
                    "type": "keyword"
                },
                "original_language": {
                    "type": "keyword"
                },
                "image_url": {
                    "type": "keyword"       
                },
                "release_date": {
                    "type": "date"
                },
                "popularity": {
                    "type": "double"
                },
                "@model": {
                    "properties": {
                        "factor": {
                            "type": "text",
                            "term_vector": "with_positions_offsets_payloads",
                            "analyzer" : "payload_analyzer"
                        },
                        "version": {
                            "type": "keyword"
                        }
                    }
                }
            }
        }
    }
}
# create index with the settings & mappings above
es.indices.create(index="demo", body=create_index)

{u'acknowledged': True, u'shards_acknowledged': True}


## Step 2: Load ratings data into Elasticsearch

In [3]:
ratings = spark.read.csv("/root/ml-latest-small/ratings.csv", header=True, inferSchema=True)
ratings.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
+------+-------+------+----------+
only showing top 5 rows



In [4]:
ratings.write.format("es").save("demo/ratings")

In [5]:
ratings = spark.read.format("es").load("demo/ratings")
ratings.printSchema()
ratings.show(5)

root
 |-- movieId: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- userId: string (nullable = true)

+-------+------+--------------------+------+
|movieId|rating|           timestamp|userId|
+-------+------+--------------------+------+
|    253|   3.0|1970-01-10 11:23:...|    50|
|    282|   3.0|1970-01-10 11:23:...|    50|
|    368|   4.0|1970-01-10 11:23:...|    50|
|    480|   4.0|1970-01-10 11:23:...|    50|
|    587|   3.0|1970-01-10 11:23:...|    50|
+-------+------+--------------------+------+
only showing top 5 rows



In [6]:
# check write went ok
print "Dataframe count: %d" % ratings.count()
print "ES index count:  %d" % es.count(index="demo", doc_type="ratings")['count']

Dataframe count: 100004
ES index count:  100004


## Step 3: Generate random names for each unique user and save to ES

In [7]:
import names
from pyspark.sql.functions import udf
from pyspark.sql.types import *
# define UDF to create random user names
random_name = udf(lambda x: names.get_full_name(), StringType())

In [8]:
users = ratings.select("userId").distinct().select("userId", random_name("userId").alias("name"))


In [9]:
users.write.format("es").option("es.mapping.id", "userId").save("demo/users")

In [10]:
# check write went ok
print "User DF count: %d" % users.count()
print "ES index count: %d" % es.count(index="demo", doc_type="users")['count']

User DF count: 671
ES index count: 671


## Step 4: Enrich movie data with TMDB metadata
NOTE this can take a while as it involves HTTP API calls!

In [11]:

# define a UDF to convert the raw genres string to an array
extract_genres = udf(lambda x: x.split("|"), ArrayType(StringType()))

In [12]:

# load raw data from CSV
raw_movies = spark.read.csv("/root/ml-latest-small/movies.csv", header=True, inferSchema=True)
link_data = spark.read.csv("/root/ml-latest-small/links.csv", header=True, inferSchema=True)
# we'll extract the genres to an array
movies = raw_movies.select("movieId", "title", extract_genres("genres").alias("genres"))
# join movies with links to get TMDB id
movie_data = movies.join(link_data, movies.movieId == link_data.movieId)\
    .select(movies.movieId, movies.title, movies.genres, link_data.tmdbId)
num_movies = movie_data.count()
movie_data.show(5)

+-------+--------------------+--------------------+------+
|movieId|               title|              genres|tmdbId|
+-------+--------------------+--------------------+------+
|      1|    Toy Story (1995)|[Adventure, Anima...|   862|
|      2|      Jumanji (1995)|[Adventure, Child...|  8844|
|      3|Grumpier Old Men ...|   [Comedy, Romance]| 15602|
|      4|Waiting to Exhale...|[Comedy, Drama, R...| 31357|
|      5|Father of the Bri...|            [Comedy]| 11862|
+-------+--------------------+--------------------+------+
only showing top 5 rows



In [13]:
import tmdbsimple as tmdb
tmdb.API_KEY = '938891a06f37f0e21195dd56984f4dd2'
# base URL for TMDB poster images
IMAGE_URL = 'https://image.tmdb.org/t/p/w500'
import csv
from requests import HTTPError

In [14]:
data = movie_data.collect()
enriched = []
i = 0
for row in data:
    try:
        m = tmdb.Movies(row.tmdbId).info()
        poster_url = IMAGE_URL + m['poster_path'] if 'poster_path' in m and m['poster_path'] is not None else ""
        movie = {
            "movieId": row.movieId,
            "title": m['title'],
            "originalTitle": row.title,
            "genres": row.genres,
            "overview": m['overview'],
            "release_date": m['release_date'],
            "popularity": m['popularity'],
            "original_language": m['original_language'],
            "image_url": poster_url
        }
        enriched.append(movie)
    except HTTPError as e:
        print "Encountered error: %s for movieId=%d title=%s" % (e, row.movieId, row.title)
        movie = {
            "movieId": row.movieId,
            "title": row.title,
            "originalTitle": row.title,
            "genres": row.genres,
            "overview": "",
            "release_date": "",
            "popularity": 0,
            "original_language": "",
            "image_url": ""
        }
        enriched.append(movie)
    i += 1
    if i % 1 == 0: print "Enriched movie %s of %s" % (i, num_movies)

Enriched movie 1 of 9125
Enriched movie 2 of 9125
Enriched movie 3 of 9125
Enriched movie 4 of 9125
Enriched movie 5 of 9125
Enriched movie 6 of 9125
Enriched movie 7 of 9125
Enriched movie 8 of 9125
Enriched movie 9 of 9125
Enriched movie 10 of 9125
Enriched movie 11 of 9125
Enriched movie 12 of 9125
Enriched movie 13 of 9125
Enriched movie 14 of 9125
Enriched movie 15 of 9125
Enriched movie 16 of 9125
Enriched movie 17 of 9125
Enriched movie 18 of 9125
Enriched movie 19 of 9125
Enriched movie 20 of 9125
Enriched movie 21 of 9125
Enriched movie 22 of 9125
Enriched movie 23 of 9125
Enriched movie 24 of 9125
Enriched movie 25 of 9125
Enriched movie 26 of 9125
Enriched movie 27 of 9125
Enriched movie 28 of 9125
Enriched movie 29 of 9125
Enriched movie 30 of 9125
Enriched movie 31 of 9125
Enriched movie 32 of 9125
Enriched movie 33 of 9125
Enriched movie 34 of 9125
Enriched movie 35 of 9125
Enriched movie 36 of 9125
Enriched movie 37 of 9125
Enriched movie 38 of 9125
Enriched movie 39 of 

Write enriched movie data to Elasticsearch

In [15]:
for m in enriched:
    if 'release_date' in m and m['release_date'] == "": m.pop('release_date')
    es.index("demo", "movies", id=m['movieId'], body=m)

In [16]:

# check load went ok
print "Movie DF count: %d" % movie_data.count()
print "ES index count: %d" % es.count(index="demo", doc_type="movies")['count']

Movie DF count: 9125
ES index count: 9122
