# Segunda Entrega Curso Data Engineering Coderhouse (ETL)

## Importamos Librerías

In [26]:
# Importamos las librerías necesarias
import pandas as pd  # Para manipulación de datos
import numpy as np  # Para operaciones numéricas

import matplotlib.pyplot as plt  # Para la visualización de datos
import seaborn as sns  # Para la visualización de datos

sns.set_style(
    "whitegrid"
)  # Establecemos el estilo de los gráficos de seaborn a 'whitegrid'
plt.style.use(
    "fivethirtyeight"
)  # Establecemos el estilo de los gráficos de matplotlib a 'fivethirtyeight'

# Para leer datos de acciones de Yahoo
from pandas_datareader.data import DataReader
import yfinance as yf  # yfinance es una herramienta para descargar datos históricos de Yahoo Finance
from pandas_datareader import data as pdr

yf.pdr_override()  # Sobreescribimos los métodos de pandas_datareader.data para que utilicen yfinance

# Para trabajar con marcas de tiempo (timestamps)
from datetime import datetime
import os

## La API que será consumida es de Yahoo Finance, para ver el comportamiento financiero de algunas empresas TOP del mercado.

### EXTRACCIÓN

In [27]:
# Lista de empresa Top (24 en total)
tech_list = [
    "AAPL",
    "MSFT",
    "AMZN",
    "JPM",
    "COST",
    "GOOGL",
    "AXP",
    "WMT",
    "NVDA",
    "DAL",
    "DIS",
    "MAR",
    "NKE",
    "KO",
    "SBUX",
    "FDX",
    "PG",
    "HD",
    "PFE",
    "CRM",
    "TGT",
    "NFLX",
    "TM",
]
# Tomamos datos desde hace 10 años hasta la fecha actual
end = datetime.now()
start = datetime(end.year - 10, end.month, end.day)

# Descargamos los datos de las acciones de las empresas de la lista
for stock in tech_list:
    globals()[stock] = yf.download(stock, start, end)

# Creamos una lista de los códigos de las empresas
company_list = [
    AAPL,
    MSFT,
    AMZN,
    JPM,
    COST,
    GOOGL,
    AXP,
    WMT,
    NVDA,
    DAL,
    DIS,
    MAR,
    NKE,
    KO,
    SBUX,
    FDX,
    PG,
    HD,
    PFE,
    CRM,
    TGT,
    NFLX,
    TM,
]

# Lista de los nombres de la compañía
company_name = [
    "APPLE",
    "MICROSOFT",
    "AMAZON.COM",
    "JPMORGAN CHASE",
    "COSTCO WHOLESALE",
    "ALPHABET",
    "AMERICAN EXPRESS",
    "WALMART",
    "NVIDIA",
    "DELTA AIR LINES",
    "WALT DISNEY",
    "MARRIOTT INTERNATIONAL",
    "NIKE",
    "COCA-COLA",
    "STARBUCKS",
    "FEDEX",
    "PROCTER & GAMBLE",
    "HOME DEPOT",
    "PFIZER",
    "SALESFORCE",
    "TARGET",
    "NETFLIX",
    "TOYOTA MOTOR",
]

# Creamos un DataFrame con los datos de las acciones de las empresas
for company, com_name in zip(company_list, company_name):
    company["company_name"] = com_name

# Concatenamos los datos de las acciones de las empresas en un solo DataFrame
df = pd.concat(company_list, axis=0)
print(df.shape)
df.tail(10)

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%*******

(57891, 7)





Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume,company_name
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2024-05-06,233.619995,235.679993,233.570007,235.600006,235.600006,208200,TOYOTA MOTOR
2024-05-07,233.279999,233.279999,230.910004,231.259995,231.259995,248900,TOYOTA MOTOR
2024-05-08,231.119995,232.429993,227.0,231.779999,231.779999,371600,TOYOTA MOTOR
2024-05-09,227.509995,228.029999,226.729996,227.240005,227.240005,329000,TOYOTA MOTOR
2024-05-10,220.929993,221.059998,218.139999,218.779999,218.779999,563900,TOYOTA MOTOR
2024-05-13,217.100006,217.130005,215.300003,215.639999,215.639999,466000,TOYOTA MOTOR
2024-05-14,216.5,217.610001,216.210007,217.119995,217.119995,268500,TOYOTA MOTOR
2024-05-15,218.899994,219.940002,218.300003,219.550003,219.550003,280900,TOYOTA MOTOR
2024-05-16,216.839996,217.039993,215.600006,215.630005,215.630005,296800,TOYOTA MOTOR
2024-05-17,219.929993,220.419998,219.089996,219.759995,219.759995,265700,TOYOTA MOTOR


### TRANSFORMACIÓN

In [28]:
df.reset_index(inplace=True)
df.head(10)

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,company_name
0,2014-05-20,21.589643,21.657143,21.454643,21.596786,19.103121,234836000,APPLE
1,2014-05-21,21.565357,21.667856,21.502144,21.653929,19.153671,196859600,APPLE
2,2014-05-22,21.664286,21.780357,21.575001,21.688213,19.183987,200760000,APPLE
3,2014-05-23,21.6875,21.954643,21.659643,21.933214,19.400707,232209600,APPLE
4,2014-05-27,21.995714,22.352142,21.986786,22.343929,19.763992,348866000,APPLE
5,2014-05-28,22.357857,22.493929,22.277857,22.286072,19.712809,315481600,APPLE
6,2014-05-29,22.423214,22.745358,22.420357,22.692142,20.071999,376474000,APPLE
7,2014-05-30,22.785,23.006071,22.460714,22.607143,19.996815,564020800,APPLE
8,2014-06-02,22.64143,22.672501,22.232143,22.451786,19.859394,369350800,APPLE
9,2014-06-03,22.445,22.812143,22.4375,22.769285,20.14024,292709200,APPLE


Establecemos los nombres da la columnas en minúsculas y separados por guión bajo

In [29]:
# Renombrar las columnas del DataFrame
df.rename(
    columns={
        "Date": "date",
        "Open": "open",
        "High": "high",
        "Low": "low",
        "Close": "close",
        "Adj Close": "adj_close",
        "Volume": "volume",
        "company_name": "company_name",
    },
    inplace=True,
)

In [30]:
df.columns  # Revisamos las columnas del DataFrame

Index(['date', 'open', 'high', 'low', 'close', 'adj_close', 'volume',
       'company_name'],
      dtype='object')

Revisión de datos nulos y tipo de datos

In [31]:
def verificar_tipo_datos_y_nulos(df):
    """
    Realiza un análisis de los tipos de datos y la presencia de valores nulos en un DataFrame.

    Esta función toma un DataFrame como entrada y devuelve un resumen que incluye información sobre
    los tipos de datos en cada columna, el porcentaje de valores no nulos y nulos, así como la
    cantidad de valores nulos por columna.

    Parameters:
        df (pandas.DataFrame): El DataFrame que se va a analizar.

    Returns:
        pandas.DataFrame: Un DataFrame que contiene el resumen de cada columna, incluyendo:
        - 'nombre_campo': Nombre de cada columna.
        - 'tipo_datos': Tipos de datos únicos presentes en cada columna.
        - 'no_nulos_%': Porcentaje de valores no nulos en cada columna.
        - 'nulos_%': Porcentaje de valores nulos en cada columna.
        - 'nulos': Cantidad de valores nulos en cada columna.
    """

    mi_dict = {
        "nombre_campo": [],
        "tipo_datos": [],
        "no_nulos_%": [],
        "nulos_%": [],
        "nulos": [],
    }

    for columna in df.columns:
        porcentaje_no_nulos = (df[columna].count() / len(df)) * 100
        mi_dict["nombre_campo"].append(columna)
        mi_dict["tipo_datos"].append(df[columna].apply(type).unique())
        mi_dict["no_nulos_%"].append(round(porcentaje_no_nulos, 2))
        mi_dict["nulos_%"].append(round(100 - porcentaje_no_nulos, 2))
        mi_dict["nulos"].append(df[columna].isnull().sum())

    df_info = pd.DataFrame(mi_dict)

    return df_info.sort_values(ascending=False, by="nulos_%")

In [32]:
verificar_tipo_datos_y_nulos(df)

Unnamed: 0,nombre_campo,tipo_datos,no_nulos_%,nulos_%,nulos
0,date,[<class 'pandas._libs.tslibs.timestamps.Timest...,100.0,0.0,0
1,open,[<class 'float'>],100.0,0.0,0
2,high,[<class 'float'>],100.0,0.0,0
3,low,[<class 'float'>],100.0,0.0,0
4,close,[<class 'float'>],100.0,0.0,0
5,adj_close,[<class 'float'>],100.0,0.0,0
6,volume,[<class 'int'>],100.0,0.0,0
7,company_name,[<class 'str'>],100.0,0.0,0


Conclusiones

* Se observa que no hay valores nulos dentro del dataset
* Se transformará la columna `date` de Timestamps a datetime

In [34]:
df.date = pd.to_datetime(df.date)  # Convertimos la columna 'date' a tipo datetime

In [33]:
df.dtypes

date            datetime64[ns]
open                   float64
high                   float64
low                    float64
close                  float64
adj_close              float64
volume                   int64
company_name            object
dtype: object

Verificamos los duplicados

In [36]:
# Verificar duplicados en todas las columnas
duplicados = df.duplicated().sum()
duplicados

0

Conclusiones

* No se encuentran valores duplicados en el dataset

### A continuación se muestra el diccionario de datos del dataset.

Este diccionario de datos describe las columnas proporcionadas por la API de Yahoo Finance.

| Columna       | Descripción                                                                 | Tipo de Dato | Ejemplo       |
|---------------|-----------------------------------------------------------------------------|--------------|---------------|
| `date`        | La fecha de la cotización.                                                  | Fecha        | `2023-05-20`  |
| `open`        | El precio de apertura de la acción en la fecha especificada.                | Decimal      | `150.25`      |
| `high`        | El precio más alto de la acción durante la fecha especificada.              | Decimal      | `155.00`      |
| `low`         | El precio más bajo de la acción durante la fecha especificada.              | Decimal      | `148.50`      |
| `close`       | El precio de cierre de la acción en la fecha especificada.                  | Decimal      | `152.75`      |
| `adj_close`   | El precio de cierre ajustado de la acción, ajustado por dividendos y splits.| Decimal      | `151.30`      |
| `volume`      | El número de acciones que cambiaron de manos durante la fecha especificada. | Entero       | `3500000`     |
| `company_name`| El nombre de la compañía cuyas acciones se están cotizando.                 | Cadena de texto | `Apple Inc.` |


In [37]:
# Supongamos que tienes un DataFrame llamado "df" que deseas exportar a un archivo .parquet
df.to_parquet("../data/data_extraida.parquet")

## Carga en RDS (Redshift)

Verificamos la conexion

In [26]:
import psycopg2

host = "data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com"
username = "criparrame_coderhouse"
password = os.getenv("REDSHIFT_PASSWORD")
database = "data-engineer-database"
port = "5439"


try:
    conn = psycopg2.connect(
        host=host, dbname=database, user=username, password=password, port="5439"
    )
    print("Connected to Redshift successfully!")

except Exception as e:
    print("Unable to connect to Redshift.")
    print(e)

Connected to Redshift successfully!


In [27]:
# Importamos las librerías necesarias
from sqlalchemy import create_engine, text
import pandas as pd
import os

# Definimos los detalles de la conexión a la base de datos Redshift
redshift_conn = {
    "host": "data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com",
    "username": "criparrame_coderhouse",
    "password": os.getenv("REDSHIFT_PASSWORD"),
    "database": "data-engineer-database",
    "port": "5439",
}


# Definimos la función para cargar los datos
def cargar_data(file_path, redshift_conn, batch_size=1000):
    # Creamos la conexión a la base de datos
    engine = create_engine(
        f'redshift+psycopg2://{redshift_conn["username"]}:{redshift_conn["password"]}@{redshift_conn["host"]}:{redshift_conn["port"]}/{redshift_conn["database"]}'
    )

    # Leemos los datos desde el archivo .parquet
    df = pd.read_parquet(file_path)

    # Ejecutamos una consulta SQL para crear la tabla 'stock_data' si no existe
    with engine.connect() as conn:
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS stock_data (
                date DATE NOT NULL,
                "open" FLOAT,
                high FLOAT,
                low FLOAT,
                close FLOAT,
                adj_close FLOAT,
                volume INT,
                company_name VARCHAR(70)
            )
            """
        )

        # Construimos la consulta de inserción de datos
        query = """
            INSERT INTO stock_data (date, "open", high, low, close, adj_close, volume, company_name)
            VALUES (:date, :open, :high, :low, :close, :adj_close, :volume, :company_name)
        """

        # Convertimos el DataFrame a una lista de diccionarios
        data_list = df.to_dict(orient="records")

        # Dividimos la lista en fragmentos más pequeños
        fragmentos = [
            data_list[i : i + batch_size] for i in range(0, len(data_list), batch_size)
        ]

        # Insertamos cada fragmento en la tabla
        for fragmento in fragmentos:
            for record in fragmento:
                conn.execute(text(query), **record)

    # Imprimimos el DataFrame 'df' después de cargar los datos
    print("Los datos se han cargado correctamente en la tabla de Redshift.")

In [None]:
# Llamamos a la función cargar_data con la ruta al archivo .parquet
cargar_data("../data/data_extraida.parquet", redshift_conn)