This notebook explores both model index and opc ua scripts and contain examples of all the functions to make request to model index api and opc ua api servers.  

### Import Libraries

In [None]:
# Import the required packeages
import pandas as pd
import os
import json
import datetime
import concurrent.futures
from dotenv import load_dotenv
from pathlib import Path
from dateutil.relativedelta import relativedelta

### Import Scripts

In [None]:
# Import model index functions
from pyprediktormapclient.model_index import ModelIndex

# Import OPC UA functions
from pyprediktormapclient.opc_ua import OPC_UA

# Import Analytics Helper
from pyprediktormapclient.analytics_helper import AnalyticsHelper

# Import "Dataframer" Tools
from pyprediktormapclient.shared import *

# import AUTH_CLIENT
from pyprediktormapclient.auth_client import AUTH_CLIENT

In [None]:
# Consider obtaining the envrionment variables from .env file if you are running this locally from source.
dotenv_path = Path(".env")
load_dotenv(dotenv_path=dotenv_path)

username = os.environ["USERNAME"]
password = os.environ["PASSWORD"]
opcua_rest_url = os.environ["OPC_UA_REST_URL"]
opcua_server_url = os.environ["OPC_UA_SERVER_URL"]
model_index_url = os.environ["MODEL_INDEX_URL"]
ory_url = os.environ["ORY_URL"]


In [None]:
# Getting ory bearer token
auth_client = AUTH_CLIENT(rest_url=ory_url, username=username, password=password)
auth_client.request_new_ory_token()

In [None]:
# Connecting to ModelIndex APIs 
model = ModelIndex(url=model_index_url, auth_client=auth_client, session=auth_client.session)

# Listed sites on the model index api server
namespaces = model.get_namespace_array()
# Types of Objects
object_types_json = model.get_object_types()
object_types = AnalyticsHelper(object_types_json)
namespace_list = object_types.namespaces_as_list(namespaces)

# Initate the OPC UA API with a fixed namespace list
opc_data = OPC_UA(rest_url=opcua_rest_url, opcua_url=opcua_server_url, namespaces=namespace_list, auth_client=auth_client)

### Download data from modelindex api

In [None]:
# Unique types of Objects
object_types_unique = object_types.dataframe[["Id", "Name"]].drop_duplicates()
object_types_unique

In [None]:
# To get the objects of a type
sites_json = model.get_objects_of_type("SiteType")

# Send the returned JSON into a normalizer to get Id, Type, Name, Props and Vars as columns
sites = AnalyticsHelper(sites_json)
sites.list_of_names()

In [None]:
# Analytics helper
sites.variables_as_dataframe()

In [None]:
sites.list_of_ids()

In [None]:
# Selecting the second site
first_site_id = sites.list_of_ids()[0]
# first_site_id = '14:1:BE.DK-ADU'
first_site_id

In [None]:
# Get all stringsets for one park
string_sets_for_first_park_as_json = model.get_object_descendants(
    "StringSetType", [first_site_id], "PV_Assets"
)
string_sets = AnalyticsHelper(string_sets_for_first_park_as_json)
string_sets.dataframe

### Query Parameters

In [None]:
variable_list =string_sets.variables_as_list(["DCPower"])
start_time = datetime.datetime(2023, 11, 13, 00, 00)
end_time = datetime.datetime(2023, 11, 13, 23, 59)
pro_interval=60*1000
agg_name="Average"

### Batching with Async Refactoring

In [None]:
import logging
import asyncio
import aiohttp
from aiohttp import ClientSession
from asyncio import Semaphore
from datetime import timedelta
from typing import Dict, List, Tuple

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

async def generate_time_batches(start_time: datetime, end_time: datetime, pro_interval: int, max_data_points: int) -> List[tuple]:
    """Generate time batches based on start time, end time, processing interval, and batch size"""

    total_time_range_ms = (end_time - start_time).total_seconds() * 1000
    estimated_intervals = total_time_range_ms / pro_interval
    
    max_variables_per_batch = max(1, int(max_data_points / estimated_intervals))
    max_time_batches = max(1, int(estimated_intervals / max_data_points))

    time_batch_size_ms = total_time_range_ms / max_time_batches

    return total_time_range_ms, max_variables_per_batch, time_batch_size_ms, max_time_batches

def generate_variable_batches(start_time, end_time, pro_interval, variable_list: List[Dict[str, str]], max_data_points) -> List:
    """Generate variable batches based on the variable list and batch size"""

    extended_variables = [{"NodeId": var, "AggregateName": agg_name} for var in variable_list]
    max_variables_per_batch = generate_time_batches(start_time, end_time, pro_interval, max_data_points)[1]

    variable_batches = [
        extended_variables[i:i + max_variables_per_batch] for i in range(0, len(extended_variables), max_variables_per_batch)
    ]

    return variable_batches

def _prepare_body(
    start_time: datetime,
    end_time: datetime,
    pro_interval: int,
    variable_list: List[Dict[str, str]], 
    agg_name: str,
    ) -> Dict:
    """
    Prepare the request body for the API call.
    """
    total_time_range_ms, max_variables_per_batch, time_batch_size_ms, max_time_batches = generate_time_batches(
        start_time, end_time, pro_interval, 10000)

    for time_batch in range(max_time_batches):
        batch_start_ms = time_batch * time_batch_size_ms
        batch_end_ms = min((time_batch + 1) * time_batch_size_ms, total_time_range_ms)
        batch_start = start_time + timedelta(milliseconds=batch_start_ms)
        batch_end = start_time + timedelta(milliseconds=batch_end_ms)

    variable_batches = generate_variable_batches(variable_list)

    for variables in variable_batches:
        body = {
            **opc_data.body,
            "StartTime": batch_start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
            "EndTime": batch_end.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
            "ProcessingInterval": pro_interval,
            "ReadValueIds": variables,
            "AggregateName": agg_name
        }
        return body
    
def process_batch(content: dict) -> pd.DataFrame:
    """ Process individual batch of data """
    
    df_list = []
    for item in content["HistoryReadResults"]:
        df = pd.json_normalize(item["DataValues"])
        for key, value in item["NodeId"].items():
            df[f"HistoryReadResults.NodeId.{key}"] = value
        df_list.append(df)
    
    if df_list:
        df_result = pd.concat(df_list)
        df_result.reset_index(inplace=True, drop=True)
        return df_result
    else:
        return pd.DataFrame()
    
async def make_async_api_request(opc_data, start_time:datetime, end_time:datetime,
        pro_interval: int, variable_list: List[Dict[str, str]], agg_name: str,
        semaphore, max_retries: int = 3, retry_delay: int = 5) -> dict:
    
    """Make API request for the given time range and variable list"""

    async with semaphore:
        body = _prepare_body(
            start_time, 
            end_time, 
            pro_interval, 
            variable_list,
            agg_name
        )
        for attempt in range(max_retries):
            try:
                async with ClientSession() as session:
                    async with session.post(
                        f"{opcua_rest_url}values/historicalaggregated",
                        json=body,
                        headers=opc_data.headers
                    ) as response:
                        response.raise_for_status()
                        content = await response.json()
                        break
            except aiohttp.ClientError as e:
                if attempt < max_retries - 1:
                    wait_time = retry_delay * (2 ** attempt)
                    logger.warning(f"Request failed. Retrying in {wait_time} seconds...")
                    await asyncio.sleep(wait_time)
                else:
                    logger.error(f"Max retries reached. Error: {e}")
                    raise RuntimeError(f'Error message {e}')

        opc_data._check_content(content)

        df_result = process_batch(content)
        return df_result
    
async def process_api_response(opc_data, start_time:datetime, end_time:datetime,
        pro_interval: int, variable_list: List[Dict[str, str]], agg_name: str,
        max_concurrent_requests: int = 10) -> pd.DataFrame:
        """ Process API response asynchronously and return the result dataframe """
        all_results = []
        semaphore = Semaphore(max_concurrent_requests)

        tasks = [
            make_async_api_request(opc_data, start_time, end_time, pro_interval, variable_list, agg_name, semaphore)
            ]
        results = await asyncio.gather(*tasks)
        all_results.extend(results)
        
        if all_results:
            combined_df = pd.concat(all_results, ignore_index=True)
            combined_df.reset_index(inplace=True, drop=True)
            columns = {
                "Value.Type": "ValueType",
                "Value.Body": "Value",
                "StatusCode.Symbol": "StatusSymbol",
                "StatusCode.Code": "StatusCode",
                "SourceTimestamp": "Timestamp",
                "HistoryReadResults.NodeId.IdType": "IdType",
                "HistoryReadResults.NodeId.Id": "Id",
                "HistoryReadResults.NodeId.Namespace": "Namespace",
            }
            return opc_data._process_df(combined_df, columns)
        else:
            return pd.DataFrame()
        
async def get_historical_aggregated_values_async(
    opc_data,
    start_time: datetime,
    end_time: datetime,
    pro_interval: int,
    variable_list: List[Dict[str, str]],
    agg_name: str,
) -> pd.DataFrame:
    """Request historical aggregated values from the OPC UA server with batching"""

    
    result_df = await process_api_response(opc_data, start_time, end_time, pro_interval, variable_list, agg_name)

    return result_df

In [None]:
# 1 day aggregated historical inverter data in asyncio process
one_days_historic_inverter_data2 = await get_historical_aggregated_values_batch_time_vars_async(
    start_time=start_time,
    end_time=end_time,
    pro_interval=60*1000,
    agg_name="Average",
    variable_list=string_sets.variables_as_list(["DCPower"])
)
one_days_historic_inverter_data2

### Batching with Async

In [None]:
import logging
import asyncio
import aiohttp
from aiohttp import ClientSession
from asyncio import Semaphore
from datetime import timedelta

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

In [None]:
async def get_historical_aggregated_values_batch_time_vars_async(
    self, 
    start_time: datetime, 
    end_time: datetime, 
    pro_interval: int, 
    agg_name: str, 
    variable_list: list, 
    max_data_points: int = 10000, 
    max_retries: int = 3, 
    retry_delay: int = 5, 
    max_concurrent_requests: int = 10
) -> pd.DataFrame:
    
    """Request historical aggregated values from the OPC UA server with batching"""

    total_time_range_ms = (end_time - start_time).total_seconds() * 1000
    estimated_intervals = total_time_range_ms / pro_interval

    max_variables_per_batch = max(1, int(max_data_points / estimated_intervals))
    max_time_batches = max(1, int(estimated_intervals / max_data_points))
    time_batch_size_ms = total_time_range_ms / max_time_batches

    extended_variables = [{"NodeId": var, "AggregateName": agg_name} for var in variable_list]
    variable_batches = [extended_variables[i:i + max_variables_per_batch] for i in range(0, len(extended_variables), max_variables_per_batch)]

    all_results = []
    semaphore = Semaphore(max_concurrent_requests)

    async def process_batch(variables, time_batch):
        async with semaphore:
            batch_start_ms = time_batch * time_batch_size_ms
            batch_end_ms = min((time_batch + 1) * time_batch_size_ms, total_time_range_ms)
            batch_start = start_time + timedelta(milliseconds=batch_start_ms)
            batch_end = start_time + timedelta(milliseconds=batch_end_ms)

            body = {
                **self.body,
                "StartTime": batch_start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "EndTime": batch_end.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "ProcessingInterval": pro_interval,
                "ReadValueIds": variables,
                "AggregateName": agg_name
            }

            for attempt in range(max_retries):
                try:
                    async with ClientSession() as session:
                        async with session.post(
                            f"{self.rest_url}values/historicalaggregated",
                            json=body,
                            headers=self.headers
                        ) as response:
                            response.raise_for_status()
                            content = await response.json()
                            break
                except aiohttp.ClientError as e:
                    if attempt < max_retries - 1:
                        wait_time = retry_delay * (2 ** attempt)
                        logger.warning(f"Request failed. Retrying in {wait_time} seconds...")
                        await asyncio.sleep(wait_time)
                    else:
                        logger.error(f"Max retries reached. Error: {e}")
                        raise RuntimeError(f'Error message {e}')

            self._check_content(content)

            df_list = []
            for item in content["HistoryReadResults"]:
                df = pd.json_normalize(item["DataValues"])
                for key, value in item["NodeId"].items():
                    df[f"HistoryReadResults.NodeId.{key}"] = value
                df_list.append(df)
            
            if df_list:
                df_result = pd.concat(df_list)
                df_result.reset_index(inplace=True, drop=True)
                return df_result

    tasks = [
        process_batch(variables, time_batch)
        for variables in variable_batches
        for time_batch in range(max_time_batches)
    ]

    results = await asyncio.gather(*tasks)
    all_results.extend(results)

    logger.info("Combining all batches...")
    combined_df = pd.concat(all_results, ignore_index=True)
    columns = {
    "Value.Type": "ValueType",
    "Value.Body": "Value",
    "StatusCode.Symbol": "StatusSymbol",
    "StatusCode.Code": "StatusCode",
    "SourceTimestamp": "Timestamp",
    "HistoryReadResults.NodeId.IdType": "IdType",
    "HistoryReadResults.NodeId.Id": "Id",
    "HistoryReadResults.NodeId.Namespace": "Namespace",
        }
    return self._process_df(combined_df, columns)

In [None]:
# 1 day aggregated historical data
one_day_historical_data = await get_historical_aggregated_values_batch_time_vars_async(
    opc_data,
    start_time=start_time,
    end_time=end_time,
    pro_interval=pro_interval,
    agg_name=agg_name,
    variable_list=variable_list,
    max_data_points=10000,
    max_concurrent_requests=35
)
one_day_historical_data

### Batching with Async for Raw Historical Data

In [None]:
from typing import Dict, List, Any, Union, Optional

In [None]:
async def get_raw_historical_values_batch_time_vars_async(
    self, 
    start_time: datetime, 
    end_time: datetime, 
    variable_list: list, 
    limit_start_index: Union[int, None] = None, 
    limit_num_records: Union[int, None] = None,
    max_data_points: int = 10000, 
    max_retries: int = 3, 
    retry_delay: int = 5, 
    max_concurrent_requests: int = 10
) -> pd.DataFrame:
    
    """Request historical aggregated values from the OPC UA server with batching"""

    total_time_range_ms = (end_time - start_time).total_seconds() * 1000
    estimated_intervals = total_time_range_ms / max_data_points

    max_variables_per_batch = max(1, int(max_data_points / estimated_intervals))
    max_time_batches = max(1, int(estimated_intervals / max_data_points))
    time_batch_size_ms = total_time_range_ms / max_time_batches

    extended_variables = [{"NodeId": var} for var in variable_list]
    variable_batches = [extended_variables[i:i + max_variables_per_batch] for i in range(0, len(extended_variables), max_variables_per_batch)]

    all_results = []
    semaphore = Semaphore(max_concurrent_requests)

    async def process_batch(variables, time_batch):
        async with semaphore:
            batch_start_ms = time_batch * time_batch_size_ms
            batch_end_ms = min((time_batch + 1) * time_batch_size_ms, total_time_range_ms)
            batch_start = start_time + timedelta(milliseconds=batch_start_ms)
            batch_end = start_time + timedelta(milliseconds=batch_end_ms)

            body = {
                **self.body,
                "StartTime": batch_start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "EndTime": batch_end.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "ReadValueIds": variables,
            }
            
            if limit_start_index is not None and limit_num_records is not None:
                body["Limit"] = {"StartIndex": limit_start_index, "NumRecords": limit_num_records}

            for attempt in range(max_retries):
                try:
                    async with ClientSession() as session:
                        async with session.post(
                            f"{self.rest_url}values/historical",
                            json=body,
                            headers=self.headers
                        ) as response:
                            response.raise_for_status()
                            content = await response.json()
                            break
                except aiohttp.ClientError as e:
                    if attempt < max_retries - 1:
                        wait_time = retry_delay * (2 ** attempt)
                        logger.warning(f"Request failed. Retrying in {wait_time} seconds...")
                        await asyncio.sleep(wait_time)
                    else:
                        logger.error(f"Max retries reached. Error: {e}")
                        raise RuntimeError(f'Error message {e}')

            self._check_content(content)

            df_list = []
            for item in content["HistoryReadResults"]:
                df = pd.json_normalize(item["DataValues"])
                for key, value in item["NodeId"].items():
                    df[f"HistoryReadResults.NodeId.{key}"] = value
                df_list.append(df)
            
            if df_list:
                df_result = pd.concat(df_list)
                df_result.reset_index(inplace=True, drop=True)
                return df_result

    tasks = [
        process_batch(variables, time_batch)
        for variables in variable_batches
        for time_batch in range(max_time_batches)
    ]

    results = await asyncio.gather(*tasks)
    all_results.extend(results)

    logger.info("Combining all batches...")
    combined_df = pd.concat(all_results, ignore_index=True)
    columns = {
    "Value.Type": "ValueType",
    "Value.Body": "Value",
    "SourceTimestamp": "Timestamp",
    "HistoryReadResults.NodeId.IdType": "IdType",
    "HistoryReadResults.NodeId.Id": "Id",
    "HistoryReadResults.NodeId.Namespace": "Namespace",
        }
    return self._process_df(combined_df, columns)

In [None]:
# 1 day raw historical data
one_day_raw_historical_data = await get_raw_historical_values_batch_time_vars_async(
    opc_data,
    start_time=start_time,
    end_time=end_time,
    variable_list=variable_list,
    max_data_points=10000,
    max_concurrent_requests=35
)
one_day_raw_historical_data

### Async with ClientPool

In [None]:
import asyncio
import aiohttp
from aiohttp import ClientSession
from asyncio import Semaphore
from typing import List, Dict, Any
from datetime import datetime, timedelta
import pandas as pd
import logging
from pydantic import AnyUrl, ValidationError

In [None]:
class ClientPool:
    def __init__(self, num_clients: int, rest_url: str, headers: Dict[str, str]):
        self.clients = asyncio.Queue()
        for _ in range(num_clients):
            self.clients.put_nowait(aiohttp.ClientSession(base_url=rest_url, headers=headers))
        self.num_clients = num_clients

    async def get_client(self):
        client = await self.clients.get()
        return client

    async def release_client(self, client):
        await self.clients.put(client)

    async def close_all(self):
        while not self.clients.empty():
            client = await self.clients.get()
            await client.close()

In [None]:
async def request_from_api_async(
    client_pool: ClientPool,
    method: str,
    endpoint: str,
    data: str = None,
    params: Dict[str, Any] = None,
    extended_timeout: bool = False,
) -> Dict[str, Any]:
    timeout = aiohttp.ClientTimeout(total=300 if extended_timeout else 30)
    client = await client_pool.get_client()
    
    try:
        if method == "GET":
            async with client.get(endpoint, params=params, timeout=timeout) as response:
                response.raise_for_status()
                if 'application/json' in response.headers.get('Content-Type', ''):
                    return await response.json()
                else:
                    return {"error": "Non-JSON response", "content": await response.text()}
        elif method == "POST":
            async with client.post(endpoint, data=data, params=params, timeout=timeout) as response:
                response.raise_for_status()
                if 'application/json' in response.headers.get('Content-Type', ''):
                    return await response.json()
                else:
                    return {"error": "Non-JSON response", "content": await response.text()}
        else:
            raise ValidationError("Unsupported method")
    finally:
        await client_pool.release_client(client)

In [None]:
async def get_historical_aggregated_values_batch_time_vars_async(
    self,
    start_time: datetime,
    end_time: datetime,
    pro_interval: int,
    agg_name: str,
    variable_list: List[str],
    max_data_points: int = 100000,
    max_retries: int = 3,
    retry_delay: int = 5,
    max_concurrent_requests: int = 55
) -> pd.DataFrame:
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)

    extended_variables = [{"NodeId": var, "AggregateName": agg_name} for var in variable_list]
    total_time_range_ms = (end_time - start_time).total_seconds() * 1000
    estimated_intervals = total_time_range_ms / pro_interval
    max_variables_per_batch = max(1, int(max_data_points / estimated_intervals))
    variable_batches = [extended_variables[i:i + max_variables_per_batch] for i in range(0, len(extended_variables), max_variables_per_batch)]
    max_time_batches = max(1, int(estimated_intervals / max_data_points))
    time_batch_size_ms = total_time_range_ms / max_time_batches

    all_results = []
    semaphore = Semaphore(max_concurrent_requests)
    client_pool = ClientPool(max_concurrent_requests, self.rest_url, self.headers)

    async def process_batch(variables, time_batch):
        async with semaphore:
            batch_start_ms = time_batch * time_batch_size_ms
            batch_end_ms = min((time_batch + 1) * time_batch_size_ms, total_time_range_ms)
            batch_start = start_time + timedelta(milliseconds=batch_start_ms)
            batch_end = start_time + timedelta(milliseconds=batch_end_ms)

            body = {
                **self.body,
                "StartTime": batch_start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "EndTime": batch_end.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "ProcessingInterval": pro_interval,
                "ReadValueIds": variables,
                "AggregateName": agg_name
            }

            for attempt in range(max_retries):
                try:
                    content = await request_from_api_async(
                        client_pool,
                        method="POST",
                        endpoint=f"/values/historicalaggregated",
                        data=json.dumps(body, default=self.json_serial),
                        extended_timeout=True
                    )
                    break
                except (aiohttp.ClientError, ValidationError) as e:
                    if attempt < max_retries - 1:
                        wait_time = retry_delay * (2 ** attempt)
                        logger.warning(f"Request failed. Retrying in {wait_time} seconds...")
                        await asyncio.sleep(wait_time)
                    else:
                        logger.error(f"Max retries reached. Error: {e}")
                        raise RuntimeError(f'Error message {e}')

            self._check_content(content)

            df_list = []
            for item in content["HistoryReadResults"]:
                df = pd.json_normalize(item["DataValues"])
                for key, value in item["NodeId"].items():
                    df[f"HistoryReadResults.NodeId.{key}"] = value
                df_list.append(df)
            
            if df_list:
                df_result = pd.concat(df_list)
                df_result.reset_index(inplace=True, drop=True)
                return df_result

    tasks = [
        process_batch(variables, time_batch)
        for variables in variable_batches
        for time_batch in range(max_time_batches)
    ]

    try:
        results = await asyncio.gather(*tasks)
        all_results.extend(results)

        logger.info("Combining all batches...")
        combined_df = pd.concat(all_results, ignore_index=True)
        columns = {
            "Value.Type": "ValueType",
            "Value.Body": "Value",
            "StatusCode.Symbol": "StatusSymbol",
            "StatusCode.Code": "StatusCode",
            "SourceTimestamp": "Timestamp",
            "HistoryReadResults.NodeId.IdType": "IdType",
            "HistoryReadResults.NodeId.Id": "Id",
            "HistoryReadResults.NodeId.Namespace": "Namespace",
        }
        return self._process_df(combined_df, columns)
    finally:
        await client_pool.close_all()

In [None]:
import datetime

In [None]:
# 1 day aggregated historical data
one_day_historical_data = await get_historical_aggregated_values_batch_time_vars_async(
    opc_data,
    start_time=(datetime.datetime.now() - datetime.timedelta(30)),
    end_time=(datetime.datetime.now() - datetime.timedelta(29)),
    pro_interval=60*1000,
    agg_name="Average",
    variable_list=string_sets.variables_as_list(["DCPower"]),
    max_data_points=10000,
    max_concurrent_requests=100
)
one_day_historical_data

### Async with Data Handler

In [None]:
import asyncio
import aiohttp
import pandas as pd
import sqlite3
import tempfile
import os
import json
from asyncio import Semaphore
from typing import List, Dict, Any
from datetime import datetime, timedelta
import logging
import pyarrow as pa
import pyarrow.parquet as pq

In [None]:
class DataHandler:
    def __init__(self, max_memory_rows=10000):
        self.max_memory_rows = max_memory_rows
        self.temp_dir = tempfile.mkdtemp()
        self.db_path = os.path.join(self.temp_dir, 'temp_data.db')
        self.conn = sqlite3.connect(self.db_path)
        self.conn.execute('''CREATE TABLE IF NOT EXISTS temp_data
                             (id INTEGER PRIMARY KEY AUTOINCREMENT,
                              batch_id TEXT,
                              data TEXT)''')

    async def save_data(self, batch_id: str, data: pd.DataFrame):
        if len(data) <= self.max_memory_rows:
            # Store small datasets directly in SQLite
            self.conn.execute("INSERT INTO temp_data (batch_id, data) VALUES (?, ?)",
                              (batch_id, data.to_json()))
        else:
            # Stream larger datasets to Parquet file
            file_path = os.path.join(self.temp_dir, f"batch_{batch_id}.parquet")
            table = pa.Table.from_pandas(data)
            pq.write_table(table, file_path)
            
            # Store file path in SQLite
            self.conn.execute("INSERT INTO temp_data (batch_id, data) VALUES (?, ?)",
                              (batch_id, file_path))
        self.conn.commit()

    async def get_data(self, batch_id: str) -> pd.DataFrame:
        cursor = self.conn.execute("SELECT data FROM temp_data WHERE batch_id = ?", (batch_id,))
        result = cursor.fetchone()
        if result:
            data = result[0]
            if data.startswith('{'):  # JSON data
                return pd.read_json(data)
            else:  # File path
                return pd.read_parquet(data)
        return None

    def cleanup(self):
        self.conn.close()
        for file in os.listdir(self.temp_dir):
            os.remove(os.path.join(self.temp_dir, file))
        os.rmdir(self.temp_dir)

In [None]:
async def get_historical_aggregated_values_batch_time_vars_data_async(
    self,
    start_time: datetime,
    end_time: datetime,
    pro_interval: int,
    agg_name: str,
    variable_list: List[str],
    max_data_points: int = 1000,
    max_retries: int = 3,
    retry_delay: int = 5,
    max_concurrent_requests: int = 10
) -> pd.DataFrame:
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)

    extended_variables = [{"NodeId": var, "AggregateName": agg_name} for var in variable_list]
    total_time_range_ms = (end_time - start_time).total_seconds() * 1000
    estimated_intervals = total_time_range_ms / pro_interval
    max_variables_per_batch = max(1, int(max_data_points / estimated_intervals))
    variable_batches = [extended_variables[i:i + max_variables_per_batch] for i in range(0, len(extended_variables), max_variables_per_batch)]
    max_time_batches = max(1, int(estimated_intervals / max_data_points))
    time_batch_size_ms = total_time_range_ms / max_time_batches

    all_results = []
    semaphore = Semaphore(max_concurrent_requests)
    client_pool = ClientPool(max_concurrent_requests, self.rest_url, self.headers)
    data_handler = DataHandler()

    async def process_batch(vid, variables, time_batch):
        async with semaphore:
            batch_start_ms = time_batch * time_batch_size_ms
            batch_end_ms = min((time_batch + 1) * time_batch_size_ms, total_time_range_ms)
            batch_start = start_time + timedelta(milliseconds=batch_start_ms)
            batch_end = start_time + timedelta(milliseconds=batch_end_ms)

            body = {
                **self.body,
                "StartTime": batch_start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "EndTime": batch_end.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "ProcessingInterval": pro_interval,
                "ReadValueIds": variables,
                "AggregateName": agg_name
            }

            for attempt in range(max_retries):
                try:
                    content = await request_from_api_async(
                        client_pool,
                        method="POST",
                        endpoint=f"/values/historicalaggregated",
                        data=json.dumps(body, default=self.json_serial),
                        extended_timeout=True
                    )
                    break
                except (aiohttp.ClientError, ValidationError) as e:
                    if attempt < max_retries - 1:
                        wait_time = retry_delay * (2 ** attempt)
                        logger.warning(f"Request failed. Retrying in {wait_time} seconds...")
                        await asyncio.sleep(wait_time)
                    else:
                        logger.error(f"Max retries reached. Error: {e}")
                        raise RuntimeError(f'Error message {e}')

            self._check_content(content)

            df_result = pd.json_normalize(
                content, 
                record_path=['HistoryReadResults', 'DataValues'], 
                meta=[['HistoryReadResults', 'NodeId', 'IdType'], 
                    ['HistoryReadResults', 'NodeId','Id'],
                    ['HistoryReadResults', 'NodeId','Namespace']]
            )
            batch_id = f"{time_batch}_{vid}"
            await data_handler.save_data(batch_id, df_result)
            return batch_id

    tasks = [
        process_batch(vid,variables, time_batch)
        for vid,variables in enumerate(variable_batches)
        for time_batch in range(max_time_batches)
    ]

    try:
        batch_ids = await asyncio.gather(*tasks)
        # for batch_id in batch_ids:
        #     df = await data_handler.get_data(batch_id)
        #     all_results.append(df)

        # logger.info("Combining all batches...")
        # combined_df = pd.concat(all_results, ignore_index=True)
        # columns = {
        #     "Value.Type": "ValueType",
        #     "Value.Body": "Value",
        #     "StatusCode.Symbol": "StatusSymbol",
        #     "StatusCode.Code": "StatusCode",
        #     "SourceTimestamp": "Timestamp",
        #     "HistoryReadResults.NodeId.IdType": "IdType",
        #     "HistoryReadResults.NodeId.Id": "Id",
        #     "HistoryReadResults.NodeId.Namespace": "Namespace",
        # }
        # return self._process_df(combined_df, columns)
    finally:
        await client_pool.close_all()
        data_handler.cleanup()

In [None]:
# 1 day aggregated historical data
one_day_historical_data = await get_historical_aggregated_values_batch_time_vars_data_async(
    opc_data,
    start_time=start_time,
    end_time=end_time,
    pro_interval=pro_interval,
    agg_name=agg_name,
    variable_list=variable_list,
    max_data_points=20000,
    max_concurrent_requests=50
)
one_day_historical_data

### Async with parquet data handler for large data

In [None]:
import asyncio
import aiohttp
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime, timedelta
import json
from typing import List, Dict, Any
import logging
from asyncio import Semaphore
from aiohttp import TCPConnector
from tenacity import retry, stop_after_attempt, wait_exponential
from concurrent.futures import ThreadPoolExecutor

import tracemalloc
tracemalloc.start()

logger = logging.getLogger(__name__)

In [None]:
class AsyncParquetWriter:
    def __init__(self, filename):
        self.filename = filename
        self.writer = None
        self.executor = ThreadPoolExecutor(max_workers=10)

    async def write(self, df):
        loop = asyncio.get_running_loop()
        table = pa.Table.from_pandas(df)
        if self.writer is None:
            self.writer = pq.ParquetWriter(self.filename, table.schema)
        await loop.run_in_executor(self.executor, self.writer.write_table, table)

    async def close(self):
        if self.writer:
            loop = asyncio.get_running_loop()
            await loop.run_in_executor(self.executor, self.writer.close)
            self.writer = None

class DataHandler:
    def __init__(self, base_path):
        self.base_path = base_path
        self.writers = {}

    async def save_data(self, batch_id: str, data: pd.DataFrame):
        if batch_id not in self.writers:
            self.writers[batch_id] = AsyncParquetWriter(f"{self.base_path}/batch_{batch_id}.parquet")
        await self.writers[batch_id].write(data)

    async def close_all(self):
        for writer in self.writers.values():
            await writer.close()

In [None]:
async def get_historical_aggregated_values_batch_time_vars_data_async_parquet(
    self,
    start_time: datetime,
    end_time: datetime,
    pro_interval: int,
    agg_name: str,
    variable_list: List[str],
    max_data_points: int = 100000,
    max_retries: int = 3,
    retry_delay: int = 5,
    max_concurrent_requests: int = 50
) -> pd.DataFrame:
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)

    extended_variables = [{"NodeId": var, "AggregateName": agg_name} for var in variable_list]
    total_time_range_ms = (end_time - start_time).total_seconds() * 1000
    estimated_intervals = total_time_range_ms / pro_interval
    max_variables_per_batch = max(1, int(max_data_points / estimated_intervals))
    variable_batches = [extended_variables[i:i + max_variables_per_batch] for i in range(0, len(extended_variables), max_variables_per_batch)]
    max_time_batches = max(1, int(estimated_intervals / max_data_points))
    time_batch_size_ms = total_time_range_ms / max_time_batches

    all_results = []
    semaphore = Semaphore(max_concurrent_requests)
    client_pool = ClientPool(max_concurrent_requests, self.rest_url, self.headers)
    data_handler = DataHandler(base_path="pqfiles")

    async def process_batch(vid, variables, time_batch):
        async with semaphore:
            batch_start_ms = time_batch * time_batch_size_ms
            batch_end_ms = min((time_batch + 1) * time_batch_size_ms, total_time_range_ms)
            batch_start = start_time + timedelta(milliseconds=batch_start_ms)
            batch_end = start_time + timedelta(milliseconds=batch_end_ms)

            body = {
                **self.body,
                "StartTime": batch_start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "EndTime": batch_end.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "ProcessingInterval": pro_interval,
                "ReadValueIds": variables,
                "AggregateName": agg_name
            }

            for attempt in range(max_retries):
                try:
                    content = await request_from_api_async(
                        client_pool,
                        method="POST",
                        endpoint=f"/values/historicalaggregated",
                        data=json.dumps(body, default=self.json_serial),
                        extended_timeout=True
                    )
                    break
                except (aiohttp.ClientError, ValidationError) as e:
                    if attempt < max_retries - 1:
                        wait_time = retry_delay * (2 ** attempt)
                        logger.warning(f"Request failed. Retrying in {wait_time} seconds...")
                        await asyncio.sleep(wait_time)
                    else:
                        logger.error(f"Max retries reached. Error: {e}")
                        raise RuntimeError(f'Error message {e}')

            self._check_content(content)

            df_result = pd.json_normalize(
                content, 
                record_path=['HistoryReadResults', 'DataValues'], 
                meta=[['HistoryReadResults', 'NodeId', 'IdType'], 
                    ['HistoryReadResults', 'NodeId','Id'],
                    ['HistoryReadResults', 'NodeId','Namespace']]
            )
            batch_id = f"{time_batch}_{vid}"
            await data_handler.save_data(batch_id, df_result)
            return batch_id

    tasks = [
        process_batch(vid,variables, time_batch)
        for vid,variables in enumerate(variable_batches)
        for time_batch in range(max_time_batches)
    ]

    try:
        batch_ids = await asyncio.gather(*tasks)
        # for batch_id in batch_ids:
        #     df = await data_handler.get_data(batch_id)
        #     all_results.append(df)

        # logger.info("Combining all batches...")
        # combined_df = pd.concat(all_results, ignore_index=True)
        # columns = {
        #     "Value.Type": "ValueType",
        #     "Value.Body": "Value",
        #     "StatusCode.Symbol": "StatusSymbol",
        #     "StatusCode.Code": "StatusCode",
        #     "SourceTimestamp": "Timestamp",
        #     "HistoryReadResults.NodeId.IdType": "IdType",
        #     "HistoryReadResults.NodeId.Id": "Id",
        #     "HistoryReadResults.NodeId.Namespace": "Namespace",
        # }
        # return self._process_df(combined_df, columns)
    finally:
        await client_pool.close_all()
        await data_handler.close_all()

In [None]:
# 1 day aggregated historical data
one_day_historical_data = await get_historical_aggregated_values_batch_time_vars_data_async_parquet(
    opc_data,
    start_time=datetime(2024,6,1,00,00),
    end_time=datetime(2024,6,2,00,00),
    pro_interval=pro_interval,
    agg_name=agg_name,
    variable_list=variable_list,
    max_data_points=50000,
    max_concurrent_requests=50
)
one_day_historical_data

### Stringset data

In [None]:
def get_historical_aggregated_values(opc_data,
    start_time, 
    end_time, 
    pro_interval, 
    agg_name, 
    variable_list
) -> pd.DataFrame:
    
    vars = opc_data._get_variable_list_as_list(variable_list)
    extended_variables = [{"NodeId": var, "AggregateName": agg_name} for var in vars]

    body = {
        **opc_data.body, 
        "StartTime": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"), 
        "EndTime": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"), 
        "ProcessingInterval": pro_interval, 
        "AggregateName": agg_name,
        "ReadValueIds": extended_variables
    }
    print(body)

    content = request_from_api(
        rest_url=opcua_rest_url, 
        method="POST", 
        endpoint="values/historicalaggregated", 
        data=json.dumps(body, default=opc_data.json_serial), 
        headers=opc_data.headers, 
        extended_timeout=True
    )
    print(content)
    df_result = pd.json_normalize(
        content, 
        record_path=['HistoryReadResults', 'DataValues'], 
        meta=[['HistoryReadResults', 'NodeId', 'IdType'], ['HistoryReadResults', 'NodeId','Id'],['HistoryReadResults', 'NodeId','Namespace']
        ]
    )
    columns = {
        "Value.Type": "ValueType",
        "Value.Body": "Value",
        "StatusCode.Symbol": "StatusSymbol",
        "StatusCode.Code": "StatusCode",
        "SourceTimestamp": "Timestamp",
        "HistoryReadResults.NodeId.IdType": "IdType",
        "HistoryReadResults.NodeId.Id": "Id",
        "HistoryReadResults.NodeId.Namespace": "Namespace",
    }
    return opc_data._process_df(df_result, columns)

In [None]:
start_time=(datetime.datetime.now() - datetime.timedelta(30))
end_time=(datetime.datetime.now() - datetime.timedelta(29))
pro_interval=600000
agg_name="Average"
variable_list=string_sets.variables_as_list(["DCPower"])

In [None]:
def get_historical_aggregated_values(opc_data,
                                     start_time, 
                                     end_time, 
                                     pro_interval, 
                                     agg_name, 
                                     variable_list) -> pd.DataFrame:
    vars = opc_data._get_variable_list_as_list(variable_list)
    batch_size = 100
    batches = [vars[i:i + batch_size] for i in range(0, len(vars), batch_size)]
    
    combined_df = pd.DataFrame()  
    
    for batch in batches:
        extended_variables = [{"NodeId": var, "AggregateName": agg_name} for var in batch]
        
        body = {
            **opc_data.body, 
            "StartTime": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"), 
            "EndTime": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"), 
            "ProcessingInterval": pro_interval, 
            "AggregateName": agg_name,
            "ReadValueIds": extended_variables
        }
        
        content = request_from_api(
            rest_url=opcua_rest_url, 
            method="POST", 
            endpoint="values/historicalaggregated", 
            data=json.dumps(body, default=opc_data.json_serial), 
            headers=opc_data.headers, 
            extended_timeout=True
        )
        
        df_result = pd.json_normalize(
            content, 
            record_path=['HistoryReadResults', 'DataValues'], 
            meta=[['HistoryReadResults', 'NodeId', 'IdType'], ['HistoryReadResults', 'NodeId','Id'],['HistoryReadResults', 'NodeId','Namespace']]
        )
        
        if combined_df.empty:
            combined_df = df_result
        else:
            combined_df = pd.concat([combined_df, df_result], ignore_index=True)
    
    columns = {
        "Value.Type": "ValueType",
        "Value.Body": "Value",
        "StatusCode.Symbol": "StatusSymbol",
        "StatusCode.Code": "StatusCode",
        "SourceTimestamp": "Timestamp",
        "HistoryReadResults.NodeId.IdType": "IdType",
        "HistoryReadResults.NodeId.Id": "Id",
        "HistoryReadResults.NodeId.Namespace": "Namespace",
    }
    
    return opc_data._process_df(combined_df, columns)

In [None]:
get_historical_aggregated_values(opc_data,
                                     start_time, 
                                     end_time, 
                                     pro_interval, 
                                     agg_name, 
                                     variable_list)

In [None]:
import hashlib
import concurrent.futures

In [None]:
def get_historical_aggregated_values(opc_data, start_time, end_time, pro_interval, agg_name, variable_list) -> pd.DataFrame:
    vars = opc_data._get_variable_list_as_list(variable_list)
    batch_size = 150
    batches = [vars[i:i + batch_size] for i in range(0, len(vars), batch_size)]

    def process_batch(batch):
        extended_variables = [{"NodeId": var, "AggregateName": agg_name} for var in batch]
        body = {
            **opc_data.body,
            "StartTime": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
            "EndTime": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
            "ProcessingInterval": pro_interval,
            "AggregateName": agg_name,
            "ReadValueIds": extended_variables
        }
        content = request_from_api(
            rest_url=opcua_rest_url,
            method="POST",
            endpoint="values/historicalaggregated",
            data=json.dumps(body, default=opc_data.json_serial),
            headers=opc_data.headers,
            extended_timeout=True
        )
        return pd.json_normalize(
            content,
            record_path=['HistoryReadResults', 'DataValues'],
            meta=[['HistoryReadResults', 'NodeId', 'IdType'], ['HistoryReadResults', 'NodeId', 'Id'], ['HistoryReadResults', 'NodeId', 'Namespace']]
        )

    dataframes = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        future_to_batch = {executor.submit(process_batch, batch): batch for batch in batches}
        for future in concurrent.futures.as_completed(future_to_batch):
            dataframes.append(future.result())

    combined_df = pd.concat(dataframes, ignore_index=True) if dataframes else pd.DataFrame()

    columns = {
        "Value.Type": "ValueType",
        "Value.Body": "Value",
        "StatusCode.Symbol": "StatusSymbol",
        "StatusCode.Code": "StatusCode",
        "SourceTimestamp": "Timestamp",
        "HistoryReadResults.NodeId.IdType": "IdType",
        "HistoryReadResults.NodeId.Id": "Id",
        "HistoryReadResults.NodeId.Namespace": "Namespace",
    }

    return opc_data._process_df(combined_df, columns)

In [None]:
vars = opc_data._get_variable_list_as_list(variable_list)
extended_variables = [{"NodeId": var, "AggregateName": agg_name} for var in vars]

In [None]:
body = {
    **opc_data.body,
    "StartTime": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
    "EndTime": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
    "ProcessingInterval": pro_interval,
    "AggregateName": agg_name,
    "ReadValueIds": extended_variables
}
body

In [None]:
get_historical_aggregated_values(opc_data,
                                     start_time, 
                                     end_time, 
                                     pro_interval, 
                                     agg_name, 
                                     variable_list)

In [None]:
start_time = datetime.now() - relativedelta(months=1)
end_time = datetime.now()
get_historical_aggregated_values(opc_data,
                                     start_time, 
                                     end_time, 
                                     pro_interval, 
                                     agg_name, 
                                     variable_list)

In [None]:
# History data for 1 day, 10 min aggregate - stringsets
history_agg = opc_data.get_historical_aggregated_values(
    start_time=(datetime.datetime.now() - datetime.timedelta(30)),
    end_time=(datetime.datetime.now() - datetime.timedelta(29)),
    pro_interval=600000,
    agg_name="Average",
    variable_list=inverters.variables_as_list(["DCPower"]),
)
history_agg

In [None]:
import copy
import math
from pydantic import BaseModel, AnyUrl
from datetime import timedelta
import asyncio
import aiohttp

In [None]:
class Variables(BaseModel):
    """Helper class to parse all values api's.
    Variables are described in https://reference.opcfoundation.org/v104/Core/docs/Part3/8.2.1/

        Variables:
            Id: str - Id of the signal, e.g. SSO.EG-AS.WeatherSymbol
            Namespace: int - Namespace on the signal, e.g. 2.
            IdType: int - IdTypes described in https://reference.opcfoundation.org/v104/Core/docs/Part3/8.2.3/.
    """
    Id: str
    Namespace: int
    IdType: int

In [None]:
async def make_async_api_request(opc_data, start_time: datetime, end_time: datetime, pro_interval: int, agg_name: str, variable_list: list[Variables]) -> dict:
        """Make API request for the given time range and variable list"""

        # Creating a new variable list to remove pydantic models
        vars = opc_data._get_variable_list_as_list(variable_list)

        extended_variables = [
            {
                    "NodeId": var,
                    "AggregateName": agg_name,
            }
            for var in vars
        ]
        body = copy.deepcopy(opc_data.body)
        body["StartTime"] = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
        body["EndTime"] = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
        body["ProcessingInterval"] = pro_interval
        body["ReadValueIds"] = extended_variables
        body["AggregateName"] = agg_name

            # Make API request using aiohttp session
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{opcua_rest_url}values/historicalaggregated",
                data=json.dumps(body, default=opc_data.json_serial),
                headers=opc_data.headers,
                timeout=aiohttp.ClientTimeout(total=None)  
            ) as response:
                response.raise_for_status()
                content = await response.json()

        return content

In [None]:
vars = opc_data._get_variable_list_as_list(variable_list)
vars1 = vars[0:5]

In [None]:
extended_variables = [
            {
                    "NodeId": var,
                    "AggregateName": agg_name,
            }
            for var in vars1
]
len(extended_variables)

In [None]:
body = copy.deepcopy(opc_data.body)
body["StartTime"] = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
body["EndTime"] = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
body["ProcessingInterval"] = pro_interval
body["ReadValueIds"] = extended_variables
body["AggregateName"] = agg_name
body

In [None]:
f"{opcua_rest_url}values/historicalaggregated",

In [None]:
data=json.dumps(body, default=opc_data.json_serial)

In [None]:
data

In [None]:
data_dict = json.loads(data)

In [None]:
read_value_ids = data_dict['ReadValueIds']

In [None]:
len(read_value_ids)

In [None]:
headers=opc_data.headers
headers

In [None]:
timeout=aiohttp.ClientTimeout(total=None) 
timeout

In [None]:
async with aiohttp.ClientSession() as session:
    async with session.post(
        f"{opcua_rest_url}values/historicalaggregated",
        data=json.dumps(body, default=opc_data.json_serial),
        headers=opc_data.headers,
        timeout=aiohttp.ClientTimeout(total=None)  
    ) as response:
        response.raise_for_status()
        content = await response.json()

In [None]:
content

In [None]:
def generate_time_batches(start_time: datetime, end_time: datetime, pro_interval: int, batch_size: int) -> list[tuple]:
    """Generate time batches based on start time, end time, processing interval, and batch size"""

    total_time_range = end_time - start_time
    pro_interval_seconds = (pro_interval / 1000)
    total_data_points = (total_time_range.total_seconds() // pro_interval_seconds) + 1

    total_batches = math.ceil(total_data_points / batch_size)
    actual_batch_size = math.ceil(total_data_points / total_batches)

    time_batches = [
        (start_time + timedelta(seconds=(i * actual_batch_size * pro_interval_seconds)),
        start_time + timedelta(seconds=((i + 1) * actual_batch_size * pro_interval_seconds)) - timedelta(seconds=pro_interval_seconds))
        for i in range(total_batches)
    ]

    return time_batches

In [None]:
def generate_variable_batches(variable_list: list[Variables], batch_size: int) -> list[list[Variables]]:
    """Generate variable batches based on the variable list and batch size"""

    variable_batches = [
        variable_list[i:i + batch_size] for i in range(0, len(variable_list), batch_size)
    ]

    return variable_batches

In [None]:
def process_api_response(opc_data, response: dict) -> pd.DataFrame:
        """Process the API response and return the result dataframe"""
        
        df_result = pd.json_normalize(response, record_path=['HistoryReadResults', 'DataValues'], 
                                      meta=[['HistoryReadResults', 'NodeId', 'IdType'], ['HistoryReadResults', 'NodeId','Id'],
                                            ['HistoryReadResults', 'NodeId','Namespace']] )

        for i, row in df_result.iterrows():
            if not math.isnan(row["Value.Type"]):
                value_type = opc_data._get_value_type(int(row["Value.Type"])).get("type")
                df_result.at[i, "Value.Type"] = str(value_type)

        df_result.rename(
            columns={
                "Value.Type": "ValueType",
                "Value.Body": "Value",
                "StatusCode.Symbol": "StatusSymbol",
                "StatusCode.Code": "StatusCode",
                "SourceTimestamp": "Timestamp",
                "HistoryReadResults.NodeId.IdType": "Id",
                "HistoryReadResults.NodeId.Namespace": "Namespace",
            },
            errors="raise",
            inplace=True,
        )

        return df_result

In [None]:
async def get_historical_aggregated_values_async(
    opc_data,
    start_time: datetime,
    end_time: datetime,
    pro_interval: int,
    agg_name: str,
    variable_list: list[Variables],
    batch_size: int = 1000
) -> pd.DataFrame:
    """Request historical aggregated values from the OPC UA server with batching"""

    
    time_batches = generate_time_batches(start_time, end_time, pro_interval, batch_size)
    variable_batches = generate_variable_batches(variable_list, batch_size)

    # Creating tasks for each API request and gathering the results
    tasks = []

    for time_batch_start, time_batch_end in time_batches:
        for variable_sublist in variable_batches:
            task = asyncio.create_task(
                make_async_api_request(opc_data, time_batch_start, time_batch_end, pro_interval, agg_name, variable_sublist)
            ) 
            tasks.append(task)
    
    # Execute all tasks concurrently and gather their results
    responses = await asyncio.gather(*tasks)
    
    # Processing the API responses
    result_list = []
    for idx, batch_response in enumerate(responses):
        
        batch_result = process_api_response(opc_data, batch_response)
        result_list.append(batch_result)
       
    result_df = pd.concat(result_list, ignore_index=True)

    return result_df

In [None]:
# 1 day aggregated historical inverter data in asyncio process
one_days_historic_inverter_data2 = await get_historical_aggregated_values_async(
    opc_data,
    start_time=(datetime.datetime.now() - datetime.timedelta(30)),
    end_time=(datetime.datetime.now() - datetime.timedelta(29)),
    pro_interval=60*1000,
    agg_name="Average",
    variable_list=string_sets.variables_as_list(["DCPower"]),
    batch_size=100
)
one_days_historic_inverter_data2

In [None]:
async def generate_time_chunks(start_time: datetime, end_time: datetime):
    """Generate time chunks between start_time and end_time, each chunk_duration_minutes long."""
    delta = timedelta(minutes=60)
    current_time = start_time
    while current_time < end_time:
        chunk_end_time = min(current_time + delta, end_time)
        yield (current_time, chunk_end_time)
        current_time = chunk_end_time

In [None]:
async def make_async_api_request(opc_data, start_time: datetime, end_time: datetime, pro_interval: int, agg_name: str, variable_list: list[Variables], max_data_points=500) -> dict:
    """Make API request for the given time range and variable list, with additional chunking based on data points."""

    def chunk_list(lst, n):
        """Yield successive n-sized chunks from lst."""
        for i in range(0, len(lst), n):
            yield lst[i:i + n]

    async def fetch_data_for_time_period(session, vars_chunk, start, end):
        """Fetch data for a given time period and chunk of variables."""
        extended_variables = [{"NodeId": var, "AggregateName": agg_name} for var in vars_chunk]
        body = copy.deepcopy(opc_data.body)
        body["StartTime"] = start.strftime("%Y-%m-%dT%H:%M:%SZ")
        body["EndTime"] = end.strftime("%Y-%m-%dT%H:%M:%SZ")
        body["ProcessingInterval"] = pro_interval
        body["ReadValueIds"] = extended_variables
        body["AggregateName"] = agg_name

        async with session.post(
            f"{opcua_rest_url}values/historicalaggregated",
            data=json.dumps(body, default=str),
            headers=opc_data.headers,
            timeout=aiohttp.ClientTimeout(total=None)
        ) as response:
            response.raise_for_status()
            return await response.json()

    # Creating a new variable list to remove pydantic models
    vars = opc_data._get_variable_list_as_list(variable_list)
    chunk_size = 5  # Chunk size for node IDs
    vars_chunks = list(chunk_list(vars, chunk_size))

    all_responses = []
    async with aiohttp.ClientSession() as session:
        for vars_chunk in vars_chunks:
            # Generate time chunks for the given time period
            async for start, end in generate_time_chunks(start_time, end_time):
                content = await fetch_data_for_time_period(session, vars_chunk, start, end)
                all_responses.append(content)
    return all_responses

In [None]:
async def make_async_api_request(opc_data, start_time: datetime, end_time: datetime, pro_interval: int, agg_name: str, variable_list: list[Variables]) -> dict:
    """Make API request for the given time range and variable list"""

    def chunk_list(lst, n):
        for i in range(0, len(lst), n):
            yield lst[i:i + n]

    # Creating a new variable list to remove pydantic models
    vars = opc_data._get_variable_list_as_list(variable_list)

    chunk_size = 150  
    vars_chunks = list(chunk_list(vars, chunk_size))

    all_responses = []
    async with aiohttp.ClientSession() as session:
        for vars_chunk in vars_chunks:
            extended_variables = [{"NodeId": var, "AggregateName": agg_name} for var in vars_chunk]
            body = copy.deepcopy(opc_data.body)
            body["StartTime"] = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
            body["EndTime"] = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
            body["ProcessingInterval"] = pro_interval
            body["ReadValueIds"] = extended_variables
            body["AggregateName"] = agg_name

            async with session.post(
                f"{opcua_rest_url}values/historicalaggregated",
                data=json.dumps(body, default=str),
                headers=opc_data.headers,
                timeout=aiohttp.ClientTimeout(total=None)
            ) as response:
                response.raise_for_status()
                content = await response.json()
                all_responses.append(content)  

    return all_responses

In [None]:
from datetime import datetime, timedelta
from typing import List, Tuple

In [None]:
def generate_time_chunks(start_time: datetime, end_time: datetime, interval_hours: int) -> List[Tuple[datetime, datetime]]:
    """Generate time chunks within the given start and end time with specified interval in hours."""
    delta = timedelta(hours=interval_hours)
    current_time = start_time
    chunks = []

    while current_time < end_time:
        chunk_end_time = min(current_time + delta, end_time)  
        chunks.append((current_time, chunk_end_time))
        current_time += delta

    return chunks

In [None]:
# 1 day aggregated historical inverter data in asyncio process
one_days_historic_inverter_data2 = await make_async_api_request(
    opc_data,
    start_time=(datetime.datetime.now() - datetime.timedelta(30)),
    end_time=(datetime.datetime.now() - datetime.timedelta(29)),
    pro_interval=60*1000,
    agg_name="Average",
    variable_list=string_sets.variables_as_list(["DCPower"])
)
one_days_historic_inverter_data2