In [66]:
import cgi
import requests
import urllib
from urllib.request import urlretrieve
from bs4 import BeautifulSoup
from pathlib import Path
import time
import os
from elasticsearch import helpers, Elasticsearch
import csv
import gzip
import pandas as pd
import json

## Data Pipeline
### 1. Data Retrieval
According to IMDB documentation, the datasets are updated once a day. Therefore a batch download of the dataset once a day is sufficient.
This can be done with a background script that is running the below code in a server. (ex: use *cron* or python *schedule* library) 

The code is designed to treat the links to the datasets as APIs. I.e. a GET request to each dataset is called on a daily basis. This assumes a certain ammount of reliability form IMDB. 

*(The alternative is to scrape the website itself which is trivial with the current state of the site but is definitely not as robust as the above approach)*

Key points:
 -  The data is kept as compressed for deep storage and not overwritten. This is useful for version controls especially since IMDB does not offer this.
 - If failure to connect to IMDB, the below code retires 5 times with a time delay.
 - If attempts have been exhausted, a notification should be sent as a fallback for engineers to resolve any issues.


In [85]:
datasets = ['https://datasets.imdbws.com/name.basics.tsv.gz',
            'https://datasets.imdbws.com/title.akas.tsv.gz',
            'https://datasets.imdbws.com/title.basics.tsv.gz',
            'https://datasets.imdbws.com/title.crew.tsv.gz',
            'https://datasets.imdbws.com/title.episode.tsv.gz',
            'https://datasets.imdbws.com/title.principals.tsv.gz',
            'https://datasets.imdbws.com/title.ratings.tsv.gz']

mappings = ['mappings/name_basics_mappings.json',
            'mappings/title_akas_mapping.json', 
            'mappings/title_basics_mapping.json',
            'mappings/title_crew_mappings.json',
            'mappings/title_eposides_mappings.json',
            'mappings/title_principals_mapping.json',
            'mappings/title_ratings_mappings.json']

In [None]:
# This code block should be run once a day in an automated process
attempts = 5
for attempt in range(attempts):
    try:
        # Create subdirectory to save today's data in
        timestr = time.strftime("%Y%m%d")
        if not os.path.exists(timestr):
            os.makedirs(timestr)
    
        # Download data files to storage
        for link in datasets:
            file = requests.get(link)
            # Save to directory
            with open(os.path.join(timestr, link.split("/")[-1]), "wb") as zip:
                zip.write(file.content)
                
    except requests.exceptions.RequestException as e:
        raise e
        # wait 2 minutes before retry
        time.sleep(120)
        
        if attempt == attempts - 1:
            # Notify admins/engineers that data retrieval for the day was skipped.
            pass
        
        continue
        
    print("success")
    break

### 2. Cleaning and Building indices

Compressed data in a server is not useful to anyone. The next step in the data pipeline is to make the data easily accessible both for analytics and for product integration purposes. The below code can also be run once a day after step 1.

#### A. Cleaning data

The IMDB data contains `\N` for null value. For elasticsearch this is problematic since in order for ES to recognise null values they have to be in the same mapping type of their corresponding field. For example: `\N` in the `startYear` field can be turned to -1 (integer that will not be used by any record).


#### B. Building indices

 - The size of each dataset should be noted in order to set the number of shards if needed (for performance).
 - In a clustes its a good idea to use replicas (faster search and robust against down time).


In [None]:
directory = "20201119"
es = Elasticsearch(host='localhost', port=9200)

# Create or update index for each dataset
for i, dataset in enumerate(datasets):
    name = dataset.split("/")[-1]
    index_name = name.split(".")[0] + "_" + name.split(".")[1]
    # Load corresponding mapping and create index if not already created
    if not es.indices.exists(index=index_name):
        es.indices.create(index=index_name, body=json.load(open(mappings[0])))
    # unzip raw file and bulk index
    with gzip.open(os.path.join(directory, name), "rt", encoding='UTF-8') as f:
        reader = csv.DictReader(f, delimiter='\t')
        helpers.bulk(es, index=index_name, actions=reader)