# SEMO & SEMOpx File Retrieval

In [20]:
import concurrent.futures
import pandas as pd
import requests
from tqdm import tqdm
from typing import Dict, Any, Optional, List

class APIFetcher:
    """
    A class used to represent an API fetcher.
    """

    def __init__(self, base_url: str, params: Dict[str, Any]):
        """
        Parameters
        ----------
        base_url : str
            The base URL of the API.
        params : Dict[str, Any]
            The parameters of the API.
        """
        self.base_url = base_url
        self.params = params

    def fetch_page(self, page: int) -> Optional[pd.DataFrame]:
        """
        Fetch a page from the API.

        Parameters
        ----------
        page : int
            The page number to fetch.

        Returns
        -------
        Optional[pd.DataFrame]
            The data frame of the page or None if an error occurred.
        """
        params = self.params.copy()
        params['page'] = page
        try:
            response = requests.get(self.base_url, params=params)
            response.raise_for_status()
        except requests.RequestException as e:
            print(f"Error occurred on page {page}: {e}")
            return None
        else:
            data = response.json()
            return pd.json_normalize(data['items'])

    def get_data(self) -> pd.DataFrame:
        """
        Get all data from the API.

        Returns
        -------
        pd.DataFrame
            The data frame of all pages.
        """
        initial_response = requests.get(self.base_url, params=self.params)
        total_pages = initial_response.json()['pagination']['totalPages']

        data_frames = []
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = [executor.submit(self.fetch_page, page) for page in range(1, total_pages + 1)]
            for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Downloading"):
                data_frame = future.result()
                if data_frame is not None:
                    data_frames.append(data_frame)

        final_data_frame = pd.concat(data_frames, ignore_index=True)
        final_data_frame['FullURL'] = self.base_url + "/documents/" + final_data_frame['ResourceName']
        
        # Additional preprocessing
        final_data_frame['Group'] = final_data_frame['Group'].apply(lambda x: ', '.join(x) if isinstance(x, list) else x)
        final_data_frame['Group'] = final_data_frame['Group'].str.replace('[', '', regex=False)
        final_data_frame['Group'] = final_data_frame['Group'].str.replace(']', '', regex=False)
        
        return final_data_frame

    @staticmethod
    def split_by_report_name(data: pd.DataFrame) -> Dict[str, pd.DataFrame]:
        """
        Split a data frame by report name.

        Parameters
        ----------
        data : pd.DataFrame
            The data frame to split.

        Returns
        -------
        Dict[str, pd.DataFrame]
            A dictionary where the keys are the unique report names and the values are data frames with rows corresponding to that report.
        """
        return {report_name: data[data['ReportName'] == report_name] for report_name in data['ReportName'].unique()}


In [21]:
# API URLs and parameters here
semo_fetcher = APIFetcher('https://reports.sem-o.com/api/v1/documents/static-reports', {
    'sort_by': 'PublishTime',
    'order_by': 'DESC',
    'name': '',
    'group[]': ['Capacity Qualification', 'Capacity Auctions', 'Forecast Data', 'Inputs, Commerical and Technical Offer Data', 'Registration', 'Settlement Data'],
    'date_from': '',
    'date_to': '',
    'page_size': 1000,
})
semopx_fetcher = APIFetcher('https://reports.semopx.com/api/v1/documents/static-reports', {
    'sort_by': 'PublishTime',
    'order_by': 'DESC',
    'name': '',
    'group[]': ['REMIT Reports', 'Market Data'],
    'date_from': '',
    'date_to': '',
    'page_size': 1000,
})

# Get data from each API
data_semo = semo_fetcher.get_data()
data_semopx = semopx_fetcher.get_data()

# Combine the two dataframes
combined_data = pd.concat([data_semo, data_semopx], ignore_index=True)

# Split dataframe into separate dataframes based on report name and store dataframes in dictionary
dfs_dict = APIFetcher.split_by_report_name(combined_data)


Downloading: 100%|██████████| 61/61 [00:26<00:00,  2.34it/s]
Downloading: 100%|██████████| 147/147 [00:45<00:00,  3.25it/s]


# Writing File metadata to database

In [23]:
import sqlite3
import re
from typing import Dict

def clean_table_name(name: str) -> str:
    """
    Clean table name by replacing non-alphanumeric characters with underscores
    and ensure it doesn't start with a digit.

    Parameters
    ----------
    name : str
        The name of the table.

    Returns
    -------
    str
        The cleaned name of the table.
    """
    return re.sub('\W|^(?=\d)', '_', name)

def write_to_sqlite(db_name: str, dfs: Dict[str, pd.DataFrame]) -> None:
    """
    Write dataframes to SQLite database.

    Parameters
    ----------
    db_name : str
        The name of the SQLite database.
    dfs : Dict[str, pd.DataFrame]
        A dictionary of dataframes to be written to the database.
        The keys are table names and the values are corresponding dataframes.
    """
    conn = sqlite3.connect(db_name)
    for table_name, df in dfs.items():
        cleaned_table_name = clean_table_name(table_name)
        try:
            # Use `if_exists="replace"` to replace the existing table if it exists
            df.to_sql(cleaned_table_name, conn, if_exists="replace", index=False)
        except Exception as e:
            print(f"Error occurred when writing to table {cleaned_table_name}: {e}")
    conn.close()

# Use the function like this:
write_to_sqlite("trading.db", dfs_dict)


# Testing to view table names

In [24]:

def get_table_names(db_name: str) -> None:
    """
    Print all table names in an SQLite database.

    Parameters
    ----------
    db_name : str
        The name of the SQLite database.
    """
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    print(cursor.fetchall())
    conn.close()

# Use the function like this:
get_table_names("trading.db")


[('Imbalance Price Supporting Information Report',), ('Imbalance Price Report (Imbalance Pricing Period)',), ('Daily Load Forecast Summary',), ('RTIC OPERATIONAL SCHEDULE REPORT',), ('Aggregated Contracted Quantities for Demand',), ('Aggregated Contracted Quantities for Generation',), ('Hourly Dispatch Instructions Report',), ('Aggregated Contracted Quantities for Wind',), ('Hourly SO Interconnector Trades Report',), ('Outturn Availability',), ('System Shortfall Imbalance Index and System Imbalance Flattening Factor',), ('Net Imbalance Volume Forecast',), ('Forecast Imbalance',), ('Balancing and Imbalance Market Cost View',), ('Imbalance Price Report (Imbalance Settlement Period)',), ('Aggregated Final Physical Notifications',), ('Anonymised Inc/Dec Curves',), ('Aggregated Wind Forecast',), ('Four Day Aggregated Rolling Wind Unit Forecast',), ('Four Day Rolling Wind Power Unit Forecast',), ('LTS Operational Schedule Report',), ('Capacity Payments by Unit',), ('Capacity Payments by Mark

# Testing to view successful table population\

In [27]:
def read_from_sqlite(db_name: str, table_name: str) -> pd.DataFrame:
    """
    Read a table from SQLite database into a DataFrame.

    Parameters
    ----------
    db_name : str
        The name of the SQLite database.
    table_name : str
        The name of the table to be read from the database.

    Returns
    -------
    pd.DataFrame
        The DataFrame obtained from reading the table.
    """
    conn = sqlite3.connect(db_name)
    try:
        df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
    except Exception as e:
        print(f"Error occurred when reading table {table_name}: {e}")
        df = None
    finally:
        conn.close()
    return df

# Use the function like this:
df = read_from_sqlite("trading.db", "'Daily Load Forecast Summary'")

df.head()


Unnamed: 0,_id,Date,PublishTime,DateRetention,DPuG_ID,ReportName,Group,Dynamic,ResourceName,FullURL
0,647f8cbe9620d91362226ff6,2023-06-06,2023-06-06T19:20:21,2023-06-06,BM-010,Daily Load Forecast Summary,Forecast Data,1,PUB_DailyLoadFcst_202306061920.xml,https://reports.sem-o.com/api/v1/documents/sta...
1,647f8cbe9620d91362226f35,2023-06-06,2023-06-06T19:15:16,2023-06-06,BM-010,Daily Load Forecast Summary,Forecast Data,1,PUB_DailyLoadFcst_202306061915.xml,https://reports.sem-o.com/api/v1/documents/sta...
2,647f893b9620d90c1d1745cd,2023-06-06,2023-06-06T19:05:20,2023-06-06,BM-010,Daily Load Forecast Summary,Forecast Data,1,PUB_DailyLoadFcst_202306061905.xml,https://reports.sem-o.com/api/v1/documents/sta...
3,647f893b9620d90c1d17450c,2023-06-06,2023-06-06T19:00:15,2023-06-06,BM-010,Daily Load Forecast Summary,Forecast Data,1,PUB_DailyLoadFcst_202306061900.xml,https://reports.sem-o.com/api/v1/documents/sta...
4,647f85b79620d9054911f4bf,2023-06-06,2023-06-06T18:50:15,2023-06-06,BM-010,Daily Load Forecast Summary,Forecast Data,1,PUB_DailyLoadFcst_202306061850.xml,https://reports.sem-o.com/api/v1/documents/sta...
