<a href="https://colab.research.google.com/github/GermanMacias/gerencia-y-proyecto-para-ciencia-de-datos/blob/main/etl_power_consumption_colab_autoconfig.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# ETL  UCI Household Power Consumption

repo: `GermanMacias/gerencia-y-proyecto-para-ciencia-de-datos`

Incluye *fallbacks* automáticos para intentar estas rutas en GitHub RAW (en este orden):
1. `household_power_consumption.zip` en la raíz
2. `data/household_power_consumption.zip`
3. `household_power_consumption.txt` en la raíz
4. `data/household_power_consumption.txt`

Genera **Parquet particionado (bronze, silver)** y **agregados (gold)**.


## 0) Dependencias

In [1]:

!pip -q install pyarrow fastparquet pandas gdown requests


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.2/1.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━[0m [32m1.3/1.8 MB[0m [31m16.8 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m17.0 MB/s[0m eta [36m0:00:00[0m
[?25h

## 1) Configuración (prellenada)

In [2]:

# Repo y candidatos RAW ya preconfigurados
GITHUB_USER_REPO = "GermanMacias/gerencia-y-proyecto-para-ciencia-de-datos"
CANDIDATES = [
    "https://raw.githubusercontent.com/{}/main/household_power_consumption.zip",
    "https://raw.githubusercontent.com/{}/main/data/household_power_consumption.zip",
    "https://raw.githubusercontent.com/{}/main/household_power_consumption.txt",
    "https://raw.githubusercontent.com/{}/main/data/household_power_consumption.txt",
]
CANDIDATES = [u.format(GITHUB_USER_REPO) for u in CANDIDATES]

# Nombre esperado dentro del zip (si aplica)
INNER_TXT_NAME = "household_power_consumption.txt"


## 2) Descarga desde GitHub (con fallbacks) o subida manual

In [3]:

import os, io, zipfile, requests
from pathlib import Path

data_dir = Path('/content/data')
data_dir.mkdir(parents=True, exist_ok=True)

def try_download(url):
    try:
        print(f"Intentando: {url}")
        r = requests.get(url, timeout=60)
        if r.ok and r.content and len(r.content) > 1024:  # >1KB
            fn = url.split('/')[-1].split('?')[0]
            local_path = str(data_dir / fn)
            with open(local_path, 'wb') as f:
                f.write(r.content)
            print(f"Descargado → {local_path} ({len(r.content)/1e6:.2f} MB)")
            return local_path
        else:
            print(f"Falló: status {r.status_code}")
    except Exception as e:
        print("Error:", e)
    return None

local_path = None
for url in CANDIDATES:
    local_path = try_download(url)
    if local_path:
        break

if not local_path:
    print("\nNo se pudo descargar automáticamente. Sube el archivo manualmente (.zip o .txt).")
    from google.colab import files
    uploaded = files.upload()
    if uploaded:
        fn = next(iter(uploaded))
        local_path = str(data_dir / fn)
        with open(local_path, 'wb') as f:
            f.write(uploaded[fn])

if not local_path:
    raise RuntimeError("No hay archivo de entrada.")

is_zip = local_path.lower().endswith('.zip')
print("Archivo local:", local_path, "| is_zip:", is_zip)


Intentando: https://raw.githubusercontent.com/GermanMacias/gerencia-y-proyecto-para-ciencia-de-datos/main/household_power_consumption.zip
Falló: status 404
Intentando: https://raw.githubusercontent.com/GermanMacias/gerencia-y-proyecto-para-ciencia-de-datos/main/data/household_power_consumption.zip
Falló: status 404
Intentando: https://raw.githubusercontent.com/GermanMacias/gerencia-y-proyecto-para-ciencia-de-datos/main/household_power_consumption.txt
Falló: status 404
Intentando: https://raw.githubusercontent.com/GermanMacias/gerencia-y-proyecto-para-ciencia-de-datos/main/data/household_power_consumption.txt
Falló: status 404

No se pudo descargar automáticamente. Sube el archivo manualmente (.zip o .txt).


Saving individual+household+electric+power+consumption.zip to individual+household+electric+power+consumption.zip
Archivo local: /content/data/individual+household+electric+power+consumption.zip | is_zip: True


## 3) Ingesta y limpieza (chunked) → **bronze**

In [4]:

import pandas as pd
import numpy as np
from datetime import datetime
from io import BytesIO

bronze_dir = data_dir / 'bronze_parquet'
silver_dir = data_dir / 'silver_parquet'
bronze_dir.mkdir(exist_ok=True, parents=True)
silver_dir.mkdir(exist_ok=True, parents=True)

expected_cols = [
    'Date','Time','Global_active_power','Global_reactive_power','Voltage',
    'Global_intensity','Sub_metering_1','Sub_metering_2','Sub_metering_3'
]

def parse_and_clean(df: pd.DataFrame) -> pd.DataFrame:
    df.columns = [c.strip() for c in df.columns]
    miss = [c for c in expected_cols if c not in df.columns]
    if miss:
        raise ValueError(f"Faltan columnas: {miss}")
    # '?' o vacío → NaN
    for c in expected_cols:
        df[c] = df[c].replace({'?': np.nan, '': np.nan})
    # timestamp dd/mm/yyyy HH:MM:SS
    df['timestamp'] = pd.to_datetime(
        df['Date'].astype(str).str.strip() + ' ' + df['Time'].astype(str).str.strip(),
        format='%d/%m/%Y %H:%M:%S',
        errors='coerce'
    )
    # numéricos (coma→punto)
    num_cols = [c for c in expected_cols if c not in ['Date','Time']]
    for c in num_cols:
        df[c] = df[c].astype(str).str.replace(',', '.', regex=False)
        df[c] = pd.to_numeric(df[c], errors='coerce')
    df = df[~df['timestamp'].isna()].copy()
    df['year'] = df['timestamp'].dt.year
    df['month'] = df['timestamp'].dt.month
    return df

def write_partitioned_parquet(df: pd.DataFrame, base_dir: Path):
    for (y,m), g in df.groupby(['year','month']):
        part_dir = base_dir / f"year={y}/month={m:02d}"
        part_dir.mkdir(parents=True, exist_ok=True)
        ts_min = g['timestamp'].min().strftime('%Y%m%d%H%M%S')
        ts_max = g['timestamp'].max().strftime('%Y%m%d%H%M%S')
        out_path = part_dir / f"part_{ts_min}_{ts_max}.parquet"
        g.drop(columns=['year','month']).to_parquet(out_path, index=False)

def iter_chunks(local_path, is_zip, inner_name):
    if is_zip:
        with zipfile.ZipFile(local_path) as z:
            with z.open(inner_name) as f:
                data = f.read()
                bio = BytesIO(data)
                for chunk in pd.read_csv(bio, sep=';', chunksize=200_000, dtype=str, low_memory=False):
                    yield chunk
    else:
        for chunk in pd.read_csv(local_path, sep=';', chunksize=200_000, dtype=str, low_memory=False):
            yield chunk

rows = 0
for i, raw_chunk in enumerate(iter_chunks(local_path, is_zip, INNER_TXT_NAME), start=1):
    print(f"Chunk {i}...")
    df = parse_and_clean(raw_chunk)
    rows += len(df)
    write_partitioned_parquet(df, bronze_dir)
print(f"Bronze filas: {rows:,}")


Chunk 1...
Chunk 2...
Chunk 3...
Chunk 4...
Chunk 5...
Chunk 6...
Chunk 7...
Chunk 8...
Chunk 9...
Chunk 10...
Chunk 11...
Bronze filas: 2,075,259


## 4) **silver**: variable `active_not_submetered` y guardado

In [8]:

import glob, pandas as pd

def augment_and_write_silver(bronze_dir: Path, silver_dir: Path):
    files = glob.glob(str(bronze_dir / "year=*/month=*/part_*.parquet"))
    total = 0
    for fp in files:
        df = pd.read_parquet(fp)
        df['active_not_submetered'] = (
    (df['Global_active_power'] * 1000.0 / 60.0)
    - df['Sub_metering_1'].fillna(0)
    - df['Sub_metering_2'].fillna(0)
    - df['Sub_metering_3'].fillna(0)
)

        df['year'] = df['timestamp'].dt.year
        df['month'] = df['timestamp'].dt.month
        write_partitioned_parquet(df, silver_dir)
        total += len(df)
    print(f"Silver filas: {total:,}")

augment_and_write_silver(bronze_dir, silver_dir)


Silver filas: 2,075,259


## 5) **gold**: agregados por hora y día

In [9]:

gold_dir = data_dir / 'gold_parquet'
gold_dir.mkdir(parents=True, exist_ok=True)

def build_gold(silver_dir: Path, gold_dir: Path):
    import glob, pandas as pd
    files = glob.glob(str(silver_dir / "year=*/month=*/part_*.parquet"))
    hourly_list, daily_list = [], []
    agg_cols = ['Global_active_power','Global_reactive_power','Voltage',
                'Global_intensity','Sub_metering_1','Sub_metering_2','Sub_metering_3','active_not_submetered']
    for fp in files:
        df = pd.read_parquet(fp)
        df['hour_ts'] = df['timestamp'].dt.floor('H')
        df['day'] = df['timestamp'].dt.date
        hourly = df.groupby('hour_ts')[agg_cols].agg(['mean','sum']).reset_index()
        daily = df.groupby('day')[agg_cols].agg(['mean','sum']).reset_index()
        hourly_list.append(hourly); daily_list.append(daily)
    if hourly_list:
        pd.concat(hourly_list, ignore_index=True).to_parquet(gold_dir / "hourly_aggregates.parquet", index=False)
        print("gold horario listo")
    if daily_list:
        pd.concat(daily_list, ignore_index=True).to_parquet(gold_dir / "daily_aggregates.parquet", index=False)
        print("gold diario listo")

build_gold(silver_dir, gold_dir)


  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.floor('H')
  df['hour_ts'] = df['timestamp'].dt.flo

gold horario listo
gold diario listo


## 6) Checks rápidos

In [10]:

import glob, pandas as pd

def count_rows(base_dir: Path):
    files = glob.glob(str(base_dir / "year=*/month=*/part_*.parquet"))
    total, tmin, tmax = 0, None, None
    for fp in files:
        df = pd.read_parquet(fp, columns=['timestamp'])
        total += len(df)
        if len(df):
            mi, ma = df['timestamp'].min(), df['timestamp'].max()
            tmin = mi if tmin is None else min(tmin, mi)
            tmax = ma if tmax is None else max(tmax, ma)
    return total, tmin, tmax

b_count, b_min, b_max = count_rows(bronze_dir)
s_count, s_min, s_max = count_rows(silver_dir)
print(f"Bronze: {b_count:,} filas | {b_min} → {b_max}")
print(f"Silver: {s_count:,} filas | {s_min} → {s_max}")

files = glob.glob(str(silver_dir / "year=*/month=*/part_*.parquet"))
if files:
    df = pd.read_parquet(files[0]).head(10)
    df
else:
    print("Sin particiones para mostrar.")


Bronze: 2,075,259 filas | 2006-12-16 17:24:00 → 2010-11-26 21:02:00
Silver: 2,075,259 filas | 2006-12-16 17:24:00 → 2010-11-26 21:02:00
