# Los Simpsons con Big Data

## ETL

Implementación de la primera parte de la arquitectura. En la práctica se puede **prototipar** este ETL en un cuaderno de Jupyter para experimentar rápido y dejar algo funcional. Al finalizar, lo más adecuado es implementarlo en un script utilizando prácticas de **código de producción**.

### Extracción

Se desea automatizar la extracción de datos y no hacer un típico _copy and paste_, para lo cual se va a implementar un en un webscrapper que haga un crawl sobre las tablas del HTML.

En la aplicación se va a utilizar un _webscrapper_ que ya está implementado en Python dentro de funciones de la librería `pandas`.

In [13]:
import os
import yaml

import numpy as np
import pandas as pd

import boto3
import awswrangler as wr

# Open project config file
with open("config.yml", "r") as file:
    config = yaml.safe_load(file); file.close()

In [8]:
# Scrapp wikipedia url
PATH_EPISODES = config['scrapper']['url']
simpsons_raw = pd.read_html(PATH_EPISODES)

# Print total scrapped objects
len(simpsons_raw)

23

### Transformación

Haciendo una inspección rápida de la página de Wikipedia, se puede identificar que existe 20 tablas de información relacionada a los episodios. Sin embargo, es necesario hacer alguna modificaciones a cada una antes de incorporarlas a AWS S3.

In [9]:
# List of seasons and movies index
seasons = np.arange(
    config['scrapper']['seasons'][0]
    ,config['scrapper']['seasons'][1]+1)
movie = config['scrapper']['movie']

# File name counter
k = 1
for idx in seasons:
    # Skip movie from ETL
    if idx == movie:
        continue
    
    # Build file name
    name = f'season{str(k).zfill(2)}.csv'

    # Data transformation
    transformed_table = (
        # Filter data object
        simpsons_raw[idx]
        # Rename table columns
        .rename(columns=config['scrapper']['names'])
        .assign(
            # Convert datestr to datetime
            air_date = lambda df_: pd.to_datetime(df_.air_date),
            # Remove " from title name
            title = lambda df_: df_.title.str.replace('"', ''),
            # Remove wikipedia reference box
            viewers = lambda df_: df_.viewers.str.extract(r'([0-9]*.[0-9]*)')
        ))

    # Save table in project dir
    transformed_table.to_csv(os.path.join(config['path']['episodes'], name))
    k += 1

print('Job done!')


Job done!


### Carga

Como último paso del Pipeline, se hace la carga de datos al _data lake_ para que puedan ser consumidos por algún otro usuario o servicio.

In [10]:
# AWS S3 client and bucket
s3 = boto3.client('s3')
BUCKET = config['etl']['target_bucket']

# Files to upload
files = [
    x for x in os.listdir(config['path']['episodes'])
    if x.__contains__('.csv')
    ]
files.sort()

# Upload tables to cloud
for file in files:
    # Build file path
    file = os.path.join(config['path']['episodes'], file)
    
    # Upload to AWS S3
    s3.upload_file(
        Filename=file,
        Bucket=BUCKET,
        Key=file)

print('Job done!')

Job done!


## ELT

In [17]:
file = os.path.join(config['path']['queries'], 'create_table.sql')

with open(file, 'r') as q_file:
    query = q_file.read()
q_file.close

query

'CREATE EXTERNAL TABLE IF NOT EXISTS `simpsons`.`<name>` (\n    `id_overall`    string,\n    `id_season`     string,\n    `title`         string,\n    `director`      string,\n    `writer`        string,\n    `air_date`      string,\n    `code`          string,\n    `viewers`       string\n)\nCOMMENT "Catalog of guests in Simpsons Seasons."\nROW FORMAT SERDE \'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\'\nWITH SERDEPROPERTIES (\'field.delim\' = \',\')\nSTORED AS INPUTFORMAT \'org.apache.hadoop.mapred.TextInputFormat\' OUTPUTFORMAT \'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\'\nLOCATION \'s3://itam-analytics-dante/simpsons/raw/episodes/\'\nTBLPROPERTIES (\'classification\' = \'csv\', "skip.header.line.count"="1");'

In [18]:
test = query.replace('<name>', 'season01')
test

'CREATE EXTERNAL TABLE IF NOT EXISTS `simpsons`.`season01` (\n    `id_overall`    string,\n    `id_season`     string,\n    `title`         string,\n    `director`      string,\n    `writer`        string,\n    `air_date`      string,\n    `code`          string,\n    `viewers`       string\n)\nCOMMENT "Catalog of guests in Simpsons Seasons."\nROW FORMAT SERDE \'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\'\nWITH SERDEPROPERTIES (\'field.delim\' = \',\')\nSTORED AS INPUTFORMAT \'org.apache.hadoop.mapred.TextInputFormat\' OUTPUTFORMAT \'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\'\nLOCATION \'s3://itam-analytics-dante/simpsons/raw/episodes/\'\nTBLPROPERTIES (\'classification\' = \'csv\', "skip.header.line.count"="1");'

In [19]:
query

'CREATE EXTERNAL TABLE IF NOT EXISTS `simpsons`.`<name>` (\n    `id_overall`    string,\n    `id_season`     string,\n    `title`         string,\n    `director`      string,\n    `writer`        string,\n    `air_date`      string,\n    `code`          string,\n    `viewers`       string\n)\nCOMMENT "Catalog of guests in Simpsons Seasons."\nROW FORMAT SERDE \'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe\'\nWITH SERDEPROPERTIES (\'field.delim\' = \',\')\nSTORED AS INPUTFORMAT \'org.apache.hadoop.mapred.TextInputFormat\' OUTPUTFORMAT \'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\'\nLOCATION \'s3://itam-analytics-dante/simpsons/raw/episodes/\'\nTBLPROPERTIES (\'classification\' = \'csv\', "skip.header.line.count"="1");'