# DAG - Movie Review Sentiment Data Warehouse

## Summary

The present documents lays out the Data Pipeline of how the Sentiment Analysis DAG should be performed by the DAG. Once we are certain about the algorithm, the code explored here must be transformed in a DAG and deployed to Airflow, so that it can be scheduled to perform the backfill and the continuous processing accordingly.

## Dependencies

In [1]:
!pip install -q -U requests
!pip install -q -U boto3
!pip install -q -U ipython-sql
!pip install -q -U psycopg2-binary
!pip install -q -U tensorflow
!pip install -q -U matplotlib
!pip install -q -U reportlab

[31mERROR: After October 2020 you may experience errors when installing or updating packages. This is because pip will change the way that it resolves dependency conflicts.

We recommend you use --use-feature=2020-resolver to test your packages with the new resolver before it becomes the default.

awscli 1.18.69 requires botocore==1.16.19, but you'll have botocore 1.17.39 which is incompatible.[0m


In [2]:
import boto3
import zipfile
import json
import os
import io
import sys
import time
import psycopg2
import requests
import numpy as np
import tensorflow as tf
import pandas as pd
import matplotlib.pyplot as plt
from urllib.request import urlopen
from urllib.parse import urlparse
from tqdm.notebook import tqdm

## Settings

### Redshift Connection Url

**Note:** Copy this URL is generated in the main notebook, generated during the Redshift installation.

In [3]:
redshift_url = 'postgresql://redshift:r4d$hifT@35.178.10.130:5439/dw_movie_review_sentiment'
redshift_url

'postgresql://redshift:r4d$hifT@35.178.10.130:5439/dw_movie_review_sentiment'

### Redshift Copy Role

**Note:** Copy this URL is generated in the main notebook, generated during the Redshift installation.

Redshift must have a *service role*, with *read only access to S3* in order to perform the copy operation. Please insert here the name of your role that shall be used by the **`COPY`** command.

In [4]:
iam_role = 'arn:aws:iam::673962978035:role/hudsonmendes-dw-movie-sentiment-analysis-role'
iam_role

'arn:aws:iam::673962978035:role/hudsonmendes-dw-movie-sentiment-analysis-role'

### Staging Bucket

**Redshift** does not read from zip packages. Even though it can **S3** folders and can collect files found in them, the files need to be individually placed in uncompressed format, or individually compressed (e.g.: _GZIP_ format). 

Please use the section bellow to determine the name of the bucket that you will use as your staging for the files that will later be injested by Redshift into the staging tables.

**Important:** bucket names are unique for AWS and you must choose one you have write access to.

In [5]:
staging_bucket = 'hudsonmendes-dw'

## URLs and Paths

### Movies Source And Staging Folder

In [6]:
tmdb_movies_source_url  = 'https://hudsonmendes-datalake.s3.eu-west-2.amazonaws.com/kaggle/hudsonmendes/tmdb-movies-with-imdb_id.zip'
tmdb_movies_staging_folder = f'tmdb-movies-with-imdb_id/'
(tmdb_movies_source_url, tmdb_movies_staging_folder)

('https://hudsonmendes-datalake.s3.eu-west-2.amazonaws.com/kaggle/hudsonmendes/tmdb-movies-with-imdb_id.zip',
 'tmdb-movies-with-imdb_id/')

### Reviews Source and Staging Folder

In [7]:
tmdb_reviews_source_url  = 'https://hudsonmendes-datalake.s3.eu-west-2.amazonaws.com/kaggle/hudsonmendes/tmdb-reviews.zip'
tmdb_reviews_staging_folder = f'tmdb-reviews'
(tmdb_reviews_source_url, tmdb_reviews_staging_folder)

('https://hudsonmendes-datalake.s3.eu-west-2.amazonaws.com/kaggle/hudsonmendes/tmdb-reviews.zip',
 'tmdb-reviews')

### IMDb Cast (Links)

In [8]:
imdb_cast_source_url = 'https://datasets.imdbws.com/title.principals.tsv.gz'
imdb_cast_staging_path = f'imdb-cast/imdb-cast-{int(time.time())}.tsv.gz'
(imdb_cast_source_url, imdb_cast_staging_path)

('https://datasets.imdbws.com/title.principals.tsv.gz',
 'imdb-cast/imdb-cast-1597096418.tsv.gz')

### IMDb Cast (Names)

In [9]:
imdb_names_source_url = 'https://datasets.imdbws.com/name.basics.tsv.gz'
imdb_names_staging_path = f'imdb-cast/imdb-cast-names{int(time.time())}.tsv.gz'
(imdb_names_source_url, imdb_names_staging_path)

('https://datasets.imdbws.com/name.basics.tsv.gz',
 'imdb-cast/imdb-cast-names1597096418.tsv.gz')

## Pipeline

This Pipeline is composed by the following steps:

1. Unzip files from the **Data Lake** into our **Staging S3**
2. **`COPY`** data from **Staging S3** to out **Redshift Staging Tables**
3. _Extract, Transform and Load_ the data into our **Data Warehouse Dimensions Table**
4. **Classify Sentiment** of Movie Reviews using our Model, generating our **Data Warehouse Facts Table**

### Upload files within ZIP to S3 Staging

In [10]:
def stage_zip_files_to_s3(
        source_url,
        destination_bucket,
        destination_folder):
    s3 = boto3.client('s3')
    
    print(f'Downloading "{source_url}", please wait...')
    with urlopen(source_url) as res:
        buffer = io.BytesIO(res.read())
        file_zip = zipfile.ZipFile(buffer)
        print('Download completed.')

        print(f'Uploading each file in "{source_url}" to s3://{destination_bucket}/{destination_folder}')
        for inner_file_name in tqdm(file_zip.namelist(), 'extracting to s3'):
            inner_file_buffer = file_zip.read(inner_file_name) 
            s3.put_object(
                Bucket=staging_bucket,
                Key=os.path.join(destination_folder, inner_file_name),
                Body=inner_file_buffer)
            print(f'[ok] {inner_file_name}')

In [11]:
stage_zip_files_to_s3(
    source_url=tmdb_movies_source_url,
    destination_bucket=staging_bucket,
    destination_folder=tmdb_movies_staging_folder)

Downloading "https://hudsonmendes-datalake.s3.eu-west-2.amazonaws.com/kaggle/hudsonmendes/tmdb-movies-with-imdb_id.zip", please wait...
Download completed.
Uploading each file in "https://hudsonmendes-datalake.s3.eu-west-2.amazonaws.com/kaggle/hudsonmendes/tmdb-movies-with-imdb_id.zip" to s3://hudsonmendes-dw/tmdb-movies-with-imdb_id/


HBox(children=(FloatProgress(value=0.0, description='extracting to s3', max=21.0, style=ProgressStyle(descript…

[ok] tmdb-movies-2000.json
[ok] tmdb-movies-2001.json
[ok] tmdb-movies-2002.json
[ok] tmdb-movies-2003.json
[ok] tmdb-movies-2004.json
[ok] tmdb-movies-2005.json
[ok] tmdb-movies-2006.json
[ok] tmdb-movies-2007.json
[ok] tmdb-movies-2008.json
[ok] tmdb-movies-2009.json
[ok] tmdb-movies-2010.json
[ok] tmdb-movies-2011.json
[ok] tmdb-movies-2012.json
[ok] tmdb-movies-2013.json
[ok] tmdb-movies-2014.json
[ok] tmdb-movies-2015.json
[ok] tmdb-movies-2016.json
[ok] tmdb-movies-2017.json
[ok] tmdb-movies-2018.json
[ok] tmdb-movies-2019.json
[ok] tmdb-movies-2020.json



In [12]:
stage_zip_files_to_s3(
    source_url=tmdb_reviews_source_url,
    destination_bucket=staging_bucket,
    destination_folder=tmdb_reviews_staging_folder)

Downloading "https://hudsonmendes-datalake.s3.eu-west-2.amazonaws.com/kaggle/hudsonmendes/tmdb-reviews.zip", please wait...
Download completed.
Uploading each file in "https://hudsonmendes-datalake.s3.eu-west-2.amazonaws.com/kaggle/hudsonmendes/tmdb-reviews.zip" to s3://hudsonmendes-dw/tmdb-reviews


HBox(children=(FloatProgress(value=0.0, description='extracting to s3', max=21.0, style=ProgressStyle(descript…

[ok] tmdb-movies-2000-reviews.json
[ok] tmdb-movies-2001-reviews.json
[ok] tmdb-movies-2002-reviews.json
[ok] tmdb-movies-2003-reviews.json
[ok] tmdb-movies-2004-reviews.json
[ok] tmdb-movies-2005-reviews.json
[ok] tmdb-movies-2006-reviews.json
[ok] tmdb-movies-2007-reviews.json
[ok] tmdb-movies-2008-reviews.json
[ok] tmdb-movies-2009-reviews.json
[ok] tmdb-movies-2010-reviews.json
[ok] tmdb-movies-2011-reviews.json
[ok] tmdb-movies-2012-reviews.json
[ok] tmdb-movies-2013-reviews.json
[ok] tmdb-movies-2014-reviews.json
[ok] tmdb-movies-2015-reviews.json
[ok] tmdb-movies-2016-reviews.json
[ok] tmdb-movies-2017-reviews.json
[ok] tmdb-movies-2018-reviews.json
[ok] tmdb-movies-2019-reviews.json
[ok] tmdb-movies-2020-reviews.json



### Upload `.tsv.gz`  to S3 Staging

In [13]:
# =============================================================================================
# Source: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html
# =============================================================================================

import os
import sys
import threading

class ProgressPercentage(object):

    def __init__(self, buffer):
        self.buffer = buffer
        self._size = sys.getsizeof(buffer)
        self.pbar = tqdm(total=self._size, unit='B', unit_scale=True, desc='uploading')
        self._lock = threading.Lock()

    def __call__(self, bytes_amount):
        with self._lock:
            self.pbar.update(bytes_amount)
            
    def close(self):
        self.pbar.update(self._size - self.pbar.n)
        self.pbar.close()

In [14]:
def stage_file_to_s3(
        source_url,
        destination_bucket,
        destination_path):

    s3 = boto3.client('s3')
    file_name = os.path.basename(source_url)
    file_size = int(urlopen(source_url).info().get('Content-Length', -1))
    pbar = tqdm(total=file_size, unit='B', unit_scale=True, desc='downloading')
    req = requests.get(source_url, stream=True)
    buffer = io.BytesIO()
    for chunk in req.iter_content(chunk_size=1024):
        if chunk:
            buffer.write(chunk)
            pbar.update(1024)
    pbar.close()
    
    pbar = ProgressPercentage(buffer)
    buffer.seek(0)
    s3.upload_fileobj(
        Fileobj=buffer,
        Bucket=staging_bucket,
        Key=destination_path,
        Callback=pbar)
    pbar.close()
    print(f'[ok] {file_name}')

In [15]:
stage_file_to_s3(
    source_url=imdb_cast_source_url,
    destination_bucket=staging_bucket,
    destination_path=imdb_cast_staging_path)

HBox(children=(FloatProgress(value=0.0, description='downloading', max=331029653.0, style=ProgressStyle(descri…




HBox(children=(FloatProgress(value=0.0, description='uploading', max=342837631.0, style=ProgressStyle(descript…


[ok] title.principals.tsv.gz


In [16]:
stage_file_to_s3(
    source_url=imdb_names_source_url,
    destination_bucket=staging_bucket,
    destination_path=imdb_names_staging_path)

HBox(children=(FloatProgress(value=0.0, description='downloading', max=200576057.0, style=ProgressStyle(descri…




HBox(children=(FloatProgress(value=0.0, description='uploading', max=214031359.0, style=ProgressStyle(descript…


[ok] name.basics.tsv.gz


### From S3 Staging to Reshift Staging Tables

In [17]:
%load_ext sql

In [18]:
%sql $redshift_url

(psycopg2.OperationalError) could not connect to server: Operation timed out
	Is the server running on host "35.178.10.130" and accepting
	TCP/IP connections on port 5439?

(Background on this error at: http://sqlalche.me/e/13/e3q8)
Connection info needed in SQLAlchemy format, example:
               postgresql://username:password@hostname/dbname
               or an existing connection: dict_keys([])


#### IMDb Movies

In [19]:
tmdb_movies_staging_url = f's3://{staging_bucket}/{tmdb_movies_staging_folder}'

In [20]:
%%sql
drop table if exists staging_tmdb_movies;

create table staging_tmdb_movies (
    id                integer,
    video             boolean,
    vote_count        bigint,
    vote_average      numeric(10, 6),
    title             varchar(256),
    release_date      timestamp,
    original_language varchar(10), 
    original_title    varchar(256),
    genre_ids         varchar(1024),
    backdrop_path     varchar(1024),
    adult             boolean,
    overview          varchar(10000),
    poster_path       varchar(1024),
    popularity        numeric(10, 6),
    id_imdb           varchar(32)
);

copy public.staging_tmdb_movies
from :tmdb_movies_staging_url
iam_role :iam_role
region 'eu-west-2'
format as json 'auto';

Environment variable $DATABASE_URL not set, and no connect string given.
Connection info needed in SQLAlchemy format, example:
               postgresql://username:password@hostname/dbname
               or an existing connection: dict_keys([])


#### IMDb Movie Reviews

In [21]:
tmdb_reviews_staging_url = f's3://{staging_bucket}/{tmdb_reviews_staging_folder}'

In [22]:
%%sql
drop table if exists staging_tmdb_reviews;

create table staging_tmdb_reviews (
    author   varchar(256),
    content  varchar(40000),
    id       varchar(40),
    url      varchar(256),
    movie_id integer
);

copy public.staging_tmdb_reviews
from :tmdb_reviews_staging_url
iam_role :iam_role
region 'eu-west-2'
format as json 'auto';

Environment variable $DATABASE_URL not set, and no connect string given.
Connection info needed in SQLAlchemy format, example:
               postgresql://username:password@hostname/dbname
               or an existing connection: dict_keys([])


#### IMDb Cast (Links)

In [23]:
imdb_cast_staging_url = f's3://{staging_bucket}/{imdb_cast_staging_path}'

In [24]:
%%sql
drop table if exists staging_imdb_cast;

create table staging_imdb_cast (
    tconst     varchar(40),
    ordering   varchar(10),
    nconst     varchar(40),
    category   varchar(256),
    job        varchar(1024),
    characters varchar(1024)
);

copy public.staging_imdb_cast
from :imdb_cast_staging_url
iam_role :iam_role
region 'eu-west-2'
delimiter '\t'
gzip;

Environment variable $DATABASE_URL not set, and no connect string given.
Connection info needed in SQLAlchemy format, example:
               postgresql://username:password@hostname/dbname
               or an existing connection: dict_keys([])


#### IMDb Cast (Names)

In [25]:
imdb_names_staging_url = f's3://{staging_bucket}/{imdb_names_staging_path}'

In [26]:
%%sql
drop table if exists staging_imdb_names;

create table staging_imdb_names (
    nconst            varchar(40),
    primaryname       varchar(256),
    birthyear         varchar(10),
    deathyear         varchar(10),
    primaryprofession varchar(256),
    knownfortitles    varchar(256)
);

copy public.staging_imdb_names
from :imdb_names_staging_url
iam_role :iam_role
region 'eu-west-2'
delimiter '\t'
gzip;

Environment variable $DATABASE_URL not set, and no connect string given.
Connection info needed in SQLAlchemy format, example:
               postgresql://username:password@hostname/dbname
               or an existing connection: dict_keys([])


### From Redshift Staging to Dimension Tables

#### `dim_dates`

In [27]:
%%sql
truncate table dim_dates;

insert into dim_dates
select release_date as date_id,
       datepart(year, release_date)  as year,
       datepart(month, release_date) as month,
       datepart(day, release_date)   as day
from (
    select distinct release_date
    from staging_tmdb_movies
    where not release_date is null);

Environment variable $DATABASE_URL not set, and no connect string given.
Connection info needed in SQLAlchemy format, example:
               postgresql://username:password@hostname/dbname
               or an existing connection: dict_keys([])


#### `dim_films`

In [28]:
%%sql
truncate table dim_films;

insert into dim_films (film_id, date_id, title)
select id, release_date, title
from staging_tmdb_movies
where not release_date is null;

Environment variable $DATABASE_URL not set, and no connect string given.
Connection info needed in SQLAlchemy format, example:
               postgresql://username:password@hostname/dbname
               or an existing connection: dict_keys([])


#### `dim_cast`

In [29]:
%%sql
truncate table dim_cast;

insert into dim_cast (cast_id, film_id, full_name)
select imdbc.nconst, tmdbm.id, imdbn.primaryname
from staging_imdb_cast as imdbc
  inner join staging_imdb_names as imdbn on imdbc.nconst = imdbn.nconst
  inner join staging_tmdb_movies as tmdbm on tmdbm.id_imdb = imdbc.tconst
where not release_date is null
  and imdbc.category in ('actor', 'actress');

Environment variable $DATABASE_URL not set, and no connect string given.
Connection info needed in SQLAlchemy format, example:
               postgresql://username:password@hostname/dbname
               or an existing connection: dict_keys([])


#### `dim_reviews`

In [30]:
%%sql
truncate table dim_reviews;

insert into dim_reviews (review_id, film_id, text)
select id, movie_id, content
from staging_tmdb_reviews;

Environment variable $DATABASE_URL not set, and no connect string given.
Connection info needed in SQLAlchemy format, example:
               postgresql://username:password@hostname/dbname
               or an existing connection: dict_keys([])


### Facts Table

#### Initial Load

In [31]:
%%sql
truncate table fact_film_review_sentiments;

insert into fact_film_review_sentiments (
    date_id,
    film_id,
    review_id,
    review_sentiment_class)
select df.date_id, df.film_id, dr.review_id, 0
from dim_films as df
    inner join dim_reviews as dr on df.film_id = dr.film_id

Environment variable $DATABASE_URL not set, and no connect string given.
Connection info needed in SQLAlchemy format, example:
               postgresql://username:password@hostname/dbname
               or an existing connection: dict_keys([])


In [32]:
%%sql
truncate table fact_cast_review_sentiments;

insert into fact_cast_review_sentiments (
    date_id,
    cast_id,
    review_id,
    review_sentiment_class)
select df.date_id, dc.cast_id, dr.review_id, 0
from dim_cast as dc
    inner join dim_films as df on dc.film_id = df.film_id
    inner join dim_reviews as dr on dc.film_id = dr.film_id

Environment variable $DATABASE_URL not set, and no connect string given.
Connection info needed in SQLAlchemy format, example:
               postgresql://username:password@hostname/dbname
               or an existing connection: dict_keys([])


#### Classification Model

In [33]:
model = tf.saved_model.load('./model/movie-sentiment-classifier')
model

<tensorflow.python.saved_model.load.Loader._recreate_base_user_object.<locals>._UserObject at 0x13c405b70>

In [34]:
from tensorflow import keras
tokenizer = None
with open('./model/movie-sentiment-classifier/assets/tokenizer.json', 'r', encoding='utf-8') as tokenizer_file:
    tokenizer_json = json.dumps(json.load(tokenizer_file))
    tokenizer = keras.preprocessing.text.tokenizer_from_json(tokenizer_json)
tokenizer

<keras_preprocessing.text.Tokenizer at 0x13d3fcba8>

#### Classification

In [35]:
db = psycopg2.connect(redshift_url)
db

OperationalError: could not connect to server: Operation timed out
	Is the server running on host "35.178.10.130" and accepting
	TCP/IP connections on port 5439?


In [None]:
reviews = []
cur = db.cursor()
cur.execute('select review_id, text from dim_reviews')
for review_id, text in tqdm(cur.fetchall(), desc='reviews'):
    reviews.append({'review_id': review_id, 'text': text })
cur.close()

In [None]:
review_texts = [ r['text'] for r in reviews ]
review_seqs  = tokenizer.texts_to_sequences(review_texts)
review_seqs  = tf.keras.preprocessing.sequence.pad_sequences(review_seqs, maxlen=500, dtype='float32', padding='post', value=0)
(len(reviews), len(review_texts), review_seqs.shape)

In [None]:
review_preds = model(inputs=review_seqs)
len(review_preds)

In [None]:
def update_review_sentiment_class_for(table):
    cur = db.cursor()
    try:
        sql = f"""update {table}
                 set review_sentiment_class = %s
                 where review_id = %s"""
        batch = []
        pbar = tqdm(enumerate(review_preds), total=len(review_preds), desc=table)
        for i, review_pred in pbar:
            review_id = reviews[i]['review_id']
            review_sentiment = -1 if np.argmax(review_preds[i]) else 1
            batch.append((review_sentiment, review_id))
            if len(batch) % 200 == 0:
                cur.executemany(sql, batch)
                db.commit()
                batch.clear()
                pbar.refresh()
        if len(batch) > 0:
            cur.executemany(sql, batch)
            db.commit()
            batch.clear()
            pbar.refresh()
        pbar.close()
        cur.close()
    except Exception as e:
        db.rollback()
        raise e

In [None]:
update_review_sentiment_class_for('fact_film_review_sentiments')

In [None]:
update_review_sentiment_class_for('fact_cast_review_sentiments')

### Quality Checks

#### Counts

##### Dates

In [None]:
%%sql dates_source_count <<
select count(distinct release_date)
from staging_tmdb_movies

In [None]:
%%sql dates_dest_count <<
select count(date_id)
from dim_dates

In [None]:
print((dates_source_count[0][0], dates_dest_count[0][0]))
assert dates_source_count == dates_dest_count

##### Films

In [None]:
%%sql films_source_count <<
select count(id_imdb)
from staging_tmdb_movies
where not release_date is null

In [None]:
%%sql films_dest_count <<
select count(film_id)
from dim_films

In [None]:
print((films_source_count[0][0], films_dest_count[0][0]))
assert films_source_count == films_dest_count

##### Reviews

In [None]:
%%sql reviews_source_count <<
select count(id)
from staging_tmdb_reviews

In [None]:
%%sql reviews_dest_count <<
select count(review_id)
from dim_reviews

In [None]:
print((reviews_source_count[0][0], reviews_dest_count[0][0]))
assert reviews_source_count == reviews_dest_count

##### Cast

In [None]:
%%sql cast_source_count <<
select count(imdbc.nconst)
from staging_imdb_cast as imdbc
  inner join staging_tmdb_movies as tmdbm on imdbc.tconst = tmdbm.id_imdb
  inner join staging_imdb_names as imdbn on imdbc.nconst = imdbn.nconst
where not release_date is null
  and imdbc.category in ('actor', 'actress');

In [None]:
%%sql cast_dest_count <<
select count(cast_id)
from dim_cast

In [None]:
print((cast_source_count[0][0], cast_dest_count[0][0]))
assert cast_source_count == cast_dest_count

##### Film Facts

In [None]:
%%sql film_fact_source_count <<
select count(0)
from dim_films as df
    inner join dim_reviews as dr on df.film_id = dr.film_id

In [None]:
%%sql film_fact_dest_count <<
select count(0)
from fact_film_review_sentiments

In [None]:
print((film_fact_source_count[0][0], film_fact_dest_count[0][0]))
assert film_fact_source_count == film_fact_dest_count

##### Cast Facts

In [None]:
%%sql cast_fact_source_count <<
select count(0)
from dim_cast as dc
    inner join dim_reviews as dr on dc.film_id = dr.film_id

In [None]:
%%sql cast_fact_dest_count <<
select count(0)
from fact_cast_review_sentiments

In [None]:
print((cast_fact_source_count[0][0], cast_fact_dest_count[0][0]))
assert cast_fact_source_count == cast_fact_dest_count

#### Existence

##### Films in Facts

In [None]:
%%sql films_not_in_facts <<
select df.film_id
from dim_films as df
  inner join dim_reviews as dr on df.film_id = dr.film_id
where df.film_id not in (select f.film_id from fact_film_review_sentiments as f)

In [None]:
assert not films_not_in_facts

##### Reviews in Facts

In [None]:
%%sql reviews_not_in_facts <<
select dr.review_id
from dim_reviews as dr
where dr.review_id not in (select f.review_id from fact_film_review_sentiments as f)

In [None]:
assert not reviews_not_in_facts

##### Cast in Facts

In [None]:
%%sql cast_not_in_facts <<
select dc.cast_id
from dim_cast as dc
  inner join dim_reviews as dr on dc.film_id = dr.film_id
where dc.cast_id not in (select f.cast_id from fact_cast_review_sentiments as f)

In [None]:
assert not cast_not_in_facts

#### Ranges

##### Sentiment Classes

In [None]:
%%sql sentiment_classes <<
select distinct review_sentiment_class
from fact_film_review_sentiments
union
select distinct review_sentiment_class
from fact_cast_review_sentiments

In [None]:
print(set([ x[0] for x in sentiment_classes ]))
assert set([ x[0] for x in sentiment_classes ]) == set([-1, 1])

## Analysis

### Top 10 Films

In [None]:
%%sql
select df.title, sum(f.review_sentiment_class) as sentiment
from fact_film_review_sentiments as f
  inner join dim_films as df on f.film_id = df.film_id
  inner join dim_reviews as dr on f.review_id = dr.review_id
group by df.title
order by sentiment desc
limit 10;

### 10 Worst Films

In [None]:
%%sql
select df.title, sum(f.review_sentiment_class) as sentiment
from fact_film_review_sentiments as f
  inner join dim_films as df on f.film_id = df.film_id
  inner join dim_reviews as dr on f.review_id = dr.review_id
group by df.title
order by sentiment asc
limit 10;

### Top 10 Actors in Films with Best Reviews

In [None]:
%%sql
select dc.full_name, sum(f.review_sentiment_class) as sentiment
from fact_cast_review_sentiments as f
  inner join dim_cast as dc on f.cast_id = dc.cast_id
  inner join dim_reviews as dr on f.review_id = dr.review_id
group by dc.full_name
order by sentiment desc
limit 10;

### Sentiment Over Time

In [None]:
%%sql
select dt.year, sum(f.review_sentiment_class) as sentiment
from fact_film_review_sentiments as f
    inner join dim_dates as dt on f.date_id = dt.date_id
group by dt.year
order by dt.year asc;

## Compilation

### Sentiment Normalization

In [None]:
sentiment_scale = 2.5
def sentiment_normalizer(max_sentiment):
    return lambda x: round(sentiment_scale + (sentiment_scale * x / max_sentiment), 2)

### Sentiment Per Films

In [None]:
%%sql film_sentiments <<
select df.title, sum(f.review_sentiment_class) as sentiment
from fact_film_review_sentiments as f
  inner join dim_films as df on f.film_id = df.film_id
group by df.title
order by sentiment desc;

In [None]:
df_films = pd.DataFrame(film_sentiments, columns=['title', 'sentiment'])
df_films = df_films.set_index('title')
df_films.describe()

In [None]:
df_films_normalizer = sentiment_normalizer(max(df_films.sentiment))
df_films['normalized_sentiment'] = df_films['sentiment'].map(df_films_normalizer)
df_films.head()

### Sentiment Per Actor

In [None]:
%%sql cast_sentiments <<
select dc.full_name, sum(f.review_sentiment_class) as sentiment
from fact_cast_review_sentiments f
  inner join dim_cast as dc on f.cast_id = dc.cast_id
group by dc.full_name
order by sentiment desc;

In [None]:
df_cast = pd.DataFrame(cast_sentiments, columns=['full_name', 'sentiment'])
df_cast = df_cast.set_index('full_name')
df_cast_normalizer = sentiment_normalizer(max(df_cast.sentiment))
df_cast['normalized_sentiment'] = df_cast['sentiment'].map(df_cast_normalizer)
df_cast.head()

### Sentiment per Year

In [None]:
%%sql year_sentiments <<
select dt.year, sum(f.review_sentiment_class) as sentiment
from fact_film_review_sentiments as f
    inner join dim_dates as dt on f.date_id = dt.date_id
group by dt.year
order by dt.year asc;

In [None]:
df_year = pd.DataFrame(year_sentiments, columns=['year', 'sentiment'])
df_year = df_year.set_index('year')
df_year.transpose()

## Publishing

### Title

In [None]:
from datetime import datetime

In [None]:
year  = str(datetime.now().year).rjust(4, '0')
month = str(datetime.now().month).rjust(2, '0')
day   = str(datetime.now().day).rjust(2, '0')
title = f'TMDb, Film Review Sentiment Analysis ({year}-{month}-{day})'
title

### Chart: Review Distribution per Films

In [None]:
film_review_distro_path = './images/film_review_distro_fig.png'
film_review_distro_fig = df_films[['sentiment']] \
    .plot \
    .density(bw_method=1, grid=True, figsize=(7, 3)) \
    .get_figure()
film_review_distro_fig.savefig(film_review_distro_path, format='png')

### Chart: Sentiment over Time

In [None]:
year_review_distro_path = './images/year_review_distro_fig.png'
year_review_distro_fig = df_year \
    .plot \
    .bar(grid=True, figsize=(7, 4)) \
    .get_figure()
year_review_distro_fig.savefig(year_review_distro_path, format='png')

### Table: Top 10 Films

In [None]:
top_10_films= df_films[['normalized_sentiment']].head(10)
top_10_films

### Table: Worst 10 Films

In [None]:
worst_10_films= df_films[['normalized_sentiment']] \
    .tail(10) \
    .sort_values(by='normalized_sentiment', ascending=True)
worst_10_films

### Table: Top 10 Cast

In [None]:
top_10_cast = df_cast[['normalized_sentiment']].head(10)
top_10_cast

### PDF

In [None]:
from reportlab.lib.units import cm
from reportlab.lib.utils import ImageReader
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import SimpleDocTemplate, Paragraph, Image, Table, TableStyle, Spacer, PageBreak
from reportlab.lib.colors import black

doc = SimpleDocTemplate('report.pdf', pagesize=A4, rightMargin=cm, leftMargin=cm, topMargin=cm, bottomMargin=cm)
doc.title = title
width, height = A4

style_title = getSampleStyleSheet()["title"]
style_h1 = getSampleStyleSheet()["h1"]
style_normal = getSampleStyleSheet()["bu"]
style_grid = TableStyle([
    ('GRID', (0, 0), (-1, -1), 1, black),
    ('ALIGN', (1, 0), (-1, -1), 'RIGHT')])

br = Spacer(width, 20)

elements = []
elements.append(Paragraph(title, style=style_title))
elements.append(Image('./images/header.png', width-(2*cm), 220))
elements.append(br)

elements.append(Paragraph('Executive Summary', style=style_h1))
elements.append(Paragraph(f'The top film in our database, accorindg to TMDB reviews is <strong>{df_films.head(1).index[0]}</strong>', style=style_normal))
elements.append(br)

elements.append(Paragraph('Top 10 Films', style=style_h1))
elements.append(Paragraph('Here are the top 10 films in our database, according to the sentiment found in the TMDb reviews, ranging from 0 (negative) to 5 (positive).', style=style_normal))
elements.append(Table(top_10_films.copy().reset_index().to_numpy().tolist(), style=style_grid))
elements.append(br)

elements.append(Paragraph('Worst 10 Films', style=style_h1))
elements.append(Paragraph('Here are the worst 10 films in our database, according to the sentiment found in the TMDb reviews, ranging from 0 (negative) to 5 (positive).', style=style_normal))
elements.append(Table(worst_10_films.copy().reset_index().to_numpy().tolist(), style=style_grid))
elements.append(br)

elements.append(Paragraph('Review Sentiment Distibution', style=style_h1))
elements.append(Image(film_review_distro_path))
elements.append(br)

elements.append(Paragraph('Top 10 Actors/Actresses in Best Reviewed Films', style=style_h1))
elements.append(Paragraph('The ranking bellow is of actors that worked in films with positive reviews. Reviews are not made directly to actors, but to their films', style=style_normal))
elements.append(Table(top_10_cast.copy().reset_index().to_numpy().tolist(), style=style_grid))
elements.append(br)

elements.append(Paragraph('IMDb Average Voting vs TMDb Sentiment Reviews', style=style_h1))
elements.append(Image(year_review_distro_path))

doc.build(elements)