In [1]:
import os
import pandas as pd

archivos = [f for f in os.listdir('.') if f.endswith('.csv') and f.startswith('finanzas_mes_')]

dataframes = []
for fname in archivos:
    df = pd.read_csv(fname)  # file is in current folder
    print(f"Archivo {fname} leído con {len(df)} registros")
    dataframes.append(df)

df_total = pd.concat(dataframes, ignore_index=True)
print(f"Total registros extraídos: {len(df_total)}")

Archivo finanzas_mes_1.csv leído con 100100 registros
Archivo finanzas_mes_2.csv leído con 100100 registros
Archivo finanzas_mes_3.csv leído con 100100 registros
Archivo finanzas_mes_4.csv leído con 100100 registros
Archivo finanzas_mes_5.csv leído con 100100 registros
Archivo finanzas_mes_6.csv leído con 100100 registros
Archivo finanzas_mes_7.csv leído con 100100 registros
Total registros extraídos: 700700


In [2]:
df_total.head()

Unnamed: 0,id,salario,gastos,fecha,correo
0,861.0,2685.0,3714.0,26/01/2024,usuario3072@universidad.edu
1,1295.0,7834.0,3619.0,2024-01-06,usuario3106@universidad.edu
2,1131.0,1446.0,2393.0,2024-01-18,usuario4130@universidad.edu
3,1096.0,5852.0,455.0,06/01/2024,usuario2235@universidad.edu
4,1639.0,2988.0,3458.0,2024-01-23,usuario1724@universidad.edu


In [3]:
df_total.isna().any(axis=0)

id          True
salario     True
gastos      True
fecha      False
correo     False
dtype: bool

In [4]:
registros_antes = len(df_total)
df_total = df_total.dropna(subset=['id', 'fecha'])
print("Registros eliminados por id/fecha nulos:", registros_antes - len(df_total))

Registros eliminados por id/fecha nulos: 70


In [5]:
df_total

Unnamed: 0,id,salario,gastos,fecha,correo
0,861.0,2685.0,3714.0,26/01/2024,usuario3072@universidad.edu
1,1295.0,7834.0,3619.0,2024-01-06,usuario3106@universidad.edu
2,1131.0,1446.0,2393.0,2024-01-18,usuario4130@universidad.edu
3,1096.0,5852.0,455.0,06/01/2024,usuario2235@universidad.edu
4,1639.0,2988.0,3458.0,2024-01-23,usuario1724@universidad.edu
...,...,...,...,...,...
700695,2168.0,4394.0,3821.0,07/07/2024,usuario1332@universidad.edu
700696,2373.0,4189.0,89.0,2024-07-15,usuario301@universidad.edu
700697,1696.0,1269.0,30.0,07-07-2024,usuario461@universidad.edu
700698,2030.0,6119.0,924.0,2024-07-21,usuario3669@universidad.edu


In [14]:
df_total['gastos'].isna().sum()

np.int64(0)

In [7]:
df_total["gastos"] = df_total["gastos"].fillna(0)

In [8]:
df_total['gastos'].isna().sum()

np.int64(0)

In [9]:
df_total = df_total[df_total['salario'] > 0]
df_total = df_total[df_total['gastos'] >= 0]

In [10]:

def estandarizar_fecha(valor):
    if pd.isnull(valor):
        return pd.NaT

    valor = str(valor).strip()

    # Caso 1: ISO estándar YYYY-MM-DD
    try:
        if "-" in valor and len(valor.split("-")[0]) == 4:
            return pd.to_datetime(valor, format="%Y-%m-%d", errors="coerce")
    except:
        pass

    # Caso 2: Formato europeo DD/MM/YYYY
    if "/" in valor:
        try:
            return pd.to_datetime(valor, format="%d/%m/%Y", errors="coerce")
        except:
            pass

    # Caso 3: Formato americano MM-DD-YYYY
    if "-" in valor:
        try:
            return pd.to_datetime(valor, format="%m-%d-%Y", errors="coerce")
        except:
            pass

    return pd.NaT


df_total["fecha_estandar"] = df_total["fecha"].apply(estandarizar_fecha)

In [11]:
df_total[['fecha','fecha_estandar']]# = pd.to_datetime(df_total['fecha'], errors='coerce')

Unnamed: 0,fecha,fecha_estandar
0,26/01/2024,2024-01-26
1,2024-01-06,2024-01-06
2,2024-01-18,2024-01-18
3,06/01/2024,2024-01-06
4,2024-01-23,2024-01-23
...,...,...
700695,07/07/2024,2024-07-07
700696,2024-07-15,2024-07-15
700697,07-07-2024,2024-07-07
700698,2024-07-21,2024-07-21


In [12]:
df_total['fecha_estandar'].isna().sum()

np.int64(0)

In [13]:
df_total.drop(columns='fecha', inplace=True)

In [None]:
import hashlib

def hash_correo(correo):
    if pd.isnull(correo):
        return None
    return hashlib.sha256(correo.encode("utf-8")).hexdigest()

df_total["correo_hash"] = df_total["correo"].apply(hash_correo)

# Eliminamos el correo original
df_total = df_total.drop(columns=["correo"])

In [None]:
df_total.head()

In [None]:
df_total['utilidad'] = df_total['salario'] - df_total['gastos']

In [None]:
import duckdb

conn = duckdb.connect('dw.duckdb')

# Eliminar la tabla si existe para asegurar que se cree con el esquema actualizado
conn.execute("DROP TABLE IF EXISTS fact_finanzas_etl");

# Crear la tabla con el esquema actualizado (7 columnas incluyendo fecha_carga)
conn.execute("""
CREATE TABLE fact_finanzas_etl (
    id INTEGER,
    salario DOUBLE,
    gastos DOUBLE,
    fecha DATE,
    utilidad DOUBLE,
    correo_hash STRING,
    fecha_carga TIMESTAMP
)
""")


In [None]:


print("Tablas en la base de datos:")
# Consultar las tablas existentes en la base de datos
# Utilizamos 'PRAGMA show_tables;' para DuckDB
# Si fuera SQLite, sería 'SELECT name FROM sqlite_master WHERE type='table';'
# O en DuckDB también se puede usar 'SHOW TABLES;'
tables_df = conn.execute("SHOW TABLES;").fetchdf()
display(tables_df)

# Si quieres ver el esquema de una tabla específica, por ejemplo, 'fact_finanzas_etl'
print("Esquema de la tabla fact_finanzas_etl:")
schema_etl_df = conn.execute("DESCRIBE fact_finanzas_etl;").fetchdf()
display(schema_etl_df)

# Si quieres ver algunas filas de una tabla, por ejemplo, 'fact_finanzas_etl'
print("Primeras 5 filas de fact_finanzas_etl:")
fact_finanzas_etl_df = conn.execute("SELECT * FROM fact_finanzas_etl LIMIT 5;").fetchdf()
display(fact_finanzas_etl_df)

# Cerrar la conexión cuando hayas terminado
conn.close()


In [None]:
from datetime import datetime
import duckdb
import pandas as pd

conn = duckdb.connect('dw.duckdb')

# Añadir la columna fecha_carga al DataFrame antes de registrarlo
df_total['fecha_carga'] = datetime.now()

conn.register("df_transformado", df_total)

conn.execute("""
INSERT INTO fact_finanzas_etl
SELECT id, salario, gastos, fecha_estandar AS fecha, utilidad, correo_hash, fecha_carga
FROM df_transformado
""")
df_total2=conn.execute("SELECT * FROM df_transformado LIMIT 5;").fetchdf()
display(df_total2)
conn.unregister("df_transformado")

conn.close()


In [None]:


conn = duckdb.connect("dw.duckdb")

# Crear tabla staging cargando CSV directamente
conn.execute("""
CREATE OR REPLACE TABLE staging_raw AS
SELECT *
FROM read_csv_auto('staging/*.csv')
""")

print("Datos crudos cargados en staging_raw")

In [None]:
conn.execute("""
CREATE OR REPLACE TABLE fact_finanzas_elt AS
SELECT
    id,

    -- Validación salario
    salario,

    -- Imputación gastos
    COALESCE(gastos, 0) AS gastos,

    -- Estandarización fecha
    CASE
        WHEN regexp_matches(fecha, '^[0-9]{4}-[0-9]{2}-[0-9]{2}$')
            THEN CAST(fecha AS DATE)
        WHEN regexp_matches(fecha, '^[0-9]{2}/[0-9]{2}/[0-9]{4}$')
            THEN STRPTIME(fecha, '%d/%m/%Y')
        WHEN regexp_matches(fecha, '^[0-9]{2}-[0-9]{2}-[0-9]{4}$')
            THEN STRPTIME(fecha, '%m-%d-%Y')
        ELSE NULL
    END AS fecha,

    -- Hash SHA256 dentro del motor
    CASE
        WHEN correo IS NOT NULL
            THEN sha256(correo)
        ELSE NULL
    END AS correo_hash,

    -- Regla de negocio
    salario - COALESCE(gastos, 0) AS utilidad

FROM staging_raw
WHERE
    id IS NOT NULL
    AND fecha IS NOT NULL
    AND salario > 0
    AND COALESCE(gastos, 0) >= 0
""")

print("Transformación ELT completada")

In [None]:
conn.execute("""
SELECT COUNT(*) FROM fact_finanzas_elt
""").fetchall()

In [None]:
import time
import os
from datetime import datetime

# ========================== #
# INICIO MEDICIÓN ETL
# ========================== #
start = time.time()

conn = duckdb.connect('dw.duckdb')
# Borramos la tabla para asegurar que el esquema de 7 columnas se aplique correctamente en cada medición.
# Si la intención es APPEND desde una tabla pre-existente, esta línea debería ser eliminada después de la primera ejecución.
conn.execute("DROP TABLE IF EXISTS fact_finanzas_etl")

dataframes = []

# Construir las URLs de los archivos CSV en GitHub
github_base_url = "https://raw.githubusercontent.com/jazaineam1/BigData2026/refs/heads/main/Airflow/staging/"
meses = [1, 2, 3, 4, 5, 6, 7]
github_csv_urls = [f"{github_base_url}finanzas_mes_{mes}.csv" for mes in meses]

# ========================== #
# 1️ EXTRACCIÓN
# ========================== #
# Leer directamente desde las URLs de GitHub
for url in github_csv_urls:
    try:
        df = pd.read_csv(url)
        dataframes.append(df)
    except Exception as e:
        print(f"Error al leer {url}: {e}")

df_total = pd.concat(dataframes, ignore_index=True)

# ========================== #
#  TRANSFORMACIÓN
# ========================== #

#  Eliminación campos críticos
df_total = df_total.dropna(subset=['id', 'fecha'])

#  Imputación controlada
df_total['gastos'] = df_total['gastos'].fillna(0)

#  Validación estructural
df_total = df_total[df_total['salario'] > 0]
df_total = df_total[df_total['gastos'] >= 0]

#  Estandarización fecha
def estandarizar_fecha(valor):
    if pd.isnull(valor):
        return pd.NaT

    valor = str(valor).strip()

    if "-" in valor and len(valor.split("-")[0]) == 4:
        return pd.to_datetime(valor, format="%Y-%m-%d", errors="coerce")

    if "/" in valor:
        return pd.to_datetime(valor, format="%d/%m/%Y", errors="coerce")

    if "-" in valor:
        return pd.to_datetime(valor, format="%m-%d-%Y", errors="coerce")

    return pd.NaT

df_total['fecha_estandar'] = df_total['fecha'].apply(estandarizar_fecha)

# Eliminar fechas inválidas
df_total = df_total.dropna(subset=['fecha_estandar'])

#  Hash SHA256 del correo
def hash_correo(correo):
    if pd.isnull(correo):
        return None
    return hashlib.sha256(correo.encode("utf-8")).hexdigest()

df_total['correo_hash'] = df_total['correo'].apply(hash_correo)

# Eliminar columnas originales sensibles
df_total = df_total.drop(columns=['correo', 'fecha'])

#  Regla de negocio
df_total['utilidad'] = df_total['salario'] - df_total['gastos']

# Añadir la columna fecha_carga
df_total['fecha_carga'] = datetime.now()

#  Deduplicación global
df_total = df_total.drop_duplicates()

# ========================== #
# 3️ CARGA
# ========================== #

# Primero, asegurarse de que la tabla exista con el esquema correcto (7 columnas)
conn.execute("""
CREATE TABLE fact_finanzas_etl (
    id INTEGER,
    salario DOUBLE,
    gastos DOUBLE,
    fecha DATE,
    utilidad DOUBLE,
    correo_hash STRING,
    fecha_carga TIMESTAMP
)
""")

conn.register("df_etl", df_total)

conn.execute("""
INSERT INTO fact_finanzas_etl
SELECT
    id,
    salario,
    gastos,
    fecha_estandar AS fecha,
    utilidad,
    correo_hash,
    fecha_carga
FROM df_etl
""")

conn.unregister("df_etl")
conn.close()

# ========================== #
# FIN MEDICIÓN
# ========================== #
etl_time = time.time() - start
print("Tiempo ETL completo:", round(etl_time, 3), "segundos")

In [None]:

start = time.time()

conn = duckdb.connect('dw.duckdb')

# Limpieza previa de staging_raw
conn.execute("DROP TABLE IF EXISTS staging_raw")
# Borramos la tabla fact_finanzas_elt para asegurar que el esquema de 7 columnas se aplique correctamente en cada medición.
# Si la intención es APPEND desde una tabla pre-existente, esta línea debería ser eliminada después de la primera ejecución.
conn.execute("DROP TABLE IF EXISTS fact_finanzas_elt")

# Construir las URLs de los archivos CSV en GitHub
github_base_url = "https://raw.githubusercontent.com/jazaineam1/BigData2026/refs/heads/main/Airflow/staging/"
meses = [1, 2, 3, 4, 5, 6, 7]
github_csv_urls = [f"{github_base_url}finanzas_mes_{mes}.csv" for mes in meses]

# ========================== #
# 1 LOAD (Extract + Load)
# ========================== #
# DuckDB puede leer múltiples URLs directamente. Convertimos la lista de URLs a una cadena separada por comas.
csv_urls_str = ", ".join([f"'{url}'" for url in github_csv_urls])

conn.execute(f"""
CREATE OR REPLACE TABLE staging_raw AS -- Usamos OR REPLACE para asegurar que staging_raw siempre esté limpia
SELECT *
FROM read_csv_auto([{csv_urls_str}])
""")

# ========================== #
# TRANSFORM (Dentro del motor)
# ========================== #

# Primero, aseguramos que la tabla final exista con el esquema correcto (7 columnas)
conn.execute("""
CREATE TABLE fact_finanzas_elt (
    id INTEGER,
    salario DOUBLE,
    gastos DOUBLE,
    fecha DATE,
    correo_hash STRING,
    utilidad DOUBLE,
    fecha_carga TIMESTAMP
)
""")

conn.execute("""
INSERT INTO fact_finanzas_elt
SELECT DISTINCT
    id,
    salario,
    COALESCE(gastos, 0) AS gastos,
    -- Estandarización fecha
    CASE
        WHEN regexp_matches(fecha, '^[0-9]{4}-[0-9]{2}-[0-9]{2}$')
            THEN CAST(fecha AS DATE)
        WHEN regexp_matches(fecha, '^[0-9]{2}/[0-9]{2}/[0-9]{4}$')
            THEN STRPTIME(fecha, '%d/%m/%Y')
        WHEN regexp_matches(fecha, '^[0-9]{2}-[0-9]{2}-[0-9]{4}$')
            THEN STRPTIME(fecha, '%m-%d-%Y')
        ELSE NULL
    END AS fecha,
    -- Hash SHA256
    CASE
        WHEN correo IS NOT NULL
            THEN sha256(correo)
        ELSE NULL
    END AS correo_hash,
    -- Regla de negocio
    salario - COALESCE(gastos, 0) AS utilidad,
    CURRENT_TIMESTAMP AS fecha_carga
FROM staging_raw
WHERE
    id IS NOT NULL
    AND fecha IS NOT NULL
    AND salario > 0
    AND COALESCE(gastos, 0) >= 0
""")

elt_time = time.time() - start

conn.close() # Cerrar la conexión después de usarla

print("Tiempo ELT completo:", round(elt_time, 3), "segundos")

In [None]:
!pip install apache-airflow --quiet
