In [1]:
# default_exp retrieval

# Electric Insights API Wrapper

This notebook walks through the usage of the electric insights API wrapper script which is contained within the same repository

<br>

### Imports

In [3]:
#exports
import pandas as pd
import numpy as np

import os
import json
import typer
import requests
import xmltodict
from datetime import date
from warnings import warn
from tqdm import tqdm

In [4]:
from IPython.display import JSON

<br>

We'll also specify the directory we wish to save any data to

In [5]:
data_dir = '../data'

<br>

### Data Retrieval

#### Single Stream

We'll being by retrieving data for a single stream only, starting with just the raw JSON response

In [6]:
#exports
def query_API(start_date:str, end_date:str, stream:str, time_group='30m'):
    """
    'Query API' makes the call to Electric Insights and returns the JSON response

    Parameters:
        start_date: Start date for data given as a string in the form '%Y-%m-%d'
        end_date: End date for data given as a string in the form '%Y-%m-%d'
        stream: One of 'prices_ahead', 'prices', 'temperatures', 'emissions', or 'generation-mix'
        time_group: One of '30m', '1h', '1d' or '7d'. The default is '30m'
    """

    # Checking stream is an EI endpoint
    possible_streams = ['prices_ahead', 'prices', 'temperatures', 'emissions', 'generation-mix']
    assert stream in possible_streams, f"Stream must be one of {''.join([stream+', ' for stream in possible_streams])[:-2]}"

    # Checking time_group will be accepted by API
    possible_time_groups = ['30m', '1h', '1d', '7d']
    assert time_group in possible_time_groups, f"Time group must be one of {''.join([time_group+', ' for time_group in possible_time_groups])[:-2]}"

    # Formatting dates
    format_dt = lambda dt: date.strftime(dt, '%Y-%m-%d') if isinstance(dt, date) else dt
    start_date = format_dt(start_date)
    end_date = format_dt(end_date)

    # Running query and parsing response
    response = requests.get(f'http://drax-production.herokuapp.com/api/1/{stream}?date_from={start_date}&date_to={end_date}&group_by={time_group}')
    r_json = response.json()

    return r_json

<br>

We can convert this response to a dataframe, however this doesn't handle the nested columns well

In [7]:
start_date = '2020-01-01'
end_date = '2020-01-31'
stream = 'emissions'
time_group = '30m'

r_json = query_API(start_date=start_date, end_date=end_date, stream=stream, time_group=time_group)
df = pd.DataFrame.from_dict(r_json)

df.head()

Unnamed: 0,start,end,value
0,2020-01-01T00:00:00Z,2020-01-01T00:30:00Z,"{'totalInTperh': 4445.670000000001, 'totalInGp..."
1,2020-01-01T00:30:00Z,2020-01-01T01:00:00Z,"{'totalInTperh': 4668.45, 'totalInGperkWh': 16..."
2,2020-01-01T01:00:00Z,2020-01-01T01:30:00Z,"{'totalInTperh': 4562.072, 'totalInGperkWh': 1..."
3,2020-01-01T01:30:00Z,2020-01-01T02:00:00Z,"{'totalInTperh': 4342.128, 'totalInGperkWh': 1..."
4,2020-01-01T02:00:00Z,2020-01-01T02:30:00Z,"{'totalInTperh': 4123.677, 'totalInGperkWh': 1..."


<br>

We can create a function that will take a specified column and extract the nested dataframe

In [8]:
#exports
def dict_col_to_cols(df:pd.DataFrame, value_col='value'):
    """Checks the `value_col`, if it contains dictionaries these are transformed into new columns which then replace it"""

    ## Checks the value col is found in the dataframe
    if value_col not in df.columns:
        return df

    if isinstance(df.loc[0, value_col], dict):
        df_values = pd.DataFrame(df[value_col].to_dict()).T
        df[df_values.columns] = df_values
        df = df.drop(columns=[value_col])

    return df

In [9]:
df = dict_col_to_cols(df)

df.head(3)

Unnamed: 0,start,end,totalInTperh,totalInGperkWh
0,2020-01-01T00:00:00Z,2020-01-01T00:30:00Z,4445.67,162.398904
1,2020-01-01T00:30:00Z,2020-01-01T01:00:00Z,4668.45,167.502063
2,2020-01-01T01:00:00Z,2020-01-01T01:30:00Z,4562.072,166.268387


<br>

Unfortunately however this doesn't handle repeated nesting of dictionaries, we'll create a wrapper that does

In [10]:
#exports
def clean_nested_dict_cols(df):
    """Unpacks columns contining nested dictionaries"""
    # Calculating columns that are still dictionaries
    s_types = df.iloc[0].apply(lambda val: type(val))
    cols_with_dicts = s_types[s_types == dict].index

    while len(cols_with_dicts) > 0:
        for col_with_dicts in cols_with_dicts:
            # Extracting dataframes from dictionary columns
            df = dict_col_to_cols(df, col_with_dicts)

            # Recalculating columns that are still dictionaries
            s_types = df.iloc[0].apply(lambda val: type(val))
            cols_with_dicts = s_types[s_types == dict].index
            
    return df

In [11]:
df = clean_nested_dict_cols(df)

df.head()

Unnamed: 0,start,end,totalInTperh,totalInGperkWh
0,2020-01-01T00:00:00Z,2020-01-01T00:30:00Z,4445.67,162.398904
1,2020-01-01T00:30:00Z,2020-01-01T01:00:00Z,4668.45,167.502063
2,2020-01-01T01:00:00Z,2020-01-01T01:30:00Z,4562.072,166.268387
3,2020-01-01T01:30:00Z,2020-01-01T02:00:00Z,4342.128,163.38526
4,2020-01-01T02:00:00Z,2020-01-01T02:30:00Z,4123.677,159.122875


<br>

Next we'll process the datetime index

In [12]:
#exports
def set_dt_idx(df:pd.DataFrame, idx_name='local_datetime'):
    """
    Converts the start datetime to UK local time, then sets it as the index and removes the original datetime columns
    """

    idx_dt = pd.DatetimeIndex(pd.to_datetime(df['start'], utc=True)).tz_convert('Europe/London')
    idx_dt.name = idx_name

    df.index = idx_dt
    df = df.drop(columns=['start', 'end'])

    return df

def create_df_dt_rng(start_date, end_date, freq='30T', tz='Europe/London', dt_str_template='%Y-%m-%d'):
    """
    Creates a dataframe mapping between local datetimes and electricity market dates/settlement periods
    """
    
    # Creating localised datetime index
    s_dt_rng = pd.date_range(start_date, end_date, freq=freq, tz=tz)
    s_dt_SP_count = pd.Series(0, index=s_dt_rng).resample('D').count()

    # Creating SP column
    SPs = []
    for num_SPs in list(s_dt_SP_count):
        SPs += list(range(1, num_SPs+1))

    # Creating datetime dataframe
    df_dt_rng = pd.DataFrame(index=s_dt_rng)
    df_dt_rng.index.name = 'local_datetime'

    # Adding query call cols
    df_dt_rng['SP'] = SPs
    df_dt_rng['date'] = df_dt_rng.index.strftime(dt_str_template)

    return df_dt_rng

def clean_df_dts(df):
    """Cleans the datetime index of the passed DataFrame"""
    df = set_dt_idx(df)
    df = df[~df.index.duplicated()] 

    df_dt_rng = create_df_dt_rng(df.index.min(), df.index.max())
    df = df.reindex(df_dt_rng.index)

    df['SP'] = df_dt_rng['SP'] # Adding settlement period designation
    
    return df

In [13]:
df = clean_df_dts(df)

df.head()

Unnamed: 0_level_0,totalInTperh,totalInGperkWh,SP
local_datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2020-01-01 00:00:00+00:00,4445.67,162.398904,1
2020-01-01 00:30:00+00:00,4668.45,167.502063,2
2020-01-01 01:00:00+00:00,4562.072,166.268387,3
2020-01-01 01:30:00+00:00,4342.128,163.38526,4
2020-01-01 02:00:00+00:00,4123.677,159.122875,5


<br>

We'll now combine all of the previous steps and add some column renaming where we want to tidy them up a bit

In [14]:
#exports
def retrieve_stream_df(start_date:str, end_date:str, stream:str, time_group='30m', renaming_dict={}):
    """
    Makes the call to Electric Insights and parses the response into a dataframe which is returned

    Parameters:
        start_date: Start date for data given as a string in the form '%Y-%m-%d'
        end_date: End date for data given as a string in the form '%Y-%m-%d'
        stream: One of 'prices_ahead', 'prices_ahead', 'prices', 'temperatures' or 'emissions'
        time_group: One of '30m', '1h', '1d' or '7d'. The default is '30m'
        renaming_dict: Mapping from old to new column names
    """

    # Calling data and parsing into dataframe
    r_json = query_API(start_date, end_date, stream, time_group)
    df = pd.DataFrame.from_dict(r_json)

    # Handling entrys which are dictionarys
    df = clean_nested_dict_cols(df)

    # Setting index as localised datetime, reindexing with all intervals and adding SP
    df = clean_df_dts(df)

    # Renaming value col
    if 'value' in df.columns:
        df = df.rename(columns={'value':stream})

    if 'referenceOnly' in df.columns:
        df = df.drop(columns=['referenceOnly'])

    df = df.rename(columns=renaming_dict)

    return df

In [15]:
start_date = '2009-01-01'
end_date = '2009-01-02'
stream = 'generation-mix'

renaming_dict = {
    'pumpedStorage' : 'pumped_storage',
    'northernIreland' : 'northern_ireland',
    'windOnshore': 'wind_onshore',
    'windOffshore': 'wind_offshore'
}

df = retrieve_stream_df(start_date, end_date, stream, renaming_dict=renaming_dict)

df.head()

Unnamed: 0_level_0,nuclear,biomass,coal,gas,hydro,wind,solar,demand,pumped_storage,wind_onshore,wind_offshore,belgian,dutch,french,ireland,northern_ireland,irish,SP
local_datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
2009-01-01 00:00:00+00:00,6.973,0,17.65,11.9,0.246,0.148,0,38.329,-0.404,,,0.0,0.0,1.977,0.0,0.0,-0.161,1
2009-01-01 00:30:00+00:00,6.968,0,17.77,12.031,0.245,0.157,0,38.461,-0.527,,,0.0,0.0,1.977,0.0,0.0,-0.16,2
2009-01-01 01:00:00+00:00,6.97,0,18.07,11.754,0.246,0.147,0,37.986,-1.018,,,0.0,0.0,1.977,0.0,0.0,-0.16,3
2009-01-01 01:30:00+00:00,6.969,0,18.022,11.162,0.246,0.148,0,36.864,-1.269,,,0.0,0.0,1.746,0.0,0.0,-0.16,4
2009-01-01 02:00:00+00:00,6.96,0,17.998,10.812,0.246,0.16,0,36.18,-1.566,,,0.0,0.0,1.73,0.0,0.0,-0.16,5


<br>

#### Multiple Streams

We'll now create further functionality for retrieving all of the streams and combining them, before doing so we'll create a helper function for checking the streams are allowed

In [16]:
#exports
def check_streams(streams='*'):
    """
    Checks that the streams given are a list containing only possible streams, or is all streams - '*'.
    """
    
    possible_streams = ['prices_ahead', 'prices', 'temperatures', 'emissions', 'generation-mix']

    if isinstance(streams, list):
        unrecognised_streams = list(set(streams) - set(possible_streams))

        if len(unrecognised_streams) == 0:
            return streams 
        else:
            unrecognised_streams_to_print = ''.join(["'"+stream+"', " for stream in unrecognised_streams])[:-2]
            raise ValueError(f"Streams {unrecognised_streams_to_print} could not be recognised, must be one of: {', '.join(possible_streams)}")

    elif streams=='*':
        return possible_streams 

    else:
        raise ValueError(f"Streams could not be recognised, must be one of: {', '.join(possible_streams)}")

In [17]:
streams = check_streams()

streams

['prices_ahead', 'prices', 'temperatures', 'emissions', 'generation-mix']

<br>

By default all streams are returned but if we provide a list it will be checked

In [18]:
streams = check_streams(['prices', 'emissions'])

streams

['prices', 'emissions']

<br>

However, if we try to check a list containing a stream that doesn't exist we should receive an error

In [19]:
try:
    _ = check_streams(['not_a_stream'])
    print('Success!')
except Exception as e:
    print('Error!\n\n'+str(e))

Error!

Streams 'not_a_stream' could not be recognised, must be one of: prices_ahead, prices, temperatures, emissions, generation-mix


<br>

Next we'll create a wrapper for downloading and combining all of the streams together

In [20]:
#exports
def retrieve_streams_df(start_date:str, end_date:str, streams='*', time_group='30m', renaming_dict={}):
    """
    Makes the calls to Electric Insights for the given streams and parses the responses into a dataframe which is returned

    Parameters:
        start_date: Start date for data given as a string in the form '%Y-%m-%d'
        end_date: End date for data given as a string in the form '%Y-%m-%d'
        streams: Contains 'prices_ahead', 'prices_ahead', 'prices', 'temperatures' or 'emissions', or is given as all, '*'
        time_group: One of '30m', '1h', '1d' or '7d'. The default is '30m'
    """

    df = pd.DataFrame()
    streams = check_streams(streams)

    for stream in streams:
        df_stream = retrieve_stream_df(start_date, end_date, stream, renaming_dict=renaming_dict)           
        df[df_stream.columns] = df_stream

    return df

In [21]:
streams = '*'
renaming_dict = {
    'pumpedStorage' : 'pumped_storage',
    'northernIreland' : 'northern_ireland',
    'windOnshore': 'wind_onshore',
    'windOffshore': 'wind_offshore',
    'prices_ahead' : 'day_ahead_price',
    'prices' : 'imbalance_price',
    'temperatures' : 'temperature',
    'totalInGperkWh' : 'gCO2_per_kWh',
    'totalInTperh' : 'TCO2_per_h'
}

df = retrieve_streams_df(start_date, end_date, streams, renaming_dict=renaming_dict)

df.head()

Unnamed: 0_level_0,day_ahead_price,SP,imbalance_price,valueSum,temperature,TCO2_per_h,gCO2_per_kWh,nuclear,biomass,coal,...,demand,pumped_storage,wind_onshore,wind_offshore,belgian,dutch,french,ireland,northern_ireland,irish
local_datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2009-01-01 00:00:00+00:00,58.05,1,74.74,74.74,-0.6,21278,555,6.973,0,17.65,...,38.329,-0.404,,,0.0,0.0,1.977,0.0,0.0,-0.161
2009-01-01 00:30:00+00:00,56.33,2,74.89,74.89,-0.6,21442,558,6.968,0,17.77,...,38.461,-0.527,,,0.0,0.0,1.977,0.0,0.0,-0.16
2009-01-01 01:00:00+00:00,52.98,3,76.41,76.41,-0.6,21614,569,6.97,0,18.07,...,37.986,-1.018,,,0.0,0.0,1.977,0.0,0.0,-0.16
2009-01-01 01:30:00+00:00,50.39,4,37.73,37.73,-0.6,21320,578,6.969,0,18.022,...,36.864,-1.269,,,0.0,0.0,1.746,0.0,0.0,-0.16
2009-01-01 02:00:00+00:00,48.7,5,59.0,59.0,-0.6,21160,585,6.96,0,17.998,...,36.18,-1.566,,,0.0,0.0,1.73,0.0,0.0,-0.16


<br>

Now we're ready to retrieve all of the streams in one, which we'll do for all years that data is available, then we'll save the resulting DataFrame.

In [22]:
#exports
def get_EI_data(
    start_date,
    end_date,
    streams='*',
    batch_freq='3M',
    renaming_dict={
        'pumpedStorage' : 'pumped_storage',
        'northernIreland' : 'northern_ireland',
        'windOnshore': 'wind_onshore',
        'windOffshore': 'wind_offshore',
        'prices_ahead' : 'day_ahead_price',
        'prices' : 'imbalance_price',
        'temperatures' : 'temperature',
        'totalInGperkWh' : 'gCO2_per_kWh',
        'totalInTperh' : 'TCO2_per_h'
    }
):
    # Preparing batch dates
    *batch_start_dates, post_batch_start_date = pd.date_range(start_date, end_date, freq=f'{batch_freq}S').strftime('%Y-%m-%d')
    pre_batch_end_date, *batch_end_dates = (pd.date_range(start_date, end_date, freq=batch_freq)+pd.Timedelta(days=1)).strftime('%Y-%m-%d')

    batch_date_pairs = list(zip(batch_start_dates, batch_end_dates))

    if start_date != pre_batch_end_date:
        batch_date_pairs = [(start_date, pre_batch_end_date)] + batch_date_pairs

    if end_date != post_batch_start_date:
        end_date = (pd.to_datetime(end_date) + pd.Timedelta(days=1)).strftime('%Y-%m-%d')
        batch_date_pairs = batch_date_pairs + [(post_batch_start_date, end_date)]

    # Retrieving data
    df = pd.DataFrame()

    for batch_start_date, batch_end_date in tqdm(batch_date_pairs):
        df_batch = retrieve_streams_df(batch_start_date, batch_end_date, streams, renaming_dict=renaming_dict)
        df = df.append(df_batch)  
        
    return df

In [23]:
start_date = '2020-12-01'
end_date = '2020-12-31'

df_EI = get_EI_data(start_date, end_date)

df_EI.head()

100%|████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:08<00:00,  4.26s/it]


Unnamed: 0_level_0,day_ahead_price,SP,imbalance_price,valueSum,temperature,TCO2_per_h,gCO2_per_kWh,nuclear,biomass,coal,...,demand,pumped_storage,wind_onshore,wind_offshore,belgian,dutch,french,ireland,northern_ireland,irish
local_datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2020-12-01 00:00:00+00:00,39.14,1,54.75,54.75,6.1,2901.478,105.685061,6.214,3.122,0,...,27.454003,0.0,2.827155,0.196851,0.804,0.806,1.596,0.0,0.0,-0.776
2020-12-01 00:30:00+00:00,40.29,2,57.7,57.7,6.1,3142.538,114.112277,6.221,3.132,0,...,27.539,0.0,3.465114,5.653972,0.804,0.804,1.606,0.0,0.0,-0.66
2020-12-01 01:00:00+00:00,42.9,3,53.0,53.0,6.1,3080.11,113.48965,6.302,3.128,0,...,27.140008,0.0,3.24411,5.600729,0.804,0.806,1.606,0.0,0.0,-0.51
2020-12-01 01:30:00+00:00,42.61,4,42.61,42.61,6.1,2916.672,109.214109,6.332,3.13,0,...,26.706,0.0,3.141496,5.532231,0.804,0.806,1.606,0.0,0.0,-0.488
2020-12-01 02:00:00+00:00,41.64,5,41.64,41.64,6.1,2928.562,110.787678,6.334,3.133,0,...,26.434005,0.0,2.981147,5.599126,0.804,0.804,1.606,0.0,0.0,-0.41


<br>

### Data Saving

We'll now create some helper functions for saving the data in batches

In [24]:
#exports
get_EI_files = lambda data_dir: [f for f in os.listdir(data_dir) if 'csv' in f]

def bulk_retrieval(start_year=2009, end_year=2020, data_dir='data'):
    """Retrieves and saves in batches of years
    """
    EI_files = get_EI_files(data_dir)
    
    for year in range(start_year, end_year+1):
        if f'electric_insights_{year}.csv' not in EI_files:
            start_date, end_date = f'{year}-01-01 00:00', f'{year}-12-31 23:30'
            df_EI = get_EI_data(start_date, end_date)
            df_EI.to_csv(f'{data_dir}/electric_insights_{year}.csv')
        
    return 

In [25]:
bulk_retrieval(end_year=2020, data_dir=data_dir)

<br>

We'll write some checks to ensure that there are no gaps within the data

In [26]:
#exports
def check_for_gappy_data(data_dir):
    EI_files = get_EI_files(data_dir)

    for EI_file in tqdm(EI_files):
        year = int(EI_file.split('_')[-1].split('.')[0])
        
        df_EI_year = pd.read_csv(f'{data_dir}/{EI_file}')
        current_ts = pd.Timestamp.now()

        df_EI_year = df_EI_year.set_index('local_datetime')
        df_EI_year.index = pd.to_datetime(df_EI_year.index, utc=True).tz_convert('Europe/London')
        
        if year < current_ts.year:
            missing_dates = list(set(pd.date_range(f'{year}-01-01 00:00', f'{year}-12-31 23:30', freq='30T', tz='Europe/London')) - set(df_EI_year.index))
        else:
            missing_dates = list(set(pd.date_range(f'{year}-01-01 00:00', df_EI_year.index.max().tz_convert(None), freq='30T', tz='Europe/London')) - set(df_EI_year.index))
            
        if len(missing_dates) > 0:
            warn(f'There are {len(missing_dates)} missing dates in the {year} dataframe')

In [27]:
check_for_gappy_data(data_dir)

100%|██████████████████████████████████████████████████████████████████████████████████| 13/13 [00:06<00:00,  2.05it/s]


<br>

We'll now create a helper function that updates the dataset for the year we're currently in

In [28]:
#exports
def retrieve_latest_data(data_dir):
    EI_files = get_EI_files(data_dir)
    EI_years_downloaded = [int(f.split('_')[-1].split('.')[0]) for f in EI_files]

    current_ts = pd.Timestamp.now(tz='Europe/London')

    if current_ts.year not in EI_years_downloaded:
        start_date, end_date = f'{current_ts.year}-01-01 00:00', current_ts.strftime('%Y-%m-%d %H:%M')
        df_EI = get_EI_data(start_date, end_date)
        df_EI.to_csv(f'{data_dir}/electric_insights_{current_ts.year}.csv')

    else:
        df_EI = pd.read_csv(f'{data_dir}/electric_insights_{current_ts.year}.csv')

        df_EI = df_EI.set_index('local_datetime')
        df_EI.index = pd.to_datetime(df_EI.index, utc=True).tz_convert('Europe/London')
        dt_rng = pd.date_range(df_EI.index.max(), current_ts, freq='30T', tz='Europe/London')

        if dt_rng.size > 1:
            start_date = dt_rng[0] - pd.Timedelta(days=7)
            end_date = dt_rng[-1]

            try:
                df_EI_latest = get_EI_data(start_date, end_date)
                df_EI_trimmed = df_EI.drop(list(set(df_EI_latest.index) - (set(df_EI_latest.index) - set(df_EI.index))))
                df_EI_combined = df_EI_trimmed.append(df_EI_latest)
                df_EI_combined.to_csv(f'{data_dir}/electric_insights_{current_ts.year}.csv')
            except:
                warn(f'Could not retrieve any new data between {start_date} and {end_date}')

In [29]:
retrieve_latest_data(data_dir)

100%|████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:02<00:00,  1.26s/it]


<br>

### Packaging

The last stage in our data retrieval process is to package it up as a Frictionless Data Package. We'll begin by creating a function that constructs the metadata for each resource within the package.

In [30]:
#exports
def year_to_resource(year=2009):
    resource = {
      "name": f"electric-insights-{year}",
      "path": f"electric_insights_{year}.csv",
      "profile": "tabular-data-resource",
      "schema": {
        "fields": [
          {
            "name": "local_datetime",
            "type": "datetime",
            "format": "default",
            "title": "Local Datetime",
            "description": "Datetime index in the `Europe/London` timezone"
          },
          {
            "name": "day_ahead_price",
            "type": "number",
            "format": "default",
            "title": "Day Ahead Price",
            "description": "Price of electricity on the day-ahead market exchanges"
          },
          {
            "name": "SP",
            "type": "integer",
            "format": "default",
            "title": "Settlement Period",
            "description": "Half hour settlement period. Normally 1-48 apart from during clock changes when there will be 46 or 50 settlement periods."
          },
          {
            "name": "imbalance_price",
            "type": "number",
            "format": "default",
            "title": "Balancing Market Price",
            "description": "Price of electricity on the balancing market"
          },
          {
            "name": "valueSum",
            "type": "number",
            "format": "default",
            "title": "Value Sum",
            "description": "Unknown"
          },
          {
            "name": "temperature",
            "type": "number",
            "format": "default",
            "title": "Temperature",
            "description": "The temperature at noon averaged over the whole country"
          },
          {
            "name": "TCO2_per_h",
            "type": "integer",
            "format": "default",
            "title": "Tonnes CO2 per Hour",
            "description": "Tonnes of CO2 released each hour"
          },
          {
            "name": "gCO2_per_kWh",
            "type": "integer",
            "format": "default",
            "description": "Carbon intensity on the GB power grid",
            "title": "Grams CO2 per KiloWatt Hour"
          },
          {
            "name": "nuclear",
            "type": "number",
            "format": "default",
            "title": "Nuclear Output (GW)",
            "description": "Power output from nuclear plants in GB"
          },
          {
            "name": "biomass",
            "type": "number",
            "format": "default",
            "title": "Biomass Output (GW)",
            "description": "Power output from biomass plants in GB"
          },
          {
            "name": "coal",
            "type": "number",
            "format": "default",
            "title": "Coal Output (GW)",
            "description": "Power output from coal plants in GB"
          },
          {
            "name": "gas",
            "type": "number",
            "format": "default",
            "title": "Gas Output (GW)",
            "description": "Power output from gas plants in GB"
          },
          {
            "name": "hydro",
            "type": "number",
            "format": "default",
            "title": "Hydro Output (GW)",
            "description": "Power output from hydro plants in GB"
          },
          {
            "name": "wind",
            "type": "number",
            "format": "default",
            "title": "Wind Output (GW)",
            "description": "Power output from wind plants in GB"
          },
          {
            "name": "solar",
            "type": "number",
            "format": "default",
            "title": "Solar Output (GW)",
            "description": "Power output from solar plants in GB"
          },
          {
            "name": "demand",
            "type": "number",
            "format": "default",
            "title": "Demand (GW)",
            "description": "Total demand for power in GB"
          },
          {
            "name": "pumped_storage",
            "type": "number",
            "format": "default",
            "title": "Pumped Storage (GW)",
            "description": "Power output from Pumped Storage plants in GB"
          },
          {
            "name": "wind_onshore",
            "type": "number",
            "format": "default",
            "title": "Onshore Wind Output (GW)",
            "description": "Power output from onshore wind plants in GB"
          },
          {
            "name": "wind_offshore",
            "type": "number",
            "format": "default",
            "title": "Offshore Wind Output (GW)",
            "description": "Power output from offshore wind plants in GB"
          },
          {
            "name": "belgian",
            "type": "number",
            "format": "default",
            "title": "Belgian Interconnector (GW)",
            "description": "Power flow in the Belgian Interconnector"
          },
          {
            "name": "dutch",
            "type": "number",
            "format": "default",
            "title": "Dutch Interconnector (GW)",
            "description": "Power flow in the Dutch Interconnector"
          },
          {
            "name": "french",
            "type": "number",
            "format": "default",
            "title": "French Interconnector (GW)",
            "description": "Power flow in the French Interconnector"
          },
          {
            "name": "ireland",
            "type": "number",
            "format": "default",
            "title": "Ireland Interconnector (GW)",
            "description": "Power flow in the Ireland Interconnector"
          },
          {
            "name": "northern_ireland",
            "type": "number",
            "format": "default",
            "title": "Northern Ireland Interconnector (GW)",
            "description": "Power flow in the Northern Ireladn Interconnector"
          },
          {
            "name": "irish",
            "type": "number",
            "format": "default",
            "title": "Irish Interconnector (GW)",
            "description": "Net flow of the two Irish interconnectors"
          }
        ]
      }
    }
    
    return resource

In [31]:
JSON(year_to_resource())

<IPython.core.display.JSON object>

<br>

Next we'll construct the full data package

In [32]:
#exports
def construct_datapackage(start_year=2009, end_year=2021):
    datapackage = {
      "profile": "tabular-data-package",
      "resources": [year_to_resource(year) for year in range(start_year, end_year+1)],
      "keywords": [
        "electric insights"
      ],
      "contributors": [
        {
          "title": "Drax",
          "role": "data-source"
        },
        {
          "title": "Bourn, Ayrton",
          "role": "maintainer"
        }
      ],
      "name": "electric-insights",
      "title": "Electric Insights",
      "homepage": "https://github.com/AyrtonB/Electric-Insights"
    }
    
    return datapackage

In [33]:
datapackage = construct_datapackage()

JSON(datapackage)

<IPython.core.display.JSON object>

<br>

Lastly we'll save it as a `.json`

In [34]:
with open(f'{data_dir}/datapackage.json', 'w') as f:
    json.dump(datapackage, f)

<br>

### CLI

We want to make this process as replicable as possible, as well as simple to integrate into GitHub actions work flows. To this end we'll create a CLI for the data retrieval, saving, and packaging.

In [35]:
#exports
app = typer.Typer()

In [36]:
#exports
@app.command()
def download(data_dir='data', start_year=2009, end_year=None):
    if end_year is None:
        end_year = pd.Timestamp.now().year
        
    bulk_retrieval(start_year=start_year, end_year=end_year-1, data_dir=data_dir)
    check_for_gappy_data(data_dir)
    retrieve_latest_data(data_dir)

    datapackage = construct_datapackage(start_year=start_year, end_year=end_year)

    with open(f'{data_dir}/datapackage.json', 'w') as f:
        json.dump(datapackage, f)
    
    return

In [38]:
download(data_dir)

100%|██████████████████████████████████████████████████████████████████████████████████| 13/13 [00:05<00:00,  2.42it/s]
100%|████████████████████████████████████████████████████████████████████████████████████| 2/2 [00:03<00:00,  1.55s/it]


In [39]:
#exports
if __name__ == '__main__' and '__file__' in globals():
    app()

In [44]:
#hide
from nbdev.export import notebook2script
notebook2script('01-retrieval.ipynb')

Converted 01-retrieval.ipynb.
