# Extraction-Transform-Load Notebook

This notebook steps will help to access the gz raw datasets from IMDB, store them in the 'extracted_files' folder, filter them and store them as a sql database file

## Extraction Steps

In [1]:
import os
import gzip
import pandas as pd
import numpy as np
import sqlite3

In [2]:
base_folder = os.getcwd()[:-10]
data_sources = os.path.join(base_folder, 'data_sources')
raw_files = os.path.join(data_sources, 'raw_files')
extracted_files = os.path.join(data_sources, 'extracted_files')

In [3]:
files_zipped =['title.ratings.tsv.gz', 'title.basics.tsv.gz', 'title.crew.tsv.gz', 'name.basics.tsv.gz']

In [4]:
# First check if there is already data downloaded
files_in_folder = os.listdir(extracted_files)

if 'title.ratings.tsv' in files_in_folder and 'title.basics.tsv' in files_in_folder \
    and 'title.crew.tsv' in files_in_folder and 'name.basics.tsv' in files_in_folder:
    df_ratings = pd.read_csv(os.path.join(extracted_files, 'title.ratings.tsv'), sep= '\t')
    df_basics = pd.read_csv(os.path.join(extracted_files, 'title.basics.tsv'), sep= '\t')
    df_crew = pd.read_csv(os.path.join(extracted_files, 'title.crew.tsv'), sep= '\t')
    df_names = pd.read_csv(os.path.join(extracted_files, 'name.basics.tsv'), sep= '\t')

 # If the data has not been downloaded, perform extraction
else:
    for file in files_zipped:
        # Extract zipped content
        gzipped_file_path = os.path.join(base_folder, raw_files, file)

        with gzip.open(gzipped_file_path, 'rt') as gzipped_file:
        # Specify the path to the extracted TSV file (remove '.gz' extension)
            extracted_file_path = os.path.join(extracted_files, file[:-3])

        # Write the extracted content to the TSV file
            with open(extracted_file_path, 'w') as extracted_file:
                extracted_file.write(gzipped_file.read())

            if file == 'title.ratings.tsv.gz':
                df_ratings = pd.read_csv(extracted_file_path, sep='\t')
            elif file == 'title.crew.tsv.gz':
                df_crew = pd.read_csv(extracted_file_path, sep='\t')
            elif file == 'name.basics.tsv.gz':
                df_names = pd.read_csv(extracted_file_path, sep='\t')
            else:
                df_basics = pd.read_csv(extracted_file_path, sep='\t')


  df_basics = pd.read_csv(os.path.join(extracted_files, 'title.basics.tsv'), sep= '\t')


## Cleaning Steps

### Basics File

In [5]:
df_basics.head()

Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,\N,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,\N,5,"Animation,Short"
2,tt0000003,short,Pauvre Pierrot,Pauvre Pierrot,0,1892,\N,4,"Animation,Comedy,Romance"
3,tt0000004,short,Un bon bock,Un bon bock,0,1892,\N,12,"Animation,Short"
4,tt0000005,short,Blacksmith Scene,Blacksmith Scene,0,1893,\N,1,"Comedy,Short"


In [6]:
# Consider movies
df_basics = df_basics[df_basics['titleType']=='movie']

In [7]:
# Drop Original Title and end year
df_basics.drop(columns=['originalTitle', 'endYear'], inplace= True)

In [8]:
# Drop rows with Start year, run time minutes and genres null
df_basics = df_basics[df_basics['startYear']!="\\N"]
df_basics = df_basics[df_basics['runtimeMinutes']!="\\N"]
df_basics = df_basics[df_basics['genres']!="\\N"]

In [9]:
# Convert startYear, runtimeMinutes  columns to integer
df_basics['startYear']=df_basics['startYear'].astype('int32')
df_basics['runtimeMinutes']=df_basics['runtimeMinutes'].astype('int32')

In [10]:
# Consider just movies filmed between 1970 and 2022
df_basics= df_basics[(df_basics['startYear']>1970) & (df_basics['startYear']<2023)]

In [44]:
# change /N to NaN
df_basics.replace('\\N', np.nan, inplace=True)
# rename column
df_basics.rename(columns={'tconst': 'title_id'}, inplace=True)
# remove nulls in names
df_basics = df_basics[df_basics['primaryTitle'].notna()]

### Linking tables (many to many relationship)

In [12]:
# 1. Create a DataFrame for the linking table (title_id, genre_name)
df_movie_genres_exploded = df_basics[['title_id', 'genres']].copy()

df_movie_genres_exploded['genres_list'] = df_movie_genres_exploded['genres'].apply(lambda x: x.split(','))
df_movie_genres_exploded = df_movie_genres_exploded.explode('genres_list')
df_movie_genres_exploded = df_movie_genres_exploded.rename(columns={'genres_list': 'genre_name'})
df_movie_genres_exploded = df_movie_genres_exploded[['title_id', 'genre_name']]
df_movie_genres_exploded['genre_name'] = df_movie_genres_exploded['genre_name'].str.strip()
df_movie_genres_exploded.reset_index(drop=True, inplace=True)


In [13]:
df_movie_genres_exploded.head()

Unnamed: 0,title_id,genre_name
0,tt0013274,Documentary
1,tt0015724,Drama
2,tt0015724,Mystery
3,tt0015724,Romance
4,tt0028248,Musical


In [14]:
# Create a DataFrame for unique genres (genre_id, genre_name)
unique_genres = df_movie_genres_exploded['genre_name'].unique()
df_genres_table = pd.DataFrame(unique_genres, columns=['genre_name'])

In [15]:
df_genres_table.head()

Unnamed: 0,genre_name
0,Documentary
1,Drama
2,Mystery
3,Romance
4,Musical


In [16]:
df_basics.drop(columns=['genres'], inplace=True)

In [17]:
# For cases with more than one genre, keep the first
#df_basics['genres'] = df_basics['genres'].apply(lambda x: x.split(',')[0] if ',' in str(x) else x)

### Crew File

In [18]:
df_crew.head()

Unnamed: 0,tconst,directors,writers
0,tt0000001,nm0005690,\N
1,tt0000002,nm0721526,\N
2,tt0000003,nm0721526,\N
3,tt0000004,nm0721526,\N
4,tt0000005,nm0005690,\N


In [19]:
# Drop Writers
df_crew.drop(columns=['writers'], inplace= True)

In [20]:
# change /N to NaN
df_crew.replace('\\N', np.nan, inplace=True)

In [21]:
df_crew.rename(columns={'directors': 'name_id',
                        'tconst': 'title_id'}, inplace=True)

In [None]:
# modify to get a df with titles duplicated for each director
df_crew['name_id'] = df_crew['name_id'].astype(str)
df_crew['name_id_list'] = df_crew['name_id'].apply(lambda x: x.split(','))


df_crew = df_crew.explode('name_id_list').drop(columns=['name_id'])
df_crew.rename(columns={'name_id_list': 'name_id'}, inplace=True)

# Ensure director IDs are stripped of whitespace if any
df_crew['name_id'] = df_crew['name_id'].str.strip()

df_crew.reset_index(drop=True, inplace=True)

In [23]:
df_crew.head()

Unnamed: 0,title_id,name_id
0,tt0000001,nm0005690
1,tt0000002,nm0721526
2,tt0000003,nm0721526
3,tt0000004,nm0721526
4,tt0000005,nm0005690


### Names File

In [24]:
df_names.head()

Unnamed: 0,nconst,primaryName,birthYear,deathYear,primaryProfession,knownForTitles
0,nm0000001,Fred Astaire,1899,1987,"soundtrack,actor,miscellaneous","tt0027125,tt0050419,tt0053137,tt0072308"
1,nm0000002,Lauren Bacall,1924,2014,"actress,soundtrack","tt0117057,tt0075213,tt0038355,tt0037382"
2,nm0000003,Brigitte Bardot,1934,\N,"actress,soundtrack,music_department","tt0049189,tt0054452,tt0056404,tt0057345"
3,nm0000004,John Belushi,1949,1982,"actor,soundtrack,writer","tt0080455,tt0072562,tt0077975,tt0078723"
4,nm0000005,Ingmar Bergman,1918,2007,"writer,director,actor","tt0050986,tt0069467,tt0083922,tt0050976"


In [25]:
# Rename column of directors for later merge
df_names = df_names.rename(columns={'nconst':'name_id'})

In [26]:
df_names.drop(columns=['deathYear', 'primaryProfession', 'knownForTitles'], inplace= True)

In [39]:
# change /N to NaN
df_names.replace('\\N', np.nan, inplace=True)

# drop null names
df_names = df_names[df_names['primaryName'].notna()]

In [40]:
df_names.head()

Unnamed: 0,name_id,primaryName,birthYear
0,nm0000001,Fred Astaire,1899
1,nm0000002,Lauren Bacall,1924
2,nm0000003,Brigitte Bardot,1934
3,nm0000004,John Belushi,1949
4,nm0000005,Ingmar Bergman,1918


### Ratings File

In [29]:
df_ratings.head()

Unnamed: 0,tconst,averageRating,numVotes
0,tt0000001,5.7,2014
1,tt0000002,5.7,270
2,tt0000003,6.5,1937
3,tt0000004,5.5,178
4,tt0000005,6.2,2712


In [30]:
# change /N to NaN
df_ratings.replace('\\N', np.nan, inplace=True)
# rename column
df_ratings.rename(columns={'tconst': 'title_id'}, inplace=True)

## Creating SQL tables

In [None]:
# defining a path and creating sqllite connection
db_directory = os.path.join(base_folder,'data')
db_path_relative = os.path.join(db_directory, 'imdb_movies.db')


Base de datos creada/conectada en: /Users/leonardomichelramirez/code/PersonalProjects/ML-Movie_recommendator_v2/data/imdb_movies.db


In [47]:
conn = sqlite3.connect(db_path_relative)
print(f"Base de datos creada/conectada en: {os.path.abspath(db_path_relative)}")
cursor = conn.cursor()

sql_schema_query = """
DROP TABLE IF EXISTS names;

CREATE TABLE names (
    name_id VARCHAR(50) NOT NULL,
    primaryName VARCHAR(256) NOT NULL,
    birthYear INT,
    PRIMARY KEY (name_id)
);

DROP TABLE IF EXISTS genres;

CREATE TABLE genres (
    genre_name VARCHAR(100) NOT NULL,
    PRIMARY KEY (genre_name)
);

DROP TABLE IF EXISTS titles;

CREATE TABLE titles (
    title_id VARCHAR(50) NOT NULL,
    titleType VARCHAR(50) NOT NULL,
    primaryTitle VARCHAR(300) NOT NULL,
    isAdult INT,
    startYear INT,
    runtimeMinutes INT,
    PRIMARY KEY (title_id)
);

DROP TABLE IF EXISTS crew;

CREATE TABLE crew (
    title_id VARCHAR(50) NOT NULL,
    name_id VARCHAR(50) NOT NULL,
    PRIMARY KEY (title_id, name_id),
    FOREIGN KEY (title_id) REFERENCES titles (title_id),
    FOREIGN KEY (name_id) REFERENCES names (name_id)
);

DROP TABLE IF EXISTS ratings;

CREATE TABLE ratings (
    title_id VARCHAR(50) NOT NULL,
    averageRating FLOAT NOT NULL,
    numVotes INT NOT NULL,
    PRIMARY KEY (title_id),
    FOREIGN KEY (title_id) REFERENCES titles (title_id)
);


DROP TABLE IF EXISTS titles_genres;

CREATE TABLE titles_genres (
    title_id VARCHAR(50) NOT NULL,
    genre_name VARCHAR(100) NOT NULL,
    PRIMARY KEY (title_id, genre_name),
    FOREIGN KEY (title_id) REFERENCES titles (title_id),
    FOREIGN KEY (genre_name) REFERENCES genres (genre_name)
);


"""

try:
    cursor.executescript(sql_schema_query)
    conn.commit()
    print("Tablas creadas/recreadas correctamente.")
except sqlite3.Error as e:
    print(f"Error al crear tablas: {e}")
    conn.close()
    exit()

Base de datos creada/conectada en: /Users/leonardomichelramirez/code/PersonalProjects/ML-Movie_recommendator_v2/data/imdb_movies.db
Tablas creadas/recreadas correctamente.


## Data Insertion

In [48]:
try:
    # 1. Insertar en 'names' (tabla principal)
    df_names.to_sql('names', conn, if_exists='append', index=False)
    print("Datos insertados en la tabla 'names'.")

    # 2. Insertar en 'genres' (tabla principal)
    df_genres_table.to_sql('genres', conn, if_exists='append', index=False)
    print("Datos insertados en la tabla 'genres'.")

    # 3. Insertar en 'titles' (tabla principal)
    df_basics.to_sql('titles', conn, if_exists='append', index=False)
    print("Datos insertados en la tabla 'titles'.")

    # 4. Insertar en 'crew' (tabla de enlace, depende de 'names' y 'titles')
    df_crew.to_sql('crew', conn, if_exists='append', index=False)
    print("Datos insertados en la tabla 'crew'.")

    # 5. Insertar en 'titles_genres' (tabla de enlace, depende de 'titles' y 'genres')
    df_movie_genres_exploded.to_sql('titles_genres', conn, if_exists='append', index=False)
    print("Datos insertados en la tabla 'titles_genres'.")

    # 6. Insertar en 'ratings' (tabla dependiente, depende de 'titles')
    df_ratings.to_sql('ratings', conn, if_exists='append', index=False)
    print("Datos insertados en la tabla 'ratings'.")

    # Confirmar todas las inserciones
    conn.commit()
    print("\n¡Todos los datos han sido insertados y confirmados en la base de datos!")

except sqlite3.Error as e:
    print(f"\n¡Error durante la inserción de datos! Error: {e}")
    conn.rollback() # Revierte cualquier cambio si hubo un error
finally:
    conn.close()
    print("Conexión a la base de datos cerrada.")

Datos insertados en la tabla 'names'.
Datos insertados en la tabla 'genres'.
Datos insertados en la tabla 'titles'.
Datos insertados en la tabla 'crew'.
Datos insertados en la tabla 'titles_genres'.
Datos insertados en la tabla 'ratings'.

¡Todos los datos han sido insertados y confirmados en la base de datos!
Conexión a la base de datos cerrada.


## Test Queries

In [49]:
conn = sqlite3.connect(db_path_relative)
print(f"Base de datos creada/conectada en: {os.path.abspath(db_path_relative)}")
cursor = conn.cursor()

cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall() # Obtiene todos los resultados

Base de datos creada/conectada en: /Users/leonardomichelramirez/code/PersonalProjects/ML-Movie_recommendator_v2/data/imdb_movies.db


In [50]:
tables

[('names',),
 ('genres',),
 ('titles',),
 ('crew',),
 ('ratings',),
 ('titles_genres',)]

In [51]:
test_query = """
select *
from titles t
limit 10;
"""

df_titles_sample = pd.read_sql_query(test_query, conn)

In [52]:
df_titles_sample

Unnamed: 0,title_id,titleType,primaryTitle,isAdult,startYear,runtimeMinutes
0,tt0013274,movie,Istoriya grazhdanskoy voyny,0,2021,94
1,tt0015724,movie,Dama de noche,0,1993,102
2,tt0028248,movie,Shipmates o' Mine,0,2022,87
3,tt0035423,movie,Kate & Leopold,0,2001,118
4,tt0036606,movie,"Another Time, Another Place",0,1983,118
5,tt0038687,movie,Let There Be Light,0,1980,58
6,tt0039442,movie,"Habla, mudita",0,1973,88
7,tt0044952,movie,Nagarik,0,1977,127
8,tt0054724,movie,I Eat Your Skin,0,1971,92
9,tt0057461,movie,La rosa de los vientos,0,1983,84
