In [1]:
import pandas as pd
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
import os

# Cargar variables del archivo .env
load_dotenv()

True

# Conexión a la base de datos

In [2]:
# Variables de conexión desde .env
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASSWORD')
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')
db_name = os.getenv('DB_NAME')

# Crear la URL de la base de datos
DATABASE_URL = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"

# Crear el engine
engine = create_engine(DATABASE_URL)

# Crear una conexión cruda (raw connection)
connection = engine.raw_connection()

# Consulta a la base de datos

In [3]:
try:
    # Consulta SQL para unir los datos de `stock_prices` con los nombres de `companies`
    query = """
    SELECT sp.date, sp.open, sp.high, sp.low, sp.close, sp.volume, c.name AS Name
    FROM stock_prices sp
    JOIN companies c ON sp.company_id = c.id
    ORDER BY sp.date;
    """
    
    # Ejecutar la consulta y cargar los resultados en un DataFrame
    df = pd.read_sql(query, connection)

finally:
    # Cerrar la conexión cruda
    connection.close()

# Mostrar los primeros 5 registros
print(df)

  df = pd.read_sql(query, connection)


              date   open   high    low  close    volume   name
0       2013-02-08  66.14  66.83  65.97  66.60    659052    XEC
1       2013-02-08  14.65  14.83  14.61  14.66   7731572     WU
2       2013-02-08  34.69  34.92  34.68  34.88  18425772    WFC
3       2013-02-08  75.02  75.99  74.96  75.85    911179    ZBH
4       2013-02-08  27.01  27.64  27.01  27.09   1206284    XYL
...            ...    ...    ...    ...    ...       ...    ...
619529  2018-08-02  50.00  50.00  50.00  50.00        50   APTV
619530  2018-08-02  50.00  50.00  50.00  50.00        50    AGN
619531  2018-08-02  50.00  50.00  50.00  50.00        50  DISCA
619532  2018-08-02  50.00  50.00  50.00  50.00        50    DPS
619533  2018-08-02  50.00  50.00  50.00  50.00        50    DXC

[619534 rows x 7 columns]


In [4]:
# Imprimimos la fecha maxima
print(f'Esta la fecha maxima antes de la insercion del dato nuevo: {df.date.max()}')

Esta la fecha maxima antes de la insercion del dato nuevo: 2018-08-02


In [5]:
# Identificar duplicados
duplicados = df.duplicated().sum()
print(f"Filas duplicadas: {duplicados}")

Filas duplicadas: 0


# Borrar y volver a crear las tablas

In [6]:
with engine.connect() as connection:
    # Eliminar las tablas (DROP)
    connection.execute("DROP TABLE IF EXISTS stock_prices;")
    connection.execute("DROP TABLE IF EXISTS companies;")

    # Crear las tablas nuevamente
    connection.execute("""
    CREATE TABLE IF NOT EXISTS companies (
        id SERIAL PRIMARY KEY,
        name VARCHAR(10) NOT NULL UNIQUE
    );
    """)

    connection.execute("""
    CREATE TABLE IF NOT EXISTS stock_prices (
        id SERIAL PRIMARY KEY,
        company_id INTEGER REFERENCES companies(id),
        date DATE NOT NULL,
        open NUMERIC(10, 2),
        high NUMERIC(10, 2),
        low NUMERIC(10, 2),
        close NUMERIC(10, 2),
        volume BIGINT
    );
    """)

print("Tablas eliminadas y recreadas.")

Tablas eliminadas y recreadas.


  connection.execute("DROP TABLE IF EXISTS stock_prices;")


# DAG

In [7]:
DAILY_DATA='../'+os.getenv('DAILY_STOCK_DATA')

def load_csv_to_dataframe():
    # Lista para almacenar los DataFrames
    dataframes = []
    
    # Iterar sobre los archivos en la carpeta
    for filename in os.listdir(DAILY_DATA):
        if filename.endswith('.csv'):
            file_path = os.path.join(DAILY_DATA, filename)
            df = pd.read_csv(file_path)
            
            # Convertir nombres de columnas a minúsculas
            df.columns = df.columns.str.lower()
            
            # Convertir la columna 'date' a tipo datetime (si existe)
            if 'date' in df.columns:
                df['date'] = pd.to_datetime(df['date'], errors='coerce')
            
            dataframes.append(df)
    
    # Concatenar todos los DataFrames en uno solo
    if dataframes:
        combined_df = pd.concat(dataframes, ignore_index=True)
        return combined_df
    else:
        return pd.DataFrame()  # Retorna un DataFrame vacío si no hay archivos

In [8]:
# Llamar la funcion para cargar los datos diarios de stock
latest_stock_df = load_csv_to_dataframe()
latest_stock_df

Unnamed: 0,date,open,high,low,close,volume,name
0,2018-08-02,50,50,50,50,50,TPR
1,2018-08-02,50,50,50,50,50,SYMC
2,2018-08-02,50,50,50,50,50,FLR
3,2018-08-02,50,50,50,50,50,RHI
4,2018-08-02,50,50,50,50,50,MMC
...,...,...,...,...,...,...,...
500,2018-08-02,50,50,50,50,50,VNO
501,2018-08-02,50,50,50,50,50,RSG
502,2018-08-02,50,50,50,50,50,AAL
503,2018-08-02,50,50,50,50,50,MSFT


In [9]:
def get_latest_dates():
    # Consulta SQL
    query = """
    SELECT c.name, MAX(sp.date) as latest_date
    FROM stock_prices sp
    JOIN companies c ON sp.company_id = c.id
    GROUP BY c.name;
    """
    
    # Ejecutar la consulta usando SQLAlchemy
    with engine.connect() as connection:
        result = connection.execute(text(query))
        
        # Convertir los resultados a un diccionario {company_name: latest_date}
        latest_dates_dict = {row['name']: row['latest_date'] for row in result}

    return latest_dates_dict

In [10]:
# Llamar a la función para obtener los datos
latest_dates_dict = get_latest_dates()
latest_dates_dict

{}

In [11]:
def filter_new_data(latest_stock_df, latest_dates_dict):
    """
    Filtra las filas de latest_stock_df que tienen fechas más recientes que las ya registradas en la base de datos.
    """
    # Filtrar el DataFrame para obtener solo los datos más recientes
    filtered_df = latest_stock_df[latest_stock_df.apply(
        lambda row: pd.Timestamp(row['date']) > pd.Timestamp(latest_dates_dict.get(row['name'], '1900-01-01')), axis=1)]
    
    return filtered_df

In [12]:
new_stock_data = filter_new_data(latest_stock_df, latest_dates_dict)
new_stock_data

Unnamed: 0,date,open,high,low,close,volume,name
0,2018-08-02,50,50,50,50,50,TPR
1,2018-08-02,50,50,50,50,50,SYMC
2,2018-08-02,50,50,50,50,50,FLR
3,2018-08-02,50,50,50,50,50,RHI
4,2018-08-02,50,50,50,50,50,MMC
...,...,...,...,...,...,...,...
500,2018-08-02,50,50,50,50,50,VNO
501,2018-08-02,50,50,50,50,50,RSG
502,2018-08-02,50,50,50,50,50,AAL
503,2018-08-02,50,50,50,50,50,MSFT


In [13]:
def insert_data_to_db(new_stock_data):
    """
    Inserta los datos nuevos en las tablas `companies` y `stock_prices`.
    """
    with engine.connect() as connection:
        # Paso 1: Insertar compañías nuevas en la tabla `companies`
        for company in new_stock_data['name'].unique():
            query = text("""
            INSERT INTO companies (name)
            VALUES (:company)
            ON CONFLICT (name) DO NOTHING
            """)
            connection.execute(query, {"company": company})
        
        # Paso 2: Insertar los precios en la tabla `stock_prices`
        for _, row in new_stock_data.iterrows():
            # Obtener el company_id de la tabla `companies`
            query = text("""
            SELECT id FROM companies WHERE name = :company_name
            """)
            company_id = connection.execute(query, {"company_name": row['name']}).scalar()
            
            # Insertar en `stock_prices`
            query = text("""
            INSERT INTO stock_prices (company_id, date, open, high, low, close, volume)
            VALUES (:company_id, :date, :open, :high, :low, :close, :volume)
            """)
            connection.execute(query, {
                "company_id": company_id,
                "date": row['date'],
                "open": row['open'],
                "high": row['high'],
                "low": row['low'],
                "close": row['close'],
                "volume": row['volume']
            })

In [14]:
# Llamar a la función para insertar los datos nuevos
insert_data_to_db(new_stock_data)

In [15]:
try:
    # Consulta SQL para unir los datos de `stock_prices` con los nombres de `companies`
    query = """
    SELECT sp.date, sp.open, sp.high, sp.low, sp.close, sp.volume, c.name AS Name
    FROM stock_prices sp
    JOIN companies c ON sp.company_id = c.id
    ORDER BY sp.date;
    """
    
    # Ejecutar la consulta y cargar los resultados en un DataFrame
    df_updated = pd.read_sql(query, connection)

finally:
    # Cerrar la conexión cruda
    connection.close()

# Mostrar los primeros 5 registros
print(df_updated)

  df_updated = pd.read_sql(query, connection)


AttributeError: 'Connection' object has no attribute 'cursor'

In [7]:
# Imprimimos la fecha maxima
print(f'Esta la fecha maxima antes de la insercion del dato nuevo: {df_updated.date.max()}')

Esta la fecha maxima antes de la insercion del dato nuevo: 2018-08-02


In [8]:
# Identificar duplicados
duplicados = df_updated.duplicated().sum()
print(f"Filas duplicadas: {duplicados}")

Filas duplicadas: 0
