ETL

In [1]:

# Se importan las librerias a utilizar
import pandas as pd
import sqlalchemy as db
from sqlalchemy import text
from pymongo import MongoClient


In [65]:
#Creamos la conexion a Base datos transaccional de netflix para cargar los datos 
engine = db.create_engine("mysql://root:root@172.16.5.4:3310/db_movies_netflix_transact")
conn = engine.connect()

# Cargamos datos a la dimension Movie

In [66]:
#Hacemos un query donde traemos informacion de la peliculas guardadas en la base de datos transaccional de Netflix
#Asignamos un nombre para la tabla donde almacenaremoslos datos tomados de la base de datos 
#Agrupamos los datos 

query = """
SELECT 
    movie.movieID as movieID, movie.movieTitle as title, movie.releaseDate as releaseDate, 
    gender.name as gender , person.name as participantName, participant.participantRole as roleparticipant 
FROM movie 
INNER JOIN participant 
ON movie.movieID=participant.movieID
INNER JOIN person
ON person.personID = participant.personID
INNER JOIN movie_gender 
ON movie.movieID = movie_gender.movieID
INNER JOIN gender 
ON movie_gender.genderID = gender.genderID
"""

In [67]:
#Leemos los datos que obtuvimps del query anterior 
movies_data=pd.read_sql(query, con=conn) #Hacemos la lactura del query 
movies_data["movieID"]=movies_data["movieID"].astype('int') #Cambiamos el tipo de datos de "MovieID" para que sean de tipo entero
movies_data

Unnamed: 0,movieID,title,releaseDate,gender,participantName,roleparticipant
0,80192187,Triple Frontier,2019-04-12,Action,Joseph Chavez Pineda,Actor
1,80210920,The Mother,2023-01-05,Drama,Maria Alejandra Navarro,Actor
2,81157374,Run,2021-05-21,Adventure,aria Lopez Gutierrez,Director


In [68]:
#Leemos datos de un archivo csv 
movies_award=pd.read_csv("./data/Awards_movie.csv") #creamos un archivo y jalamos la informacion en el archivo .csv
movies_award["movieID"]=movies_award["movieID"].astype('int') #Cambio el tipo de dato de MovieID a tipo INT
movies_award.rename(columns={"Aware":"Award"}, inplace=True) #renombramos la columna "Aware" con el nombre de "Award" 
movies_award

Unnamed: 0,movieID,IdAward,Award
0,80210920,0,Oscar
1,81157374,1,Grammy
2,80192187,2,Oscar


In [69]:
#Con los dos tablas obtenidas de diferentes fuentes creamos un Merge entre las tablas 
movie_data=pd.merge(movies_data,movies_award, left_on="movieID", right_on="movieID") 
movie_data

Unnamed: 0,movieID,title,releaseDate,gender,participantName,roleparticipant,IdAward,Award
0,80192187,Triple Frontier,2019-04-12,Action,Joseph Chavez Pineda,Actor,2,Oscar
1,80210920,The Mother,2023-01-05,Drama,Maria Alejandra Navarro,Actor,0,Oscar
2,81157374,Run,2021-05-21,Adventure,aria Lopez Gutierrez,Director,1,Grammy


In [71]:
#Creamos una nueva conexion con el warehouse dw_Netflix para guardar informacion en mysql.
 
engine = db.create_engine("mysql://root:root@172.16.5.4:3310/dw_netflix")
conn = engine.connect()

In [72]:
#Renombramos las columnas como la tablas que se encuentran en dw_netflix para evitar que nos de errores 
movie_data = movie_data.rename(columns={'releaseDate': 'releaseMovie', 'Award': 'awardMovie'})
movie_data

Unnamed: 0,movieID,title,releaseMovie,gender,participantName,roleparticipant,IdAward,awardMovie
0,80192187,Triple Frontier,2019-04-12,Action,Joseph Chavez Pineda,Actor,2,Oscar
1,80210920,The Mother,2023-01-05,Drama,Maria Alejandra Navarro,Actor,0,Oscar
2,81157374,Run,2021-05-21,Adventure,aria Lopez Gutierrez,Director,1,Grammy


In [73]:
#Eliminamos la columna IdAward
movie_data = movie_data.drop(columns=['IdAward'])

In [74]:
movie_data

Unnamed: 0,movieID,title,releaseMovie,gender,participantName,roleparticipant,awardMovie
0,80192187,Triple Frontier,2019-04-12,Action,Joseph Chavez Pineda,Actor,Oscar
1,80210920,The Mother,2023-01-05,Drama,Maria Alejandra Navarro,Actor,Oscar
2,81157374,Run,2021-05-21,Adventure,aria Lopez Gutierrez,Director,Grammy


In [19]:
#Insertamos los registro en la Dimension Movie de la BD de Netflix 
movie_data.to_sql('dimMovie',conn, if_exists='append', index=False)

IntegrityError: (MySQLdb.IntegrityError) (1062, "Duplicate entry '80192187' for key 'dimMovie.PRIMARY'")
[SQL: INSERT INTO `dimMovie` (`movieID`, title, `releaseMovie`, gender, `participantName`, roleparticipant, `awardMovie`) VALUES (%s, %s, %s, %s, %s, %s, %s)]
[parameters: [(80192187, 'Triple Frontier', datetime.date(2019, 4, 12), 'Action', 'Joseph Chavez Pineda', 'Actor', 'Oscar'), (80210920, 'The Mother', datetime.date(2023, 1, 5), 'Drama', 'Maria Alejandra Navarro', 'Actor', 'Oscar'), (81157374, 'Run', datetime.date(2021, 5, 21), 'Adventure', 'aria Lopez Gutierrez', 'Director', 'Grammy')]]
(Background on this error at: https://sqlalche.me/e/20/gkpj)

# Cargamos datos a la dimension USER

In [75]:
#Cargamos los datos de la dimension usuario de un archivo csv 
users = pd.read_csv("./data/users.csv", sep='|')
users

Unnamed: 0,idUser,username,country,subscription
0,1002331,user123,USA,Premium
1,1002332,gamerGirl97,Canada,Basic
2,1002333,techMaster,UK,Premium
3,1002334,soccerFan,Brazil,Basic
4,1002335,travelBug,Australia,Premium
5,1002336,musicLover,France,Basic
6,1002337,foodie88,Italy,Premium
7,1002338,bookWorm23,Germany,Basic
8,1002339,fitnessJunk,Mexico,Premium
9,10023310,movieBuff,Japan,Basic


In [76]:
#De igual manera renombramos las columnas 
users = users.rename(columns={'idUser': 'userID'})
users


Unnamed: 0,userID,username,country,subscription
0,1002331,user123,USA,Premium
1,1002332,gamerGirl97,Canada,Basic
2,1002333,techMaster,UK,Premium
3,1002334,soccerFan,Brazil,Basic
4,1002335,travelBug,Australia,Premium
5,1002336,musicLover,France,Basic
6,1002337,foodie88,Italy,Premium
7,1002338,bookWorm23,Germany,Basic
8,1002339,fitnessJunk,Mexico,Premium
9,10023310,movieBuff,Japan,Basic


In [16]:
#Cargamos los datos a la Dimesion USer ID
users.to_sql('dimUser',conn,if_exists='append', index=False)

IntegrityError: (MySQLdb.IntegrityError) (1062, "Duplicate entry '1002331' for key 'dimUser.PRIMARY'")
[SQL: INSERT INTO `dimUser` (`userID`, username, country, subscription) VALUES (%s, %s, %s, %s)]
[parameters: [(1002331, 'user123', 'USA', 'Premium'), (1002332, 'gamerGirl97', 'Canada', 'Basic'), (1002333, 'techMaster', 'UK', 'Premium'), (1002334, 'soccerFan', 'Brazil', 'Basic'), (1002335, 'travelBug', 'Australia', 'Premium'), (1002336, 'musicLover', 'France', 'Basic'), (1002337, 'foodie88', 'Italy', 'Premium'), (1002338, 'bookWorm23', 'Germany', 'Basic')  ... displaying 10 of 20 total bound parameter sets ...  (10023319, 'homeChef', 'NewZealand', 'Premium'), (10023320, 'yogiMaster', 'Thailand', 'Basic')]]
(Background on this error at: https://sqlalche.me/e/20/gkpj)

# Cargamos datos a la tabla de hechos

In [77]:
#Copiamos los valores de userID y movieID para poder crear una tabla de hechos 
users_id=users["userID"]
movies_id=movie_data["movieID"]

In [83]:
#Hacemos un merge de los 20 usuarios con las 3 peliculas
# el cross permite crear 3 registro de cada UserId, uno por cada movieID.
#Esta será nuestra tabla de hechos  |
watchs_data=pd.merge(users_id,movies_id, how="cross")
watchs_data

Unnamed: 0,userID,movieID
0,1002331,80192187
1,1002331,80210920
2,1002331,81157374
3,1002332,80192187
4,1002332,80210920
5,1002332,81157374
6,1002333,80192187
7,1002333,80210920
8,1002333,81157374
9,1002334,80192187


In [84]:
#Generamos datos aleatorios de rating y de fechas

import random
from datetime import datetime, timedelta
import random

def gen_rating():
    # Generar un número aleatorio entre 0 y 5 con 1 solo decimal
    numero_aleatorio = round(random.uniform(0, 5), 1)
    # Mostrar el número aleatorio
    return numero_aleatorio

def gen_timestamp():
    # Generar un timestamp aleatorio dentro de un rango específico
    start_date = datetime(2024, 1, 15)
    end_date = datetime(2024, 4, 6)

    # Calcular un valor aleatorio entre start_date y end_date
    random_date = start_date + timedelta(seconds=random.randint(0, int((end_date - start_date).total_seconds())))

    # Mostrar el timestamp aleatorio
    return random_date

In [23]:
#Con los datos aleatorios generados agregamos una columna de rating y una columna de fecha 
# y se los agregamos a nuestra tabla de hechos. 
#watchs_data["rating"]=watchs_data["movieID"].apply(lambda x: gen_rating())
#watchs_data["timestamp"]=watchs_data["userID"].apply(lambda x: gen_timestamp())

In [118]:
rating=[]
date=[]
for i in range(60):
    rating.append(gen_rating())
    date.append(gen_timestamp())
    

In [119]:
randf=pd.DataFrame((zip(rating,date)),columns=['rating','timestamp'])

In [120]:
CONNECTION_STRING = "mongodb+srv://andreigaled3:txmnM3oUgWMsnxFy@cluster0.qdxwmr6.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
client = MongoClient(CONNECTION_STRING)

In [121]:
dbname=client["ramdon_data"]
randf_to_dict = randf.to_dict("records")
dbname["random"].insert_many(randf_to_dict)

InsertManyResult([ObjectId('6680f7de2c5e16c1d35d6dbe'), ObjectId('6680f7de2c5e16c1d35d6dbf'), ObjectId('6680f7de2c5e16c1d35d6dc0'), ObjectId('6680f7de2c5e16c1d35d6dc1'), ObjectId('6680f7de2c5e16c1d35d6dc2'), ObjectId('6680f7de2c5e16c1d35d6dc3'), ObjectId('6680f7de2c5e16c1d35d6dc4'), ObjectId('6680f7de2c5e16c1d35d6dc5'), ObjectId('6680f7de2c5e16c1d35d6dc6'), ObjectId('6680f7de2c5e16c1d35d6dc7'), ObjectId('6680f7de2c5e16c1d35d6dc8'), ObjectId('6680f7de2c5e16c1d35d6dc9'), ObjectId('6680f7de2c5e16c1d35d6dca'), ObjectId('6680f7de2c5e16c1d35d6dcb'), ObjectId('6680f7de2c5e16c1d35d6dcc'), ObjectId('6680f7de2c5e16c1d35d6dcd'), ObjectId('6680f7de2c5e16c1d35d6dce'), ObjectId('6680f7de2c5e16c1d35d6dcf'), ObjectId('6680f7de2c5e16c1d35d6dd0'), ObjectId('6680f7de2c5e16c1d35d6dd1'), ObjectId('6680f7de2c5e16c1d35d6dd2'), ObjectId('6680f7de2c5e16c1d35d6dd3'), ObjectId('6680f7de2c5e16c1d35d6dd4'), ObjectId('6680f7de2c5e16c1d35d6dd5'), ObjectId('6680f7de2c5e16c1d35d6dd6'), ObjectId('6680f7de2c5e16c1d35d6d

In [122]:
rating_collection = dbname['random']
random_rating = rating_collection.find({})
random_rating

<pymongo.cursor.Cursor at 0x76444fc02680>

In [123]:
df_random_rating = pd.DataFrame(random_rating)
df_random_rating

Unnamed: 0,_id,rating,timestamp
0,6680f7de2c5e16c1d35d6dbe,2.9,2024-02-26 14:52:01
1,6680f7de2c5e16c1d35d6dbf,1.6,2024-02-05 13:04:31
2,6680f7de2c5e16c1d35d6dc0,2.2,2024-03-05 14:47:40
3,6680f7de2c5e16c1d35d6dc1,0.3,2024-02-20 23:32:58
4,6680f7de2c5e16c1d35d6dc2,1.7,2024-03-09 11:02:44
5,6680f7de2c5e16c1d35d6dc3,4.1,2024-01-27 08:04:11
6,6680f7de2c5e16c1d35d6dc4,4.6,2024-03-22 18:52:15
7,6680f7de2c5e16c1d35d6dc5,1.0,2024-02-09 03:15:12
8,6680f7de2c5e16c1d35d6dc6,3.0,2024-02-21 12:16:42
9,6680f7de2c5e16c1d35d6dc7,1.0,2024-02-26 06:01:00


In [124]:
df_random_rating = df_random_rating.drop(columns=['_id'])
df_random_rating

Unnamed: 0,rating,timestamp
0,2.9,2024-02-26 14:52:01
1,1.6,2024-02-05 13:04:31
2,2.2,2024-03-05 14:47:40
3,0.3,2024-02-20 23:32:58
4,1.7,2024-03-09 11:02:44
5,4.1,2024-01-27 08:04:11
6,4.6,2024-03-22 18:52:15
7,1.0,2024-02-09 03:15:12
8,3.0,2024-02-21 12:16:42
9,1.0,2024-02-26 06:01:00


In [50]:
#watchs_data

In [131]:
watchs_data = pd.concat([watchs_data,df_random_rating],axis=1)
watchs_data

Unnamed: 0,userID,movieID,rating,timestamp
0,1002331,80192187,2.9,2024-02-26 14:52:01
1,1002331,80210920,1.6,2024-02-05 13:04:31
2,1002331,81157374,2.2,2024-03-05 14:47:40
3,1002332,80192187,0.3,2024-02-20 23:32:58
4,1002332,80210920,1.7,2024-03-09 11:02:44
5,1002332,81157374,4.1,2024-01-27 08:04:11
6,1002333,80192187,4.6,2024-03-22 18:52:15
7,1002333,80210920,1.0,2024-02-09 03:15:12
8,1002333,81157374,3.0,2024-02-21 12:16:42
9,1002334,80192187,1.0,2024-02-26 06:01:00


In [132]:
# Cargamos los datos a la tabla de hechos
watchs_data.to_sql("FactWatchs", conn, if_exists='append', index=False)
watchs_data

Unnamed: 0,userID,movieID,rating,timestamp
0,1002331,80192187,2.9,2024-02-26 14:52:01
1,1002331,80210920,1.6,2024-02-05 13:04:31
2,1002331,81157374,2.2,2024-03-05 14:47:40
3,1002332,80192187,0.3,2024-02-20 23:32:58
4,1002332,80210920,1.7,2024-03-09 11:02:44
5,1002332,81157374,4.1,2024-01-27 08:04:11
6,1002333,80192187,4.6,2024-03-22 18:52:15
7,1002333,80210920,1.0,2024-02-09 03:15:12
8,1002333,81157374,3.0,2024-02-21 12:16:42
9,1002334,80192187,1.0,2024-02-26 06:01:00


In [None]:
### SET FOREIGN_KEY_CHECKS = 0;
### SET FOREIGN_KEY_CHECKS = 1;

