In [1]:
import os
import sys


In [2]:
pwd('../')

'f:\\Electricity-Demand-Predictor\\notebooks'

In [3]:
os.chdir('../')

In [4]:
sys.path.append(os.path.join(os.getcwd(), "src"))

In [5]:
from electron.utils.helpers import *
from electron.constants import *

In [6]:
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime, timedelta

In [7]:
@dataclass(frozen=True)
class DataIngestionConfig:
    root_dir: Path
    source_api: str
    raw_data_path: Path
    data_file: Path
    start_date: str
    end_date: str

In [8]:
class ConfigurationManager:
    def __init__(self,config_filepath=CONFIG_PATH,
                 params_filepath=PARAMS_PATH,
                 schema_filepath=SCHEMA_PATH):

        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        self.schema = read_yaml(schema_filepath)

        create_directories([self.config.artifacts_root])

    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.data_ingestion
        params = self.params.dates

        data_ingestion_config = DataIngestionConfig(
            root_dir=Path(config.root_dir),
            source_api=config.source_api,
            raw_data_path=Path(config.raw_data_path),
            data_file=Path(config.data_file),
            start_date=params.start_date,
            end_date=params.end_date
        )

        return data_ingestion_config

In [9]:
import pandas as pd
import requests
import json
from electron import logger
from sklearn.preprocessing import LabelEncoder
from pathlib import Path



class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config
        self.label_encoder = LabelEncoder()
        
        # Create directories if they don't exist
        self.config.raw_data_path.mkdir(parents=True, exist_ok=True)
        self.config.data_file.parent.mkdir(parents=True, exist_ok=True)
        
        logger.info(f"DataIngestion initialized with config: {config}")

    def _get_api_url(self, year, month, day):
        """
        Generate API URL and parameters for fetching electricity data.
        
        Args:
            year (int): Year
            month (int): Month
            day (int): Day
            
        Returns:
            tuple: (url, params) for API request
        """
        return (
            "https://api.eia.gov/v2/electricity/rto/region-sub-ba-data/data/"
        ), {
            "frequency": "hourly",
            "data[0]": "value",
            "sort[0][column]": "period",
            "sort[0][direction]": "desc",
            "facets[parent][0]": "NYIS",
            "offset": 0,
            "length": 5000,
            "start": f"{year}-{month:02d}-{day:02d}",
            "end": (datetime(year, month, day) + timedelta(days=1)).strftime("%Y-%m-%d"),
            "api_key": 'dqRq8VpXSoyUrCbrPhuYFxGl6Rul9kmVcRshZ98c'
        }

    def _fetch_and_save_json(self, year, month, day) -> Path:
        """
        Fetch data from API and save as JSON file.
        
        Args:
            year (int): Year
            month (int): Month
            day (int): Day
            
        Returns:
            Path: Path to saved JSON file or None if failed
        """
        url, params = self._get_api_url(year, month, day)
        try:
            logger.info(f"Fetching data for {year}-{month:02d}-{day:02d}")
            response = requests.get(url, params=params)
            response.raise_for_status()
            data = response.json()

            save_path = self.config.raw_data_path / f"hourly_demand_{year}-{month:02d}-{day:02d}.json"
            with open(save_path, 'w') as f:
                json.dump(data, f, indent=4)

            logger.info(f"Saved data to {save_path}")
            return save_path
            
        except requests.RequestException as e:
            logger.error(f"Failed to fetch data for {year}-{month:02d}-{day:02d}: {e}")
            return None
        except Exception as e:
            logger.error(f"Unexpected error while fetching data for {year}-{month:02d}-{day:02d}: {e}")
            return None

    def _load_single_day(self, file_path: Path) -> pd.DataFrame:
        """
        Load and process data from a single JSON file.
        
        Args:
            file_path (Path): Path to JSON file
            
        Returns:
            pd.DataFrame: Processed DataFrame
        """
        try:
            with open(file_path, 'r') as f:
                data = json.load(f)

            if 'response' in data and 'data' in data['response']:
                df = pd.DataFrame(data['response']['data'])
            else:
                logger.warning(f"Unexpected structure in file {file_path}")
                return pd.DataFrame()

            if df.empty:
                logger.warning(f"No data in file {file_path}")
                return pd.DataFrame()

            if 'subba' not in df.columns:
                logger.warning(f"'subba' column missing in {file_path}")
                return pd.DataFrame()

            # Encode sub-region codes
            df['sub_region_code'] = self.label_encoder.fit_transform(df['subba']).astype('int64')
            
            # Select and rename columns
            df = df[['period', 'sub_region_code', 'value']].copy()
            df.rename(columns={'value': 'demand', 'period': 'date'}, inplace=True)
            
            # Convert date to datetime
            df['date'] = pd.to_datetime(df['date'], utc=True)
            
            logger.info(f"Loaded {len(df)} records from {file_path}")
            return df
            
        except Exception as e:
            logger.error(f"Error loading data from {file_path}: {e}")
            return pd.DataFrame()

    def download(self) -> pd.DataFrame:
        """
        Download electricity demand data for the specified date range.
        
        Returns:
            pd.DataFrame: Combined DataFrame with all data
        """
        try:
            start = pd.to_datetime(self.config.start_date, utc=True)
            end = pd.to_datetime(self.config.end_date, utc=True)
            all_data = []

            logger.info(f"Starting data download from {start.date()} to {end.date()}")
            
            current_date = start
            total_days = (end - start).days + 1
            processed_days = 0
            
            while current_date <= end:
                year, month, day = current_date.year, current_date.month, current_date.day
                
                # Use raw_data_path for JSON files
                file_path = self.config.raw_data_path / f"hourly_demand_{year}-{month:02d}-{day:02d}.json"

                if not file_path.exists():
                    file_path = self._fetch_and_save_json(year, month, day)
                    if not file_path:
                        current_date += timedelta(days=1)
                        processed_days += 1
                        continue

                df = self._load_single_day(file_path)
                if not df.empty:
                    all_data.append(df)

                current_date += timedelta(days=1)
                processed_days += 1
                
                # Progress logging
                if processed_days % 30 == 0:
                    logger.info(f"Processed {processed_days}/{total_days} days")

            if all_data:
                final_df = pd.concat(all_data, ignore_index=True)
                
                # Sort by date
                final_df = final_df.sort_values('date').reset_index(drop=True)
                
                # Save to CSV
                final_df.to_csv(self.config.data_file, index=False)
                logger.info(f"Final dataset saved to {self.config.data_file}")
                logger.info(f"Final dataset shape: {final_df.shape}")
                logger.info(f"Date range: {final_df['date'].min()} to {final_df['date'].max()}")
                
                return final_df
            else:
                logger.warning("No data collected.")
                return pd.DataFrame()
                
        except Exception as e:
            logger.error(f"Error during data download: {e}")
            raise

In [10]:
try:
    config = ConfigurationManager()
    ingestion_config = config.get_data_ingestion_config()
    ingestion = DataIngestion(config=ingestion_config)
    df = ingestion.download()

except Exception as e:
    logging.info(f"An error occurred during data ingestion: {e}")
    raise

[2025-07-01 17:54:10,648: INFO: helpers: yaml file: config_file\config.yaml loaded successfully]
[2025-07-01 17:54:10,652: INFO: helpers: yaml file: config_file\params.yaml loaded successfully]
[2025-07-01 17:54:10,657: INFO: helpers: yaml file: config_file\schema.yaml loaded successfully]
[2025-07-01 17:54:10,659: INFO: helpers: created directory at: artifacts]
[2025-07-01 17:54:10,661: INFO: 1207651540: DataIngestion initialized with config: DataIngestionConfig(root_dir=WindowsPath('data_ingestion'), source_api='https://api.eia.gov/v2/electricity/rto/region-sub-ba-data/data/', raw_data_path=WindowsPath('artifacts/data_ingestion/raw_data'), data_file=WindowsPath('artifacts/data_ingestion/data/electricity_demand.csv'), start_date='2023-01-01', end_date='2023-12-31')]
[2025-07-01 17:54:10,668: INFO: 1207651540: Starting data download from 2023-01-01 to 2023-12-31]
[2025-07-01 17:54:10,673: INFO: 1207651540: Fetching data for 2023-01-01]
[2025-07-01 17:54:13,490: INFO: 1207651540: Saved 