In [135]:
# Librerias y Dependencias
# =================================================
import os
from dotenv import load_dotenv
from __future__ import annotations
import argparse
import csv
import datetime as dt
import os
import psycopg2
from pathlib import Path
from typing import Iterable, List, Tuple, Optional
from datetime import datetime

In [136]:
#  Concultas SQL de Inserción y Actualizacion 
# ====================================================

INIT_STATS_ROW_SQL = """
INSERT INTO stats(name, cnt, ssum, smin, smax)
VALUES('global', 0, 0.0, NULL, NULL)
ON CONFLICT (name) DO NOTHING;
"""


INSERT_TX_SQL = """
INSERT INTO transactions (timestamp, price, user_id, source_file)
VALUES %s
ON CONFLICT DO NOTHING
"""

UPDATE_STATS_SQL = """
UPDATE stats
SET
  cnt = cnt + %s,
  ssum = ssum + %s,
  smin = CASE WHEN smin IS NULL THEN %s ELSE LEAST(smin, %s) END,
  smax = CASE WHEN smax IS NULL THEN %s ELSE GREATEST(smax, %s) END
WHERE name = %s
"""

GET_STATS_SQL = "SELECT cnt, ssum, smin, smax FROM stats WHERE name = %s"



CHECK_FILE_SQL = "SELECT 1 FROM ingestion_log WHERE file_name = %s;"


LOG_FILE_SQL = """
INSERT INTO ingestion_log(file_name, rows_loaded, loaded_at)
VALUES (%s, %s, %s)
ON CONFLICT (file_name)
DO UPDATE SET rows_loaded = EXCLUDED.rows_loaded,
              loaded_at   = EXCLUDED.loaded_at;
"""


# Creacion de esquema de la Base de datos
# ===============================================
DDL = {
    "transactions": """
        CREATE TABLE IF NOT EXISTS transactions (
            id SERIAL PRIMARY KEY,
            timestamp TIMESTAMP NOT NULL,
            price DOUBLE PRECISION NOT NULL,
            user_id TEXT NOT NULL,
            source_file TEXT NOT NULL
        );
    """,
    "ingestion_log": """
        CREATE TABLE IF NOT EXISTS ingestion_log (
            file_name   TEXT PRIMARY KEY,
            rows_loaded INTEGER NOT NULL,
            loaded_at   TIMESTAMP NOT NULL
        );
    """,
    "stats": """
        CREATE TABLE IF NOT EXISTS stats (
            name TEXT PRIMARY KEY CHECK (name = 'global'),
            cnt  BIGINT NOT NULL,
            ssum DOUBLE PRECISION,
            smin DOUBLE PRECISION,
            smax DOUBLE PRECISION 
        );
    """,
}




"""
    Esta función me permite inicializar la BD, crear las tablas.
"""
def init_db(conn):
    with conn.cursor() as cur:
        for sql in DDL.values():
            cur.execute(sql)
        cur.execute(INIT_STATS_ROW_SQL)
    conn.commit()

In [143]:
# Librerias y Dependencias
# =======================================================
import os
import csv
import psycopg2
from psycopg2.extras import execute_values
from dateutil import parser as dateparser
from decimal import Decimal, InvalidOperation



# Variables Globales
# ==================================================
CSV_DIR = "./datos"
MICROBATCH_SIZE = 8                  # poner 1 para actualizar por fila si quieres microbatch por fila
SOURCE_PREFIX = "2012-"             # asumimos archivos 2012-1.csv ... 2012-5.csv
STATS_NAME = 'global' 



# 1. Conexion BD Postgres
# =====================================================
DB_CONF = {
    "host": "localhost",
    "port": 5432,
    "dbname": "mb",
    "user": "postgres",
    "password": "user"
}



def find_source_files(directory):
    files = [f for f in os.listdir(directory) if f.endswith('.csv')]
    # filter out validation.csv
    files = [f for f in files if f.lower() != 'validation.csv']
    # keep only those that start with SOURCE_PREFIX (opcional)
    files = [f for f in files if f.startswith(SOURCE_PREFIX)]
    # sort to ensure chronological order: "2012-1.csv", "2012-2.csv", ...
    files.sort(key=lambda x: [int(part) if part.isdigit() else part for part in x.replace('.csv','').split('-')])
    return files


def parse_row(row):
    # Asumimos campos: timestamp, price, user_id
    ts_raw = row.get('timestamp') or row.get('time') or row.get('date')
    price_raw = row.get('price')
    user_id = row.get('user_id') or row.get('userid') or row.get('user')

    if not price_raw or price_raw.strip() == '':
        raise ValueError("price empty")

    # limpiar price (por si usan coma como separador)
    p = price_raw.replace(',', '').strip()
    try:
        price = Decimal(p)
    except InvalidOperation:
        # intentar reemplazar posible formato dd/mm/yyyy por ejemplo (defensivo)
        raise ValueError(f"price invalid: {price_raw}")

    # parse timestamp robusto
    ts = None
    if ts_raw:
        ts = dateparser.parse(ts_raw)
    # else ts puede ser None

    return ts, price, user_id

def process_file(conn, filepath, source_file_name, microbatch_size=100):
    cur = conn.cursor()
    
        # Verificar si ya fue procesado
    cur.execute(CHECK_FILE_SQL, (source_file_name,))
    if cur.fetchone():
        print(f"[SKIP] Archivo {source_file_name} ya fue procesado previamente.")
        cur.close()
        return 0
    
    
    inserted_total = 0
    batch = []
    batch_prices = []

    with open(filepath, newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row_idx, row in enumerate(reader, start=1):
            try:
                ts, price, user_id = parse_row(row)
            except Exception as e:
                print(f"[WARN] Skipping row {row_idx} in {source_file_name}: {e}")
                continue

            batch.append((ts, price, user_id, source_file_name))
            batch_prices.append(price)

            if len(batch) >= microbatch_size:
                # insert batch
                execute_values(cur, INSERT_TX_SQL, batch, template="(%s, %s, %s, %s)")
                # compute batch aggregates
                b_count = len(batch)
                b_sum = sum(batch_prices)
                b_min = min(batch_prices)
                b_max = max(batch_prices)
                # update stats atomically
                cur.execute(UPDATE_STATS_SQL, (b_count, b_sum, b_min, b_min, b_max, b_max, STATS_NAME))
                conn.commit()
                inserted_total += b_count
                print(f"[INFO] Inserted batch of {b_count} rows from {source_file_name}. Total inserted so far for this file: {inserted_total}")
                batch.clear()
                batch_prices.clear()

    # leftover
    if batch:
        execute_values(cur, INSERT_TX_SQL, batch, template="(%s, %s, %s, %s)")
        b_count = len(batch)
        b_sum = sum(batch_prices)
        b_min = min(batch_prices)
        b_max = max(batch_prices)
        cur.execute(UPDATE_STATS_SQL, (b_count, b_sum, b_min, b_min, b_max, b_max, STATS_NAME))
        conn.commit()
        inserted_total += b_count
        print(f"[INFO] Inserted final batch of {b_count} rows from {source_file_name}. File total: {inserted_total}")
        
    
    
    # Registrar archivo en log
    cur.execute(LOG_FILE_SQL, (source_file_name, inserted_total,datetime.now()))
    conn.commit()
    cur.close()

    print(f"[OK] Archivo {source_file_name} procesado con {inserted_total} registros insertados.")

    cur.close()
    return inserted_total

def print_stats(conn):
    cur = conn.cursor()
    cur.execute(GET_STATS_SQL, (STATS_NAME,))
    row = cur.fetchone()
    if row:
        row_count, price_sum, price_min, price_max = row
        mean = None
        if row_count and row_count > 0:
            mean = (price_sum / row_count) if price_sum is not None else None
        print(f"STATS -> rows: {row_count}, sum: {price_sum}, min: {price_min}, max: {price_max}, mean: {mean}")
    else:
        print("No stats row found.")
    cur.close()



In [144]:
def main():
    conn = psycopg2.connect(**DB_CONF)
    # Creacion de tablas
    init_db(conn)
    try:
        #--- Procesar solo los archivos CSV ---
        files = find_source_files(CSV_DIR)
        if not files:
            print("No se encontraron archivos de origen.")
            return

        for fname in files:
            fullpath = os.path.join(CSV_DIR, fname)
            print(f"\n=== Procesando archivo {fname} ===")
            inserted = process_file(conn, fullpath, fname, microbatch_size=MICROBATCH_SIZE)
            print(f"[DONE] Archivo {fname} procesado. Filas insertadas: {inserted}")
            # Imprimir stats después de cada archivo
            print_stats(conn)

        # --- Paso 2: imprimir estadísticas acumuladas ---
        print("\n>>> Estadísticas acumuladas después de cargar todos los archivos 2012-*.csv")
        print_stats(conn)

        # --- Paso 3: procesar validation.csv ---
        val_path = os.path.join(CSV_DIR, "validation.csv")
        if os.path.exists(val_path):
            print(f"\n=== Procesando archivo validation.csv ===")
            inserted = process_file(conn, val_path, "validation.csv", microbatch_size=MICROBATCH_SIZE)
            print(f"[DONE] Archivo validation.csv procesado. Filas insertadas: {inserted}")

            # --- Paso 4: imprimir estadísticas después de validation ---
            print("\n>>> Estadísticas después de cargar validation.csv")
            print_stats(conn)
        else:
            print("\n[WARN] No se encontró validation.csv en la carpeta de datos.")

    finally:
        conn.close()


In [145]:
if __name__ == '__main__':
    
    main()


=== Procesando archivo 2012-1.csv ===
[INFO] Inserted batch of 8 rows from 2012-1.csv. Total inserted so far for this file: 8
[WARN] Skipping row 12 in 2012-1.csv: price empty
[INFO] Inserted batch of 8 rows from 2012-1.csv. Total inserted so far for this file: 16
[WARN] Skipping row 21 in 2012-1.csv: price empty
[INFO] Inserted final batch of 4 rows from 2012-1.csv. File total: 20
[OK] Archivo 2012-1.csv procesado con 20 registros insertados.
[DONE] Archivo 2012-1.csv procesado. Filas insertadas: 20
STATS -> rows: 20, sum: 1193.0, min: 14.0, max: 97.0, mean: 59.65

=== Procesando archivo 2012-2.csv ===
[INFO] Inserted batch of 8 rows from 2012-2.csv. Total inserted so far for this file: 8
[INFO] Inserted batch of 8 rows from 2012-2.csv. Total inserted so far for this file: 16
[INFO] Inserted batch of 8 rows from 2012-2.csv. Total inserted so far for this file: 24
[INFO] Inserted final batch of 5 rows from 2012-2.csv. File total: 29
[OK] Archivo 2012-2.csv procesado con 29 registros i