In [1]:
import torch

In [2]:
#extract korean dataset
from load_korean_data import extract_zips

extract_zips()

Done extracting Korean data


In [3]:
import pandas as pd

df_madalena_comfort = pd.read_csv('datasets/madalena_comfort.csv')

df_madalena_comfort.head()

Unnamed: 0,date,Room,CO2[ppm],PM4[ug/m3],Lighting[lux],T_in[°C],RH [%],PM10[ug/m3],PM2_5[ug/m3],PM1[ug/m3],PM0_5[ug/m3],T_out [°C]
0,2024-07-01 00:00:00,E145,624.23,65.43,0.0,28.46,55.51,65.44,65.42,65.22,56.81,24.5
1,2024-07-01 01:00:00,E145,629.0,66.77,0.0,28.48,56.39,66.78,66.76,66.56,57.97,24.0
2,2024-07-01 02:00:00,E145,640.13,67.78,0.0,28.45,56.74,67.79,67.77,67.57,58.85,22.3
3,2024-07-01 03:00:00,E145,639.77,68.0,0.0,28.39,57.08,68.01,67.98,67.78,59.04,22.0
4,2024-07-01 04:00:00,E145,643.31,67.25,0.03,28.35,57.57,67.26,67.24,67.04,58.39,21.5


In [16]:
from glob import glob
import os
from multiprocessing import Pool
import gc


def optimize_memory(df):

    for col in df.select_dtypes(include=["int64", "float64"]).columns:
        df[col] = pd.to_numeric(df[col], downcast="integer")  # Convert int64 → int32
        df[col] = pd.to_numeric(df[col], downcast="float")    # Convert float64 → float32
    return df

def load_house_data(house_path, save_dir="datasets/korean_processed"):
    os.makedirs(save_dir, exist_ok=True)
    house_id = os.path.basename(house_path)
    sub_folders = [d for d in os.listdir(house_path) if os.path.isdir(os.path.join(house_path, d))]

    if not sub_folders:
        print(f"No sub-folder found in {house_path}")
        return None
    house_sub_folder = os.path.join(house_path, sub_folders[0])
    for timestamp_folder in sorted(os.listdir(house_sub_folder)):
        full_timestamp_path = os.path.join(house_sub_folder, timestamp_folder)
        print(f"Timestamp: {timestamp_folder}")
        if os.path.isdir(full_timestamp_path):
            daily_appliance_df_list = []
            daily_total_load_df = None
            for file in glob(os.path.join(full_timestamp_path, "*.parquet.gzip")):
                appliance_name = os.path.basename(file).replace(".parquet.gzip", "")

                df = pd.read_parquet(file, engine="pyarrow")

                # ✅ Convert timestamp correctly
                df["timestamp"] = pd.to_datetime(df["timestamp"] / 1000, unit="s")

                # ✅ Optimize memory usage
                df = optimize_memory(df)  
                if "total" in appliance_name.lower():
                    daily_total_load_df = df
                else:
                    df["appliance"] = appliance_name
                    daily_appliance_df_list.append(df)

            if daily_appliance_df_list:
                daily_appliance_df = pd.concat(daily_appliance_df_list, ignore_index=True)
                daily_appliance_df = daily_appliance_df.pivot_table(
                    index="timestamp",
                    columns="appliance",
                    values=["active_power", "reactive_power"]
                )
                daily_appliance_df.columns = [f"{col[1]}_{col[0]}" for col in daily_appliance_df.columns]
                daily_appliance_df = daily_appliance_df.reset_index()

                if daily_total_load_df is not None:
                    daily_appliance_df = daily_appliance_df.merge(daily_total_load_df, on="timestamp", suffixes=("", "_total"))

                daily_appliance_df.to_parquet(f"{save_dir}/{house_id}_{timestamp_folder}.parquet", compression="gzip")
                print(f"✅ Processed {house_id} for day {timestamp_folder}")

                del daily_appliance_df_list, daily_total_load_df
                gc.collect()
    print(f"🎯 Finished processing {house_id}")

# def process_house(house_id):
#     house_path = os.path.join("datasets/korean_extracted", house_id)

#     if os.path.isdir(house_path):
#         print(f"🔄 Processing {house_id}...")
#         return house_id, load_house_data(house_path)

#     return house_id, None

def process_all_houses():
    extracted_data_dir = "datasets/korean_extracted"
    processed_data_dir = "datasets/korean_processed"

    os.makedirs(processed_data_dir, exist_ok=True) 
    houses = [d for d in os.listdir(extracted_data_dir) if os.path.isdir(os.path.join(extracted_data_dir, d))]
    for house_id in houses:
        house_path = os.path.join(extracted_data_dir, house_id)
        if os.path.isdir(house_path):
            print(f"📊 Processing house {house_id}...")

            house_df = load_house_data(house_path)

            if house_df is not None:
                # Save each house’s data separately
                house_df.to_parquet(f"{processed_data_dir}/{house_id}.parquet", compression="gzip")
                print(f"✅ Finished processing {house_id}")
#     with Pool(processes=4) as pool:  # Adjust based on CPU cores
#         results = pool.map(process_house, houses)

#     all_houses_data = {house_id: df for house_id, df in results if df is not None}

#     print(f"✅ Loaded {len(all_houses_data)} houses' data in parallel!")
#     return all_houses_data
# extracted_korean_path = 'datasets/korean_extracted'
# all_houses_data = {}

def resample_to_5Hz(df):
    processed_data_dir = "datasets/processed_data"
    resampled_data_dir = "datasets/resampled_data"

    os.makedirs(resampled_data_dir, exist_ok=True)

    for house_file in os.listdir(processed_data_dir):
        if house_file.endswith(".parquet"):
            house_path = os.path.join(processed_data_dir, house_file)
            house_df = pd.read_parquet(house_path)

            # Set timestamp as index and resample
            house_df = house_df.set_index("timestamp")
            house_df_5Hz = house_df.resample("200ms").mean().reset_index()

            # Save resampled data
            house_df_5Hz.to_parquet(f"{resampled_data_dir}/{house_file}", compression="gzip")
            print(f"✅ Resampled {house_file} to 5Hz")
    # df = df.set_index("timestamp")  # Ensure timestamp is the index
    # df_5Hz = df.resample("200ms").mean().reset_index()  # 5Hz resampling
    # return df_5Hz


In [5]:
def create_final_dataset():
    all_houses_data = process_all_houses()

    for house_id, df in all_houses_data.items():
        df_5Hz = resample_to_5Hz(df)
        df_5Hz.to_parquet(f"datasets/processed_data/{house_id}_5Hz.parquet", compression="gzip")
        print(f"📁 Saved downsampled data for {house_id}")

    # Merge all houses into a final dataset
    # final_dataset = pd.concat([resample_to_5Hz(df) for df in all_houses_data.values()], ignore_index=True)
    # final_dataset.to_parquet("final_microgrid_5Hz.parquet", compression="gzip")

    # print("✅ Final dataset created and saved!")

In [17]:
extract_zips()  # Step 1: Extract ZIPs (only if needed)
load_house_data("datasets/korean_extracted/enertalk-dataset-00")  # Step 2: Process each house individually
resample_to_5Hz() 

Done extracting Korean data
Timestamp: 20161101
✅ Processed enertalk-dataset-00 for day 20161101
Timestamp: 20161102
✅ Processed enertalk-dataset-00 for day 20161102
Timestamp: 20161103
✅ Processed enertalk-dataset-00 for day 20161103
Timestamp: 20161104
✅ Processed enertalk-dataset-00 for day 20161104
Timestamp: 20161105
✅ Processed enertalk-dataset-00 for day 20161105
Timestamp: 20161106
✅ Processed enertalk-dataset-00 for day 20161106
Timestamp: 20161107
✅ Processed enertalk-dataset-00 for day 20161107
Timestamp: 20161109
✅ Processed enertalk-dataset-00 for day 20161109
Timestamp: 20161110
✅ Processed enertalk-dataset-00 for day 20161110
Timestamp: 20161111
✅ Processed enertalk-dataset-00 for day 20161111
Timestamp: 20161112
✅ Processed enertalk-dataset-00 for day 20161112
Timestamp: 20161113
✅ Processed enertalk-dataset-00 for day 20161113
Timestamp: 20161114
✅ Processed enertalk-dataset-00 for day 20161114
Timestamp: 20161115
✅ Processed enertalk-dataset-00 for day 20161115
Timest

TypeError: resample_to_5Hz() missing 1 required positional argument: 'df'