# Launch TMDB Distributed Download Fleet

```
https://github.com/hudsonmendes/lambda-tmdb-distributed-downloader
MENDES, Hudson
14th May, 2020
London, UK
```

## Summary

The `lambda TMDB distributed downloader` is an **[AWS Lambda Function](https://aws.amazon.com/lambda/)** hooked to an **[AWS SQS Queue](https://aws.amazon.com/sqs/)**.

In order to launch our download fleet, we must then send messages to **SQS** and let our lambda function find them and start the download process.

This notebook reads the **[IMDB Titles Dataset]** in order to determine the download partitions (based in `year` and `initial` letter of the title), and sends messages for
each of those partitions.

## Environment

### Settings

In [1]:
lambda_name          = 'hudsonmendes-tmdb-downloader-lambda'
queue_name           = 'hudsonmendes-tmdb-downloader-queue'
datalake_bucket_name = 'hudsonmendes-datalake'

### Dependencies

In [2]:
%%bash
pip install -U pandas
pip install -r requirements.txt

Requirement already up-to-date: pandas in /Users/hudsonmendes/.pyenv/versions/3.6.10/lib/python3.6/site-packages (1.0.3)


### `tdd` deployment

We first must deploy/prepare our AWS infra-structure, and we do so by using our `cli`

In [3]:
!python tdd deploy \
    --lambda_name $lambda_name \
    --queue_name $queue_name \
    --datalake_bucket_name $datalake_bucket_name

Deploy, cleaning up environment
Deploy, saving dependencies locally
[31mERROR: awscli 1.18.46 has requirement botocore==1.15.46, but you'll have botocore 1.16.11 which is incompatible.[0m
Deploy, adding code to ZIP
Deploy, ensure queue hudsonmendes-tmdb-downloader-queue
Deploy, ensure lambda hudsonmendes-tmdb-downloader-lambda
Deploy, link lambda hudsonmendes-tmdb-downloader-lambda to queue hudsonmendes-tmdb-downloader-queue
Deploy, cleaning up environment


### Imports

In [4]:
import os
import boto3
import json
import pandas as pd
from urllib.request import urlopen
from tqdm.notebook import tqdm
from tdd.pipeline import IMDbMovie

### Storage

In [5]:
IMDB_MOVIES_URL  = 'https://datasets.imdbws.com/title.basics.tsv.gz'
IMDB_MOVIES_PATH = '/tmp/title.basics.tsv.gz'

In [6]:
if not os.path.isfile(IMDB_MOVIES_PATH):
    with urlopen(IMDB_MOVIES_URL) as res:
        with open(IMDB_MOVIES_PATH, 'wb+') as out_file:
            out_file.write(res.read())
IMDB_MOVIES_PATH

'/tmp/title.basics.tsv.gz'

### AWS SQS

In [7]:
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=queue_name)

## Data

### IMDB, Titles

In [8]:
df = pd.read_csv(IMDB_MOVIES_PATH, delimiter='\t', header=0)

  interactivity=interactivity, compiler=compiler, result=result)


In [9]:
df['initial'] = df['primaryTitle'].map(IMDbMovie.get_initial_from)

In [10]:
df['year'] = df['startYear'].map(IMDbMovie.get_year_from)

### Fleet Partitions

In [11]:
initials = set()
for initial in tqdm(df.initial.unique()):
    if initial and len(initial.strip()) == 2:
        initials.add(initial)
initials = sorted(list(initials))
pd.DataFrame(initials)

HBox(children=(FloatProgress(value=0.0, max=1285.0), HTML(value='')))




Unnamed: 0,0
0,00
1,01
2,02
3,03
4,04
...,...
1243,ZX
1244,ZY
1245,ZZ
1246,Z_


In [12]:
import math
years = set()
for year in tqdm(df.year.unique()):
    if year and not math.isnan(year):
        years.add(int(year))
years = sorted(list(years))
pd.DataFrame(years)

HBox(children=(FloatProgress(value=0.0, max=148.0), HTML(value='')))




Unnamed: 0,0
0,1874
1,1878
2,1881
3,1883
4,1885
...,...
142,2024
143,2025
144,2026
145,2027


In [16]:
partitions = zip(years, initials)

In [17]:
partitions

<zip at 0x15aa317c8>

## Launching

### First Batch

Here we schedule just a few messages, to see if everything is alright.

In [18]:
messages_max = 3
messages_count = 0
for year, initial in partitions:
    print((year, initial))
    message = { 'year': year, 'initial': initial }
    body = json.dumps(message)
    queue.send_message(MessageBody=body)
    messages_count += 1
    if messages_count >= messages_max:
        break

(1874, '00')
(1878, '01')
(1881, '02')


### Everything

In [None]:
for year, initial in partitions:
    print((year, initial))
    message = { 'year': year, 'initial': initial }
    body = json.dumps(message)
    queue.send_message(MessageBody=body)