# Libraries

In [1]:
import pandas as pd
import numpy as np
import re
import os
import json
import chardet
from pathlib import Path
# libraries to connect and upload with mysql
import pymysql
from sqlalchemy import create_engine

# Data collection

In [12]:
# This function maintains all the considerations for instantiating the path of each file
def importFiles(list):
    dirPath = r"../data/raw"
    
    for path in list: 
            path = os.path.join(dirPath, path)
            
            if Path(path).suffix == ".csv":         
                f = open(path,"rb")
                result = chardet.detect(f.read())
                df = pd.read_csv(path, encoding = result["encoding"], sep = None, engine = "python", decimal = ".")

            if Path(path).suffix == ".json" or Path(path).suffix == ".js":
                df = pd.read_json(path, precise_float = True)
    
            return df

In [11]:
# Find the file path
dirPath = r"../data/raw"
list_amazon = []
list_disney = []
list_hulu = []
list_netflix = []

for f in os.listdir(dirPath):
    if os.path.isfile(os.path.join(dirPath, f)):
        if "amazon" in f:
            list_amazon.append(f)
        elif "disney" in f:
            list_disney.append(f)
        elif "hulu" in f:
            list_hulu.append(f)
        elif "netflix" in f:
            list_netflix.append(f)

In [13]:
# We bring the files transformed into dataframes
amazon = importFiles(list_amazon)
disney = importFiles(list_disney)
hulu = importFiles(list_hulu)
netflix = importFiles(list_netflix)

In [14]:
# We create a new column in each table with its source name
amazon["platform"] = "amazon"
disney["platform"] = "disney"
hulu["platform"] = "hulu"
netflix["platform"] = "netflix"

In [15]:
# We concatenate the dataframes into one
df_stream = amazon
df_stream = pd.concat([df_stream, disney])
df_stream = pd.concat([df_stream, hulu])
df_stream = pd.concat([df_stream, netflix])

# Data Exploratory

In [16]:
df_stream.head(3)

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description,platform
0,s1,Movie,The Grand Seduction,Don McKellar,"Brendan Gleeson, Taylor Kitsch, Gordon Pinsent",Canada,"March 30, 2021",2014,,113 min,"Comedy, Drama",A small fishing village must procure a local d...,amazon
1,s2,Movie,Take Care Good Night,Girish Joshi,"Mahesh Manjrekar, Abhay Mahajan, Sachin Khedekar",India,"March 30, 2021",2018,13+,110 min,"Drama, International",A Metro Family decides to fight a Cyber Crimin...,amazon
2,s3,Movie,Secrets of Deception,Josh Webber,"Tom Sizemore, Lorenzo Lamas, Robert LaSardo, R...",United States,"March 30, 2021",2017,,74 min,"Action, Drama, Suspense",After a man discovers his wife is cheating on ...,amazon


In [17]:
df_stream.info()

<class 'pandas.core.frame.DataFrame'>
Index: 22998 entries, 0 to 8806
Data columns (total 13 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   show_id       22998 non-null  object
 1   type          22998 non-null  object
 2   title         22998 non-null  object
 3   director      14738 non-null  object
 4   cast          17677 non-null  object
 5   country       11499 non-null  object
 6   date_added    13444 non-null  object
 7   release_year  22998 non-null  int64 
 8   rating        22134 non-null  object
 9   duration      22516 non-null  object
 10  listed_in     22998 non-null  object
 11  description   22994 non-null  object
 12  platform      22998 non-null  object
dtypes: int64(1), object(12)
memory usage: 2.5+ MB


In [18]:
df_stream.isnull().sum()

show_id             0
type                0
title               0
director         8260
cast             5321
country         11499
date_added       9554
release_year        0
rating            864
duration          482
listed_in           0
description         4
platform            0
dtype: int64

### I have the hypothesis that in the "rating" column there is data that belongs to the "duration" column.

In [19]:
# If the value in 'duration' is empty (null) and the value in 'rating' contains "min" or "Season", then 'duration' is set to the value of 'rating'. Otherwise, 'duration' stays the same.
df_stream['duration'] = df_stream.apply(lambda row: row['rating'] if ((pd.isnull(row['duration'])) & (("min" in str(row['rating'])) or ("Season" in str(row['rating'])))) else row['duration'], axis=1)

In [20]:
df_stream.isnull().sum()

show_id             0
type                0
title               0
director         8260
cast             5321
country         11499
date_added       9554
release_year        0
rating            864
duration          187
listed_in           0
description         4
platform            0
dtype: int64

As you can see, the nulls for "duration" decreased from 482 to 187.

# Data preprocessing (Feature Enginering)

In [21]:
# We eliminate the columns that we don't require for this exercise
df_stream = df_stream.drop(["show_id", "director", "country", "date_added", "rating", "description"], axis=1)

In [22]:
# we restore the indices
df_stream = df_stream.reset_index(drop = True)
df_stream.head(3)

Unnamed: 0,type,title,cast,release_year,duration,listed_in,platform
0,Movie,The Grand Seduction,"Brendan Gleeson, Taylor Kitsch, Gordon Pinsent",2014,113 min,"Comedy, Drama",amazon
1,Movie,Take Care Good Night,"Mahesh Manjrekar, Abhay Mahajan, Sachin Khedekar",2018,110 min,"Drama, International",amazon
2,Movie,Secrets of Deception,"Tom Sizemore, Lorenzo Lamas, Robert LaSardo, R...",2017,74 min,"Action, Drama, Suspense",amazon


In [23]:
# We separate the "duration" column into two columns. One with the numerical value and the other with the string
separate = df_stream.duration.str.split(" ", n=1, expand=True)
df_stream["duration"] = separate[0]
df_stream["type_duration"] = separate[1]

In [24]:
# We check the remaining nulls
df_stream.isnull().sum()

type                0
title               0
cast             5321
release_year        0
duration          187
listed_in           0
platform            0
type_duration     187
dtype: int64

In [25]:
# replace null with "undefined"
df_stream = df_stream.fillna("undefined")

In [26]:
# In the "duration" column it converts the values ​​to numbers, and values ​​that cannot be converted are replaced with NaN
df_stream["duration"] = pd.to_numeric(df_stream["duration"], errors="coerce")
# We replace nulls with 0
df_stream["duration"] = df_stream["duration"].fillna(0)
# We change from float to int format
df_stream["duration"] = df_stream["duration"].astype(int)

In [27]:
# We create an id column with the index
df_stream["idStream"] = df_stream.index

In [28]:
# We organize the columns
df_stream= df_stream.reindex(columns = ["idStream", "type", "title", "cast", "release_year", "duration", "type_duration", "listed_in", "platform"])

In [29]:
# We eliminate blank spaces in case they find
df_stream["type"] = df_stream["type"].str.strip()
df_stream["title"] = df_stream["title"].str.strip()
df_stream["type_duration"] = df_stream["type_duration"].str.strip()

In [30]:
# Finally we check for duplicate data
duplicated = df_stream[df_stream.duplicated() == True].count()
print(duplicated)

idStream         0
type             0
title            0
cast             0
release_year     0
duration         0
type_duration    0
listed_in        0
platform         0
dtype: int64


## We separate the dataframe between "actors" and "gender" for future queries in SQL

### **Actors**

In [31]:
df_actors = df_stream[["idStream", "cast"]].drop_duplicates()

In [32]:
# We create a list with our values ​​separated by comma
df_actors["cast"] = df_actors["cast"].str.split(",")
# We separate these values ​​into a column, each value with the idStream
df_actors = df_actors.explode("cast")

In [33]:
# We eliminate blank spaces in case they find
df_actors["cast"] = df_actors["cast"].str.strip()

In [34]:
df_actors.head(6)

Unnamed: 0,idStream,cast
0,0,Brendan Gleeson
0,0,Taylor Kitsch
0,0,Gordon Pinsent
1,1,Mahesh Manjrekar
1,1,Abhay Mahajan
1,1,Sachin Khedekar


In [35]:
df_actors.info()

<class 'pandas.core.frame.DataFrame'>
Index: 119713 entries, 0 to 22997
Data columns (total 2 columns):
 #   Column    Non-Null Count   Dtype 
---  ------    --------------   ----- 
 0   idStream  119713 non-null  int64 
 1   cast      119713 non-null  object
dtypes: int64(1), object(1)
memory usage: 2.7+ MB


### **Gender**

In [36]:
df_gender = df_stream[["idStream", "listed_in"]].drop_duplicates()

In [37]:
# We create a list with our values ​​separated by comma
df_gender["listed_in"] = df_gender["listed_in"].str.split(",")
# We separate these values ​​into a column, each value with the idStream
df_gender = df_gender.explode("listed_in")

In [38]:
# We eliminate blank spaces in case they find
df_gender["listed_in"] = df_gender["listed_in"].str.strip()

In [39]:
# We modify the name "listed_in" to "genre"
df_gender.rename(columns = {"listed_in": "genre"}, inplace = True)

In [40]:
df_gender.head(4)

Unnamed: 0,idStream,genre
0,0,Comedy
0,0,Drama
1,1,Drama
1,1,International


In [41]:
df_gender.info()

<class 'pandas.core.frame.DataFrame'>
Index: 48303 entries, 0 to 22997
Data columns (total 2 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   idStream  48303 non-null  int64 
 1   genre     48303 non-null  object
dtypes: int64(1), object(1)
memory usage: 1.1+ MB


## We make the last modifications to df_stream

In [42]:
# We modify the name "type" to "category"
df_stream.rename(columns = {"type": "category"}, inplace = True)

In [43]:
# We eliminate some unnecessary columns in this exercise
df_stream = df_stream.drop(["cast", "listed_in"], axis = 1)

In [44]:
df_stream.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 22998 entries, 0 to 22997
Data columns (total 7 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   idStream       22998 non-null  int64 
 1   category       22998 non-null  object
 2   title          22998 non-null  object
 3   release_year   22998 non-null  int64 
 4   duration       22998 non-null  int64 
 5   type_duration  22998 non-null  object
 6   platform       22998 non-null  object
dtypes: int64(3), object(4)
memory usage: 1.2+ MB


In [45]:
df_stream.head(3)

Unnamed: 0,idStream,category,title,release_year,duration,type_duration,platform
0,0,Movie,The Grand Seduction,2014,113,min,amazon
1,1,Movie,Take Care Good Night,2018,110,min,amazon
2,2,Movie,Secrets of Deception,2017,74,min,amazon


# We connect and upload to the database

In [46]:
engine = create_engine("mysql+pymysql://root:1711999@localhost:33061/StreamETL_DB")

# We are trying to connect to the database
try:
    connection = engine.connect()
    print("Successful connection")
    connection.close()
except Exception as e:
    print(f"Connection error: {e}")

Successful connection


In [74]:
# We load the dataframes through sqlalchemy
df_stream.to_sql("streaming", con = engine, index = False, if_exists = 'append')
df_gender.to_sql("gender", con = engine, index = False, if_exists = 'append')
df_actors.to_sql("actors", con = engine, index = False, if_exists = 'append')

119713