In [48]:
# Get the API to send data
factory_api = "https://altergo.io/"
iot_api = "https://iot.altergo.io/"
# Get API Key
api_key = "d365fd18cd5a14c1965349b6aa2f9253"
node_names = ["SANTIAGO_6_N026"]
target_assets = ["CA-ISO SANTIAGO"]

In [49]:
!pip install -qqq git+https://bitbucket.org/freemens/ion_sdk@master

In [50]:
from typing import Tuple
import time
import json
import os
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from ion_sdk.edison_api.models.factory import getSensorByName
# from api.caiso import CAISO
from ion_sdk.edison_api.edison_api import Client
import collections
import requests
from io import BytesIO
from zipfile import ZipFile
import warnings

In [51]:
os.environ["ALTERGO_FACTORY_API"] = factory_api
os.environ["ALTERGO_IOT_API"] = iot_api
os.environ["ALTERGO_API_KEY"] = api_key

In [52]:
class CAISO():
    """
    Base Class containing useful methods to get data from OASIS provided by CAISO.
    """

    def __init__(self, node: str):
        """
        Initialises the CAISO class with some helpful API calls.

        Args:
            node (str): PNODE_ID with CAISO.
        """

        self.base_url = "http://oasis.caiso.com/oasisapi/"
        # Check validity of node
        if self.check_CAISO_node_validity(node):
            self.node = node
            self.AS_REGIONS = self.get_node_regions()
        else:
            raise ValueError(f"PNODE_ID {node} is not a known node with the CAISO System.")
    
    def check_CAISO_node_validity(self, node: str) -> bool:
        """
        Checks if the node is a valid node with the CAISO.

        Args:
            node (str): PNODE_ID for the specific CAISO NODE.

        Returns:
            bool: True if node is within the CAISO Node list. False if node is not present.
        """

        response = requests.get(
            "?".join(
                [
                    f"{self.base_url}SingleZip",
                    "&".join(
                        [
                            "queryname=ATL_AS_REGION_MAP",
                            f"startdatetime={(datetime.now() - timedelta(days = 1)).strftime('%Y%m%dT%H:%M-0000')}",
                            f"enddatetime={datetime.now().strftime('%Y%m%dT%H:%M-0000')}",
                            "version=1",
                            "resultformat=6",
                            ]
                        )
                ]
            )
        )
        zip_file = ZipFile(BytesIO(response.content))
        return node in pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))["PNODE_ID"].to_list()

    def get_node_regions(self) -> list[str]:
        """
        Gets a list of Ancillary Service regions in which the node participates in.

        Returns:
            list[str]: List of Ancillary Service Regions that the node falls under
        """

        response = requests.get(
            "?".join(
                [
                    f"{self.base_url}SingleZip", 
                    "&".join(
                        [
                            "queryname=ATL_AS_REGION_MAP",
                            f"startdatetime={datetime.now().strftime('%Y%m%dT%H:%M-0000')}",
                            f"enddatetime={(datetime.now()+timedelta(days=1)).strftime('%Y%m%dT%H:%M-0000')}",
                            "version=1",
                            "resultformat=6",
                        ],
                    )
                ]
            )
        )
        zip_file = ZipFile(BytesIO(response.content))
        df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
        return df[df["PNODE_ID"] == self.node]["AS_REGION_ID"].to_list()

    def get_DAM_LMP(self, start_datetime: datetime, end_datetime: datetime) -> pd.DataFrame:
        """
        Get the Day Ahead Market (DAM) Location Marginal Prices (LMP) for the node. Note that the start and end datetimes are in UTC.

        Args:
            start_date (datetime): Start date for the prices in UTC.
            end_date (datetime): End date for the prices in UTC.

        Returns
            pd.DataFrame: A DataFrame which contains the LMP for DAM at the given node for the given time period.
        """

        response = requests.get(
            "?".join(
                [
                    f"{self.base_url}SingleZip",
                    "&".join(
                        [
                            "queryname=PRC_LMP",
                            f"startdatetime={start_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            f"enddatetime={end_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            "version=1",
                            "market_run_id=DAM",
                            f"node={self.node}",
                            "resultformat=6",
                        ]
                    )
                ]
            )
        )
        zip_file = ZipFile(BytesIO(response.content))
        df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
        df_to_return = df[df["LMP_TYPE"] == "LMP"].loc[:, ["INTERVALSTARTTIME_GMT", "MW"]]
        df_to_return["INTERVALSTARTTIME_GMT"] = pd.to_datetime(df_to_return["INTERVALSTARTTIME_GMT"])
        df_to_return = df_to_return.set_index("INTERVALSTARTTIME_GMT").rename(columns = {"MW": "DAM_LMP"})
        return df_to_return.sort_index()

    def get_RTM_LMP(self, start_datetime: datetime, end_datetime: datetime) -> pd.DataFrame:
        """
        Get the Real Time Market (RTM) Location Marginal Prices (LMP) for the node. Note that the start and end datetimes are in UTC.

        Args:
            start_date (datetime): Start date for the prices in UTC.
            end_date (datetime): End date for the prices in UTC.

        Returns
            pd.DataFrame: A DataFrame which contains the LMP for RTM at the given node for the given time period.
        """

        response = requests.get(
            "?".join(
                [
                    f"{self.base_url}SingleZip",
                    "&".join(
                        [
                            "queryname=PRC_INTVL_LMP",
                            f"startdatetime={start_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            f"enddatetime={end_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            "version=1",
                            "market_run_id=RTM",
                            f"node={self.node}",
                            "resultformat=6",
                        ]
                    )
                ]
            )
        )
        zip_file = ZipFile(BytesIO(response.content))
        df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
        df_to_return = df[df["LMP_TYPE"] == "LMP"].loc[:, ["INTERVALSTARTTIME_GMT", "MW"]]
        df_to_return["INTERVALSTARTTIME_GMT"] = pd.to_datetime(df_to_return["INTERVALSTARTTIME_GMT"])
        df_to_return = df_to_return.set_index("INTERVALSTARTTIME_GMT").rename(columns = {"MW": "RTM_LMP"})
        return df_to_return.sort_index()

    def get_CD_RTM_LMP(self, start_datetime: datetime, end_datetime: datetime) -> pd.DataFrame:
        """
        Get the Contingency Dispatch Real Time Market (RTM) Location Marginal Prices (LMP) for the node. Note that the start and end datetimes are in UTC.

        Args:
            start_date (datetime): Start date for the prices in UTC.
            end_date (datetime): End date for the prices in UTC.

        Returns
            pd.DataFrame: A DataFrame which contains the LMP for Contingency Dispatch in thr RTM at the given node for the given time period.

        Raises:
            ValueError if there is no data between the start and end datetimes.
        """

        response = requests.get(
            "?".join(
                [
                    f"{self.base_url}SingleZip",
                    "&".join(
                        [
                            "queryname=PRC_CD_INTVL_LMP",
                            f"startdatetime={start_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            f"enddatetime={end_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            "version=1",
                            "market_run_id=RTM",
                            f"node={self.node}",
                            "resultformat=6",
                        ]
                    )
                ]
            )
        )
        zip_file = ZipFile(BytesIO(response.content))
        df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
        try:
            df_to_return = df[df["LMP_TYPE"] == "LMP"].loc[:, ["INTERVALSTARTTIME_GMT", "MW"]]
            df_to_return["INTERVALSTARTTIME_GMT"] = pd.to_datetime(df_to_return["INTERVALSTARTTIME_GMT"])
            df_to_return = df_to_return.set_index("INTERVALSTARTTIME_GMT").rename(columns = {"MW": "CD_RTM_LMP"})
        except KeyError:
            raise ValueError(f"There is no Contingency Dispatch Data between {start_datetime.strftime('%Y%m%dT%H:%M-0000')} and {end_datetime.strftime('%Y%m%dT%H:%M-0000')}")
        return df_to_return.sort_index()

    def get_HASP_LMP(self, start_datetime: datetime, end_datetime: datetime) -> pd.DataFrame:
        """
        Get the Hour Ahead Scheduling Process (HASP) Location Marginal Prices (LMP) for the node. Note that the start and end datetimes are in UTC.

        Args:
            start_date (datetime): Start date for the prices in UTC.
            end_date (datetime): End date for the prices in UTC.

        Returns
            pd.DataFrame: A DataFrame which contains the LMP for the next 4 15-minute periods as per the HASP at the given node for the given time period.
        """

        response = requests.get(
            "?".join(
                [
                    f"{self.base_url}SingleZip",
                    "&".join(
                        [
                            "queryname=PRC_HASP_LMP",
                            f"startdatetime={start_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            f"enddatetime={end_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            "version=3",
                            "market_run_id=HASP",
                            f"node={self.node}",
                            "resultformat=6",
                        ]
                    )
                ]
            )
        )
        zip_file = ZipFile(BytesIO(response.content))
        df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
        df_to_return = df[df["LMP_TYPE"] == "LMP"].loc[:, ["INTERVALSTARTTIME_GMT", "MW"]]
        df_to_return["INTERVALSTARTTIME_GMT"] = pd.to_datetime(df_to_return["INTERVALSTARTTIME_GMT"])
        df_to_return = df_to_return.set_index("INTERVALSTARTTIME_GMT").rename(columns = {"MW": "HASP_LMP"})
        return df_to_return.sort_index()

    def get_RTPD_LMP(self, start_datetime: datetime, end_datetime: datetime) -> pd.DataFrame:
        """
        Get the Real Time Pre-Dispatch (RTPD) Location Marginal Prices (LMP) for the node. Note that the start and end datetimes are in UTC.

        Args:
            start_date (datetime): Start date for the prices in UTC.
            end_date (datetime): End date for the prices in UTC.

        Returns
            pd.DataFrame: A DataFrame which contains the LMP for RTPD at the given node for the given time period. Each LMP is valid for a 15-minute period.
        """

        response = requests.get(
            "?".join(
                [
                    f"{self.base_url}SingleZip",
                    "&".join(
                        [
                            "queryname=PRC_RTPD_LMP",
                            f"startdatetime={start_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            f"enddatetime={end_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            "version=3",
                            "market_run_id=RTPD",
                            f"node={self.node}",
                            "resultformat=6",
                        ]
                    )
                ]
            )
        )
        zip_file = ZipFile(BytesIO(response.content))
        df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
        df_to_return = df[df["LMP_TYPE"] == "LMP"].loc[:, ["INTERVALSTARTTIME_GMT", "PRC"]]
        df_to_return["INTERVALSTARTTIME_GMT"] = pd.to_datetime(df_to_return["INTERVALSTARTTIME_GMT"])
        df_to_return = df_to_return.set_index("INTERVALSTARTTIME_GMT").rename(columns = {"PRC": "RTPD_LMP"})
        return df_to_return.sort_index()

    def get_AS_LMP(self, start_datetime: datetime, end_datetime: datetime) -> pd.DataFrame:
        """
        Get the Ancillary Service (AS) Location Marginal Prices (LMP) for the various services for the node. Note that the start and end datetimes are in UTC. Note that the CAISO API returns the LMPs for one day prior to the start datetime (Midnight PST/PT reported at UTC)

        Args:
            start_date (datetime): Start date for the prices in UTC.
            end_date (datetime): End date for the prices in UTC.

        Returns
            pd.DataFrame: A DataFrame which contains the LMP for AS at the given node in the DAM in the given time period.
        """

        response = requests.get(
            "?".join(
                [
                    f"{self.base_url}SingleZip",
                    "&".join(
                        [
                            "queryname=PRC_AS",
                            f"startdatetime={start_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            f"enddatetime={end_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            "version=1",
                            "market_run_id=DAM",
                            "anc_type=ALL",
                            "anc_region=ALL",
                            "resultformat=6",
                        ]
                    )
                ]
            )
        )
        zip_file = ZipFile(BytesIO(response.content))
        df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
        df_list = []
        for anc_type in df["ANC_TYPE"].unique():
            series_to_sum = []
            for anc_region in self.AS_REGIONS:
                df_to_sum = df[(df["ANC_TYPE"] == anc_type) & (df["ANC_REGION"] == anc_region)].loc[:,["INTERVALSTARTTIME_GMT", "MW"]]
                df_to_sum["INTERVALSTARTTIME_GMT"] = pd.to_datetime(df_to_sum["INTERVALSTARTTIME_GMT"])
                df_to_sum = df_to_sum.set_index("INTERVALSTARTTIME_GMT").sort_index()["MW"]
                if not df_to_sum.empty:
                    series_to_sum.append(df_to_sum)
            if len(series_to_sum) > 0:
                df_list.append(pd.DataFrame(sum(series_to_sum)).rename(columns= {"MW":f"{anc_type}_MW"}))

        df_to_return = pd.concat(df_list, axis = 1)

        return df_to_return.sort_index()

    def get_AS_RTM_LMP(self, start_datetime: datetime, end_datetime: datetime) -> pd.DataFrame:
        """
        Get the Ancillary Service (AS) Real Time Market (RTM) Location Marginal Prices (LMP) for the various services for the node. Note that the start and end datetimes are in UTC. Further note that this method only yield the immediate 4 LMPs after the start datetime.

        Args:
            start_date (datetime): Start date for the prices in UTC.
            end_date (datetime): End date for the prices in UTC.

        Returns
            pd.DataFrame: A DataFrame which contains the LMP for AS at the given node in the RTM.
        """

        response = requests.get(
            "?".join(
                [
                    f"{self.base_url}SingleZip",
                    "&".join(
                        [
                            "queryname=PRC_INTVL_AS",
                            f"startdatetime={start_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            f"enddatetime={end_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            "version=1",
                            "market_run_id=RTM",
                            "anc_type=ALL",
                            "anc_region=ALL",
                            "resultformat=6",
                        ]
                    )
                ]
            )
        )
        zip_file = ZipFile(BytesIO(response.content))
        df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
        df_list = []
        for anc_type in df["ANC_TYPE"].unique():
            series_to_sum = []
            for anc_region in self.AS_REGIONS:
                df_to_sum = df[(df["ANC_TYPE"] == anc_type) & (df["ANC_REGION"] == anc_region)].loc[:,["INTERVALSTARTTIME_GMT", "MW"]]
                df_to_sum["INTERVALSTARTTIME_GMT"] = pd.to_datetime(df_to_sum["INTERVALSTARTTIME_GMT"])
                df_to_sum = df_to_sum.set_index("INTERVALSTARTTIME_GMT").sort_index()["MW"]
                if not df_to_sum.empty:
                    series_to_sum.append(df_to_sum)
            if len(series_to_sum) > 0:
                df_list.append(pd.DataFrame(sum(series_to_sum)).rename(columns= {"MW":f"{anc_type}_MW"}))

        df_to_return = pd.concat(df_list, axis = 1)

        return df_to_return.sort_index()

    def get_SLD_FCST(self, start_datetime: datetime, end_datetime: datetime, market: str) -> pd.DataFrame:
        """
        Get the Demand Forecast for the node. Note that the start and end datetimes are in UTC.

        Args:
            start_date (datetime): Start date for the prices in UTC.
            end_date (datetime): End date for the prices in UTC.
            market (str): ID for market demand forecast. Allowed values are "DAM", "2DA", "7DA", "RTD".

        Returns
            pd.DataFrame: A DataFrame which contains the Demand Forecast for the given market type at the given node for the given time period.
        """

        warnings.warn("Demand Forecast can only find demand forecast for the CA ISO-TAC area.")

        if market in ["DAM", "2DA", "7DA"]:
            response = requests.get(
                "?".join(
                    [
                        f"{self.base_url}SingleZip",
                        "&".join(
                            [
                                "queryname=SLD_FCST",
                                f"startdatetime={start_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                                f"enddatetime={end_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                                "version=1",
                                f"market_run_id={market}",
                                "resultformat=6",
                            ]
                        )
                    ]
                )
            )
        elif market == "RTM":
            response = requests.get(
                "?".join(
                    [
                        f"{self.base_url}SingleZip",
                        "&".join(
                            [
                                "queryname=SLD_FCST",
                                f"startdatetime={start_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                                f"enddatetime={end_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                                "version=1",
                                f"market_run_id={market}",
                                "resultformat=6",
                                "execution_type=RTD"
                            ]
                        )
                    ]
                )
            )
        else:
            raise ValueError(f"Unknown input for market {market}. It should be one of 'DAM', '2DA, '7DA', or 'RTD'.")

        zip_file = ZipFile(BytesIO(response.content))
        if zip_file.namelist()[0][-3:] == "csv":
            df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
            df_to_return = df[df["TAC_AREA_NAME"] == "CA ISO-TAC"].loc[:, ["INTERVALSTARTTIME_GMT", "MW"]]
            df_to_return["INTERVALSTARTTIME_GMT"] = pd.to_datetime(df_to_return["INTERVALSTARTTIME_GMT"])
            df_to_return = df_to_return.set_index("INTERVALSTARTTIME_GMT").rename(columns = {"MW": f"{market}_MW"})
            return df_to_return.sort_index()
        else:
            raise ValueError(f"There is no valid file between {start_datetime.strftime('%Y-%m-%d %H:%M:00')} and {end_datetime.strftime('%Y-%m-%d %H:%M:00')}.")

    def get_SLD_REN_FCST(self, start_datetime: datetime, end_datetime: datetime, market: str) -> pd.DataFrame:
        """
        Get the Renewable Energy Demand Forecast for the node. Note that the start and end datetimes are in UTC.

        Args:
            start_date (datetime): Start date for the prices in UTC.
            end_date (datetime): End date for the prices in UTC.
            market (str): ID for market demand forecast. Allowed values are "DAM", "RTPD", "RTD".

        Returns
            pd.DataFrame: A DataFrame which contains the System Renewable Energy Forecast for the given market type at the given node for the given time period.
        """

        if market in ["DAM", "RTPD", "RTD"]:
            response = requests.get(
                "?".join(
                    [
                        f"{self.base_url}SingleZip",
                        "&".join(
                            [
                                "queryname=SLD_REN_FCST",
                                f"startdatetime={start_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                                f"enddatetime={end_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                                "version=1",
                                f"market_run_id={market}",
                                "resultformat=6",
                            ]
                        )
                    ]
                )
            )
        else:
            raise ValueError(f"Unknown input for market {market}. It should be one of 'DAM', 'RTPD, or 'RTD'.")

        zip_file = ZipFile(BytesIO(response.content))
        df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
        df_list = []
        for renew_type in df["RENEWABLE_TYPE"].unique():
            for trading_hub in df["TRADING_HUB"].unique():
                df_to_return = df[(df["TRADING_HUB"] == trading_hub) & (df["RENEWABLE_TYPE"] == renew_type)].loc[:, ["INTERVALSTARTTIME_GMT", "MW"]]
                df_to_return["INTERVALSTARTTIME_GMT"] = pd.to_datetime(df_to_return["INTERVALSTARTTIME_GMT"])
                df_to_return = df_to_return.set_index("INTERVALSTARTTIME_GMT").rename(columns = {"MW": f"{renew_type}_{trading_hub}_MW"})
                if not df_to_return.empty:
                    df_list.append(df_to_return)
        df_to_return = df_list[0]
        if len(df_list) > 1:
            for x in df_list[1:]:
                df_to_return = df_to_return.join(x, how="outer")

        return df_to_return.sort_index()

    def get_AS_REQ(self, start_datetime: datetime, end_datetime: datetime, market: str) -> pd.DataFrame:
        """
        Get the Ancillary Service (AS) Requirement for the various services for the node. Note that the start and end datetimes are in UTC.

        Args:
            start_date (datetime): Start date for the prices in UTC.
            end_date (datetime): End date for the prices in UTC.
            market (str): ID for market requrement. Allowed values are "DAM", or "RTM".

        Returns
            pd.DataFrame: A DataFrame which contains the LMP for RTM at the given node for the given time period.
        """

        if market in ["DAM", "RTM"]:

            response = requests.get(
                "?".join(
                    [
                        f"{self.base_url}SingleZip",
                        "&".join(
                            [
                                "queryname=AS_REQ",
                                f"startdatetime={start_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                                f"enddatetime={end_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                                "version=1",
                                f"market_run_id={market}",
                                "anc_type=ALL",
                                "anc_region=ALL",
                                "resultformat=6",
                            ]
                        )
                    ]
                )
            )
        else:
            raise ValueError(f"Unknown input for market {market}. It should be one of 'DAM', or 'RTM'.")

        zip_file = ZipFile(BytesIO(response.content))
        df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
        df_list = []
        for anc_type in df["ANC_TYPE"].unique():
            for xml_data in df["XML_DATA_ITEM"].unique():
                series_to_sum = []
                for anc_region in self.AS_REGIONS:
                    df_to_sum = df[(df["ANC_TYPE"] == anc_type) & (df["ANC_REGION"] == anc_region) & (df["XML_DATA_ITEM"] == xml_data)].loc[:,["INTERVALSTARTTIME_GMT", "MW"]]
                    df_to_sum["INTERVALSTARTTIME_GMT"] = pd.to_datetime(df_to_sum["INTERVALSTARTTIME_GMT"])
                    df_to_sum = df_to_sum.set_index("INTERVALSTARTTIME_GMT").sort_index()["MW"]
                    if not df_to_sum.empty:
                        series_to_sum.append(df_to_sum)
                if len(series_to_sum) > 0:
                    df_list.append(pd.DataFrame(sum(series_to_sum)).rename(columns= {"MW":f"{xml_data}"}))

        df_to_return = pd.concat(df_list, axis = 1)

        return df_to_return.sort_index()

    def get_AS_RESULTS(self, start_datetime: datetime, end_datetime: datetime, market: str) -> pd.DataFrame:
        """
        Get the Ancillary Service (AS) Results for the various services for the node. Note that the start and end datetimes are in UTC.

        Args:
            start_date (datetime): Start date for the prices in UTC.
            end_date (datetime): End date for the prices in UTC.
            market (str): ID for market requrement. Allowed values are "DAM", or "RTM".

        Returns
            pd.DataFrame: A DataFrame which contains the LMP for RTM at the given node for the given time period.
        """

        if market in ["DAM", "RTM"]:
            response = requests.get(
                "?".join(
                    [
                        f"{self.base_url}SingleZip",
                        "&".join(
                            [
                                "queryname=AS_RESULTS",
                                f"startdatetime={start_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                                f"enddatetime={end_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                                "version=1",
                                f"market_run_id={market}",
                                "anc_type=ALL",
                                "anc_region=ALL",
                                "resultformat=6",
                            ]
                        )
                    ]
                )
            )
        else:
            raise ValueError(f"Unknown input for market {market}. It should be one of 'DAM', or 'RTM'.")

        zip_file = ZipFile(BytesIO(response.content))
        df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
        df_list = []
        for xml_data in df["XML_DATA_ITEM"].unique():
            series_to_sum = []
            for anc_region in self.AS_REGIONS:
                df_to_sum = df[(df["ANC_REGION"] == anc_region) & (df["XML_DATA_ITEM"] == xml_data)].loc[:,["INTERVALSTARTTIME_GMT", "MW"]]
                df_to_sum["INTERVALSTARTTIME_GMT"] = pd.to_datetime(df_to_sum["INTERVALSTARTTIME_GMT"])
                df_to_sum = df_to_sum.set_index("INTERVALSTARTTIME_GMT").sort_index()["MW"]
                if not df_to_sum.empty:
                    series_to_sum.append(df_to_sum)
            if len(series_to_sum) > 0:
                df_list.append(pd.DataFrame(sum(series_to_sum)).rename(columns= {"MW":f"{xml_data}"}))

        df_to_return = pd.concat(df_list, axis=1)

        return df_to_return.sort_index()

    def get_AS_MILEAGE_CALC(self, start_datetime: datetime, end_datetime: datetime) -> pd.DataFrame:
        """
        Get the Ancillary Service (AS) Mileage Calculations for the various services with CAISO. Note that the start and end datetimes are in UTC.

        Args:
            start_date (datetime): Start date for the prices in UTC.
            end_date (datetime): End date for the prices in UTC.

        Returns
            pd.DataFrame: A DataFrame which contains the LMP for RTM at the given node for the given time period.
        """

        response = requests.get(
            "?".join(
                [
                    f"{self.base_url}SingleZip",
                    "&".join(
                        [
                            "queryname=AS_MILEAGE_CALC",
                            f"startdatetime={start_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            f"enddatetime={end_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                            "version=1",
                            "anc_type=ALL",
                            "resultformat=6",
                        ]
                    )
                ]
            )
        )
        zip_file = ZipFile(BytesIO(response.content))
        df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
        df_list = []
        for anc_type in df["ANC_TYPE"].unique():
            for xml_data_type in df["XML_DATA_TYPE"].unique():
                df_to_sum = df[(df["ANC_TYPE"] == anc_type) & (df["ANC_REGION"] == "AS_CAISO_EXP") & (df["XML_DATA_TYPE"] == xml_data_type)].loc[:,["INTERVALSTARTTIME_GMT", "MW"]]
                df_to_sum["INTERVALSTARTTIME_GMT"] = pd.to_datetime(df_to_sum["INTERVALSTARTTIME_GMT"])
                df_to_sum = df_to_sum.set_index("INTERVALSTARTTIME_GMT").sort_index()["MW"]
                if not df_to_sum.empty:
                    df_list.append(pd.DataFrame(df_to_sum).rename(columns= {"MW":f"{xml_data_type}"}))

        df_to_return = pd.concat(df_list, axis = 1)

        return df_to_return.sort_index()

    def get_ENE_SLRS(self, start_datetime: datetime, end_datetime: datetime, market: str) -> pd.DataFrame:
        """
        Fetches the balanced load schedule for the specific market within the given time range.

        Args:
            start_date (datetime): Start date for the prices in UTC.
            end_date (datetime): End date for the prices in UTC.
            market (str): ID for market requrement. Allowed values are "DAM", "HASP", "RUC", or "RTM".
        """

        if market in ["DAM", "HASP", "RUC", "RTM"]:
            response = requests.get(
                "?".join(
                    [
                        f"{self.base_url}SingleZip",
                        "&".join(
                            [
                                "queryname=ENE_SLRS",
                                f"startdatetime={start_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                                f"enddatetime={end_datetime.strftime('%Y%m%dT%H:%M-0000')}",
                                "version=1",
                                f"market_run_id={market}",
                                "tac_zone_name=ALL",
                                "schedule=ALL",
                                "resultformat=6",
                            ]
                        )
                    ]
                )
            )
        else:
            raise ValueError(f"Unknown input for market {market}. It should be one of 'DAM', 'HASP', 'RUC', or 'RTM'.")
        
        zip_file = ZipFile(BytesIO(response.content))
        df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
        df["INTERVALSTARTTIME_GMT"] = pd.to_datetime(df["INTERVALSTARTTIME_GMT"])
        df = df.set_index("INTERVALSTARTTIME_GMT").sort_index()
        if market == "DAM":
            df_to_return = df[(df["TAC_ZONE_NAME"] == "Caiso_Totals") & (df["SCHEDULE"] == "Load")].loc[:, ["MW"]]
        else:
            df = df[(df["TAC_ZONE_NAME"] == "Caiso_Totals") & (df["SCHEDULE"] != "Load")].loc[:, ["MW", "SCHEDULE"]]
            df_to_return = -1 * df[df["SCHEDULE"] == "Import"]
            for load in df["SCHEDULE"].unique():
                if load == "Export" or load == "Generation":
                    df_to_return += df[df["SCHEDULE"] == load]
                else:
                    pass
            df_to_return = df_to_return.drop(columns=["SCHEDULE"])
        df_to_return = df_to_return.rename(columns = {"MW":f"{market}_MW"})
        return df_to_return


In [53]:
def read_configurations(config_fp: str) -> Tuple[list, list, str]:
    """
    A function to read the configuration file.

    Args:
        config_fp (str): Filepath to the configurations files.

    Returns:
        serial_numbers [list]: List of serial numbers read from the configurations file.
        nodes [list]: List of nodes read from the configuration file.
        blueprint_name: Blueprint name for the CAISO assets on the platform.
    """
#     config = json.load(open(config_fp, "r"))
#     os.environ["ALTERGO_FACTORY_API"] = config["ALTERGO_FACTORY_API"]
#     os.environ["ALTERGO_IOT_API"] = config["ALTERGO_IOT_API"]
#     os.environ["ALTERGO_API_KEY"] = config["ALTERGO_API_KEY"]
    serial_numbers = [x for x in target_assets]
    nodes = [x for x in node_names]
#     blueprint_name = config["blueprint_name"]
    return serial_numbers, nodes

# Create a client to connect to the platform
def create_session(serial_numbers: list[str], blueprint_name: str = "California Independent System Operator (CAISO) Node"):
    if os.environ.get("ALTERGO_API_KEY") is not None:
        edApi = Client(os.environ.get("ALTERGO_API_KEY"))
    else:
        raise ValueError("Cannot find API Key in the environment.")

    assets = []
    for sn in serial_numbers:
        if edApi.getAsset(sn) is None:
            edApi.createAssetBySerial(blueprint_name, sn)
        assets.append(edApi.getAsset(sn))
    assert not any([True for x in assets if x is None]), "A serial number could not be found on the platform."
    
    return edApi, assets

def run():
    serial_numbers, nodes = read_configurations("config.json")
    edApi, assets = create_session(serial_numbers)

    for i, x in enumerate(nodes):
        nodes[i] = CAISO(x)
    
    while True:

        start_time = time.time()
        
        for node, asset in zip(nodes, assets):
            # Loops to run once an hour
            start_datetime = datetime.now() - timedelta(days = 1)
            end_datetime = datetime.now()
            try:
                # Get DAM LMP
                df = node.get_DAM_LMP(start_datetime, end_datetime)
                # df.index = df.index.tz_convert("America/Los_Angeles")
                df.index = df.index.tz_convert(tz=None)
                asset.df = df
                edApi.updateSensorDataByDirectInsert(asset, asset.df.columns)
            except Exception as e:
                print(e)
                print(f"DAM_LMP gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            try:
                # Get HASP LMP
                df = node.get_HASP_LMP(start_datetime, end_datetime)
                # df.index = df.index.tz_convert("America/Los_Angeles")
                df.index = df.index.tz_convert(tz=None)
                asset.df = df
                edApi.updateSensorDataByDirectInsert(asset, asset.df.columns)
            except Exception as e:
                print(e)
                print(f"HASP_LMP gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            try:
                # Get RTPD LMP
                df = node.get_RTPD_LMP(start_datetime, end_datetime)
                # df.index = df.index.tz_convert("America/Los_Angeles")
                df.index = df.index.tz_convert(tz=None)
                asset.df = df
                edApi.updateSensorDataByDirectInsert(asset, asset.df.columns)
            except Exception as e:
                print(e)
                print(f"RTPD_LMP gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")
            
            try:
                # Get RTM LMP
                df = node.get_RTM_LMP(start_datetime, end_datetime)
                # df.index = df.index.tz_convert("America/Los_Angeles")
                df.index = df.index.tz_convert(tz=None)
                asset.df = df
                edApi.updateSensorDataByDirectInsert(asset, asset.df.columns)
            except Exception as e:
                print(e)
                print(f"RTM_LMP gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            try:
                # Get DAM AS
                df = node.get_AS_LMP(start_datetime, end_datetime)
                # df.index = df.index.tz_convert("America/Los_Angeles")
                array_map = json.loads(getSensorByName(asset.model, "AS_LMP").format)
                df = df[array_map]
                df.loc[:, "AS_LMP"] = [[x for x in row] for _, row in df.iterrows()]
                df = df.drop(columns = array_map)
                df.index = df.index.tz_convert(tz=None)
                asset.df = df
                edApi.updateSensorDataByDirectInsert(asset, asset.df.columns)
            except Exception as e:
                print(e)
                print(f"AS_LMP gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            try:
                # Get RTM AS
                df = node.get_AS_RTM_LMP(start_datetime, end_datetime)
                # df.index = df.index.tz_convert("America/Los_Angeles")
                array_map = json.loads(getSensorByName(asset.model, "AS_RTM_LMP").format)
                df = df[array_map]
                df.loc[:, "AS_RTM_LMP"] = [[x for x in row] for _, row in df.iterrows()]
                df = df.drop(columns = array_map)
                df.index = df.index.tz_convert(tz=None)
                asset.df = df
                edApi.updateSensorDataByDirectInsert(asset, asset.df.columns)
            except Exception as e:
                print(e)
                print(f"AS_RTM_LMP gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            try:
                # Get SLD DAM FCST
                df_list = []
                start_datetime = datetime.now()
                end_datetime = datetime.now() + timedelta(days = 1)
                df = node.get_SLD_FCST(start_datetime, end_datetime, "DAM")
                # df.index = df.index.tz_convert("America/Los_Angeles")
                df_list.append(df)
            except Exception as e:
                print(e)
                print(f"SLD_FCST in DAM gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            try:
                # Get SLD 2DA FCST
                start_datetime = datetime.now()
                end_datetime = datetime.now() + timedelta(days = 2)
                df = node.get_SLD_FCST(start_datetime, end_datetime, "2DA")
                # df.index = df.index.tz_convert("America/Los_Angeles")
                df_list.append(df)
            except Exception as e:
                print(e)
                print(f"SLD_FCST in 2DA gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            try:
                # Get SLD 7DA FCST
                start_datetime = datetime.now()
                end_datetime = datetime.now() + timedelta(days = 7)
                df = node.get_SLD_FCST(start_datetime, end_datetime, "7DA")
                # df.index = df.index.tz_convert("America/Los_Angeles")
                df_list.append(df)
            except Exception as e:
                print(e)
                print(f"SLD_FCST in 7DA gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            try:
                # Get SLD RTM FCST
                start_datetime = datetime.now() - timedelta(days = 1)
                end_datetime = datetime.now()
                df = node.get_SLD_FCST(start_datetime, end_datetime, "RTM")
                # df.index = df.index.tz_convert("America/Los_Angeles")
                df_list.append(df)
            except Exception as e:
                print(e)
                print(f"SLD_FCST in RTM gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            try:
                df = pd.concat(df_list,axis=1)
                fcst_cols = json.loads(getSensorByName(asset.model, "SLD_FCST").format)
                df.loc[:, "SLD_FCST"] = [[x for x in row] for _, row in df[fcst_cols].iterrows()]
                df = df.drop(columns=fcst_cols)
                df.index = df.index.tz_convert(tz=None)
                asset.df = df
                edApi.updateSensorDataByDirectInsert(asset, asset.df.columns)
            except Exception as e:
                print(e)
                print(f"SLD_FCST  while merging gave an exception.")

            try:
                # Get SLD REN FCST
                start_datetime = datetime.now()
                end_datetime = datetime.now() + timedelta(days = 1)
                df = node.get_SLD_REN_FCST(start_datetime, end_datetime, "DAM")
                ren_fcst_cols = json.loads(getSensorByName(asset.model, "DAM_REN_FCST").format)
                if "Wind_ZP26_MW" not in df.columns:
                    df["Wind_ZP26_MW"] = np.NaN
                df.loc[:, "DAM_REN_FCST"] = [[x for x in row] for _, row in df[ren_fcst_cols].iterrows()]
                df = df.drop(columns = ren_fcst_cols)
                # df.index = df.index.tz_convert("America/Los_Angeles")
                df.index = df.index.tz_convert(tz=None)
                asset.df = df
                edApi.updateSensorDataByDirectInsert(asset, asset.df.columns)
            except Exception as e:
                print(e)
                print(f"SLD_REN_FCST in DAM gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            
            try:
                start_datetime = datetime.now() - timedelta(days = 1)
                end_datetime = datetime.now()
                df = node.get_SLD_REN_FCST(start_datetime, end_datetime, "RTPD")
                ren_fcst_cols = json.loads(getSensorByName(asset.model, "RTPD_REN_FCST").format)
                if "Wind_ZP26_MW" not in df.columns:
                    df["Wind_ZP26_MW"] = np.NaN
                df.loc[:, "RTPD_REN_FCST"] = [[x for x in row] for _, row in df[ren_fcst_cols].iterrows()]
                df = df.drop(columns = ren_fcst_cols)
                # df.index = df.index.tz_convert("America/Los_Angeles")
                df.index = df.index.tz_convert(tz=None)
                asset.df = df
                edApi.updateSensorDataByDirectInsert(asset, asset.df.columns)
            except Exception as e:
                print(e)
                print(f"SLF_REN_FCST in RTPD gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            try:
                df = node.get_SLD_REN_FCST(start_datetime, end_datetime, "RTD")
                ren_fcst_cols = json.loads(getSensorByName(asset.model, "RTD_REN_FCST").format)
                if "Wind_ZP26_MW" not in df.columns:
                    df["Wind_ZP26_MW"] = np.NaN
                df.loc[:, "RTD_REN_FCST"] = [[x for x in row] for _, row in df[ren_fcst_cols].iterrows()]
                df = df.drop(columns = ren_fcst_cols)
                # df.index = df.index.tz_convert("America/Los_Angeles")
                df.index = df.index.tz_convert(tz=None)
                asset.df = df
                edApi.updateSensorDataByDirectInsert(asset, asset.df.columns)
            except Exception as e:
                print(e)
                print(f"SLD_REN_FCST in RTD gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            try:
                # Get DAM AS REQ
                start_datetime = datetime.now()
                end_datetime = datetime.now() + timedelta(days = 1)
                df = node.get_AS_REQ(start_datetime, end_datetime, "DAM")
                req_cols = json.loads(getSensorByName(asset.model, "DAM_AS_REQ").format)
                df = df[req_cols]
                df.loc[:,"DAM_AS_REQ"] = [[x for x in row] for _, row in df.iterrows()]
                df = df.drop(columns=req_cols)
                # df.index = df.index.tz_convert("America/Los_Angeles")
                df.index = df.index.tz_convert(tz=None)
                asset.df = df
                edApi.updateSensorDataByDirectInsert(asset, asset.df.columns)
            except Exception as e:
                print(e)
                print(f"AS_REQ in DAM gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            try:
                # Get RTM AS REQ
                start_datetime = datetime.now() - timedelta(days = 1)
                end_datetime = datetime.now()
                df = node.get_AS_REQ(start_datetime, end_datetime, "RTM")
                req_cols = json.loads(getSensorByName(asset.model, "RTM_AS_REQ").format)
                df = df[req_cols]
                df.loc[:, "RTM_AS_REQ"] = [[x for x in row] for _, row in df.iterrows()]
                df = df.drop(columns=req_cols)
                # df.index = df.index.tz_convert("America/Los_Angeles")
                df.index = df.index.tz_convert(tz=None)
                asset.df = df
                edApi.updateSensorDataByDirectInsert(asset, asset.df.columns)
            except Exception as e:
                print(e)
                print(f"AS_REQ in RTM gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            try:
                # Get AS MILEAGE CALC
                start_datetime = datetime.now() - timedelta(days = 1)
                end_datetime = datetime.now()
                df = node.get_AS_MILEAGE_CALC(start_datetime, end_datetime)
                # df.index = df.index.tz_convert("America/Los_Angeles")
                as_mileage_cols = json.loads(getSensorByName(asset.model, "AS_MILEAGE_CALC").format)
                df.loc[:, "AS_MILEAGE_CALC"] = [[x for x in row] for _, row in df[as_mileage_cols].iterrows()]
                df = df.drop(columns=as_mileage_cols)
                df.index = df.index.tz_convert(tz=None)
                asset.df = df
                edApi.updateSensorDataByDirectInsert(asset, asset.df.columns)
            except Exception as e:
                print(e)
                print(f"AS_MILEAGE_CALC gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")

            try:
                df_list = []
                for market in ["DAM", "HASP", "RUC", "RTM"]:
                    df_list.append(node.get_ENE_SLRS(start_datetime, end_datetime, market))
                ene_cols = json.loads(getSensorByName(asset.model, "ENE_SLRS").format)
                df = pd.concat(df_list, axis=1)[ene_cols]
                df.loc[:, "ENE_SLRS"] = [[x for x in row] for _, row in df.iterrows()]
                df.index = df.index.tz_convert(tz=None)
                df = df.drop(columns = ene_cols)
                asset.df = df
                edApi.updateSensorDataByDirectInsert(asset, asset.df.columns)
            except Exception as e:
                print(e)
                print(f"ENE_SLRS gave an exception for the time range {start_datetime.strftime('%y-%m-%d %H:%M:%S')} to {end_datetime.strftime('%y-%m-%d %H:%M:%S')}.")


        time.sleep(300 - (time.time() - start_time))


In [54]:
run()


Preparing payload
Payload sent successfully!


Preparing payload
Payload sent successfully!


Preparing payload
Payload sent successfully!


Preparing payload
Payload sent successfully!


Preparing payload
Payload sent successfully!


Preparing payload
Payload sent successfully!





KeyboardInterrupt: 