In [1]:
import os
import sys
os.chdir('../')

In [2]:
from WattPredictor.utils.helpers import *
from WattPredictor.constants import *
from WattPredictor.utils.exception import *
from WattPredictor import logger

In [3]:
from pathlib import Path
from dataclasses import dataclass
from datetime import datetime, timedelta

In [None]:
@dataclass(frozen=True)
class FeatureStoreConfig:
    hopsworks_project_name: str
    hopsworks_api_key: str

In [5]:
@dataclass(frozen=True)
class DataIngestionConfig:
    root_dir: Path
    elec_raw_data: Path
    wx_raw_data: Path
    elec_api: str
    wx_api: str
    elec_api_key: str
    data_file: Path
    start_date: str
    end_date: str

In [6]:
class ConfigurationManager:
    def __init__(self, 
                 config_filepath=CONFIG_PATH,
                 params_filepath=PARAMS_PATH,
                 schema_filepath=SCHEMA_PATH):

        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        if schema_filepath.exists():
            self.schema = read_yaml(schema_filepath)

        create_directories([self.config.artifacts_root])

    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.data_ingestion
        params = self.params.dates

        create_directories([config.root_dir])

        data_ingestion_config = DataIngestionConfig(
            root_dir=Path(config.root_dir),
            elec_raw_data=Path(config.elec_raw_data),
            wx_raw_data=Path(config.wx_raw_data),
            elec_api= os.environ['elec_api'],
            wx_api= os.environ['wx_api'],
            elec_api_key= os.environ['elec_api_key'],
            data_file=Path(config.data_file),
            start_date=params.start_date,
            end_date=params.end_date
        )

        return data_ingestion_config
    
    def get_feature_store_config(self) -> FeatureStoreConfig:

        config = self.config.feature_store

        feature_store_config = FeatureStoreConfig(
            hopsworks_project_name=config.hopsworks_project_name,
            hopsworks_api_key=os.environ['hopsworks_api_key'],
            hopsworks_host=config.hopsworks_host
        )

        return feature_store_config

In [7]:
import hopsworks
import pandas as pd
import sys
import os
from WattPredictor.utils.exception import CustomException
from WattPredictor import logger

class FeatureStore:
    def __init__(self, config):
        try:
            self.config = config
            self.connect()
        except Exception as e:
            raise CustomException(e, sys)

    def connect(self):
        try:
            self.project = hopsworks.login(
                project=self.config.hopsworks_project_name,
                api_key_value=self.config.hopsworks_api_key
            )
            self.feature_store = self.project.get_feature_store()
            self.dataset_api = self.project.get_dataset_api()
            logger.info(f"Connected to Hopsworks Feature Store: {self.config.hopsworks_project_name}")
        except Exception as e:
            raise CustomException(e, sys)

    def create_feature_group(self, name, df, primary_key, event_time, description):
        try:
            try:
                fg = self.feature_store.get_feature_group(name=name, version=1)
                logger.info(f"Feature Group '{name}' already exists. Inserting data instead.")
                fg.insert(df)
            except:
                logger.info(f"Feature Group '{name}' does not exist. Creating new one.")
                fg = self.feature_store.get_or_create_feature_group(
                    name=name,
                    version=1,
                    primary_key=primary_key,
                    event_time=event_time,
                    description=description,
                    online_enabled=False
                )
                fg.save(df)

            logger.info(f"Feature Group '{name}' created/updated successfully")

        except Exception as e:
            raise CustomException(e, sys)

    def create_feature_view(self, name: str, feature_group_name: str, features: list):
        try:
            fg = self.feature_store.get_feature_group(name=feature_group_name, version=1)
            fv = self.feature_store.get_or_create_feature_view(
                name=name,
                version=1,
                query=fg.select(features),
                description=f"Feature View for {name}"
            )
            logger.info(f"Feature View '{name}' created successfully")
        except Exception as e:
            raise CustomException(e, sys)

    def upload_file_safely(self, local_path: str, target_name: str):
        """
        Upload file to Hopsworks dataset storage.
        If it already exists, it will be overwritten.
        """
        try:
            self.dataset_api.upload(
                local_path,
                f"Resources/wattpredictor_artifacts/{target_name}",
                overwrite=True 
            )
            logger.info(f"Uploaded file to Feature Store: {target_name}")
        except Exception as e:
            raise CustomException(e, sys)

    def delete_file(self, target_name: str):
        """
        Delete file from Hopsworks dataset storage.
        Only use this if you want to clean up files manually.
        """
        try:
            full_path = f"Resources/wattpredictor_artifacts/{target_name}"
            self.dataset_api.delete(full_path)
            logger.warning(f"Deleted file from Feature Store: {target_name}")
        except Exception as e:
            logger.warning(f"File not found or already deleted: {target_name}")
            # Not raising exception here to allow safe cleanup

    def get_training_data(self, feature_view_name: str):
        try:
            fv = self.feature_store.get_feature_view(name=feature_view_name, version=1)
            X, y = fv.training_data()
            logger.info(f"Retrieved training data from Feature View '{feature_view_name}'")
            return X, y
        except Exception as e:
            raise CustomException(e, sys)

In [8]:
import os
import json
import pandas as pd
import requests
from datetime import datetime, timedelta
import openmeteo_requests
import requests_cache
from retry_requests import retry
from WattPredictor import logger
from WattPredictor.utils.helpers import create_directories, save_json, load_json
from WattPredictor.utils.exception import CustomException
from pathlib import Path
from dotenv import load_dotenv

cache_session = requests_cache.CachedSession('.cache', expire_after=3600)
retry_session = retry(cache_session, retries=5, backoff_factor=0.2)
load_dotenv()

class DataIngestion:
    def __init__(self, config: DataIngestionConfig, feature_store_config: FeatureStoreConfig):
        """
        Initialize DataIngestion with configuration and Feature Store settings.

        Args:
            config (DataIngestionConfig): Configuration for data ingestion.
            feature_store_config (FeatureStoreConfig): Configuration for Hopsworks Feature Store.
        """
        self.config = config
        self.feature_store_config = feature_store_config
        self.feature_store = FeatureStore(feature_store_config)
        self.openmeteo = openmeteo_requests.Client(session=retry_session)



    def _elec_get_api_url(self, year, month, day):
        """
        Construct API URL and parameters for electricity data.

        Args:
            year (int): Year of data.
            month (int): Month of data.
            day (int): Day of data.

        Returns:
            tuple: URL and parameters dictionary.
        """
        return self.config.elec_api, {
            "frequency": "hourly",
            "data[0]": "value",
            "sort[0][column]": "period",
            "sort[0][direction]": "desc",
            "facets[parent][0]": "NYIS",
            "offset": 0,
            "length": 5000,
            "start": f"{year}-{month:02d}-{day:02d}",
            "end": (datetime(year, month, day) + timedelta(days=1)).strftime("%Y-%m-%d"),
            "api_key": self.config.elec_api_key
        }

    def _wx_get_api_url(self, start_date, end_date):
        """
        Construct API URL and parameters for weather data.

        Args:
            start_date (datetime): Start date.
            end_date (datetime): End date.

        Returns:
            tuple: URL and parameters dictionary.
        """
        return self.config.wx_api, {
            "latitude": 40.7128,
            "longitude": -74.0060,
            "start_date": start_date.strftime("%Y-%m-%d"),
            "end_date": end_date.strftime("%Y-%m-%d"),
            "hourly": ["temperature_2m", "weather_code", "relative_humidity_2m", "wind_speed_10m"],
            "timeformat": "unixtime",
            "timezone": "America/New_York"
        }

    def _fetch_data(self, data_type, *args):
        """
        Fetch data from APIs or load from local storage.

        Args:
            data_type (str): Type of data ("electricity" or "weather").
            *args: Arguments for fetching data (year, month, day for electricity; start_date, end_date for weather).

        Returns:
            pd.DataFrame: Fetched data.
        """
        try:
            if data_type == "electricity":
                year, month, day = args
                file_path = self.config.elec_raw_data / f"hourly_demand_{year}-{month:02d}-{day:02d}.json"
                if file_path.exists():
                    data = load_json(file_path)
                    if 'response' in data and 'data' in data['response']:
                        return pd.DataFrame(data['response']['data'])
                
                url, params = self._elec_get_api_url(year, month, day)
                response = requests.get(url, params=params)
                response.raise_for_status()
                data = response.json()
                
                create_directories([self.config.elec_raw_data])
                save_json(file_path, data)
                
                if 'response' in data and 'data' in data['response']:
                    return pd.DataFrame(data['response']['data'])
                
            elif data_type == "weather":
                start_date, end_date = args
                file_path = self.config.wx_raw_data / f"weather_data_{start_date.strftime('%Y-%m-%d')}_to_{end_date.strftime('%Y-%m-%d')}.csv"
                if file_path.exists():
                    return pd.read_csv(file_path)
                
                url, params = self._wx_get_api_url(start_date, end_date)
                responses = self.openmeteo.weather_api(url, params=params)
                response = responses[0]
                
                hourly = response.Hourly()
                hourly_data = {
                    "date": pd.date_range(
                        start=pd.to_datetime(hourly.Time(), unit="s", utc=True),
                        end=pd.to_datetime(hourly.TimeEnd(), unit="s", utc=True),
                        freq=pd.Timedelta(seconds=hourly.Interval()),
                        inclusive="left"
                    ),
                    "temperature_2m": hourly.Variables(0).ValuesAsNumpy(),
                    "weather_code": hourly.Variables(1).ValuesAsNumpy(),
                    "relative_humidity_2m": hourly.Variables(2).ValuesAsNumpy(),
                    "wind_speed_10m": hourly.Variables(3).ValuesAsNumpy()
                }
                
                df = pd.DataFrame(data=hourly_data)
                create_directories([self.config.wx_raw_data])
                df.to_csv(file_path, index=False)
                return df
            
            return pd.DataFrame()
        
        except requests.RequestException as e:
            logger.error(f"API request failed for {data_type} data: {e}")
            return pd.DataFrame()
        except pd.errors.EmptyDataError as e:
            logger.error(f"Empty data error for {data_type} data: {e}")
            return pd.DataFrame()
        except Exception as e:
            logger.error(f"Unexpected error fetching {data_type} data: {e}")
            raise CustomException(e, sys)

    def download(self):
        """
        Download electricity and weather data, merge, and save to Feature Store.
        """
        try:
            start = pd.to_datetime(self.config.start_date, utc=True)
            end = pd.to_datetime(self.config.end_date, utc=True)

            elec_data = []
            current_date = start
            while current_date <= end:
                year, month, day = current_date.year, current_date.month, current_date.day
                df = self._fetch_data("electricity", year, month, day)
                if not df.empty:
                    elec_data.append(df)
                current_date += timedelta(days=1)

            wx_df = self._fetch_data("weather", start, end)
            
            if elec_data and not wx_df.empty:
                elec_df = pd.concat(elec_data, ignore_index=True)
                
                elec_df['date'] = pd.to_datetime(elec_df['period'], utc=True)
                wx_df['date'] = pd.to_datetime(wx_df['date'], utc=True)
                
                combined_df = pd.merge(elec_df, wx_df, on="date", how="inner")
                
                if combined_df.empty:
                    logger.warning("Merged dataset is empty, check data alignment")
                    return pd.DataFrame()
                
                logger.info(f"Merged data shape: {combined_df.shape}")


                combined_df.columns = (
                    combined_df.columns.str.lower()
                    .str.replace("-", "_", regex=False)
                    .str.replace(" ", "_", regex=False)
                    .str.strip()
                )
                
                # Save to Feature Store
                self.feature_store.create_feature_group(
                    name="elec_wx_demand",
                    df=combined_df,
                    primary_key=["subba"],
                    event_time="date",
                    description="Merged electricity demand and weather data for WattPredictor"
                )
                
                create_directories([self.config.data_file.parent])
                combined_df.to_csv(self.config.data_file, index=False)
                logger.info(f"Dataset saved to {self.config.data_file} and Feature Store")
                
                return combined_df
            
            logger.warning("No data fetched, returning empty DataFrame")
            return pd.DataFrame()
        
        except Exception as e:
            logger.error(f"Error during download: {e}")
            raise CustomException(e, sys)

In [9]:
try:
    config_manager = ConfigurationManager()
    ingestion_config = config_manager.get_data_ingestion_config()
    feature_store_config = config_manager.get_feature_store_config()
    ingestion = DataIngestion(config=ingestion_config, feature_store_config=feature_store_config)
    ingestion.download()
except Exception as e:
    raise CustomException(e, sys)

[2025-07-11 14:37:34,842: INFO: helpers: yaml file: config_file\config.yaml loaded successfully]
[2025-07-11 14:37:34,854: INFO: helpers: yaml file: config_file\params.yaml loaded successfully]
[2025-07-11 14:37:34,859: INFO: helpers: yaml file: config_file\schema.yaml loaded successfully]
[2025-07-11 14:37:34,861: INFO: helpers: created directory at: artifacts]
[2025-07-11 14:37:34,862: INFO: helpers: created directory at: data]
[2025-07-11 14:37:34,865: INFO: external: Initializing external client]
[2025-07-11 14:37:34,866: INFO: external: Base URL: https://c.app.hopsworks.ai:443]
[2025-07-11 14:37:37,837: INFO: python: Python Engine initialized.]

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1237149
[2025-07-11 14:37:40,358: INFO: 385296184: Connected to Hopsworks Feature Store: JavithNaseem]
[2025-07-11 14:37:42,934: INFO: helpers: created directory at: data\raw\elec_data]
[2025-07-11 14:37:42,940: INFO: helpers: json file saved at: data\raw\elec_data\hour

Uploading Dataframe: 100.00% |██████████| Rows 40909/40909 | Elapsed Time: 00:09 | Remaining Time: 00:00


Launching job: elec_wx_demand_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1237149/jobs/named/elec_wx_demand_1_offline_fg_materialization/executions
[2025-07-11 14:44:05,400: INFO: 385296184: Feature Group 'elec_wx_demand' created/updated successfully]
[2025-07-11 14:44:05,403: INFO: helpers: created directory at: data\processed]
[2025-07-11 14:44:05,752: INFO: 3241335765: Dataset saved to data\processed\elec_wx_demand.csv and Feature Store]
