<font color="#87CEEB" size="6">Carbon Intensity API</font>

<font color="#4682B4" size="3">
Introduction
This is the Official Carbon Intensity API for Great Britain developed by National Energy System Operator (NESO). You can find out more about carbon intensity at carbonintensity.org.uk.

Summary
National Energy System Operator’s Carbon Intensity API provides an indicative trend of regional carbon intensity of the electricity system in Great Britain (GB) up to 2 days ahead of real-time. It provides programmatic and timely access to both forecast and estimated carbon intensity data. The Carbon Intensity forecast includes CO2 emissions related to electricity generation only. The includes emissions from all large metered power stations, interconnector imports, transmission and distribution losses, and accounts for national electricity demand, embedded wind and solar generation.</font>


<font color="#4682B4" size="3">Base URL: <https://api.carbonintensity.org.uk/>

In [2]:
# @title
from IPython.display import HTML

html_code = """
<div style="background-color: lightpink; padding: 15px; border-radius: 5px; color: darkred;">
<strong><i>Módulo Nº1 - Unidad Nº 1</i></strong><br>
<br>
Extracción y almacenamiento de datos</div>
"""

display(HTML(html_code))

In [3]:
!pip install requests
!pip install deltalake
!pip install pyarrow
!pip install -q ydata-profiling

Collecting deltalake
  Downloading deltalake-0.25.5-cp39-abi3-win_amd64.whl.metadata (5.6 kB)
Downloading deltalake-0.25.5-cp39-abi3-win_amd64.whl (38.2 MB)
   ---------------------------------------- 0.0/38.2 MB ? eta -:--:--
   - -------------------------------------- 1.0/38.2 MB 16.7 MB/s eta 0:00:03
   -- ------------------------------------- 2.1/38.2 MB 7.8 MB/s eta 0:00:05
   -- ------------------------------------- 2.1/38.2 MB 7.8 MB/s eta 0:00:05
   --- ------------------------------------ 3.1/38.2 MB 3.5 MB/s eta 0:00:11
   ---- ----------------------------------- 4.2/38.2 MB 4.1 MB/s eta 0:00:09
   ----- ---------------------------------- 5.2/38.2 MB 4.4 MB/s eta 0:00:08
   ------- -------------------------------- 7.3/38.2 MB 4.8 MB/s eta 0:00:07
   -------- ------------------------------- 8.4/38.2 MB 5.0 MB/s eta 0:00:07
   ---------- ----------------------------- 10.5/38.2 MB 5.3 MB/s eta 0:00:06
   ------------ --------------------------- 11.5/38.2 MB 5.4 MB/s eta 0:00:05


In [4]:
# @title
from IPython.display import HTML

html_code = """
<div style="background-color: lavender; padding: 15px; border-radius: 5px; color: indigo;">
<strong>Delta Lake:</strong> es un formato de almacenamiento conocido como open table, o bien formato para lakehouse, que permite almacenar datos en un formato tabular, con la capacidad de realizar operaciones de lectura y escritura de datos
 de manera transaccional, es decir, que permite realizar operaciones de escritura y lectura de datos de manera atómica, lo que garantiza la integridad de los datos almacenados.</div>
"""

display(HTML(html_code))

In [5]:
# Importamos las librerias y modulos necesarios
# write_deltalake: Modulo que permite escribir un DataFrame en un archivo Delta Lake
# DeltaTable: Modulo para manipular archivos Delta Lake

import json
import requests
import pandas as pd
import pyarrow as pa
from deltalake import write_deltalake, DeltaTable
from deltalake.exceptions import TableNotFoundError
from datetime import datetime, timedelta
import os
from ydata_profiling import ProfileReport

In [6]:
#Definimos una funcion para obtener datos de una API en formato .json
#Definimos una funcion que construye un DataFrame de pandas a partir de datos en formato JSON.

def get_data(base_url, endpoint, data_field=None, params=None, headers=None):
    """
    Realiza una solicitud GET a una API para obtener datos.

    Parámetros:
    base_url (str): La URL base de la API.
    endpoint (str): El endpoint de la API al que se realizará la solicitud.
    params (dict): Parámetros de consulta para enviar con la solicitud.
    data_field (str): El nombre del campo en el JSON que contiene los datos.
    headers (dict): Encabezados para enviar con la solicitud.

    Retorna:
    dict: Los datos obtenidos de la API en formato JSON.
    """
    try:
        endpoint_url = f"{base_url}/{endpoint}"
        response = requests.get(endpoint_url, params=params, headers=headers)
        response.raise_for_status()  # Levanta una excepción si hay un error en la respuesta HTTP.

        # Verificar si los datos están en formato JSON.
        try:
            data = response.json()
            if data_field:
              data = data[data_field]
        except:
            print("El formato de respuesta no es el esperado")
            return None
        return data

    except requests.exceptions.RequestException as e:
        # Capturar cualquier error de solicitud, como errores HTTP.
        print(f"La petición ha fallado. Código de error : {e}")
        return None

def build_table(json_data):
    """
    Construye un DataFrame de pandas a partir de datos en formato JSON.

    Parámetros:
    json_data (dict): Los datos en formato JSON obtenidos de una API.

    Retorna:
    DataFrame: Un DataFrame de pandas que contiene los datos.
    """
    try:
        df = pd.json_normalize(json_data)
        return df
    except:
        print("Los datos no están en el formato esperado")
        return None

In [7]:
#Definimos las funciones para realizar las actualizaciones
def save_data_as_delta(df, path, mode="overwrite", partition_cols=None):
    """
    Guarda un dataframe en formato Delta Lake en la ruta especificada.
    A su vez, es capaz de particionar el dataframe por una o varias columnas.
    Por defecto, el modo de guardado es "overwrite".

    Args:
      df (pd.DataFrame): El dataframe a guardar.
      path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      mode (str): El modo de guardado. Son los modos que soporta la libreria
      deltalake: "overwrite", "append", "error", "ignore".
      partition_cols (list or str): La/s columna/s por las que se particionará el
      dataframe. Si no se especifica, no se particionará.
    """
    write_deltalake(
        path, df, mode=mode, partition_by=partition_cols
    )

def save_new_data_as_delta(new_data, data_path, predicate, partition_cols=None):
    """
    Guarda solo nuevos datos en formato Delta Lake usando la operación MERGE,
    comparando los datos ya cargados con los datos que se desean almacenar
    asegurando que no se guarden registros duplicados.

    Args:
      new_data (pd.DataFrame): Los datos que se desean guardar.
      data_path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      predicate (str): La condición de predicado para la operación MERGE.
    """

    try:
      dt = DeltaTable(data_path)
      new_data_pa = pa.Table.from_pandas(new_data)
      # Se insertan en target, datos de source que no existen en target
      dt.merge(
          source=new_data_pa,
          source_alias="source",
          target_alias="target",
          predicate=predicate
      ) \
      .when_not_matched_insert_all() \
      .execute()

    # Si no existe la tabla Delta Lake, se guarda como nueva
    except TableNotFoundError:
      save_data_as_delta(new_data, data_path, partition_cols=partition_cols)



def upsert_data_as_delta(data, data_path, predicate):
    """
    Guardar datos en formato Delta Lake usando la operacion MERGE.
    Cuando no haya registros coincidentes, se insertarán nuevos registros.
    Cuando haya registros coincidentes, se actualizarán los campos.

    Args:
      data (pd.DataFrame): Los datos que se desean guardar.
      data_path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      predicate (str): La condición de predicado para la operación MERGE.
    """
    try:
        dt = DeltaTable(data_path)
        data_pa = pa.Table.from_pandas(data)
        dt.merge(
            source=data_pa,
            source_alias="source",
            target_alias="target",
            predicate=predicate
        ) \
        .when_matched_update_all() \
        .when_not_matched_insert_all() \
        .execute()
    except TableNotFoundError:
        save_data_as_delta(data, data_path)



def read_most_recent_partition(data_path):
    """
    Lee la particion mas reciente de una tabla Delta Lake, teniendo en cuenta
    la fecha y hora actual.
    Supone que la tabla delta lake esta particionada por fecha y hora

    Args:
      data_path (str): La ruta donde se encuentra la tabla Delta Lake.

    Returns:
      pd.DataFrame: Los datos de la particion mas reciente.
    """
    try:
      requested_date = datetime.utcnow() - timedelta(hours=1)
      dt = DeltaTable(data_path)
      df_recent = dt.to_pandas(
        partitions=[
        ("fecha", "=", requested_date.strftime("%Y-%m-%d")),
        ("hora", "=", requested_date.strftime("%H"))
        ]
        )
      return df_recent
    except:
      raise Exception(f"No se pudo procesar la tabla Delta Lake, por {E}")
      return None

In [8]:
# @title
from IPython.display import HTML

html_code = """
<div style="background-color: lavender; padding: 15px; border-radius: 5px; color: indigo;">
<strong>Extraccion full:</strong> se caracteriza por extraer todos los datos de la fuente en cada ejecución del
proceso de extracción y volcarlos por completo en el sistema de destino. El
volcado de los datos puede sobreescribir lo que ya está disponible en el
sistema de destino, o depositar los datos en otro archivo o directorio. Puede ser
útil cuando se trabaja con fuentes de datos estáticas o cuando los requisitos del
proyecto no permiten la identificación de cambios incrementales.</div>
"""

display(HTML(html_code))

In [9]:
##############################################################
#                      EXTRACCION FULL                       #
##############################################################



#Creamos una variable con la URL base que vamos a trabajar
base_url = "https://api.carbonintensity.org.uk/"



#Definimos el "endpoint" con el que vamos a trabajar y del cual tenemos que extraer la informacion, para hacerlo debemos suministrar
#los siguientes parametros:

##############################################################
#Segun la pagina web debemos suministar los siguientes datos:#
##############################################################

# GET /regional/intensity/{from}/fw48h/postcode/{postcode}
# Code samples

# # You can also use wget
# curl -X GET https://api.carbonintensity.org.uk/regional/intensity/{from}/fw48h/postcode/{postcode} \
#   -H 'Accept: application/json'

# Get Carbon Intensity data for next 48h for specified postcode

# Get Regional Carbon Intensity data for next 48h for specified postcode. All times provided in UTC (+00:00).



#Definimos una variable de inicio que tome el ultimo año
start_date_full = datetime.utcnow() - timedelta(days=365)

#Le damos formato para que tome la medicion del 1er dia de cada mes del año anterior
start_date_full = start_date_full.strftime("%Y-%m-01T00:00:00Z")


#Definimos el codigo postal como parametro
postcode = "RG10"

endpoint_regional = f"/regional/intensity/{start_date_full}/fw48h/postcode/{postcode}"

regional_json_data = get_data(base_url, endpoint_regional, data_field='data')

print(start_date_full)
regional_json_data

  start_date_full = datetime.utcnow() - timedelta(days=365)


2024-05-01T00:00:00Z


{'regionid': 12,
 'dnoregion': 'SSE South',
 'shortname': 'South England',
 'postcode': 'RG10',
 'data': [{'from': '2024-04-30T23:30Z',
   'to': '2024-05-01T00:00Z',
   'intensity': {'forecast': 156, 'index': 'moderate'},
   'generationmix': [{'fuel': 'biomass', 'perc': 7.7},
    {'fuel': 'coal', 'perc': 1.5},
    {'fuel': 'imports', 'perc': 52.7},
    {'fuel': 'gas', 'perc': 31.9},
    {'fuel': 'nuclear', 'perc': 0.7},
    {'fuel': 'other', 'perc': 0},
    {'fuel': 'hydro', 'perc': 0.1},
    {'fuel': 'solar', 'perc': 0},
    {'fuel': 'wind', 'perc': 5.3}]},
  {'from': '2024-05-01T00:00Z',
   'to': '2024-05-01T00:30Z',
   'intensity': {'forecast': 174, 'index': 'moderate'},
   'generationmix': [{'fuel': 'biomass', 'perc': 7.5},
    {'fuel': 'coal', 'perc': 1.1},
    {'fuel': 'imports', 'perc': 47.7},
    {'fuel': 'gas', 'perc': 37.7},
    {'fuel': 'nuclear', 'perc': 0.8},
    {'fuel': 'other', 'perc': 0},
    {'fuel': 'hydro', 'perc': 0.1},
    {'fuel': 'solar', 'perc': 0},
    {'fuel'

In [10]:
#Aplicamos pd.json_normalize para asignarle la estructura correspondiente al archivo
# Ver mas informacion de la pagina oficial de Pandas: https://pandas.pydata.org/docs/reference/api/pandas.json_normalize.html

regional_data_normalized = pd.json_normalize(regional_json_data['data'],
                                 record_path='generationmix',
                                 meta=['from', 'to', ['intensity', 'forecast'], ['intensity', 'index']],
                                 sep= '_')


#Obtenemos por separado la siguiente informacion del archivo .json y creamos la columnas correspondientes del DataFrame. En estos casos
#no es necesario aplicar json_normalize
regional_data_normalized['regionid'] = regional_json_data['regionid']
regional_data_normalized['dnoregion'] = regional_json_data['dnoregion']
regional_data_normalized['shortname'] = regional_json_data['shortname']
regional_data_normalized['postcode'] = regional_json_data['postcode']

regional_data_normalized.head()

Unnamed: 0,fuel,perc,from,to,intensity_forecast,intensity_index,regionid,dnoregion,shortname,postcode
0,biomass,7.7,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10
1,coal,1.5,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10
2,imports,52.7,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10
3,gas,31.9,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10
4,nuclear,0.7,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10


In [11]:
#Informacion del tipo de dato de cada una de las columnas
regional_data_normalized.info()


# Crear una tabla o archivo Delta Lake vacío
DeltaTable.create(table_uri="datalakehouse/1_bronze/carbon_intensity_api/regional",
     # Columnas y tipos de datos
                  schema = pa.schema([
                      pa.field("fuel", pa.string()),
                      pa.field("perc", pa.float64()),
                      pa.field("from", pa.string()),
                      pa.field("to", pa.string()),
                      pa.field("intensity_forecast", pa.string()),
                      pa.field("intensity_index", pa.string()),
                      pa.field("regionid", pa.int64()),
                      pa.field("dnoregion", pa.string()),
                      pa.field("shortname", pa.string()),
                      pa.field("postcode", pa.string())
                      ]),
                  mode="ignore", #Si se selecciona 'ignore', no realizará ninguna acción si la tabla ya existe.
                  description="Tabla de datos crudos de tipos de combustibles",
                  # Configuraciones de la tabla
                  # Etiquedas o tags adicionales, para sumar metadata.
                  custom_metadata={
                      "source": "NESO" #National Energy System Operator
                  },
                  configuration={
                      "delta.deletedFileRetentionDuration": "interval 7 day",
                      "delta.logRetentionDuration": "interval 7 day",
                      "delta.enableChangeDataFeed": "true",
    }                  )

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 873 entries, 0 to 872
Data columns (total 10 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   fuel                873 non-null    object 
 1   perc                873 non-null    float64
 2   from                873 non-null    object 
 3   to                  873 non-null    object 
 4   intensity_forecast  873 non-null    object 
 5   intensity_index     873 non-null    object 
 6   regionid            873 non-null    int64  
 7   dnoregion           873 non-null    object 
 8   shortname           873 non-null    object 
 9   postcode            873 non-null    object 
dtypes: float64(1), int64(1), object(8)
memory usage: 68.3+ KB


DeltaTable()

In [12]:
# @title
from IPython.display import HTML

html_code = """
<div style="background-color: lavender; padding: 15px; border-radius: 5px; color: indigo;">
<strong>Utilizando la funcion "save_new_data_as_delta":</strong>
Guarda solo nuevos datos en formato Delta Lake usando la operación MERGE, comparando los datos ya cargados con los datos que se desean almacenar asegurando que no se guarden registros duplicados.<br>
<br>
<strong><i>Es importante destacar que merge, en la funcion "save_new_data_as_delta", opera sobre DeltaTable por lo tanto no modifica el DataFrame original</i></strong><br>
<br></div>
"""

display(HTML(html_code))

In [13]:
save_new_data_as_delta(
    regional_data_normalized,
    "datalakehouse/1_bronze/carbon_intensity_api/regional",
    """target.from = source.from
    AND target.intensity_forecast = source.intensity_forecast
    AND target.intensity_index = source.intensity_index""",
    partition_cols=None
)


# #Copiamos la ruta del archivo .parquet, que queremos consultar, con el boton derecho del mouse y le aplicamos "pd.read_parquet()"
# info_parquet =pd.read_parquet("/content/datalakehouse/1_bronze/carbon_intensity_api/regional/part-00001-f26adcf4-0afa-476f-bebb-6a9ebc8d5138-c000.snappy.parquet", engine="auto")

# #Luego obtenemos la informacion relacionada al mismo
# info_parquet.info()

In [14]:
#Accedemos a la tabla almacenada
regional_table = DeltaTable("datalakehouse/1_bronze/carbon_intensity_api/regional")

#La convertimos a DataFrame
regional_table_normalized = regional_table.to_pandas()


#Accedemos a la tabla para conseguir informacion relacionada a cada campo en formato .json
json_schema = regional_table.schema().to_json() # resultado: str
json_schema = json.loads(json_schema) # resultado: Dict
json_schema["fields"]

#Aplicamos pd.json_normalize para asignarle la estructura correspondiente al archivo
pd.json_normalize(json_schema, record_path="fields")

Unnamed: 0,name,type,nullable
0,fuel,string,True
1,perc,double,True
2,from,string,True
3,to,string,True
4,intensity_forecast,string,True
5,intensity_index,string,True
6,regionid,long,True
7,dnoregion,string,True
8,shortname,string,True
9,postcode,string,True


In [15]:
regional_table_normalized

Unnamed: 0,fuel,perc,from,to,intensity_forecast,intensity_index,regionid,dnoregion,shortname,postcode
0,biomass,7.7,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10
1,coal,1.5,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10
2,imports,52.7,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10
3,gas,31.9,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10
4,nuclear,0.7,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10
...,...,...,...,...,...,...,...,...,...,...
868,nuclear,1.5,2024-05-02T23:00Z,2024-05-02T23:30Z,99,low,12,SSE South,South England,RG10
869,other,0.0,2024-05-02T23:00Z,2024-05-02T23:30Z,99,low,12,SSE South,South England,RG10
870,hydro,0.1,2024-05-02T23:00Z,2024-05-02T23:30Z,99,low,12,SSE South,South England,RG10
871,solar,0.1,2024-05-02T23:00Z,2024-05-02T23:30Z,99,low,12,SSE South,South England,RG10


In [16]:
# @title
from IPython.display import HTML

html_code = """
<div style="background-color: lavender; padding: 15px; border-radius: 5px; color: indigo;">
<strong>Extraccion incremental (o delta):</strong> consiste en recolectar actualizaciones de la fuente de datos, ya sea por la
inserción de nuevos registros o la modificación de los existentes. En lugar de
extraer todos los datos, se realizan consultas que seleccionan solo los datos
nuevos o modificados desde la última extracción.
El rastreo de cambios en esta técnica es posible si la fuente de datos cuenta
con algún campo de identificación o con alguna marca de tiempo, como por ejemplo la fecha y la hora.</div>
"""

display(HTML(html_code))

In [17]:
##############################################################
#              EXTRACCION INCREMENTAL STATELESS              #
##############################################################



#Obtener mediciones de la última hora

endpoint_time = "intensity"

start_date = datetime.utcnow() - timedelta(hours=1)

# Formatear objetos fecha a string, y definir inicio y fin
end_date = start_date.strftime("%Y-%m-%dT%H:59:59Z")
start_date = start_date.strftime("%Y-%m-%dT%H:00:00Z")

endpoint_time = f"{endpoint_time}/{start_date}/{end_date}"

time = get_data(base_url, endpoint_time, "data")
time_df = build_table(time)

# Renombrar las columnas
time_df = time_df.rename(columns={'from': 'fecha_desde', 'to': 'fecha_hasta',
                                                    'intensity.forecast': 'intensity_forecast',
                                                    'intensity.actual': 'intensity_actual'})


print(f"fecha desde: {start_date}")
print(f"fecha hasta: {end_date}")


time_df

  start_date = datetime.utcnow() - timedelta(hours=1)


fecha desde: 2025-05-11T19:00:00Z
fecha hasta: 2025-05-11T19:59:59Z


Unnamed: 0,fecha_desde,fecha_hasta,intensity_forecast,intensity_actual,intensity.index
0,2025-05-11T18:30Z,2025-05-11T19:00Z,119,111,moderate
1,2025-05-11T19:00Z,2025-05-11T19:30Z,116,113,moderate


In [18]:
# Antes de hacer el procesamiento
# Vamos a inspeccionar el DataFrame
# La siguiente libreria nos da info como la cantidad de nulos, duplicados, etc.
profile = ProfileReport(regional_table_normalized)
profile

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]


  0%|          | 0/10 [00:00<?, ?it/s][A
100%|██████████| 10/10 [00:00<00:00, 63.04it/s][A


Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]



In [19]:
#Creamos nuestro directorio haciendo referencia a la capa Bronze sobre la cual iniciamos nuestra carga
bronze_dir = "datalakehouse/1_bronze/carbon_intensity_api"


#Separamos la fecha y la hora para almacenarlas en carpetas diferentes. De esta forma tendremos un detalle pormenorizado de las
#modificaciones que se produzcan cada hora.

time_df["fecha_desde"] = pd.to_datetime(time_df.fecha_desde)
time_df["fecha"] = time_df.fecha_desde.dt.date
# time_df["hora"] = time_df.fecha_desde.dt.hour (dt.hour toma la hora en formato int y no coincide con el formato str de strftime("%H"))
time_df["hora"] = time_df.fecha_desde.dt.strftime("%H")


time_raw_dir = f"{bronze_dir}/intensity/time"


#A continuación se ve la aplicación de la operación MERGE, donde se insertan los datos.

#Si al comparar actual_data y new_data hay registros con el mismo predicado,
#podemos omitir las coincidencias y solo insertar los registros que no coincidan.

#Si no hay registros con la misma fecha_desde / intensity_forecast / intensity_actual
#se insertará el registro de new_data.

save_new_data_as_delta(
    time_df,
    time_raw_dir,
    """target.fecha_desde = source.fecha_desde
    AND target.intensity_forecast = source.intensity_forecast
    AND target.intensity_actual = source.intensity_actual""",
    partition_cols=["fecha", "hora"]
    )

In [20]:
# @title
from IPython.display import HTML

html_code = """
<div style="background-color: lightpink; padding: 15px; border-radius: 5px; color: darkred;">
<strong><i>Módulo Nº2 - Unidad Nº 1</i></strong><br>
<br>
Procesamiento de datos</div>
"""

display(HTML(html_code))

In [21]:
# @title
from IPython.display import HTML

html_code = """


<div style="background-color: white; padding: 15px; border-radius: 5px; color: black;">
<strong>Los Data Lakes se dividen típicamente en zonas o capas para organizar y gestionar los datos de manera eficiente. Estas zonas representan diferentes niveles de procesamiento y
gobernanza sobre los datos almacenados.</strong></div><br>
<br>

<div style="background-color: peachpuff; padding: 15px; border-radius: 5px; color: black;">
<strong><i>1. Bronze:</i></strong> también conocida como “Landing” o “raw data”, es la primera
capa en un Data Lake. Aquí es donde los datos aterrizan desde diversas
fuentes, por medio de pipelines, sin realizar transformaciones significativas.
Los datos se almacenan en su forma bruta, preservando su integridad
original. En esta capa, se pueden incluir datos estructurados, semiestructurados y no estructurados, por eso los formatos a usar aquí pueden
variar.</div><br>
<br>

<div style="background-color: #f0f0f0; padding: 15px; border-radius: 5px; color: black;">
<strong>2. Silver:</strong>
 es donde los datos crudos se transforman y preparan para un uso
más amplio. Aquí se aplican procesos de limpieza, normalización, de
duplicación, validación y otras transformaciones para mejorar la calidad
y la estructura de los datos. Esta capa tiene como objetivo ofrecer datos
9
de calidad, más estructurados y listos para los equipos de análisis y ciencia
de datos.</div><br>
<br>

<div style="background-color: gold; padding: 15px; border-radius: 5px; color: black;">
<strong>3. Gold:</strong>
 es la capa final en un Data Lake, donde se ofrecen datos
enriquecidos y de valor para la organización. Aquí se aplican reglas de
negocio y se realizan agregaciones, cálculos, agrupaciones, transformaciones más
avanzadas y se optimizan los datos para casos de uso específicos.</div><br>
<br>
"""

display(HTML(html_code))

In [22]:
#Trabajamos con la extraccion full


#Excluimos del DataFrame las filas cuyo valor sea igual a cero en la columna "percentage", ya que no aportan informacion a nuestro analisis
regional_table_normalized = regional_table_normalized[regional_table_normalized['perc'] != 0]

regional_table_normalized.head()

Unnamed: 0,fuel,perc,from,to,intensity_forecast,intensity_index,regionid,dnoregion,shortname,postcode
0,biomass,7.7,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10
1,coal,1.5,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10
2,imports,52.7,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10
3,gas,31.9,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10
4,nuclear,0.7,2024-04-30T23:30Z,2024-05-01T00:00Z,156,moderate,12,SSE South,South England,RG10


In [23]:
#Realizamos una copia del DataFrame "regional_table_normalized"
regional_table_cast = regional_table_normalized.copy()

#Convertimos a formato datetime las columnas 'from' y 'to'
regional_table_cast['from'] = pd.to_datetime(regional_table_cast['from'], format='mixed')
regional_table_cast['to'] = pd.to_datetime(regional_table_cast['to'], format='mixed')

regional_table_cast.head()

Unnamed: 0,fuel,perc,from,to,intensity_forecast,intensity_index,regionid,dnoregion,shortname,postcode
0,biomass,7.7,2024-04-30 23:30:00+00:00,2024-05-01 00:00:00+00:00,156,moderate,12,SSE South,South England,RG10
1,coal,1.5,2024-04-30 23:30:00+00:00,2024-05-01 00:00:00+00:00,156,moderate,12,SSE South,South England,RG10
2,imports,52.7,2024-04-30 23:30:00+00:00,2024-05-01 00:00:00+00:00,156,moderate,12,SSE South,South England,RG10
3,gas,31.9,2024-04-30 23:30:00+00:00,2024-05-01 00:00:00+00:00,156,moderate,12,SSE South,South England,RG10
4,nuclear,0.7,2024-04-30 23:30:00+00:00,2024-05-01 00:00:00+00:00,156,moderate,12,SSE South,South England,RG10


In [24]:
#Casteo
#La opcion "category" aplica para string y enteros
conversion_mapping = {
    "regionid" : "category",
    "dnoregion" : "category",
    "shortname" : "category",
    "postcode" : "category",
    "from" : "datetime64[ns]",
    "to" : "datetime64[ns]",
    "intensity_forecast" : "int16",
    "intensity_index" : "category",
    "fuel" : "category",
    "perc" : "float64"
}

# Convierte 'from' y 'to' a columnas timezone-naive
regional_table_cast['from'] = regional_table_cast['from'].dt.tz_localize(None)
regional_table_cast['to'] = regional_table_cast['to'].dt.tz_localize(None)

regional_table_cast = regional_table_cast.astype(conversion_mapping)

regional_table_cast.info()
#A partir de aplicar el casteo obtenemos una reduccion del 44% en la utilizacion de memoria

<class 'pandas.core.frame.DataFrame'>
Index: 675 entries, 0 to 872
Data columns (total 10 columns):
 #   Column              Non-Null Count  Dtype         
---  ------              --------------  -----         
 0   fuel                675 non-null    category      
 1   perc                675 non-null    float64       
 2   from                675 non-null    datetime64[ns]
 3   to                  675 non-null    datetime64[ns]
 4   intensity_forecast  675 non-null    int16         
 5   intensity_index     675 non-null    category      
 6   regionid            675 non-null    category      
 7   dnoregion           675 non-null    category      
 8   shortname           675 non-null    category      
 9   postcode            675 non-null    category      
dtypes: category(6), datetime64[ns](2), float64(1), int16(1)
memory usage: 27.3 KB


In [25]:
#Aplicamos formato a las columnas de fecha para obtener el dia-mes-año
regional_table_cast['from'] = regional_table_cast['from'].dt.strftime("%d-%m-%Y")
regional_table_cast['to'] = regional_table_cast['to'].dt.strftime("%d-%m-%Y")


#Renombramos las columnas
regional_table_cast = regional_table_cast.rename(columns={
    'shortname': 'nombre_corto',
    'postcode': 'codigo_postal',
    'from': 'fecha_desde',
    'to': 'fecha_hasta',
    'intensity_forecast': 'pronostico_intensidad',
    'intensity_index': 'indice_intensidad',
    'fuel': 'combustible',
    'perc': 'porcentaje'
    })

regional_table_cast.head()

Unnamed: 0,combustible,porcentaje,fecha_desde,fecha_hasta,pronostico_intensidad,indice_intensidad,regionid,dnoregion,nombre_corto,codigo_postal
0,biomass,7.7,30-04-2024,01-05-2024,156,moderate,12,SSE South,South England,RG10
1,coal,1.5,30-04-2024,01-05-2024,156,moderate,12,SSE South,South England,RG10
2,imports,52.7,30-04-2024,01-05-2024,156,moderate,12,SSE South,South England,RG10
3,gas,31.9,30-04-2024,01-05-2024,156,moderate,12,SSE South,South England,RG10
4,nuclear,0.7,30-04-2024,01-05-2024,156,moderate,12,SSE South,South England,RG10


In [26]:
# Guardar en Silver
save_new_data_as_delta(
    regional_table_cast,
    "datalakehouse/2_silver/carbon_intensity_api/regional",
    """target.fecha_desde = source.fecha_desde
    AND target.porcentaje = source.porcentaje""",
    partition_cols=None
)

In [27]:
#Accedemos a la tabla almacenada en Silver
regional_dt_silver = DeltaTable("datalakehouse/2_silver/carbon_intensity_api/regional")

#La convertimos a DataFrame
regional_normalized_silver = regional_dt_silver.to_pandas()


#Ordenamos las compañias que segun su cantidad (de mayor a menor)
# Conservamos el original "regional_table_cast" con todas las columnas
original_table_cast = regional_normalized_silver.copy()

# Subset para grouping y aggregation
table_cast_subset = original_table_cast[["combustible", "porcentaje", "fecha_desde"]]

regional_gold_cleaned = table_cast_subset.groupby(['combustible', 'fecha_desde'], observed=False).sum().sort_values(by = 'fecha_desde', ascending=False)

regional_gold_cleaned.reset_index(inplace = True)

regional_gold_cleaned

Unnamed: 0,combustible,fecha_desde,porcentaje
0,hydro,30-04-2024,0.1
1,coal,30-04-2024,1.5
2,nuclear,30-04-2024,0.7
3,imports,30-04-2024,52.7
4,gas,30-04-2024,31.9
5,wind,30-04-2024,5.3
6,biomass,30-04-2024,7.7
7,coal,02-05-2024,42.2
8,wind,02-05-2024,622.1
9,hydro,02-05-2024,16.0


In [28]:
# Guardar en Gold
save_new_data_as_delta(
    regional_gold_cleaned,
    "datalakehouse/3_gold/carbon_intensity_api/regional",
    """target.fecha_desde = source.fecha_desde
    AND target.porcentaje = source.porcentaje""",
    partition_cols=None
)

In [29]:
#Trabajamos con la extraccion incremental


#Leemos la ultima particion de la extraccion incremental
time_df_bronze = read_most_recent_partition(time_raw_dir)


#Renombramos las columnas en forma dinamica. Reemplazara el "." por el "_" en aquellas columnas que lo tengan
time_df_bronze = time_df_bronze.rename(
    columns=lambda col: col.replace(".", "_")
)

time_df_bronze

  requested_date = datetime.utcnow() - timedelta(hours=1)


Unnamed: 0,fecha_desde,fecha_hasta,intensity_forecast,intensity_actual,intensity_index,fecha,hora
0,2025-05-11 19:00:00+00:00,2025-05-11T19:30Z,116,113,moderate,2025-05-11,19


In [30]:
#Completamos el DataFrame con mas datos y nuevas columnas
df1 = pd.DataFrame({'fuel': ['gas', 'coal', 'biomass', 'nuclear', 'hydro', 'imports', 'other', 'wind', 'solar'],
                    'perc': [43.6, 0.7, 4.2, 17.6, 2.2, 6.5, 0.3, 6.8, 18.1]})

df1['regionid'] = 3
df1['dnoregion'] = "Electricity North West"

# Unión por columnas
time_df_silver = pd.concat([df1, time_df_bronze], axis=1)

#Eliminamos la columna 'fecha_hasta'
time_df_silver = time_df_silver.drop(columns=['fecha_hasta'])

time_df_silver

Unnamed: 0,fuel,perc,regionid,dnoregion,fecha_desde,intensity_forecast,intensity_actual,intensity_index,fecha,hora
0,gas,43.6,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19.0
1,coal,0.7,3,Electricity North West,NaT,,,,,
2,biomass,4.2,3,Electricity North West,NaT,,,,,
3,nuclear,17.6,3,Electricity North West,NaT,,,,,
4,hydro,2.2,3,Electricity North West,NaT,,,,,
5,imports,6.5,3,Electricity North West,NaT,,,,,
6,other,0.3,3,Electricity North West,NaT,,,,,
7,wind,6.8,3,Electricity North West,NaT,,,,,
8,solar,18.1,3,Electricity North West,NaT,,,,,


In [31]:
#Completamos los valores NaN con el correspondiente dato de la primera fila de cada columna
imputation_mapping = {
    #Obtenemos el valor de la primera fila de cada columna
    'fecha_desde': time_df_silver.loc[0, "fecha_desde"],
    'intensity_forecast': time_df_silver.loc[0, "intensity_forecast"],
    'intensity_actual': time_df_silver.loc[0, "intensity_actual"],
    'intensity_index': time_df_silver.loc[0, "intensity_index"],
    'fecha': time_df_silver.loc[0, "fecha"],
    'hora':	time_df_silver.loc[0, "hora"]
}

time_df_silver = time_df_silver.fillna(imputation_mapping)

time_df_silver

Unnamed: 0,fuel,perc,regionid,dnoregion,fecha_desde,intensity_forecast,intensity_actual,intensity_index,fecha,hora
0,gas,43.6,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19
1,coal,0.7,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19
2,biomass,4.2,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19
3,nuclear,17.6,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19
4,hydro,2.2,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19
5,imports,6.5,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19
6,other,0.3,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19
7,wind,6.8,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19
8,solar,18.1,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19


In [32]:
# Guardar en Silver

#A continuación se ve la aplicación de la operación MERGE, donde se insertan los datos.

#Si al comparar actual_data y new_data hay registros con el mismo predicado,
#podemos omitir las coincidencias y solo insertar los registros que no coincidan.

#Si no hay registros con la misma fecha_desde / intensity_forecast / intensity_actual
#se INSERTARA el registro de new_data.

measurements_silver_dir = 'datalakehouse/2_silver/carbon_intensity_api/intensity/time'
save_new_data_as_delta(
    time_df_silver,
    measurements_silver_dir,
    """target.fecha_desde = source.fecha_desde
    AND target.intensity_forecast = source.intensity_forecast
    AND target.intensity_actual = source.intensity_actual""",
    partition_cols=["fecha", "hora"]
)

In [33]:
#Accedemos a la tabla almacenada en Silver
time_dt_silver = DeltaTable("datalakehouse/2_silver/carbon_intensity_api/intensity/time")

#La convertimos a DataFrame
time_normalized_silver = time_dt_silver.to_pandas()


#Ingresamos formulas al procesamiento para obtener mas informacion

#Calculamos la diferencia entre 'intensity_actual' e 'intensity_forecast'
time_normalized_silver['diff'] =  time_normalized_silver['intensity_actual'] - time_normalized_silver['intensity_forecast']

#Establecemos una formula que determine si la diferencia entre 'intensity_actual' e 'intensity_forecast' subio, bajo o se mantuvo igual
time_normalized_silver['trend'] = time_normalized_silver.apply(lambda row: 'up' if row['intensity_actual'] - row['intensity_forecast'] > 0
                        else 'down' if row['intensity_actual'] - row['intensity_forecast'] < 0
                        else 'equal', axis=1)


time_normalized_silver

Unnamed: 0,fuel,perc,regionid,dnoregion,fecha_desde,intensity_forecast,intensity_actual,intensity_index,fecha,hora,diff,trend
0,gas,43.6,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19,-3.0,down
1,coal,0.7,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19,-3.0,down
2,biomass,4.2,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19,-3.0,down
3,nuclear,17.6,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19,-3.0,down
4,hydro,2.2,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19,-3.0,down
5,imports,6.5,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19,-3.0,down
6,other,0.3,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19,-3.0,down
7,wind,6.8,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19,-3.0,down
8,solar,18.1,3,Electricity North West,2025-05-11 19:00:00+00:00,116.0,113.0,moderate,2025-05-11,19,-3.0,down


In [34]:
#Creamos un DataFrame que contenga el ID correspondiente a cada tipo de combustible
df_id = pd.DataFrame({'fuel': ['gas', 'coal', 'biomass', 'nuclear', 'hydro', 'imports', 'other', 'wind', 'solar'],
                    'id_num': [1, 2, 3, 4, 5, 6, 7, 8, 9]})


#Completamos el DataFrame "time_normalized_silver" con el ID de los combustibles utilizando left_join
time_df_silver = pd.merge(time_normalized_silver, df_id, on='fuel', how='left')


#Reordenamos las columnas para facilitar la lectura del DF y quitamos la que no utilizamos
time_df_silver = time_df_silver.reindex(columns=['id_num', 'fuel', 'perc', 'regionid', 'fecha_desde',
       'intensity_forecast', 'intensity_actual', 'diff', 'trend', 'intensity_index', 'fecha',
       'hora'])

time_df_silver

Unnamed: 0,id_num,fuel,perc,regionid,fecha_desde,intensity_forecast,intensity_actual,diff,trend,intensity_index,fecha,hora
0,1,gas,43.6,3,2025-05-11 19:00:00+00:00,116.0,113.0,-3.0,down,moderate,2025-05-11,19
1,2,coal,0.7,3,2025-05-11 19:00:00+00:00,116.0,113.0,-3.0,down,moderate,2025-05-11,19
2,3,biomass,4.2,3,2025-05-11 19:00:00+00:00,116.0,113.0,-3.0,down,moderate,2025-05-11,19
3,4,nuclear,17.6,3,2025-05-11 19:00:00+00:00,116.0,113.0,-3.0,down,moderate,2025-05-11,19
4,5,hydro,2.2,3,2025-05-11 19:00:00+00:00,116.0,113.0,-3.0,down,moderate,2025-05-11,19
5,6,imports,6.5,3,2025-05-11 19:00:00+00:00,116.0,113.0,-3.0,down,moderate,2025-05-11,19
6,7,other,0.3,3,2025-05-11 19:00:00+00:00,116.0,113.0,-3.0,down,moderate,2025-05-11,19
7,8,wind,6.8,3,2025-05-11 19:00:00+00:00,116.0,113.0,-3.0,down,moderate,2025-05-11,19
8,9,solar,18.1,3,2025-05-11 19:00:00+00:00,116.0,113.0,-3.0,down,moderate,2025-05-11,19


In [35]:
# Casteo
type_mapping = {
    "id_num": "int16",
    "fuel": "category",
    "regionid": "category",
    "intensity_forecast": "int16",
    "intensity_actual": "int16",
    "diff": "int16",
    "trend": "category",
    "intensity_index": "category"
    }

time_df_silver = time_df_silver.astype(type_mapping)

#time_df_silver.info()

time_df_silver.head()

Unnamed: 0,id_num,fuel,perc,regionid,fecha_desde,intensity_forecast,intensity_actual,diff,trend,intensity_index,fecha,hora
0,1,gas,43.6,3,2025-05-11 19:00:00+00:00,116,113,-3,down,moderate,2025-05-11,19
1,2,coal,0.7,3,2025-05-11 19:00:00+00:00,116,113,-3,down,moderate,2025-05-11,19
2,3,biomass,4.2,3,2025-05-11 19:00:00+00:00,116,113,-3,down,moderate,2025-05-11,19
3,4,nuclear,17.6,3,2025-05-11 19:00:00+00:00,116,113,-3,down,moderate,2025-05-11,19
4,5,hydro,2.2,3,2025-05-11 19:00:00+00:00,116,113,-3,down,moderate,2025-05-11,19


In [36]:
# Guardar en Gold

#A continuación se ve la aplicación de la operación MERGE, donde se insertan los datos.

#Si al comparar actual_data y new_data hay registros con el mismo predicado,
#podemos omitir las coincidencias y solo insertar los registros que no coincidan.

#Si no hay registros con la misma fecha_desde / intensity_forecast / intensity_actual
#se INSERTARA el registro de new_data.

measurements_gold_dir = 'datalakehouse/3_gold/carbon_intensity_api/intensity/time'
save_new_data_as_delta(
    time_df_silver,
    measurements_gold_dir,
    """target.fecha_desde = source.fecha_desde
    AND target.intensity_forecast = source.intensity_forecast
    AND target.intensity_actual = source.intensity_actual""",
    partition_cols=["fecha", "hora"]
)


#Accedemos a la tabla almacenada
gold_table_intensity = DeltaTable("datalakehouse/3_gold/carbon_intensity_api/intensity/time")

#La convertimos a DataFrame
gold_table_intensity = gold_table_intensity.to_pandas()

In [37]:
# @title
from IPython.display import HTML

html_code = """
<div style="background-color: lavender; padding: 15px; border-radius: 5px; color: indigo;">
<strong><i>Conclusión:</i></strong><br>
<br>
Delta Lake es una tecnología que permite almacenar datos de manera tabular y columnar, con la capacidad de realizar operaciones de lectura y escritura de datos de manera
 transaccional, lo que garantiza la integridad de los datos almacenados.
Si bien realizamos operacion de lectura y escritura, además de agregar restricciones, es posible también por ejemplo realizar operaciones de eliminación de datos, consultar
 versiones pasadas de los datos, etc.
Las tabla delta lake permiten agregar metadatos relevantes, como autor de los datos, dominio al que pertenecen, descripción, etc. con el fin de catalogarlos y facilitar
 su gestión con otras herramientas.
Es una tecnología potente y versátil, que ofrece muchas posibilidades para trabajar con datos de manera eficiente.</div>
"""

display(HTML(html_code))