In [24]:
import asyncio
import httpx
from dataclasses import dataclass, field
from typing import Dict, List
import pandas as pd
from asyncio import Semaphore
import os

import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger("httpx").setLevel(logging.WARNING)  # Suppress httpx detailed logs


@dataclass
class ReportURLs:
    report_list_path: str = "report_list_short.csv"
    reports: pd.DataFrame = field(init=False)
    urls: Dict[str, List[str]] = field(default_factory=dict, init=False)
    data: Dict[str, pd.DataFrame] = field(default_factory=dict, init=False)
    semaphore: Semaphore = field(default=Semaphore(50), init=False)

    async def async_init(self):
        logging.info("Initializing ReportURLs class.")
        self.reports = pd.read_csv(self.report_list_path, dtype={'report_id': str})
        await self.generate_all_urls_async()
        await self.fetch_and_merge_data()

    def create_api_url(self, report_num, page="1", page_size="5000"):
        start_time = "2010-11-18T00:00"
        end_time = "2030-02-18T23:59"
        sort_by = "StartTime"
        order_by = "ASC"
        participant_name = ""
        resource_name = ""
        resource_type = ""
        base_url = f"https://reports.sem-o.com/api/v1/dynamic/BM-{report_num}?"

        url = (
            f"{base_url}"
            f"StartTime=%3E%3D{start_time}&"
            f"EndTime=%3C%3D{end_time}&"
            f"sort_by={sort_by}&"
            f"order_by={order_by}&"
            f"ParticipantName={participant_name}&"
            f"ResourceName={resource_name}&"
            f"ResourceType={resource_type}&"
            f"page={page}&"
            f"page_size={page_size}"
        )

        return url

    async def get_total_pages(self, url):
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            response_data = response.json()
            total_pages = response_data['pagination']['totalPages']
            return total_pages

    async def generate_all_urls_async(self):
        logging.info("Asynchronously generating all URLs for reports.")
        self.urls = {}
        tasks = []
        for index, row in self.reports.iterrows():
            report_id = row['report_id']
            report_name = row['name']
            task = asyncio.create_task(self.fetch_total_pages_and_generate_urls(report_id, report_name))
            tasks.append(task)
        await asyncio.gather(*tasks)

    async def fetch_total_pages_and_generate_urls(self, report_id, report_name):
        initial_url = self.create_api_url(report_id)
        total_pages = await self.get_total_pages(initial_url)
        report_urls = [self.create_api_url(report_id, page=str(page)) for page in range(1, total_pages + 1)]
        self.urls[report_name] = report_urls
    
    async def fetch_with_retry(self, url, url_index, total_urls, report_name):
        retries = 3
        backoff_factor = 0.5
        async with self.semaphore:
            for attempt in range(1, retries + 1):
                try:
                    async with httpx.AsyncClient(timeout=20.0) as client:
                        response = await client.get(url)
                        response.raise_for_status()
                        logging.info(f"Request {url_index}/{total_urls} completed for {report_name}.")
                        return response.json()
                except (httpx.ReadTimeout, httpx.ConnectTimeout) as e:
                    logging.warning(f"Timeout on attempt {attempt} for {url}: {e}")
                    if attempt == retries:
                        logging.error(f"Max retries reached for {url}.")
                        return None
                    sleep_duration = backoff_factor * (2 ** (attempt - 1))
                    logging.info(f"Retrying in {sleep_duration} seconds...")
                    await asyncio.sleep(sleep_duration)
                except httpx.RequestError as e:
                    logging.error(f"Request error for {url}: {e}")
                    return None

    async def fetch_and_merge_data(self):
        logging.info("Starting to fetch and merge data for all reports.")
        for report_name, urls in self.urls.items():
            logging.info(f"Processing {len(urls)} URLs for {report_name}.")
            tasks = [self.fetch_with_retry(url, i+1, len(urls), report_name) for i, url in enumerate(urls)]
            pages_data = await asyncio.gather(*tasks)
            combined_data = self.combine_data(pages_data)
            if combined_data:
                df = pd.DataFrame(combined_data)
                self.save_data_to_csv(df, report_name)  # Call save method immediately after data is ready
                logging.info(f"Completed download and merging for {report_name}: {len(combined_data)} items fetched.")
            else:
                logging.warning(f"No data found for {report_name}.")

    def combine_data(self, pages_data):
        all_data = []
        for page_data in pages_data:
            if page_data and 'items' in page_data:
                all_data.extend(page_data['items'])
        return all_data

    def save_data_to_csv(self, df, report_name):
        output_dir = "Output_Data"
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        file_path = os.path.join(output_dir, f"{report_name}.csv")
        df.to_csv(file_path, index=False)
        logging.info(f"Saved {report_name} data to {file_path}.")

In [26]:
report_urls = ReportURLs()
await report_urls.async_init()

2024-02-18 23:51:57,042 - INFO - Initializing ReportURLs class.
2024-02-18 23:51:57,044 - INFO - Asynchronously generating all URLs for reports.
2024-02-18 23:52:01,457 - INFO - Starting to fetch and merge data for all reports.
2024-02-18 23:52:01,458 - INFO - Processing 1 URLs for Trading_Day_Ex_Rate.
2024-02-18 23:52:02,820 - INFO - Request 1/1 completed for Trading_Day_Ex_Rate.
2024-02-18 23:52:02,824 - INFO - Saved Trading_Day_Ex_Rate data to Output_Data\Trading_Day_Ex_Rate.csv.
2024-02-18 23:52:02,825 - INFO - Completed download and merging for Trading_Day_Ex_Rate: 188 items fetched.
2024-02-18 23:52:02,825 - INFO - Processing 1 URLs for Daily_Load_Forecast_Summary.
2024-02-18 23:52:03,242 - INFO - Request 1/1 completed for Daily_Load_Forecast_Summary.
2024-02-18 23:52:03,265 - INFO - Saved Daily_Load_Forecast_Summary data to Output_Data\Daily_Load_Forecast_Summary.csv.
2024-02-18 23:52:03,266 - INFO - Completed download and merging for Daily_Load_Forecast_Summary: 4512 items fetc

In [5]:
for key in report_urls.urls.keys():
    print(f"{len(report_urls.urls[key])} URL's generated for {key}")

1 URL's generated for Unit_Under_Test
1 URL's generated for Trading_Day_Ex_Rate
281 URL's generated for Dispatch_Quantity
12 URL's generated for Aggregated_Contracted_Generation_Quantities
1 URL's generated for Imbalance_Price_Report(Imbalance_Settlement_Period)
2 URL's generated for Aggregated_Wind_Forecast
12 URL's generated for Aggregated_Contracted_Demand_Quantities
66 URL's generated for Daily_Dispatch_Instructions_D+1
1 URL's generated for Interconnector_Flows_&_Residual_Capacity
12 URL's generated for Aggregated_Contracted_Wind_Quantities
1 URL's generated for Demand_Control
1 URL's generated for Forecast_Imbalance
1 URL's generated for Balance_&_Imbalance_Market_Cost
1 URL's generated for Average_System_Frequency
2 URL's generated for Four_Day_Aggregated_Wind_Forecast
1 URL's generated for Daily_Load_Forecast_Summary
6 URL's generated for Imbalance_Price_Report(Imbalance_Pricing_Period)
89 URL's generated for Final_Physical_Notificaiton
137 URL's generated for Forecast_Availabi

# Original Web Scraper

In [None]:
import os
import pandas as pd
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from selenium.common.exceptions import TimeoutException, ElementClickInterceptedException
import time

def main():
    current_directory = os.path.dirname(os.path.abspath("web_scrape.ipynb"))
    setup_and_download_reports(current_directory)
    merge_data(current_directory)

def setup_and_download_reports(current_directory):
    """Sets up directories and downloads reports based on a CSV file."""
    downloads_main_dir = create_main_download_directory(current_directory)
    download_dir = create_versioned_download_directory(downloads_main_dir)
    csv_file = os.path.join(current_directory, 'report_list.csv')
    download_reports(download_dir, csv_file)

def merge_data(current_directory):
    """Merges CSV files from download directories and saves them into an output directory."""
    base_dir = os.path.join(current_directory, 'Downloads')
    output_dir = os.path.join(current_directory, 'Output_Data')
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    data_folders = [folder for folder in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, folder)) and 'Downloaded_Data_' in folder]
    data_folders += [output_dir]  # Include Output_Data for merging

    merged_dataframes = {}

    for folder_name in data_folders:
        folder_path = os.path.join(base_dir, folder_name) if folder_name != os.path.basename(output_dir) else output_dir
        for file_name in os.listdir(folder_path):
            file_path = os.path.join(folder_path, file_name)
            new_df = pd.read_csv(file_path)

            if file_name in merged_dataframes:
                merged_dataframes[file_name] = pd.concat([merged_dataframes[file_name], new_df], ignore_index=True)
            else:
                merged_dataframes[file_name] = new_df

    for file_name, df in merged_dataframes.items():
        df = df.drop_duplicates()
        sort_column = 'StartTime' if 'StartTime' in df.columns else 'TradeDate' if 'TradeDate' in df.columns else None
        if sort_column:
            df = df.sort_values(by=sort_column)

        df.to_csv(os.path.join(output_dir, file_name), index=False)

    print(f'Merged files are saved in {output_dir}.')

def create_main_download_directory(current_directory):
    """Creates a main download directory."""
    downloads_main_dir = os.path.join(current_directory, "Downloads")
    if not os.path.exists(downloads_main_dir):
        os.makedirs(downloads_main_dir)
    return downloads_main_dir

def create_versioned_download_directory(downloads_main_dir):
    """Creates a versioned download directory within the main download directory with date and version."""
    today = datetime.now()
    date_str = today.strftime("%d_%m_%Y")
    dir_number = 1
    while True:
        dir_name = f"Downloaded_Data_{date_str}_{dir_number}"
        download_dir = os.path.join(downloads_main_dir, dir_name)
        if not os.path.exists(download_dir):
            os.makedirs(download_dir)
            return download_dir
        dir_number += 1

def download_reports(download_dir, csv_file):
    """Downloads reports based on a given CSV file."""
    dataframe = pd.read_csv(csv_file, dtype={'report_id': str})
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_experimental_option("prefs", {
        "download.default_directory": download_dir,
        "download.prompt_for_download": False,
        "safebrowsing.enabled": False,
    })

    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=chrome_options)
    reports_downloaded = 0

    for index, row in dataframe.iterrows():
        report_id = row['report_id']
        report_name = row['name']
        chart_required = row['chart?'].lower() == 'y'
        url = f"https://www.sem-o.com/market-data/dynamic-reports/#BM-{report_id}"
        driver.get(url)

        try:
            if chart_required:
                wait_and_click(driver, By.ID, "dynamic-reports-table")
            wait_and_click(driver, By.CSS_SELECTOR, ".icon.icon-download")
            print(f"Downloaded report ID: {report_id} - {report_name}.")
            reports_downloaded += 1
        except (TimeoutException, ElementClickInterceptedException) as e:
            print(f"Failed to interact with the elements for report ID {report_id} - {report_name}. Reason: {e}")
            continue
        time.sleep(0.5)

    print(f"Successfully downloaded {reports_downloaded}/{len(dataframe)} reports.")
    driver.quit()

def wait_and_click(driver, by_method, selector):
    """Waits for an element to be clickable and then clicks it."""
    try:
        element = WebDriverWait(driver, 10).until(EC.element_to_be_clickable((by_method, selector)))
        element.click()
    except ElementClickInterceptedException as e:
        driver.execute_script("arguments[0].click();", element)

if __name__ == "__main__":
    main()
