In [1]:
"""
# Load the file
df = pd.read_parquet(os.path.join(data_path, infer_data_file))

print(df.shape)
print(df.head(10))
print(df.tail(10))

columns_to_drop = ['SMid', 'Po', 'P1o', 'P2o', 'P3o', 'Ei', 'Ei1', 'Ei2', 'Eo', 'Eo1', 'Eo2']
for col in columns_to_drop:
    if col in df.columns:
        df = df.drop(columns=[col])

df = df.rename(columns={
    'P1i': 'powerl1',
    'P2i': 'powerl2',
    'P3i': 'powerl3',
    'I1': 'currentl1',
    'I2': 'currentl2',
    'I3': 'currentl3',
    'V1': 'voltagel1',
    'V2': 'voltagel2',
    'V3': 'voltagel3',
})
"""

"\n# Load the file\ndf = pd.read_parquet(os.path.join(data_path, infer_data_file))\n\nprint(df.shape)\nprint(df.head(10))\nprint(df.tail(10))\n\ncolumns_to_drop = ['SMid', 'Po', 'P1o', 'P2o', 'P3o', 'Ei', 'Ei1', 'Ei2', 'Eo', 'Eo1', 'Eo2']\nfor col in columns_to_drop:\n    if col in df.columns:\n        df = df.drop(columns=[col])\n\ndf = df.rename(columns={\n    'P1i': 'powerl1',\n    'P2i': 'powerl2',\n    'P3i': 'powerl3',\n    'I1': 'currentl1',\n    'I2': 'currentl2',\n    'I3': 'currentl3',\n    'V1': 'voltagel1',\n    'V2': 'voltagel2',\n    'V3': 'voltagel3',\n})\n"

In [2]:
import pandas as pd
from datetime import datetime, timedelta
import os
from config_loader import load_config, logger
from joblib import load
import torch
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
import json

In [3]:
config, config_dir = load_config()
env = config['Settings']['environment']
models_dir = config['Data']['models_dir']
scalers_dir = config['Data']['scalers_dir']
data_path = config[env]['data_path']
training_dataset_file = config['Data']['training_dataset_file']
# infer_data_file = config['Data']['infer_data_file']
infer_data_file = 'data/mqtt_data_whole.parquet'
# infer_data_file = 'mqtt_data_daily.parquet'
demo_dataset_ground_truth_file = config['Data']['demo_dataset_ground_truth_file']
inference_timestamp = config['Inference']['inference_timestamp']
model_file = config['Data']['model_file']
# inferred_data_file = config['Data']['inferred_data_file']
inferred_data_file = 'real_inferred_whole.parquet'
column_names_file = config['Data']['training_dataset_columns_file']
input_scaler_file = config['Data']['input_scaler_file']
target_scalers_file = config['Data']['target_scalers_file']
batch_size = int(config['Inference']['batch_size'])

model_path = os.path.join(data_path, models_dir)
inferred_data_path = os.path.join(data_path, inferred_data_file)
infer_data_path = os.path.join(data_path, infer_data_file)
input_scaler_path = os.path.join(data_path, input_scaler_file)
target_scalers_path = os.path.join(data_path, scalers_dir)

input_scaler = load(input_scaler_path)

device = torch.device('cpu')

[2025-07-03 10:30:40] INFO: Transfer time: 1900-01-01 11:50:00, Inference time: 1900-01-01 11:53:00


In [4]:
# Read appliance names
with open(os.path.join(data_path, column_names_file), 'r') as file:
    column_names_json = json.load(file)

# Create the list of appliances dynamically from the model files
appliances_list = [f.replace('.pt', '').rsplit('_', 1)[0].replace('_', ' ').title() for f in os.listdir(model_path)]


In [5]:
def prepare_inference_input(filename):
    df = pd.read_parquet(filename)

    # Drop unnecessary columns
    drop_cols = ['SMid', 'Pi', 'Po', 'P1o', 'P2o', 'P3o', 'Ei', 'Ei1', 'Ei2', 'Eo', 'Eo1', 'Eo2']
    df = df.drop(columns=[col for col in drop_cols if col in df.columns], errors='ignore')

    # Rename to match training schema
    rename_map = {
        'P1i': 'powerl1', 'P2i': 'powerl2', 'P3i': 'powerl3',
        'I1': 'currentl1', 'I2': 'currentl2', 'I3': 'currentl3',
        'V1': 'voltagel1', 'V2': 'voltagel2', 'V3': 'voltagel3',
    }
    df = df.rename(columns=rename_map)

    # Extract and drop timestamp
    ts = df.pop('timestamp').reset_index(drop=True)

    # Ensure column order
    expected_cols = list(rename_map.values())
    # if missing := set(expected_cols) - df.columns.to_set():
      #   raise ValueError(f"Missing required columns: {missing}")
    df = df[expected_cols]

    # Normalize input
    X = input_scaler.transform(df)
    return X, ts


def run_inference(day_input, app):
    appliance_name = app.lower().replace(' ', '_')
    model = torch.jit.load(os.path.join(model_path, appliance_name + model_file))
    model.eval()

    day_input = np.expand_dims(day_input, axis=0)

    day_input_tensor = torch.tensor(day_input, dtype=torch.float32).to(device)
    day_dataset = TensorDataset(day_input_tensor)
    day_loader = DataLoader(day_dataset, batch_size=batch_size, shuffle=False)

    predictions_all = []
    with torch.no_grad():
        for (batch_X,) in day_loader:
            batch_X = batch_X.to(device)
            batch_predictions = model(batch_X)

            # Clamp predictions to be non-negative (power >= 0)
            batch_predictions = torch.clamp(batch_predictions, min=0.0)

            # Convert to NumPy after clamping
            predictions_all.append(batch_predictions.cpu().numpy())

    return np.concatenate(predictions_all, axis=0)

def melt_dataframe(df):
    # Melt the DataFrame to long format
    df_long = pd.melt(df,
                      id_vars=['timestamp'],   # Columns to keep
                      var_name='appliance',    # New column for appliance names
                      value_name='value')      # New column for values

    # Convert timestamp and extract date, hour, month
    df_long['timestamp'] = pd.to_datetime(df_long['timestamp'])
    df_long['date'] = df_long['timestamp'].dt.date
    df_long['minute'] = df_long['timestamp'].dt.minute
    df_long['hour'] = df_long['timestamp'].dt.hour
    df_long['month'] = df_long['timestamp'].dt.to_period('M')

    # Sort by timestamp
    return df_long.sort_values(by=['timestamp'])


def compute_other_column(p_df, sm_df):
    # Ensure timestamps are datetime and sorted
    p_df['timestamp'] = pd.to_datetime(p_df['timestamp'])
    sm_df['timestamp'] = pd.to_datetime(sm_df['timestamp'])

    p_df = p_df.sort_values('timestamp').reset_index(drop=True)
    sm_df = sm_df.sort_values('timestamp').reset_index(drop=True)

    # Sum predicted appliance power per timestamp (exclude 'timestamp' column)
    appliance_cols = p_df.columns.difference(['timestamp'])
    p_df['total_pred_power'] = p_df[appliance_cols].sum(axis=1)

    # Sum smart meter phases to get total power per timestamp
    phase_cols = [col for col in sm_df.columns if col.lower() in ['powerl1', 'powerl2', 'powerl3']]
    sm_df['total_sm_power'] = sm_df[phase_cols].sum(axis=1)

    # Merge on timestamp to align rows
    merged = pd.merge(p_df, sm_df[['timestamp', 'total_sm_power']], on='timestamp', how='inner')

    # Compute 'Other' = smart meter total - sum predicted appliances
    merged['Other'] = merged['total_sm_power'] - merged['total_pred_power']

    # Clip negative values to zero
    merged['Other'] = merged['Other'].clip(lower=0)

    result = merged.drop(columns=['total_pred_power', 'total_sm_power'])

    return result


def append_predictions(ts, predictions_dict):
    """
    timestamps: list of timestamps (len = total timesteps)
    predictions_dict: dict of {appliance_name: np.ndarray of shape (total_timesteps,)}
                      or (1, seq_len, 1) / (batch, seq_len, 1)
    """
    pred_df = pd.DataFrame({'timestamp': ts})

    for app, pred in predictions_dict.items():
        appliance_name = app.lower().replace(' ', '_')

        pred = np.squeeze(pred)

        # Inverse scale
        target_scaler = load(os.path.join(target_scalers_path, appliance_name + target_scalers_file))
        pred_reshaped = pred.reshape(-1, 1)
        pred_inverse = target_scaler.inverse_transform(pred_reshaped).flatten()

        pred_df[appliance] = pred_inverse

    # Compute 'Other' column
    sm_df = pd.read_parquet(infer_data_path)
    pred_df = compute_other_column(pred_df, sm_df)

    # Melt and save
    pred_df = melt_dataframe(pred_df)

    # Load previous data if exists
    if os.path.exists(inferred_data_path):
        try:
            existing_df = pd.read_parquet(inferred_data_path)
            pred_df = pd.concat([existing_df, pred_df], ignore_index=True)
        except Exception as ex:
            logger.error(f"Error loading existing file: {ex}")

    pred_df.to_parquet(inferred_data_path, index=False)
    logger.info(f"[{datetime.now()}] Predictions for appliances {list(predictions_dict.keys())} appended to {inferred_data_path}")



input_data, timestamps = prepare_inference_input(infer_data_path)
predictions = {}
for appliance in appliances_list:
    predictions_np = run_inference(input_data, app=appliance)
    predictions[appliance] = predictions_np

append_predictions(timestamps, predictions)

KeyboardInterrupt: 