# ***PipeLine ETL***

## 1. Conexión a MySQL

Imports

In [1]:
import mysql.connector
import pandas as pd

Conexion a SQL

In [2]:
con = mysql.connector.connect(user = "root", password = "1234", host = "127.0.0.1")
cursor = con.cursor()

## 2. Crear Base de Datos y Tablas

In [18]:
cursor.execute("DROP DATABASE f1_db")

In [19]:
cursor.execute("CREATE DATABASE IF NOT EXISTS f1_db")


In [20]:
cursor.execute("USE f1_db")

Función auxiliar para show * tables

In [21]:
def mostrar_sql(sentencia_sql):
    cursor.execute(sentencia_sql)
    rows = cursor.fetchall()
    for row in rows:
        print(row)

### Tabla Piloto

In [22]:
sql = """
CREATE TABLE PILOTO (
	driverId INT PRIMARY KEY,
    driverRef VARCHAR(50),
    driverNumber INT,
    driverCode VARCHAR(3),
    forename VARCHAR(50),
    surname VARCHAR(50),
    dob DATE,
    nationality VARCHAR(50),
    url VARCHAR(255)
    );
"""
cursor.execute(sql)

In [23]:
sentencia_sql = "SHOW TABLES;"
mostrar_sql(sentencia_sql)

('piloto',)


### Tabla Equipo

In [24]:
sql = """
CREATE TABLE EQUIPO(
	constructorId INT PRIMARY KEY,
    constructorRef VARCHAR(50),
    constructorName VARCHAR(70),
    nationality VARCHAR(50),
    url VARCHAR(255)
    );
"""
cursor.execute(sql)

In [25]:
sentencia_sql = "SHOW TABLES;"
mostrar_sql(sentencia_sql)

('equipo',)
('piloto',)


### Tabla Circuito

In [26]:
sql = """
CREATE TABLE CIRCUITO(
	circuitId INT PRIMARY KEY,
    circuitRef VARCHAR(50),
    circuitName VARCHAR(70),
    location VARCHAR(50),
    country VARCHAR(50),
    lat DECIMAL(10, 6),
    lng DECIMAL(10, 6),
    alt INT,
    url VARCHAR(255)
    );
"""
cursor.execute(sql)

In [27]:
sentencia_sql = "SHOW TABLES;"
mostrar_sql(sentencia_sql)

('circuito',)
('equipo',)
('piloto',)


### Tabla Carrera

In [28]:
sql = """
CREATE TABLE CARRERA(
	raceId INT PRIMARY KEY,
    raceYear INT,
    raceRound INT,
    circuitId INT,
    FOREIGN KEY (circuitId) REFERENCES CIRCUITO(circuitId),
    raceName VARCHAR(100),
    raceDate DATE,
    raceTime TIME,
    url VARCHAR(255)
    );
"""
cursor.execute(sql)

In [29]:
sentencia_sql = "SHOW TABLES;"
mostrar_sql(sentencia_sql)

('carrera',)
('circuito',)
('equipo',)
('piloto',)


### Tabla Resultado

In [30]:
sql = """
CREATE TABLE RESULTADO(
	resultId INT PRIMARY KEY,
    raceId INT,
    FOREIGN KEY (raceId) REFERENCES CARRERA(raceId),
    driverId INT,
    FOREIGN KEY (driverId) REFERENCES PILOTO(driverId),
    constructorId INT,
	FOREIGN KEY (constructorId) REFERENCES EQUIPO(constructorId),
    grid INT,
    positionResult INT,
    points DECIMAL(6,2),
    laps INT,
    totalTime VARCHAR(50)
    );
"""
cursor.execute(sql)

In [31]:
sentencia_sql = "SHOW TABLES;"
mostrar_sql(sentencia_sql)

('carrera',)
('circuito',)
('equipo',)
('piloto',)
('resultado',)


### Tabla Clasificación

In [32]:
sql = """
CREATE TABLE CLASIFICACION(
	qualifyingId INT PRIMARY KEY,
	raceId INT,
	FOREIGN KEY (raceId) REFERENCES CARRERA(raceId),
    driverId INT,
    FOREIGN KEY (driverId) REFERENCES PILOTO(driverId),
    constructorId INT,
	FOREIGN KEY (constructorId) REFERENCES EQUIPO(constructorId),
	qualifyingPosition INT,
    q1 VARCHAR(50),
    q2 VARCHAR(50),
    q3 VARCHAR(50)
    );
""" 
cursor.execute(sql) 

In [33]:
sentencia_sql = "SHOW TABLES;"
mostrar_sql(sentencia_sql)

('carrera',)
('circuito',)
('clasificacion',)
('equipo',)
('piloto',)
('resultado',)


### Tabla info_circuito (scraping)


In [34]:
sql = """
CREATE TABLE INFO_CIRCUITO(
    circuitId INT PRIMARY KEY,  
    FOREIGN KEY (circuitId) REFERENCES CIRCUITO(circuitId),
    capacity VARCHAR(50),
    opened_year INT,            
    length VARCHAR(50),         
    turns INT
);
"""
cursor.execute(sql)

In [35]:
sentencia_sql = "SHOW TABLES;"
mostrar_sql(sentencia_sql)

('carrera',)
('circuito',)
('clasificacion',)
('equipo',)
('info_circuito',)
('piloto',)
('resultado',)


### Tabla info_equipo (scraping)

In [36]:
sql = """
CREATE TABLE INFO_EQUIPO(
	constructorId INT PRIMARY KEY,
    FOREIGN KEY (constructorId) REFERENCES EQUIPO(constructorId),
    championships INT,
    race_wins INT,
    pole_positions INT,
    fastest_laps INT 
    );
"""
cursor.execute(sql)

In [37]:
sentencia_sql = "SHOW TABLES;"
mostrar_sql(sentencia_sql)

('carrera',)
('circuito',)
('clasificacion',)
('equipo',)
('info_circuito',)
('info_equipo',)
('piloto',)
('resultado',)


## 3. Limpieza

### 3.1. Funciones auxiliares

#### 3.1.1. Limpiar capacity (circuito)

In [38]:
def limpiar_capacity(valor):
    #Si el valor es nulo o está vacío
    if pd.isna(valor) or valor == "":
        return None
    
    # Pasar el valor a string
    valor = str(valor)

    # Si hay texto que no indica número
    if "open seating" in valor.lower():
        return None
    
    # Recorrer caracter por caracter y sacar el primer número
    numero = ""
    digito_encontrado = False

    for char in valor:
        if char.isdigit():
            numero += char
            digito_encontrado = True
        elif char == "," and digito_encontrado:
            # Ignorar esta coma, es parte del número, ej: 125,000
            continue
        elif digito_encontrado:
            # Ya tenemos digito y encuentra otra cosa distinta
            break

    # Si encontramos número, pasar a int
    if numero:
        return int(numero)
    
    return None

#### 3.1.2. Limpiar opened_year (circuito)

In [39]:
def limpiar_opened_year(valor):
    # Si el avlor es nulo o está vacío
    if pd.isna(valor) or valor == "":
        return None

    # Pasar el valor a string
    valor = str(valor)

    # Buscar el primer grupo de 4 dígitos
    digitos = ""

    for char in valor:
        if char.isdigit():
            digitos += char
            if len(digitos) == 4:
                # comprobar si es un año válido
                anio = int (digitos)
                if 1900 <= anio <= 2030:
                    return anio
                else:
                # El año no era válido, hay que seguir buscando
                    digitos = ""

    return None

#### 3.1.3. Limpiar length (circuito)

In [40]:
# En esta funcion voy a buscar donde está "km" y voy a sacar el valor que esté justo antes
def limpiar_length(valor):
    # Si el valor es nulo o está vacío
    if pd.isna(valor) or valor == "":
        return None

    # Pasar el valor a string
    valor = str(valor)

    pos_km = valor.lower().find("km")

    if pos_km == -1: 
        return None

    
    parte_antes = valor[:pos_km]


    numero = ""
    dentro_corchete = False

    for char in reversed(parte_antes):
        if char == "]":
            dentro_corchete = True
            continue
        
        elif char == "[":
            dentro_corchete = False
            continue

        elif dentro_corchete:
            continue

        if char.isdigit() or char == ".":
            numero = char + numero
        elif numero:
            #Ya encontré numero y hay otra cosa
            break


    #Convertir a float
    if numero and numero != ".":
        return float(numero)

    return None

    

#### 3.1.4. Limpiar turns (circuito)

In [41]:
def limpiar_turns(valor):
    # Si el valor es nulo o está vacío
    if pd.isna(valor) or valor == "":
        return None

    # Pasar el valor a string
    valor = str(valor)

    # Sacar el priemr numero
    numero = ""

    for char in valor:
        if char.isdigit():
            numero += char
        elif numero:
            break

    if numero:
        return int(numero)
    
    return None

#### 3.1.5. Limpiar valor numérico (general)

In [42]:
def limpiar_valor_numerico(valor):
    # convertir NaN a None y float a int si es posible
    if pd.isna(valor):
        return None

    if float(valor) == int(valor):
        return int (valor)
    
    return valor

### 3.2. Limpiar datos

Cargar los csv de scraping a dataframes

In [72]:
df_circuitos_scraped = pd.read_csv("../data/scraped/Final_Scraped_Circuitos.csv")
df_equipos_scraped = pd.read_csv("../data/scraped/Final_Scraped_Equipos.csv")

#### 3.2.1. Limpiar df_circuitos_scraped

In [73]:
df_circuitos_scraped["capacity"] = df_circuitos_scraped["capacity"].apply(limpiar_capacity)
df_circuitos_scraped["opened_year"] = df_circuitos_scraped["opened_year"].apply(limpiar_opened_year)
df_circuitos_scraped["length"] = df_circuitos_scraped["length"].apply(limpiar_length)
df_circuitos_scraped["turns"] = df_circuitos_scraped["turns"].apply(limpiar_turns)

#### 3.2.2. Limpiar df_equipos_scraped

In [74]:
df_equipos_scraped["championships"] = df_equipos_scraped["championships"].apply(limpiar_valor_numerico)
df_equipos_scraped["race_wins"] = df_equipos_scraped["race_wins"].apply(limpiar_valor_numerico)
df_equipos_scraped["pole_positions"] = df_equipos_scraped["pole_positions"].apply(limpiar_valor_numerico)
df_equipos_scraped["fastest_laps"] = df_equipos_scraped["fastest_laps"].apply(limpiar_valor_numerico)

Este bloque de código de abajo es para pasar los valores vacíos a None, ya que al insertar en las dos tablas de scraped me daba error. Lo que pasaba es que me limpiaba los /N pero no los valores vacíos

In [75]:
import numpy as np

#Los vacios del scraping de equipos los paso a 0
df_equipos_scraped = df_equipos_scraped.fillna(0)

# Los vacios del scraping de circuitos los paso a None
df_circuitos_scraped = df_circuitos_scraped.replace({np.nan: None})

#### 3.2.3. Limpiar df_drivers

In [46]:
df_piloto = pd.read_csv("../data/raw/used/drivers.csv")

df_piloto = df_piloto.rename(columns ={
    "number": "driverNumber",
    "code" : "driverCode"
})
df_piloto = df_piloto.replace("\\N", None)

#### 3.2.4. Limpiar df_equipo

In [47]:
df_equipo = pd.read_csv("../data/raw/used/constructors.csv")

df_equipo = df_equipo.rename(columns={
    "name" : "constructorName"
})

#### 3.2.5. Limpiar df_circuito

In [48]:
df_circuito = pd.read_csv("../data/raw/used/circuits.csv")

df_circuito = df_circuito.rename(columns={
    "name" : "circuitName"
})

#### 3.2.6. Limpiar df_carrera

In [49]:
df_carrera = pd.read_csv("../data/raw/used/races.csv")

df_carrera = df_carrera.rename(columns={
    "year": "raceYear",
    "round" : "raceRound",
    "name" : "raceName",
    "date": "raceDate",
    "time" : "raceTime"
})
df_carrera = df_carrera.replace("\\N", None)

#### 3.2.7. Limpiar df_resultado

In [50]:
df_resultado = pd.read_csv("../data/raw/used/results.csv")

df_resultado = df_resultado.rename(columns={
    "position" : "positionResult",
    "time" : "totalTime"
})

df_resultado = df_resultado.replace("\\N", None)

#### 3.2.8. Limpiar df_clasificacion

In [51]:
df_clasificacion = pd.read_csv("../data/raw/used/qualifying.csv")

df_clasificacion = df_clasificacion.rename(columns={
    "qualifyId": "qualifyingId",
    "position": "qualifyingPosition"
})

df_clasificacion = df_clasificacion.replace("\\N", None)

## 4. Insertar datos

### 4.1. Insertar en PILOTO

In [52]:
for index, row in df_piloto.iterrows():
    sql = """
    INSERT INTO PILOTO (driverId, driverRef, driverNumber, driverCode, forename, surname, dob, nationality, url)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
    """

    valores = (row["driverId"], row["driverRef"], row["driverNumber"], row["driverCode"],
               row["forename"], row["surname"], row["dob"], row["nationality"], row["url"])

    cursor.execute(sql,valores)


con.commit()

### 4.2. Insertar en EQUIPO

In [53]:
for index, row in df_equipo.iterrows():
    sql = """
    INSERT INTO EQUIPO (constructorId, constructorRef, constructorName, nationality, url)
    VALUES (%s, %s, %s, %s, %s)
    """

    valores = (row["constructorId"], row["constructorRef"], row["constructorName"], row["nationality"],
               row["url"])

    cursor.execute(sql,valores)


con.commit()

### 4.3. Insertar en CIRCUITO

In [54]:
for index, row in df_circuito.iterrows():
    sql = """
    INSERT INTO CIRCUITO (circuitId, circuitRef, circuitName, location, country, lat, lng, alt, url)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
    """

    valores = (row["circuitId"], row["circuitRef"], row["circuitName"], row["location"],
               row["country"], row["lat"], row["lng"], row["alt"], row["url"])

    cursor.execute(sql,valores)


con.commit()

### 4.4. Insertar en CARRERA

In [55]:
for index, row in df_carrera.iterrows():
    sql = """
    INSERT INTO CARRERA (raceId, raceYear, raceRound, circuitId, raceName, raceDate, raceTime, url)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """

    valores = (row["raceId"], row["raceYear"], row["raceRound"], row["circuitId"],
               row["raceName"], row["raceDate"], row["raceTime"], row["url"])

    cursor.execute(sql,valores)


con.commit()

### 4.5. Insertar en RESULTADO

In [56]:
for index, row in df_resultado.iterrows():
    sql = """
    INSERT INTO RESULTADO (resultId, raceId, driverId, constructorId, grid, positionResult, points, laps, totalTime)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
    """

    valores = (row["resultId"], row["raceId"], row["driverId"], row["constructorId"],
               row["grid"], row["positionResult"], row["points"], row["laps"], row["totalTime"])

    cursor.execute(sql,valores)


con.commit()

### 4.6. Insertar en CLASIFICACION

In [57]:
for index, row in df_clasificacion.iterrows():
    sql = """
    INSERT INTO CLASIFICACION (qualifyingId, raceId, driverId, constructorId, qualifyingPosition, q1, q2, q3)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """

    valores = (row["qualifyingId"], row["raceId"], row["driverId"], row["constructorId"],
               row["qualifyingPosition"], row["q1"], row["q2"], row["q3"])

    cursor.execute(sql,valores)


con.commit()

### 4.7. Insertar en INFO_CIRCUITO

In [78]:
for index, row in df_circuitos_scraped.iterrows():
    sql = """
    INSERT INTO INFO_CIRCUITO (circuitId, capacity, opened_year, length, turns)
    VALUES (%s, %s, %s, %s, %s)
    """

    valores = (row["circuitId"], row["capacity"], row["opened_year"], row["length"],
               row["turns"])
    
    cursor.execute(sql, valores)

con.commit()

### 4.8. Insertar en INFO_EQUIPO

In [79]:
for index, row in df_equipos_scraped.iterrows():
    sql = """
    INSERT INTO INFO_EQUIPO (constructorId, championships, race_wins, pole_positions, fastest_laps)
    VALUES (%s, %s, %s, %s, %s)
    """
    
    valores = (row["constructorId"], row["championships"], row["race_wins"], row["pole_positions"],
               row["fastest_laps"])
    
    cursor.execute(sql, valores)

con.commit()