In [None]:
# Extracción y almacenamiento de datos desde la API de la EIA

"""
Este script realiza la extracción de datos sobre consumo y ventas de electricidad en EE.UU.
Se implementan dos tipos de extracción:
- Full: Descarga todos los datos históricos.
- Incremental: Descarga solo los datos del último día.

Los datos se almacenan en un Data Lake siguiendo la estructura de capas:
- Bronze: Datos crudos obtenidos de la API.
- Silver: Datos procesados y limpios.
- Gold: Datos listos para análisis.
"""

In [1]:
#Instalación de bibliotecas para solicitudes HTTP y trabajar con DeltaLake
!pip install requests deltalake

Collecting deltalake
  Downloading deltalake-0.25.5-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Downloading deltalake-0.25.5-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (45.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.9/45.9 MB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: deltalake
Successfully installed deltalake-0.25.5


In [2]:
#Importación de librerias y módulos para su uso en las extracciones
import os
import requests
import pandas as pd
from pprint import pprint
from configparser import ConfigParser
from datetime import datetime, timedelta
from deltalake import write_deltalake, DeltaTable
import numpy as np
from google.colab import files


In [3]:
#Para realizar la extracción es necesario el access token, lo he archivado
#junto con la entrega de este Colab.
#Solo es necesario cargar "pipeline.conf" desde el cuadro "Elegir archivos"

files.upload()  # Esto abrirá el cuadro de diálogo para seleccionar el archivo


with open('pipeline.conf', 'r') as file:
    access_token = file.read()
print(f"Token de acceso cargado: {access_token}")


Saving pipeline.conf to pipeline.conf
Token de acceso cargado: [tmdb_api]
access_token = WrHnbEfGfLwR67MJKoxvuYvBMJDHLkh8dXqGZmau


In [4]:
# Función para obtener credenciales de la API
def get_api_config():
  parser = ConfigParser()
  parser.read("pipeline.conf")
  api_credentials = parser["tmdb_api"]
  return dict(api_credentials)

In [5]:
# Definición de los endpoints incremental y full respectivamente
ENDPOINTS = {
    "electricity_consumption": "https://api.eia.gov/v2/electricity/rto/region-sub-ba-data/data/",
    "electricity_sales": "https://api.eia.gov/v2/electricity/retail-sales/data/"
}

In [6]:
# Obtención de token de acceso
access_token = get_api_config()
api_credentials = get_api_config()
print(api_credentials)

{'access_token': 'WrHnbEfGfLwR67MJKoxvuYvBMJDHLkh8dXqGZmau'}


In [7]:
# Extracción Incremental

BASE_URL = ENDPOINTS["electricity_consumption"]
access_token = api_credentials["access_token"]

fecha_actual = datetime.now()
fecha_inicio = (fecha_actual - timedelta(days=1)).strftime('%Y-%m-%d')  # Últimas 24 horas
fecha_fin = fecha_actual.strftime('%Y-%m-%d')

params = {
    "api_key": access_token,
    "frequency": "hourly",
    "data[]": "value",
    "start": fecha_inicio,
    "end": fecha_fin
}

response = requests.get(BASE_URL, params=params)

if response.status_code == 200:
    data = response.json()
    if "response" in data and "data" in data["response"]:
        df = pd.DataFrame(data["response"]["data"])

        if 'value' in df.columns:
            df['value'] = pd.to_numeric(df['value'], errors='coerce')
            df['period'] = pd.to_datetime(df['period'])
            df = df.sort_values(by='period', ascending=False)

        df['fecha'] = df['period'].dt.date
        df['hora'] = df['period'].dt.hour

        print(df[['period', 'subba', 'subba-name', 'value']].head())

        path = "/content/datalake/bronze/eia/electricity_subba"

        if os.path.exists(path):
            existing_df = DeltaTable(path).to_pandas()
            df = pd.concat([existing_df, df]).drop_duplicates().reset_index(drop=True)

        write_deltalake(path, df, mode="append", partition_by=["fecha", "hora"])
    else:
        print("No se encontraron valores.")
else:
    print(f"Error en la solicitud: {response.status_code}, {response.text}")

# - Se extraen datos de consumo eléctrico con frecuencia horaria.
# - Debido a la naturaleza dinámica de los datos, se usa una extracción incremental.
# - Se almacena en Delta Lake con partición por fecha y hora para facilitar consultas temporales.
# - Se usa el modo "append" para mantener el histórico de consumo eléctrico.




       period subba                                  subba-name  value
0  2025-04-10  ZONA                                        West   1860
15 2025-04-10  KACY       Kansas City Board of Public Utilities    233
27 2025-04-10    WR                               Westar Energy   3360
26 2025-04-10  WFEC        Western Farmers Electric Cooperative   1071
25 2025-04-10  WAUE  Western Area Power Upper Great Plains East   3792


In [8]:
#Extracción Full

BASE_URL = ENDPOINTS["electricity_sales"]
access_token = api_credentials["access_token"]

fecha_actual = datetime.now().year
fecha_inicio = "2014-01-01"  # Se mantiene el inicio fijo en el año 2000
fecha_fin = f"{fecha_actual}-01-01"  # Año actual dinámico

params = {
    "api_key": access_token,
    "frequency": "annual",
    "data[]": "customers",
    "facets[stateid][]": ["FL"], # Filtrado por estado de Florida
    "start": fecha_inicio,
    "end": fecha_fin
}

response = requests.get(BASE_URL, params=params)

if response.status_code == 200:
    data = response.json()
    if "response" in data and "data" in data["response"]:
        df = pd.DataFrame(data["response"]["data"])
        print(df.head(10))

        # Renombrar columnas
        df.rename(columns={'stateid': 'State'}, inplace=True)

        # Convertir la columna 'customers' a numérica
        df['stateDescription'] = pd.to_numeric(df['stateDescription'], errors='coerce')

        # Verificación del tipo de datos y valores en la columna 'customers'
        print("Valores en 'customers' antes de la limpieza:")
        print(df['customers'].unique())

        # Convertir cualquier valor "None", 'None', o vacío en NaN
        df['customers'] = df['customers'].apply(lambda x: np.nan if str(x).strip().lower() == "none" or pd.isnull(x) else x)

        # Eliminar filas donde la columna 'customers' sea NaN o tenga valores no válidos
        df = df.dropna(subset=['customers'])

        # Verificación después de la limpieza
        print("Valores en 'customers' después de la limpieza:")
        print(df['customers'].unique())

        path = "/content/datalake/bronze/eia/electricity_retail_sales"

        if os.path.exists(path):
            existing_df = DeltaTable(path).to_pandas()
            df = pd.concat([existing_df, df]).drop_duplicates(subset=None, keep='first', inplace=False).reset_index(drop=True)

        write_deltalake(path, df, mode="overwrite", schema_mode="merge")


    else:
        print("No se encontraron datos.")
else:
    print(f"Error en la solicitud: {response.status_code}, {response.text}")


# - Se extraen datos anuales sobre la cantidad de clientes de electricidad en Florida.
# - Como los datos cambian solo una vez al año, se utiliza una extracción completa.
# - Se almacena en Delta Lake con el modo "overwrite" para actualizar los datos sin conservar versiones antiguas.


  period stateid stateDescription sectorid      sectorName customers  \
0   2023      FL          Florida      ALL     all sectors  11541551   
1   2023      FL          Florida      COM      commercial   1292988   
2   2023      FL          Florida      IND      industrial     26085   
3   2022      FL          Florida      ALL     all sectors  11372598   
4   2022      FL          Florida      COM      commercial   1282170   
5   2022      FL          Florida      IND      industrial     23673   
6   2022      FL          Florida      OTH           other      None   
7   2022      FL          Florida      RES     residential  10066753   
8   2022      FL          Florida      TRA  transportation         2   
9   2023      FL          Florida      OTH           other      None   

       customers-units  
0  number of customers  
1  number of customers  
2  number of customers  
3  number of customers  
4  number of customers  
5  number of customers  
6  number of customers  
7  numb

In [9]:
# Almacenamiento BRONZE
def almacenar_datos(data, layer="bronze"):
    if not data:
        print("No hay datos para almacenar.")
        return
    df = pd.DataFrame(data["response"]["data"])
    path = f"data_lake/{layer}/nuclear_outages/"
    os.makedirs(path, exist_ok=True)
    df.to_csv(f"{path}outages_{datetime.today().strftime('%Y-%m-%d')}.csv", index=False)
    print(f"Datos almacenados en {path}")

In [10]:
# Almacenamiento SILVER
def process_bronze_to_silver():
    bronze_path = "/content/datalake/bronze/eia/electricity_subba"
    silver_path = "/content/datalake/silver/eia/electricity_subba"

    if not os.path.exists(bronze_path):
        print("No hay datos en Bronze.")
        return

    df = DeltaTable(bronze_path).to_pandas()

    # Transformaciones: limpieza y normalización
    df = df.dropna()
    df['value'] = pd.to_numeric(df['value'], errors='coerce')
    df['period'] = pd.to_datetime(df['period'])
    df = df.sort_values(by='period', ascending=False)
    df = df.drop_duplicates()

    # Reseteo el índice para evitar duplicado
    df.reset_index(drop=True, inplace=True)

    write_deltalake(silver_path, df, mode="overwrite", partition_by=["fecha"])
    print("Datos procesados a Silver.")

#Almacenamiento GOLD
def process_silver_to_gold():
    silver_path = "/content/datalake/silver/eia/electricity_subba"
    gold_path = "/content/datalake/gold/eia/electricity_aggregates"

    if not os.path.exists(silver_path):
        print("No hay datos en Silver.")
        return

    df = DeltaTable(silver_path).to_pandas()

    # Promedio de valores por día
    df_gold = df.groupby("fecha")["value"].mean().reset_index()
    df_gold.rename(columns={"value": "avg_value"}, inplace=True)

    # Reseteo índice para evitar duplicado
    df_gold.reset_index(drop=True, inplace=True)

    write_deltalake(gold_path, df_gold, mode="overwrite")
    print("Datos procesados a Gold.")

# Ejecutar los procesos
process_bronze_to_silver()
process_silver_to_gold()

Datos procesados a Silver.
Datos procesados a Gold.
