# **ETL process on Great Britain's Carbon Intensity Data**

This is a basic ETL process using python libraries. The data is extracted from a public API (Carbon Intensity API).

> ***1. Data Acquisition:***





In [1]:
#H: import necessary libraries
import requests as rq
import pandas as pd
import datetime as dt
import json

In [2]:
#H: a function that checks the format and validity of a date
def verif_date_validity(date):
  try:
    formatted_date = dt.datetime.strptime(date, '%Y-%m-%d').strftime('%Y-%m-%d') #H: verify date format and validity
    return formatted_date
  except ValueError:
    print(f"Validation Error: Date '{date}' is not in 'YYYY-MM-DD' format or is not a valid calendar date.") #H: if not valid return an error msg
    return None

In [3]:
#H: defining a function for data extraction with error handling (we get the json data -> put it in a dataframe for future processing, we also handle exception in case of a problem)

def get_co2_intensity_data(date): #H: we give the date (YYYY-MM-DD) as an argument to help extracting a specific day data

  #----------- H: checking date format (if not valid return an empty df)
  validated_date = verif_date_validity(date)
  if not validated_date:
    print(f"Extraction Failed: Invalid date format '{date}'.")
    return pd.DataFrame()

  #----------- H: if the date is valid proceed with extraction and return the data
  try:

    resp = rq.get(f"https://api.carbonintensity.org.uk/intensity/date/{date}",timeout=10)
    resp.raise_for_status() #H: Raises HTTPError, if one occurred
    data = resp.json() #H: put the json returned in a data var

    #H: make sure the data is not the error json response of the api
    if 'error' in data:
      error_code = data['error'].get('code', 'No code')
      error_message = data['error'].get('message', 'No specific message.')
      print(f"API returned an error in JSON: Code={error_code}, Message='{error_message}'")
      return pd.DataFrame()

    #H: verify the existence of the data
    if (data) and ('data' in data) and (len(data['data']) > 0):
        co2_intensity_df = pd.DataFrame(data['data'])
        print(f"Successfully extracted {len(co2_intensity_df)} records for {validated_date}.")
        return co2_intensity_df
    else:
        print(f"No 'data' records found or unexpected response structure for {validated_date}. Response: {json.dumps(data, indent=2)[:500]}...")
        return pd.DataFrame()

  #-----------  H: if any exceptin accured during extraction request display the msg of error and return and empty df
  except rq.exceptions.ConnectionError as conn_err:
    print(f"Connection Error during extraction for {validated_date}: {conn_err}")
    print("\n Check your internet connection, DNS resolution, or firewall settings.")
    return pd.DataFrame()

  except rq.exceptions.Timeout as timeout_err:
    print(f"Timeout Error during extraction for {validated_date}: {timeout_err}")
    print("\n The API did not respond within the set time. Retry.")
    return pd.DataFrame()

  except rq.exceptions.HTTPError as http_err:
    stat_code = resp.status_code
    print(f"HTTP Error during extraction for {validated_date}: {http_err} [Status Code: {stat_code}]")

    if stat_code == 404:
      print("The requested date might be out of the API's available range.")
    elif stat_code == 429:
      print("You might have hit the API rate limit. Retry after some time.")
    elif stat_code >= 500:
      print("Internal server error.")
    return pd.DataFrame()

  except rq.exceptions.RequestException as req_err:
    print(f"There was an ambiguous exception that occurred while handling your request for {validated_date}: {req_err}")
    return pd.DataFrame()

  except json.JSONDecodeError as json_err:
    print(f"Error decoding JSON response during extraction for {validated_date}: {json_err}")
    if resp:
        print(f"\n The API might have returned non-JSON content. Response start: {resp.text[:200]}...")
    return pd.DataFrame()

  except Exception as e:
    print(f"An unhandled error occurred during extraction for {validated_date}: {e}")
    return pd.DataFrame()


In [4]:
#H: this one to make sure we get the data of 1 day from (00:00 to 00:00) so we will be doing 2 api calls
# For more details about why check the documentation

def get_co2_intensity_data_2days(date): #H: this will return 2 Dataframes to be concatenated after in the Cleaning phase
  validated_target_date = verif_date_validity(date)
  if not validated_target_date:
    print(f"Failed to get raw daily data: Invalid target date format '{date}'.")
    return []

  print(f"\n--- Data extraction for: {validated_target_date} ---")

  #H: calculate next day's date
  start_date = dt.datetime.strptime(validated_target_date, '%Y-%m-%d').date()
  next_day = start_date + dt.timedelta(days=1)
  next_day_date = next_day.strftime('%Y-%m-%d')

  #H: 2 calls of API
  df_for_start_day = get_co2_intensity_data(validated_target_date) #H: records of the target day
  df_for_next_day = get_co2_intensity_data(next_day_date)     #H: reored of the next day (needed for last hour of target_date)

  if (not df_for_start_day.empty) or (not df_for_next_day.empty):
      print(f"Data extraction completed for {validated_target_date} and {next_day_date}.\n Cleaning will start.")
  else:
      print(f"No data extracted for {validated_target_date} or {next_day_date}. Both DataFrames are empty.")

  return [df_for_start_day, df_for_next_day]


> ***2. Data Cleaning:***

In [5]:
#H: cleaning and structuring the data
def clean_co2_intensity_data(dfs_list, date):

  #----------- H: validate the date
  validated_date = verif_date_validity(date)
  if not validated_date:
    print(f"Cleaning Failed: Invalid target date format '{date}'.")
    return pd.DataFrame()

  print(f"\n--- Starting Cleaning Phase for: {date} ---")

  #----------- H: verify the dataframes are not empty first
  if not dfs_list or all(df.empty for df in dfs_list):
    print("No DataFrames provided for cleaning!")
    return pd.DataFrame()

  #----------- H: concatenate the dataframes
  combined_df = pd.concat(dfs_list, ignore_index=True)

  #----------- H: making sure the from and to are laways datetime objects
  combined_df['from'] = pd.to_datetime(combined_df['from'], errors='coerce', utc=True)
  combined_df['to'] = pd.to_datetime(combined_df['to'], errors='coerce', utc=True)

  #----------- H: remove records with unparsable timestamps
  combined_df_len = len(combined_df)
  combined_df.dropna(subset=['from', 'to'], inplace=True)
  if len(combined_df) < combined_df_len:
      print(f"Informative Message: {combined_df_len - len(combined_df)} rows were removed 'from' or 'to' timestamps.")
  else:
      print("Informative Message: All 'from'/'to' timestamps parsed successfully.")


  #----------- H: remove duplicated records if exist (since from should be unique we are using it here)
  valid_combined_df_len = len(combined_df)
  combined_df.drop_duplicates(subset=['from'], inplace=True)
  if len(combined_df) < valid_combined_df_len:
      print(f"Informative Message: {valid_combined_df_len - len(combined_df)} duplicate rows removed based on 'from' unicity.")
  else:
      print("Informative Message: No duplicate rows found based on 'from' timestamp after initial combine.")

  #---------- H: get the day records from target_day 00:00 to next_day 00:00 (this is the main goal that we did 2 calls for)
  target_date_obj = dt.datetime.strptime(validated_date, '%Y-%m-%d').date()
  target_day_start = dt.datetime.combine(target_date_obj, dt.time.min).replace(tzinfo=dt.timezone.utc)
  target_day_end = target_day_start + dt.timedelta(days=1)

  filtered_dataFr = combined_df[
      (combined_df['from'] >= target_day_start) &
      (combined_df['from'] < target_day_end)
  ].copy()

  if filtered_dataFr.empty:
      print(f"Cleaning aborted: No data found for the full 00:00-23:59:59 period of {validated_date} after filtering.")
      return pd.DataFrame()
  else:
      print(f"Data successfully filtered to {len(filtered_dataFr)} records for {validated_date}.")

  #----------- H: working on the composed intensity field

  #H: concatenate the intensity_details_df with the original DataFrame and drop the old 'intensity' column
  data_without_intensity = filtered_dataFr.drop(columns=['intensity'])
  intensity_details_df = pd.DataFrame(index=data_without_intensity.index)
  valid_intensity_mask = filtered_dataFr['intensity'].apply(lambda x: isinstance(x, dict))

  if valid_intensity_mask.any():
    normalized_part = pd.json_normalize(filtered_dataFr.loc[valid_intensity_mask, 'intensity'])
    normalized_part.index = filtered_dataFr.loc[valid_intensity_mask].index

  #H: rename the columns from the normalized DataFrame to desired names
    normalized_part.rename(columns={
        'forecast': 'intensity_forecast',
        'actual': 'intensity_actual',
        'index': 'intensity_index'
    }, inplace=True)

    intensity_details_df = normalized_part

  filtered_dataFr = pd.concat([data_without_intensity, intensity_details_df], axis=1)

  #H: convert the values of 'intensity_forecast' and 'intensity_actual' to numerical ones (normally they are numerical but just in case of unexpected events)
  filtered_dataFr['intensity_forecast'] = pd.to_numeric(filtered_dataFr['intensity_forecast'], errors='coerce')
  filtered_dataFr['intensity_actual'] = pd.to_numeric(filtered_dataFr['intensity_actual'], errors='coerce')

  #----------- H: lastly verifying empty fields and duplicates
  missing_actual = filtered_dataFr['intensity_actual'].isnull().sum()
  if missing_actual > 0:
      #H:making 'actual' values as 'forecast' values
      filtered_dataFr['intensity_actual'].fillna(filtered_dataFr['intensity_forecast'], inplace=True)
      print(f"Informative Message: Replaced {missing_actual} missing 'intensity_actual' values with 'intensity_forecast' values.")
  else:
      print("Informative Message: No missing 'intensity_actual' values found.")

  #---------- H: finally returning the cleaned dataframe
  print(f"Cleaning phase complete for {validated_date}. Final record count: {len(filtered_dataFr)}.")
  return filtered_dataFr

> ***3. Data Transformation:***



**Note**: *Before proceeding in this step, I defined my final ideation of my database scheme and I worked in this phase to facilitate the final loading and make my dataframe ready as much as possible.*

In [6]:
#H: preparing the data to be loaded in a database
def transform__co2_intensity_data(cleanedData):
  #----------- H: verify the dataframe is not empty first
  if (cleanedData.empty):
    print("No data for transformation!")
    return pd.DataFrame()

  #----------- H: separate date and times
  cleanedData['from_date'] = cleanedData['from'].dt.date
  cleanedData['from_time'] = cleanedData['from'].dt.time
  cleanedData['to_date'] = cleanedData['to'].dt.date
  cleanedData['to_time'] = cleanedData['to'].dt.time

  #---------- H: calculate the difference between the actual and forcased intensity : forcast_precision
  cleanedData['forcast_precision'] = cleanedData['intensity_actual'] - cleanedData['intensity_forecast']

  #---------- H: the intensity_index will be changed to a number (for the usage in the intensity_class table)
  intensity_index_mapping = {
        "very low": 1,
        "low": 2,
        "moderate": 3,
        "high": 4,
        "very high": 5
    }
  cleanedData['intensity_level_id'] = cleanedData['intensity_index'].map(intensity_index_mapping).astype('Int64')

  #---------- H: filter the needed columns only for further processing
  transformed_data = cleanedData[['from_date', 'from_time', 'to_date', 'to_time',
                                  'intensity_actual', 'intensity_forecast', 'forcast_precision',
                                  'intensity_level_id']].copy()

  return transformed_data

**Note:** *as I will my database will contain a daily intensity statistics table, I need some extra operations but not be stored in the initial transformed dataframe, but a separated one. And that's what will the following function help in.*

In [7]:
#H: more processing of the data
def get_daily_co2_intensity_stats(transformedData):

  #----------- H: verify the dataframe is not empty first
  if (transformedData.empty):
    print("No data for anaytics!")
    return pd.DataFrame()

  ##----------- H: else we specify the day date and proceed
  day_date = transformedData['from_date'].iloc[0]

  #----------- H: get the min and max intesities of a day with the time of eachone
  min_intensity_row = transformedData.loc[transformedData['intensity_actual'].idxmin()]
  min_intensity = min_intensity_row['intensity_actual']
  min_intensity_time = min_intensity_row['from_time']

  max_intensity_row = transformedData.loc[transformedData['intensity_actual'].idxmax()]
  max_intensity = max_intensity_row['intensity_actual']
  max_intensity_time = max_intensity_row['from_time']

  #----------- H:  the forcasting precision, intensities (actual nd forecast ) means of a day
  precision_mean = transformedData['forcast_precision'].mean()
  avg_actual_intensity = transformedData['intensity_actual'].mean()
  avg_forecast_intensity = transformedData['intensity_forecast'].mean()

  #----------- H: organize the final dataframe
  daily_stats_dataFr = pd.DataFrame([{
        'date': day_date,
        'min_intensity_actual': min_intensity,
        'min_intensity_time': min_intensity_time,
        'max_intensity_actual': max_intensity,
        'max_intensity_time': max_intensity_time,
        'mean_forecast_precision': precision_mean,
        'avg_actual_intensity': avg_actual_intensity,
        'avg_forecast_intensity': avg_forecast_intensity
  }])

  print(f" The daily statistics are calculated for {day_date}")
  return daily_stats_dataFr

> ***4. Data Loading:***

In [8]:
import sqlite3

In [9]:
#H: defining the function for db creation
def create_database_schema(db_path):

    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()

        print(f"Creating database schema")

        #-------- H: create and fill intensity_levels table
        cursor.execute("""CREATE TABLE IF NOT EXISTS intensity_levels (int_level_id INTEGER PRIMARY KEY,int_level_name TEXT UNIQUE NOT NULL)""")
        intensity_levels_data = [
            (1, "very low"),
            (2, "low"),
            (3, "moderate"),
            (4, "high"),
            (5, "very high")
        ]
        cursor.executemany("INSERT OR IGNORE INTO intensity_levels (int_level_id, int_level_name) VALUES (?, ?)", intensity_levels_data)
        print("intensity_levels table created/filled.")


        #-------- H: create carbon_intensity_records table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS carbon_intensity_records (
                id_record INTEGER PRIMARY KEY,
                from_date TEXT NOT NULL,
                from_time TEXT NOT NULL,
                to_time TEXT NOT NULL,
                intensity_actual REAL,
                intensity_forecast REAL,
                forcast_precision REAL,
                intensity_level_id INTEGER,
                FOREIGN KEY (intensity_level_id) REFERENCES intensity_levels(int_level_id)
            )
        """)
        print("carbon_intensity_records table created.")

        #-------- H: create carbon_intensity_daily_stats table
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS carbon_intensity_daily_stats (
                date TEXT PRIMARY KEY,
                min_intensity_actual REAL,
                min_intensity_time TEXT,
                max_intensity_actual REAL,
                max_intensity_time TEXT,
                mean_forecast_precision REAL,
                avg_actual_intensity REAL,
                avg_forecast_intensity REAL
            )
        """)
        print("carbon_intensity_daily_stats table created.")

        conn.commit()
        print("Informative Message: Database schema creation complete.")

    except sqlite3.Error as e:
        print(f"Error: Database schema creation failed: {e}")
    finally:
        if conn:
            conn.close()

In [10]:
#H: filling the main table carbon_intensity_records
def load_data_to_carbon_intensity_records(data,db_path):

  #----------- H: check data availability
  if data.empty:
    print("No data to load.")
    return

  conn = None

  #------------ H: transformed data loaded to table + hanling errors
  try:
      conn = sqlite3.connect(db_path)

      #H: select columns to match the table schema
      data_to_load = data[[
          'from_date', 'from_time', 'to_time',
          'intensity_actual', 'intensity_forecast', 'forcast_precision',
          'intensity_level_id'
      ]]

      cursor = conn.cursor()
      sql_insert = """
      INSERT OR IGNORE INTO carbon_intensity_records (
          from_date, from_time, to_time, intensity_actual, intensity_forecast,
          forcast_precision, intensity_level_id
      ) VALUES (?, ?, ?, ?, ?, ?, ?)
      """
      rows_inserted_count = 0
      for index, row in data_to_load.iterrows():
        cursor.execute(sql_insert, (
              str(row['from_date']),
              str(row['from_time']),
              str(row['to_time']),
              row['intensity_actual'],
              row['intensity_forecast'],
              row['forcast_precision'],
              row['intensity_level_id']
          ))
        rows_inserted_count += cursor.rowcount

      conn.commit()
      print(f"Loaded {rows_inserted_count} new/updated records.")

  except sqlite3.Error as e:
      print(f"Error loading data: {e}")
  finally:
      if conn:
          conn.close()

In [11]:
#H: filing the carbon_intensity_daily_stats table
def load_daily_stats_to_table(data,db_path):

  #----------- H: check data availability
  if data.empty:
        print("No data to load.")
        return

  conn = None
  try:
      conn = sqlite3.connect(db_path)
      cursor = conn.cursor()

      #------------H : create and fill table
      sql_upsert = """
      INSERT OR REPLACE INTO carbon_intensity_daily_stats (
          date, min_intensity_actual, min_intensity_time, max_intensity_actual,
          max_intensity_time, mean_forecast_precision, avg_actual_intensity,
          avg_forecast_intensity
      ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
      """
      rows_processed_count = 0
      for index, row in data.iterrows():
          cursor.execute(sql_upsert, (
              str(row['date']),
              row['min_intensity_actual'],
              str(row['min_intensity_time']),
              row['max_intensity_actual'],
              str(row['max_intensity_time']),
              row['mean_forecast_precision'],
              row['avg_actual_intensity'],
              row['avg_forecast_intensity']
          ))
          rows_processed_count += 1

      conn.commit()
      print(f"Loaded/updated {rows_processed_count} daily statistics records.")

  except sqlite3.Error as e:
      print(f"Error loading daily statistics data: {e}")
  finally:
      if conn:
          conn.close()

> ***5. Apply the whole ETL Pipeline on an example:***

In [None]:
#H: do the complete process for a specific date
if __name__ == "__main__":

  #H: defining the required arguments
  date = '2023-05-30'
  db_directory = '/data'
  db_path = db_directory + 'co2_intensity_db.db'


  #H: the ELT Process
  print("------ Startig the ELT Process -------")

  print("\n ____________________| E: EXTRACT |____________________")
  print("\n ---------- Extracting data from Carbon Intensity API (Great Britain) -----------------")
  api_dataFrames_list = get_co2_intensity_data_2days(date)

  print("\n ____________________| T: TRANSFORM |____________________")
  print(  "\n ---------- Cleaning the extracted data -----------------")
  cleaned_data = clean_co2_intensity_data(api_dataFrames_list, date)

  print(  "\n ---------- Transforming data -----------------")
  transformed_main_records = transform__co2_intensity_data(cleaned_data)

  print( "\n ----------- Setting the daily statistics -------------")
  daily_stats = get_daily_co2_intensity_stats(transformed_main_records)

  print("\n ____________________| L: LOAD |____________________")
  print(  "\n ---------- Creating the database schema -----------------")
  create_database_schema(db_path)

  print("\n ----------- Loading data to tables -------------------")
  load_data_to_carbon_intensity_records(transformed_main_records,db_path)
  load_daily_stats_to_table(daily_stats,db_path)

------ Startig the ELT Process -------

 ____________________| E: EXTRACT |____________________

 ---------- Extracting data from Carbon Intensity API (Great Britain) -----------------

--- Data extraction for: 2023-05-30 ---
Successfully extracted 48 records for 2023-05-30.
Successfully extracted 48 records for 2023-05-31.
Data extraction completed for 2023-05-30 and 2023-05-31.
 Cleaning will start.

 ____________________| T: TRANSFORM |____________________

 ---------- Cleaning the extracted data -----------------

--- Starting Cleaning Phase for: 2023-05-30 ---
Informative Message: All 'from'/'to' timestamps parsed successfully.
Informative Message: No duplicate rows found based on 'from' timestamp after initial combine.
Data successfully filtered to 48 records for 2023-05-30.
Informative Message: No missing 'intensity_actual' values found.
Cleaning phase complete for 2023-05-30. Final record count: 48.

 ---------- Transforming data -----------------

 ----------- Setting the dail