# Facundo Aranda TP

## Carbon Intensity API(https://api.carbonintensity.org.uk/) 

Esta API contiene datos sobre emisiones de carbono del Reino Unido, los cuales vamos a extraer, procesar y almacenar en un DeltaLake.

## Librerias

### Instalacion de librerias

Primero vamos a instalar las librerias necesarias.

In [None]:
%pip install requests
%pip install deltalake
%pip install pyarrow




[notice] A new release of pip is available: 24.2 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip



Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.2 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.2 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


Note: you may need to restart the kernel to use updated packages.


ERROR: Ignored the following versions that require a different python version: 4.0.0 Requires-Python >=3.7, <3.11; 4.1.0 Requires-Python >=3.7, <3.12; 4.1.1 Requires-Python >=3.7, <3.12; 4.1.2 Requires-Python >=3.7, <3.12; 4.10.0 Requires-Python <3.13,>=3.7; 4.11.0 Requires-Python <3.13,>=3.7; 4.12.0 Requires-Python <3.13,>=3.7; 4.2.0 Requires-Python >=3.7, <3.12; 4.3.0 Requires-Python >=3.7, <3.12; 4.3.1 Requires-Python >=3.7, <3.12; 4.3.2 Requires-Python >=3.7, <3.12; 4.4.0 Requires-Python >=3.7, <3.12; 4.5.0 Requires-Python >=3.7, <3.12; 4.5.1 Requires-Python >=3.7, <3.12; 4.6.0 Requires-Python >=3.7, <3.12; 4.6.1 Requires-Python >=3.7, <3.12; 4.6.2 Requires-Python >=3.7, <3.12; 4.6.3 Requires-Python >=3.7, <3.12; 4.6.4 Requires-Python >=3.7, <3.12; 4.6.5 Requires-Python >=3.7, <3.12; 4.7.0 Requires-Python >=3.7, <3.13; 4.8.3 Requires-Python <3.13,>=3.7; 4.9.0 Requires-Python <3.13,>=3.7
ERROR: Could not find a version that satisfies the requirement ydata-profiling (from versions: n

### Importación de librerias

Y ahora vamos a importar lo necesario de las mismas.

In [None]:
import requests
import pandas as pd
import pyarrow as pa
from deltalake import write_deltalake, DeltaTable
from deltalake.exceptions import TableNotFoundError
from datetime import datetime, timedelta


## Funciones

Las siguientes son las funciones que vamos a necesitar para el procedimiento.

In [12]:
def get_data(base_url, endpoint, data_field=None, params=None, headers=None):
    """
    Realiza una solicitud GET a una API para obtener datos.

    Parámetros:
    base_url (str): La URL base de la API.
    endpoint (str): El endpoint de la API al que se realizará la solicitud.
    data_field (str): Atribudo del json de respuesta donde estará la lista
    de objetos con los datos que requerimos
    params (dict): Parámetros de consulta para enviar con la solicitud.
    headers (dict): Encabezados para enviar con la solicitud.

    Retorna:
    dict: Los datos obtenidos de la API en formato JSON.
    """
    try:
        endpoint_url = f"{base_url}/{endpoint}"
        response = requests.get(endpoint_url, params=params, headers=headers)
        response.raise_for_status()  # Levanta una excepción si hay un error en la respuesta HTTP.

        # Verificar si los datos están en formato JSON.
        try:
            data = response.json()
            if data_field:
              data = data[data_field]
        except:
            print("El formato de respuesta no es el esperado")
            return None
        return data

    except requests.exceptions.RequestException as e:
        # Capturar cualquier error de solicitud, como errores HTTP.
        print(f"La petición ha fallado. Código de error : {e}")
        return None

def build_table(json_data, record_path=None):
    """
    Construye un DataFrame de pandas a partir de datos en formato JSON.

    Parámetros:
    json_data (dict): Los datos en formato JSON obtenidos de una API.

    Retorna:
    DataFrame: Un DataFrame de pandas que contiene los datos.
    """
    try:
        df = pd.json_normalize(
            json_data,
            record_path)
        return df
    except:
        print("Los datos no están en el formato esperado")
        return None

def save_data_as_delta(df, path, mode="overwrite", partition_cols=None):
    """
    Guarda un dataframe en formato Delta Lake en la ruta especificada.
    A su vez, es capaz de particionar el dataframe por una o varias columnas.
    Por defecto, el modo de guardado es "overwrite".

    Args:
      df (pd.DataFrame): El dataframe a guardar.
      path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      mode (str): El modo de guardado. Son los modos que soporta la libreria
      deltalake: "overwrite", "append", "error", "ignore".
      partition_cols (list or str): La/s columna/s por las que se particionará el
      dataframe. Si no se especifica, no se particionará.
    """
    write_deltalake(
        path, df, mode=mode, partition_by=partition_cols
    )

def save_new_data_as_delta(new_data, data_path, predicate, partition_cols=None):
    """
    Guarda solo nuevos datos en formato Delta Lake usando la operación MERGE,
    comparando los datos ya cargados con los datos que se desean almacenar
    asegurando que no se guarden registros duplicados.

    Args:
      new_data (pd.DataFrame): Los datos que se desean guardar.
      data_path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      predicate (str): La condición de predicado para la operación MERGE.
    """

    try:
      dt = DeltaTable(data_path)
      new_data_pa = pa.Table.from_pandas(new_data)
      # Se insertan en target, datos de source que no existen en target
      dt.merge(
          source=new_data_pa,
          source_alias="source",
          target_alias="target",
          predicate=predicate
      ) \
      .when_not_matched_insert_all() \
      .execute()

    # Si no existe la tabla Delta Lake, se guarda como nueva
    except TableNotFoundError:
      save_data_as_delta(new_data, data_path, partition_cols=partition_cols)

def upsert_data_as_delta(data, data_path, predicate):
    """
    Guardar datos en formato Delta Lake usando la operacion MERGE.
    Cuando no haya registros coincidentes, se insertarán nuevos registros.
    Cuando haya registros coincidentes, se actualizarán los campos.

    Args:
      data (pd.DataFrame): Los datos que se desean guardar.
      data_path (str): La ruta donde se guardará el dataframe en formato Delta Lake.
      predicate (str): La condición de predicado para la operación MERGE.
    """
    try:
        dt = DeltaTable(data_path)
        data_pa = pa.Table.from_pandas(data)
        dt.merge(
            source=data_pa,
            source_alias="source",
            target_alias="target",
            predicate=predicate
        ) \
        .when_matched_update_all() \
        .when_not_matched_insert_all() \
        .execute()
    except TableNotFoundError:
        save_data_as_delta(data, data_path)

## Seteo de variables

Vamos a establecer ciertas variables fijas que se utilizaran en todo el programa.

### URL Base

Ahora vamos a establecer la URL base de la API.

In [13]:
base_url = "https://api.carbonintensity.org.uk"

### Header de la API

In [14]:
headers = {
  'Accept': 'application/json'
}

### Directorio de guardado general

In [None]:
bronze_dir = "datalake/bronze/carbonintensity_api"
silver_dir = "datalake/silver/carbonintensity_api"
gold_dir = "datalake/gold/carbonintensity_api"

# Facundo Aranda TP 1

## Extracción Full

Vamos a realizar una extracción full para obtener los ids y los nombres de los operadores de distribución de energia de todas las regiones, con los nombres de esas regiones(pseudonimos), de la que brinda datos la api. 

Vamos a hacer extracción full debido a que son pocos datos y no suelen cambiar, además, vamos a agregar constrains para que el id no pueda ser igual o menor a 0 y para que los nombres y los pseudonimos no puedan estar vacios.

En este caso no vamos a particionar ya que me quedaria un archivo por particion, seria innecesario. 

### Obtener datos de todas las regiones

In [16]:
# Obtener todas las regiones
endpoint = "regional"

regions = get_data(base_url, endpoint, data_field="data", params={}, headers=headers)
regions

[{'from': '2024-11-23T04:00Z',
  'to': '2024-11-23T04:30Z',
  'regions': [{'regionid': 1,
    'dnoregion': 'Scottish Hydro Electric Power Distribution',
    'shortname': 'North Scotland',
    'intensity': {'forecast': 14, 'index': 'very low'},
    'generationmix': [{'fuel': 'biomass', 'perc': 0},
     {'fuel': 'coal', 'perc': 0},
     {'fuel': 'imports', 'perc': 0},
     {'fuel': 'gas', 'perc': 3.7},
     {'fuel': 'nuclear', 'perc': 0},
     {'fuel': 'other', 'perc': 0},
     {'fuel': 'hydro', 'perc': 2.2},
     {'fuel': 'solar', 'perc': 0},
     {'fuel': 'wind', 'perc': 94.2}]},
   {'regionid': 2,
    'dnoregion': 'SP Distribution',
    'shortname': 'South Scotland',
    'intensity': {'forecast': 35, 'index': 'low'},
    'generationmix': [{'fuel': 'biomass', 'perc': 1.4},
     {'fuel': 'coal', 'perc': 0},
     {'fuel': 'imports', 'perc': 6.4},
     {'fuel': 'gas', 'perc': 0.9},
     {'fuel': 'nuclear', 'perc': 21},
     {'fuel': 'other', 'perc': 0},
     {'fuel': 'hydro', 'perc': 0.6}

### Crear DataFrame 

In [30]:
# Crear DataFrame
df_all_regions = build_table(regions[0]['regions'])

# Obtener no mas los detalles de cada region
df_regions = df_all_regions[["regionid", "dnoregion", "shortname"]].copy()
df_regions.head()

Unnamed: 0,regionid,dnoregion,shortname
0,1,Scottish Hydro Electric Power Distribution,North Scotland
1,2,SP Distribution,South Scotland
2,3,Electricity North West,North West England
3,4,NPG North East,North East England
4,5,NPG Yorkshire,Yorkshire


### Guardar DeltaLake

En este caso vamos a usar la operación UPSERT, asi podremos cambiar el nombre del operador o el pseudonimo de la región mediante el id o agregar datos que no se encuentren. Vamos a utilizar este metodo ya que no me importa sobreescribir la información de los nombres en este caso, además seria algo que no pasaria seguido debido a que la fuente de datos es dentro de todo estatica y no son una gran cantidad de datos.

Si es la primera vez, la propia función va a utilizar el modo overwrite para crearlo.

In [31]:
regions_raw_dir = f"{bronze_dir}/regions"
upsert_data_as_delta(
    df_regions,
    regions_raw_dir,
    "target.regionid = source.regionid"
    )

In [32]:
DeltaTable(regions_raw_dir).to_pandas()

Unnamed: 0,regionid,dnoregion,shortname
0,2,SP Distribution,South Scotland
1,4,NPG North East,North East England
2,6,SP Manweb,North Wales & Merseyside
3,8,WPD West Midlands,West Midlands
4,11,WPD South West,South West England
5,15,England,England
6,17,Wales,Wales
7,1,Scottish Hydro Electric Power Distribution,North Scotland
8,9,WPD East Midlands,East Midlands
9,13,UKPN London,London


### Constrains 

Ahora vamos a agregar constrains para que el id tenga que ser mayor a 0 y que no pueda tener campos nulos ni en el nombre ni en el psedonimo.

In [32]:
dt = DeltaTable(regions_raw_dir)
dt.alter.add_constraint(
    {"regionid_gt_0": "regionid > 0"}
)
dt.alter.add_constraint(
    {"dnoregion_not_empty": "LENGTH(dnoregion) > 0"}
)
dt.alter.add_constraint(
    {"shortname_not_empty": "LENGTH(shortname) > 0"}
)

In [33]:
# Creamos un DataFrame con datos no válidos
data_no_valid = {
    "regionid": [20, 0],
    "dnoregion": ["Germany", "France"],
    "shortname": ["GER", "FR"]
}
df_no_valid = pd.DataFrame(data_no_valid)
df_no_valid

Unnamed: 0,regionid,dnoregion,shortname
0,20,Germany,GER
1,0,France,FR


In [34]:
# Intentamos escribir los datos no válidos en la tabla Delta
upsert_data_as_delta(
    df_no_valid,
    regions_raw_dir,
    "target.regionid = source.regionid"
    )

DeltaProtocolError: Invariant violations: ["Check or Invariant (regionid > 0) violated by value in row: [0, France, FR]"]

In [35]:
DeltaTable(regions_raw_dir).to_pandas()

Unnamed: 0,regionid,dnoregion,shortname
0,1,Scottish Hydro Electric Power Distribution,North Scotland
1,2,SP Distribution,South Scotland
2,3,Electricity North West,North West England
3,4,NPG North East,North East England
4,5,NPG Yorkshire,Yorkshire
5,6,SP Manweb,North Wales & Merseyside
6,7,WPD South Wales,South Wales
7,8,WPD West Midlands,West Midlands
8,9,WPD East Midlands,East Midlands
9,10,UKPN East,East England


In [36]:
# Creamos un DataFrame con datos válidos
data_valid = {
    "regionid": [10, 20],
    "dnoregion": ["Germany", "France"],
    "shortname": ["GER", "FR"]
}
df_valid = pd.DataFrame(data_valid)
df_valid

Unnamed: 0,regionid,dnoregion,shortname
0,10,Germany,GER
1,20,France,FR


In [37]:
# Intentamos escribir los datos válidos en la tabla Delta
upsert_data_as_delta(
    df_valid,
    regions_raw_dir,
    "target.regionid = source.regionid"
    )

In [38]:
DeltaTable(regions_raw_dir).to_pandas()

Unnamed: 0,regionid,dnoregion,shortname
0,2,SP Distribution,South Scotland
1,4,NPG North East,North East England
2,6,SP Manweb,North Wales & Merseyside
3,8,WPD West Midlands,West Midlands
4,11,WPD South West,South West England
5,15,England,England
6,17,Wales,Wales
7,1,Scottish Hydro Electric Power Distribution,North Scotland
8,9,WPD East Midlands,East Midlands
9,13,UKPN London,London


## Extracción Incremental

Vamos a realizar una extracción incremental para obtener los datos del porcentaje de emisión de carbono del Reino Unido según el tipo de combustible, de la última hora.

La extracción va a ser incremental debido a que solo me interesa obtener los datos nuevos, para lo cual vamos a usar como identificación la hora y el dia.

En este caso si que vamos a particionar por fecha y hora, asi tendremos los datos un poco mas organizados. Además, aplicaremos una constrain para hacer que solo admita valores iguales o mayores a 0 en el porcentaje.

### Obtener datos de los porcentajes de generacion de carbono según los distintos tipos de combustible del Reino Unido de la última hora(horario UTC)

In [24]:
# Obtener Datos de generación según combustible de la ultima hora
start_date = datetime.utcnow() - timedelta(hours=1)

end_date = start_date.strftime("%Y-%m-%dT%H:59:59Z")
start_date = start_date.strftime("%Y-%m-%dT%H:00:00Z")

endpoint = f"generation/{start_date}/{end_date}"

generationmix = get_data(base_url, endpoint, data_field="data", params={}, headers=headers)
generationmix

  start_date = datetime.utcnow() - timedelta(hours=1)


[{'from': '2024-11-23T03:30Z',
  'to': '2024-11-23T04:00Z',
  'generationmix': [{'fuel': 'biomass', 'perc': 10.3},
   {'fuel': 'coal', 'perc': 0},
   {'fuel': 'imports', 'perc': 13.9},
   {'fuel': 'gas', 'perc': 9},
   {'fuel': 'nuclear', 'perc': 16.8},
   {'fuel': 'other', 'perc': 0},
   {'fuel': 'hydro', 'perc': 0.4},
   {'fuel': 'solar', 'perc': 0},
   {'fuel': 'wind', 'perc': 49.7}]},
 {'from': '2024-11-23T04:00Z',
  'to': '2024-11-23T04:30Z',
  'generationmix': [{'fuel': 'biomass', 'perc': 10.3},
   {'fuel': 'coal', 'perc': 0},
   {'fuel': 'imports', 'perc': 13.9},
   {'fuel': 'gas', 'perc': 8.9},
   {'fuel': 'nuclear', 'perc': 16.7},
   {'fuel': 'other', 'perc': 0},
   {'fuel': 'hydro', 'perc': 0.4},
   {'fuel': 'solar', 'perc': 0},
   {'fuel': 'wind', 'perc': 49.8}]}]

### Crear DataFrame

In [34]:
df_all_generationmix = build_table(generationmix)

# Explode la columna 'generationmix'
df_all_generationmix_exploded = df_all_generationmix.explode('generationmix')

# Normalizamos la columna 'generationmix' (diccionarios 'fuel' y 'perc')
df_normalized = pd.json_normalize(df_all_generationmix_exploded['generationmix'])

# Reiniciamos los índices para ambos DataFrames
df_all_generationmix_exploded = df_all_generationmix_exploded.drop(columns=['generationmix', 'to']).reset_index(drop=True)
df_normalized = df_normalized.reset_index(drop=True)

# Concatenamos los DataFrames (ahora con índices reiniciados)
df_generationmix = pd.concat([df_all_generationmix_exploded, df_normalized], axis=1)

# Renombramos la columna 'from' a 'timestamp_measured'
df_generationmix.rename(columns={'from': 'timestamp_measured'}, inplace=True)

# Mostrar las primeras filas del DataFrame
df_generationmix.head()

Unnamed: 0,timestamp_measured,fuel,perc
0,2024-11-23T03:30Z,biomass,10.3
1,2024-11-23T03:30Z,coal,0.0
2,2024-11-23T03:30Z,imports,13.9
3,2024-11-23T03:30Z,gas,9.0
4,2024-11-23T03:30Z,nuclear,16.8


### Guardar DeltaLake

Primero vamos a separar fecha y hora.

In [35]:
df_generationmix["timestamp_measured"] = pd.to_datetime(df_generationmix.timestamp_measured)
df_generationmix["fecha"] = df_generationmix.timestamp_measured.dt.date
df_generationmix["hora"] = df_generationmix.timestamp_measured.dt.hour

print(df_generationmix)

          timestamp_measured     fuel  perc       fecha  hora
0  2024-11-23 03:30:00+00:00  biomass  10.3  2024-11-23     3
1  2024-11-23 03:30:00+00:00     coal   0.0  2024-11-23     3
2  2024-11-23 03:30:00+00:00  imports  13.9  2024-11-23     3
3  2024-11-23 03:30:00+00:00      gas   9.0  2024-11-23     3
4  2024-11-23 03:30:00+00:00  nuclear  16.8  2024-11-23     3
5  2024-11-23 03:30:00+00:00    other   0.0  2024-11-23     3
6  2024-11-23 03:30:00+00:00    hydro   0.4  2024-11-23     3
7  2024-11-23 03:30:00+00:00    solar   0.0  2024-11-23     3
8  2024-11-23 03:30:00+00:00     wind  49.7  2024-11-23     3
9  2024-11-23 04:00:00+00:00  biomass  10.3  2024-11-23     4
10 2024-11-23 04:00:00+00:00     coal   0.0  2024-11-23     4
11 2024-11-23 04:00:00+00:00  imports  13.9  2024-11-23     4
12 2024-11-23 04:00:00+00:00      gas   8.9  2024-11-23     4
13 2024-11-23 04:00:00+00:00  nuclear  16.7  2024-11-23     4
14 2024-11-23 04:00:00+00:00    other   0.0  2024-11-23     4
15 2024-

Ahora vamos a guardar el DeltaLake mediante la operación MERGE, sin sobreescribir los datos, para solamente agregar los datos nuevos que aparezcan. Esto debido a que son datos que se actualizan constantemente y de los cuales me interesa mantener un registro. 

El target para esta operación va a ser por la fecha y hora, y por el tipo de combustible. De esta manera solo cargara registros nuevos para cada tipo de combustible. 

Además, vamos a particionar por fecha y hora, para tener un mayor orden de los mismos.

In [36]:
generation_mix_raw_dir = f"{bronze_dir}/generation_mix"
save_new_data_as_delta(
    df_generationmix,
    generation_mix_raw_dir,
    """target.timestamp_measured = source.timestamp_measured
    AND target.fuel = source.fuel""",
    partition_cols=["fecha", "hora"]
    )

### Constrains

Ahora vamos a agregar constrains para que el porcentaje de emision del combustible no pueda ser menor a 0.

In [11]:
dt2 = DeltaTable(generation_mix_raw_dir)
dt2.alter.add_constraint(
    {"perc_not_negative": "perc >= 0"}
)

DeltaError: Generic DeltaTable error: Constraint with name: perc_not_negative already exists

In [12]:
# Creamos un DataFrame con datos no válidos
data_no_valid_2 = {
    "timestamp_measured": ["2018-05-15T11:30Z", "2018-05-15T11:30Z"],
    "fuel": ["biomass", "coal"],
    "perc": [-1, 3]
}
df_no_valid_2 = pd.DataFrame(data_no_valid_2)
df_no_valid_2

Unnamed: 0,timestamp_measured,fuel,perc
0,2018-05-15T11:30Z,biomass,-1
1,2018-05-15T11:30Z,coal,3


In [13]:
df_no_valid_2["timestamp_measured"] = pd.to_datetime(df_no_valid_2.timestamp_measured)
df_no_valid_2["fecha"] = df_no_valid_2.timestamp_measured.dt.date
df_no_valid_2["hora"] = df_no_valid_2.timestamp_measured.dt.hour

In [14]:
# Intentamos escribir los datos no válidos en la tabla Delta
save_new_data_as_delta(
    df_no_valid_2,
    generation_mix_raw_dir,
    """target.timestamp_measured = source.timestamp_measured
    AND target.fuel = source.fuel""",
    partition_cols=["fecha", "hora"]
    )

DeltaProtocolError: Invariant violations: ["Check or Invariant (perc >= 0) violated by value in row: [2018-05-15T11:30:00Z, biomass, -1.0, 2018-05-15, 11]"]

In [37]:
DeltaTable(generation_mix_raw_dir).to_pandas()

Unnamed: 0,timestamp_measured,fuel,perc,fecha,hora
0,2024-11-23 03:30:00+00:00,other,0.0,2024-11-23,3
1,2024-11-23 03:30:00+00:00,nuclear,16.8,2024-11-23,3
2,2024-11-23 03:30:00+00:00,biomass,10.3,2024-11-23,3
3,2024-11-23 03:30:00+00:00,coal,0.0,2024-11-23,3
4,2024-11-23 03:30:00+00:00,imports,13.9,2024-11-23,3
5,2024-11-23 03:30:00+00:00,gas,9.0,2024-11-23,3
6,2024-11-23 03:30:00+00:00,solar,0.0,2024-11-23,3
7,2024-11-23 03:30:00+00:00,wind,49.7,2024-11-23,3
8,2024-11-23 03:30:00+00:00,hydro,0.4,2024-11-23,3
9,2024-11-23 04:00:00+00:00,biomass,10.3,2024-11-23,4


# Facundo Aranda TP 2

## Procesamiento del DeltaLake "Regions"

Para este DeltaLake vamos a renombrar las columnas para que sean mas claras y a convertir los tipos de datos de las mismas para que ocupen menos espacio, además de ordenar los valores por id. 

### Conversion de tipo de datos

In [42]:
regions_dt = DeltaTable(regions_raw_dir).to_pandas()

regions_dt.info(memory_usage='deep')


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19 entries, 0 to 18
Data columns (total 3 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   regionid   19 non-null     int64 
 1   dnoregion  19 non-null     object
 2   shortname  19 non-null     object
dtypes: int64(1), object(2)
memory usage: 2.6 KB


In [45]:
conversion_mapping = {
    "regionid": "int8",
    "dnoregion": "string",
    "shortname": "string"
    }

regions_dt = regions_dt.astype(conversion_mapping)

regions_dt.info(memory_usage='deep')

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19 entries, 0 to 18
Data columns (total 3 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   regionid   19 non-null     int8  
 1   dnoregion  19 non-null     string
 2   shortname  19 non-null     string
dtypes: int8(1), string(2)
memory usage: 2.4 KB


### Renombrar Columnas y Ordenar por ID

In [47]:
regions_dt = regions_dt.rename(
    columns={
        "regionid": "id_region",
        "dnoregion": "operador_red_electrica",
        "shortname": "pseudonimo_region"
    }
)

regions_dt = regions_dt.sort_values(by='id_region').reset_index(drop=True)

regions_dt

Unnamed: 0,id_region,operador_red_electrica,pseudonimo_region
0,1,Scottish Hydro Electric Power Distribution,North Scotland
1,2,SP Distribution,South Scotland
2,3,Electricity North West,North West England
3,4,NPG North East,North East England
4,5,NPG Yorkshire,Yorkshire
5,6,SP Manweb,North Wales & Merseyside
6,7,WPD South Wales,South Wales
7,8,WPD West Midlands,West Midlands
8,9,WPD East Midlands,East Midlands
9,10,UKPN East,East England


### Guardar DeltaLake

Como la información mantiene la estructura de la capa anterior vamos a usar el mismo metodo para guardarlo, el metodo "upsert" 

In [48]:
regions_silver_dir = f"{silver_dir}/regions"
upsert_data_as_delta(
    regions_dt,
    regions_silver_dir,
    "target.id_region = source.id_region"
    )

In [49]:
DeltaTable(regions_silver_dir).to_pandas()

Unnamed: 0,id_region,operador_red_electrica,pseudonimo_region
0,1,Scottish Hydro Electric Power Distribution,North Scotland
1,2,SP Distribution,South Scotland
2,3,Electricity North West,North West England
3,4,NPG North East,North East England
4,5,NPG Yorkshire,Yorkshire
5,6,SP Manweb,North Wales & Merseyside
6,7,WPD South Wales,South Wales
7,8,WPD West Midlands,West Midlands
8,9,WPD East Midlands,East Midlands
9,10,UKPN East,East England


## Procesamiento del DeltaLake "Generation_mix"

Para este DeltaLake vamos a realizar varias cosas:
1) Agregar columna minutos y agregar columna boolena "emision_cero" cuando algun combustible de 0 en porcentaje de emision. Ademas de borrar la columna timestamp_measured porque ya vamos a tener el tiempo mas organizado
2) Cambio de nombre las columnas para que sean mas intuitivas
3) Conversión de tipos de datos para que ocupen menos espacio
4) Formatear las fechas en formato(dd/mm/aaaa) y las horas y minutos para que siempre sean 2 digitos
5) Agrupar por tipo de combustible, mostrando un promedio historico del porcentaje de cada combustible

### Agregar y Eliminar Columnas

In [92]:
generation_mix_dt = DeltaTable(generation_mix_raw_dir).to_pandas()

generation_mix_dt["minutos"] = generation_mix_dt.timestamp_measured.dt.minute

generation_mix_dt["emision_cero"] = generation_mix_dt.perc == 0

generation_mix_dt = generation_mix_dt.drop(columns=['timestamp_measured'])

generation_mix_dt

Unnamed: 0,fuel,perc,fecha,hora,minutos,emision_cero
0,other,0.0,2024-11-23,3,30,True
1,nuclear,16.8,2024-11-23,3,30,False
2,biomass,10.3,2024-11-23,3,30,False
3,coal,0.0,2024-11-23,3,30,True
4,imports,13.9,2024-11-23,3,30,False
5,gas,9.0,2024-11-23,3,30,False
6,solar,0.0,2024-11-23,3,30,True
7,wind,49.7,2024-11-23,3,30,False
8,hydro,0.4,2024-11-23,3,30,False
9,biomass,10.3,2024-11-23,4,0,False


### Cambiar nombres de columnas

In [93]:
generation_mix_dt = generation_mix_dt.rename(
    columns={
        "fuel": "combustible",
        "perc": "porcentaje_emision"
    }
)

generation_mix_dt

Unnamed: 0,combustible,porcentaje_emision,fecha,hora,minutos,emision_cero
0,other,0.0,2024-11-23,3,30,True
1,nuclear,16.8,2024-11-23,3,30,False
2,biomass,10.3,2024-11-23,3,30,False
3,coal,0.0,2024-11-23,3,30,True
4,imports,13.9,2024-11-23,3,30,False
5,gas,9.0,2024-11-23,3,30,False
6,solar,0.0,2024-11-23,3,30,True
7,wind,49.7,2024-11-23,3,30,False
8,hydro,0.4,2024-11-23,3,30,False
9,biomass,10.3,2024-11-23,4,0,False


### Convertir tipo de dato

In [94]:
generation_mix_dt.info(memory_usage='deep')

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 36 entries, 0 to 35
Data columns (total 6 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   combustible         36 non-null     object 
 1   porcentaje_emision  36 non-null     float64
 2   fecha               36 non-null     object 
 3   hora                36 non-null     int32  
 4   minutos             36 non-null     int32  
 5   emision_cero        36 non-null     bool   
dtypes: bool(1), float64(1), int32(2), object(2)
memory usage: 4.0 KB


In [None]:
generation_mix_dt['fecha'] = pd.to_datetime(generation_mix_dt['fecha'])

conversion_mapping_mix = {
    "combustible": "string",
    "hora": "int8",
    "minutos": "int8"
    }

generation_mix_dt = generation_mix_dt.astype(conversion_mapping_mix)

generation_mix_dt.info(memory_usage='deep')

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 36 entries, 0 to 35
Data columns (total 6 columns):
 #   Column              Non-Null Count  Dtype         
---  ------              --------------  -----         
 0   combustible         36 non-null     string        
 1   porcentaje_emision  36 non-null     float64       
 2   fecha               36 non-null     datetime64[ns]
 3   hora                36 non-null     int8          
 4   minutos             36 non-null     int8          
 5   emision_cero        36 non-null     bool          
dtypes: bool(1), datetime64[ns](1), float64(1), int8(2), string(1)
memory usage: 2.7 KB


Unnamed: 0,combustible,porcentaje_emision,fecha,hora,minutos,emision_cero
0,other,0.0,2024-11-23,3,30,True
1,nuclear,16.8,2024-11-23,3,30,False
2,biomass,10.3,2024-11-23,3,30,False
3,coal,0.0,2024-11-23,3,30,True
4,imports,13.9,2024-11-23,3,30,False
5,gas,9.0,2024-11-23,3,30,False
6,solar,0.0,2024-11-23,3,30,True
7,wind,49.7,2024-11-23,3,30,False
8,hydro,0.4,2024-11-23,3,30,False
9,biomass,10.3,2024-11-23,4,0,False


### Formatear fechas, horas y minutos

In [96]:
generation_mix_dt['hora'] = generation_mix_dt['hora'].apply(lambda x: f"{x:02d}")
generation_mix_dt['minutos'] = generation_mix_dt['minutos'].apply(lambda x: f"{x:02d}")
generation_mix_dt['fecha'] = generation_mix_dt.fecha.dt.strftime("%d/%m/%Y")

generation_mix_dt

Unnamed: 0,combustible,porcentaje_emision,fecha,hora,minutos,emision_cero
0,other,0.0,23/11/2024,3,30,True
1,nuclear,16.8,23/11/2024,3,30,False
2,biomass,10.3,23/11/2024,3,30,False
3,coal,0.0,23/11/2024,3,30,True
4,imports,13.9,23/11/2024,3,30,False
5,gas,9.0,23/11/2024,3,30,False
6,solar,0.0,23/11/2024,3,30,True
7,wind,49.7,23/11/2024,3,30,False
8,hydro,0.4,23/11/2024,3,30,False
9,biomass,10.3,23/11/2024,4,0,False


### Guardar DeltaLake de este formato

Como la información mantiene la estructura de la capa anterior vamos a usar el mismo metodo para guardarlo, el metodo "merge" 

In [97]:
generation_mix_silver_dir = f"{silver_dir}/generation_mix"
save_new_data_as_delta(
    generation_mix_dt,
    generation_mix_silver_dir,
    """target.fecha = source.fecha
    AND target.hora = source.hora
    AND target.minutos = source.minutos
    AND target.combustible = source.combustible""",
    partition_cols=["fecha", "hora"]
    )

In [98]:
DeltaTable(generation_mix_silver_dir).to_pandas()

Unnamed: 0,combustible,porcentaje_emision,fecha,hora,minutos,emision_cero
0,biomass,6.8,09/11/2024,16,30,False
1,coal,0.0,09/11/2024,16,30,True
2,imports,12.9,09/11/2024,16,30,False
3,gas,58.3,09/11/2024,16,30,False
4,nuclear,13.5,09/11/2024,16,30,False
5,other,0.0,09/11/2024,16,30,True
6,hydro,2.3,09/11/2024,16,30,False
7,solar,0.1,09/11/2024,16,30,False
8,wind,6.0,09/11/2024,16,30,False
9,biomass,10.3,23/11/2024,4,0,False


### Agregación agrupando por combustible para mostrar el promedio historico

In [99]:
generation_mix_historico_dt = generation_mix_dt.groupby('combustible').agg({
     'porcentaje_emision': 'mean'
}).rename(
    columns={
        'porcentaje_emision': 'promedio_porcentaje_emision_historico'
    }
)

generation_mix_historico_dt

Unnamed: 0_level_0,promedio_porcentaje_emision_historico
combustible,Unnamed: 1_level_1
biomass,8.525
coal,0.0
gas,33.775
hydro,1.35
imports,13.3
nuclear,15.125
other,0.0
solar,0.025
wind,27.9


### Guardar Delta Lake del promedio historico

Para este caso vamos a usar la operacion "overwrite" ya que al ser un porcentaje general de datos historicos no me interesa ir guardando la variacion de estos, solo el valor actual, ademas siempre que vaya a cambiar cualquiera de los datos van a terminar cambiando todos ya que todos los porcentajes varian a la vez, es por esto que uso overwrite para sobreescribir todo directamente.

In [None]:
generation_mix_historico_gold_dir = f"{gold_dir}/generation_mix_historico"
save_data_as_delta(
    generation_mix_historico_dt,
    generation_mix_historico_gold_dir
    )

In [None]:
DeltaTable(generation_mix_historico_gold_dir).to_pandas()

Unnamed: 0,promedio_porcentaje_emision_historico,combustible
0,8.525,biomass
1,0.0,coal
2,33.775,gas
3,1.35,hydro
4,13.3,imports
5,15.125,nuclear
6,0.0,other
7,0.025,solar
8,27.9,wind
