In [46]:
import pandas as pd
import numpy as np
import psycopg2
from datetime import datetime
import json

### DB connection

In [47]:
attribute_names = [
    'energyImportTotal',
    'temperature',
    'energy',
    'currentPrice'
]

idMap = {
    "4gBLwNljLBlc0oYpBdpGkD" : "ht_wp_1_id",
    "475wGcmfSE0L1wguboGfpW": "ht_wp_2_id",
    "43wZIInlpD6YFsEc8TkwlN": "weather",
    "30cjCSTbgsokSNalNoBbJh": "energie_prijs"
}

id_keys = list(idMap.keys())
id_names = list(idMap.values())
batch_size = 500000

conn = psycopg2.connect(
    host="ec2-63-35-201-247.eu-west-1.compute.amazonaws.com",
    port="5432",
    user="postgres",
    password="Be6DkBT9FfY7CQ2vtj3Lpr6Wzqxax9",
    database="openremote"
)

cur = conn.cursor()


### Procedure 

process_asset

DECLARE
    dynamic_sql TEXT;
    column_list TEXT;
BEGIN
   
    SELECT string_agg(DISTINCT format('%I FLOAT8', ad.entity_id || '_' || ad.attribute_name), ', ')
    INTO column_list
    FROM asset_datapoint ad
    WHERE ad.entity_id = ANY(id_keys) 
    AND ad.attribute_name = ANY(attribute_names);

    IF column_list IS NULL THEN
        RAISE EXCEPTION 'No data to pivot';
    END IF;

    dynamic_sql := format(
        'SELECT * FROM crosstab(
            $$SELECT ad.timestamp, ad.entity_id || ''_'' || ad.attribute_name AS atributename_id, ad.value::FLOAT8
              FROM asset_datapoint ad
              WHERE ad.entity_id = ANY(%L) 
              AND ad.attribute_name = ANY(%L)
              ORDER BY ad.timestamp$$,
            $$SELECT DISTINCT ad.entity_id || ''_'' || ad.attribute_name AS atributename_id
              FROM asset_datapoint ad
              WHERE ad.entity_id = ANY(%L) 
              AND ad.attribute_name = ANY(%L)
              ORDER BY atributename_id$$
        ) AS pivot_table(timestamp TIMESTAMP, %s)',
        id_keys, attribute_names, id_keys, attribute_names, column_list);

    OPEN result_set FOR EXECUTE dynamic_sql;
END;

### Procedure description

SQL procedure dynamically pivots data from the asset_datapoint table, transforming a long-format dataset (with entity_id, attribute_name, and value columns) into a wide-format dataset. Returns pivoted data set, ready for further analysing.

### Fetching data using procedure

In [48]:
cursor_name = "asset_data_cursor"
processed_batches = []

# Using refcursor in sql code instead of OFFSET to make it faster without skipping any record
cur.execute("CALL process_asset(%s, %s, %s);", (id_keys, attribute_names, cursor_name))

while True:
    # Fetching data batch by batch
    cur.execute(f"FETCH {batch_size} FROM {cursor_name};")
    rows = cur.fetchall()

    if not rows:
        break

    # Making sure that pandas recognizes column names
    column_names = [desc[0] for desc in cur.description]

    # Pack every row into data frama and combine into table
    df = pd.DataFrame(rows, columns=column_names)  
    processed_batches.append(df)

cur.execute(f"CLOSE {cursor_name};")
cur.close()
conn.close()

# Make one data frame for combined data
processed_data = pd.concat(processed_batches, ignore_index=True)

In [49]:
processed_data.head()

Unnamed: 0,timestamp,30cjCSTbgsokSNalNoBbJh_currentPrice,43wZIInlpD6YFsEc8TkwlN_temperature,475wGcmfSE0L1wguboGfpW_energyImportTotal,4gBLwNljLBlc0oYpBdpGkD_energyImportTotal
0,2023-12-16 00:00:01.870,67.199997,,,
1,2023-12-16 00:00:01.877,75.16,,,
2,2023-12-16 01:00:00.235,55.64,,,
3,2023-12-16 02:00:01.620,56.0,,,
4,2023-12-16 03:00:01.894,58.0,,,


### Change column names from idKeys to idNames

In [50]:
column_rename_map = {
    col: col.replace(entity_id, idMap[entity_id])
    for entity_id in id_keys
    for col in processed_data.columns if entity_id in col
}
processed_data.rename(columns=column_rename_map, inplace=True)

In [51]:
processed_data.head()

Unnamed: 0,timestamp,energie_prijs_currentPrice,weather_temperature,ht_wp_2_id_energyImportTotal,ht_wp_1_id_energyImportTotal
0,2023-12-16 00:00:01.870,67.199997,,,
1,2023-12-16 00:00:01.877,75.16,,,
2,2023-12-16 01:00:00.235,55.64,,,
3,2023-12-16 02:00:01.620,56.0,,,
4,2023-12-16 03:00:01.894,58.0,,,


In [52]:
processed_data.describe()

Unnamed: 0,timestamp,energie_prijs_currentPrice,weather_temperature,ht_wp_2_id_energyImportTotal,ht_wp_1_id_energyImportTotal
count,510771,31296.0,27476.0,254895.0,246876.0
mean,2024-07-28 04:55:52.351399680,102.07255,13.015815,311609.868197,189379.281161
min,2023-12-16 00:00:01.870000,-200.0,-3.37,0.0,0.0
25%,2024-05-08 11:00:42.588999936,79.9,7.29,297054.5,174148.0
50%,2024-07-06 15:23:17.817999872,101.0,13.7,310912.0,184260.0
75%,2024-10-20 22:39:41.895000064,123.99,18.22,325953.0,201127.25
max,2025-02-04 03:58:03.445000,550.0,32.29,354738.0,232357.0
std,,48.372422,6.990488,21225.049924,18810.211377


In [53]:
print(processed_data.dtypes)

timestamp                       datetime64[ns]
energie_prijs_currentPrice             float64
weather_temperature                    float64
ht_wp_2_id_energyImportTotal           float64
ht_wp_1_id_energyImportTotal           float64
dtype: object
