In [25]:
import pandas as pd
import numpy as np
import os
import shutil
import math
import json


# Command-line argument parsing
#parser = argparse.ArgumentParser(description="Generate test sets based on device difficulty.")
#parser.add_argument('--num_devices', type=int, default=2, help='Total number of devices to include across both difficulty groups (easy and hard).')

#args = parser.parse_args()
num_output_devices = 12 #args.num_devices

# Folder Paths
device_folder = "deviceFiles"
price_folder = "priceFiles"
solar_folder = "solarFiles"

output_folder_a = f"outputs/{num_output_devices}/output_folder_a_{num_output_devices}"
output_folder_b = f"outputs/{num_output_devices}/output_folder_b_{num_output_devices}"
output_folder_c = f"outputs/{num_output_devices}/output_folder_c_{num_output_devices}"
output_folder_d = f"outputs/{num_output_devices}/output_folder_d_{num_output_devices}"
output_folder_e = f"outputs/{num_output_devices}/output_folder_e_{num_output_devices}"

os.makedirs(output_folder_a, exist_ok=True)
os.makedirs(output_folder_b, exist_ok=True)
os.makedirs(output_folder_c, exist_ok=True)
os.makedirs(output_folder_d, exist_ok=True)
os.makedirs(output_folder_e, exist_ok=True)

price_files = os.listdir(price_folder)
solar_files = os.listdir(solar_folder)

# Choose random price file, and its corresponding solar file.
price_file = price_files[np.random.randint(len(price_files))]
solar_file = price_file

# Devices
device_files = os.listdir(device_folder)

devices_stats = []

for device_file in device_files:
    device_df = pd.read_csv(os.path.join(device_folder, device_file))
    
    # Calculate criteria
    duration = device_df['value'].count()         # Low duration is harder to plan
    peak = device_df['value'].max()               # Variability in peak needed so that different devices are used
    variability = device_df['value'].std()        # High variability so that lots of different device types are used
    
    # Store the results in a list along with the filename
    devices_stats.append({
        'filename': device_file,
        'duration': duration,
        'peak': peak,
        'variability': variability,
    })

# Step 1: Sort the devices based on peak power
devices_stats_sorted_peak = sorted(devices_stats, key=lambda x: x['peak'], reverse=True)

# Step 2: Divide the devices into 4 quartiles based on peak power
num_devices = len(devices_stats_sorted_peak)
quartile_size = num_devices // 4

# Ensure we have exactly 4 quartiles, some may have more devices if not perfectly divisible
quartiles = [
    devices_stats_sorted_peak[i:i + quartile_size]
    for i in range(0, num_devices, quartile_size)
]

# Step 3: Sort each quartile based on standard deviation
for i, quartile in enumerate(quartiles):
    quartiles[i] = sorted(quartile, key=lambda x: x['variability'], reverse=True)

# a: 1st, 2nd, 3rd, 4th quartile always take first device (this makes test sets with peak power variability between the devices, and variability within the devices)
group_a = []

# deep copy the quartiles list to tmp
quartiles_tmp = []
for quartile in quartiles:
    quartiles_tmp.append(quartile.copy())

while True:
    for i, quartile in enumerate(quartiles_tmp):
        if quartile:  # Ensure the quartile is not empty before popping
            group_a.append(quartile.pop(0))
            if len(group_a) == num_output_devices:
                break
    if len(group_a) == num_output_devices:
        break

# Copy files to output folder a
for device in group_a:
    shutil.copy(os.path.join(device_folder, device['filename']), output_folder_a)

# b: 1st, 2nd, 3rd, 4th quartile always take last device (this makes test sets with peak power variability between the devices, and little variability within the devices)
group_b = []

# deep copy the quartiles list to tmp
quartiles_tmp = []
for quartile in quartiles:
    quartiles_tmp.append(quartile.copy())

while True:
    for i, quartile in enumerate(quartiles_tmp):
        if quartile:  # Ensure the quartile is not empty before popping
            group_b.append(quartile.pop(-1))
            if len(group_b) == num_output_devices:
                break
    if len(group_b) == num_output_devices:
        break

# Copy files to output folder b
for device in group_b:
    shutil.copy(os.path.join(device_folder, device['filename']), output_folder_b)

# c: Ordered devices with highest standard deviation first
group_c = []

devices_stats_sorted_variability = sorted(devices_stats, key=lambda x: x['variability'], reverse=True)

# adding num_output_devices devices with highest variability to group_c
for i in range(num_output_devices):
    group_c.append(devices_stats_sorted_variability[i]) 

# Copy files to output folder c
for device in group_c:
    shutil.copy(os.path.join(device_folder, device['filename']), output_folder_c)

# d: Ordered devices with lowest standard deviation first
group_d = []

devices_stats_sorted_variability = sorted(devices_stats, key=lambda x: x['variability'])

# adding num_output_devices devices with lowest variability to group_d
for i in range(num_output_devices):
    group_d.append(devices_stats_sorted_variability[i])

# Copy files to output folder d
for device in group_d:
    shutil.copy(os.path.join(device_folder, device['filename']), output_folder_d)

# e: Randomized set of devices
group_e = []

# Randomly shuffle the devices
np.random.shuffle(devices_stats)

# adding num_output_devices devices to group_e
for i in range(num_output_devices):
    group_e.append(devices_stats[i])

# Copy files to output folder e
for device in group_e:
    shutil.copy(os.path.join(device_folder, device['filename']), output_folder_e)

print("Files copied to output folders successfully.")

# Calculate standard deviation for all price files
def calculate_std_deviation(prices):
    mean_price = sum(prices) / len(prices)
    variance = sum((price - mean_price) ** 2 for price in prices) / len(prices)
    return math.sqrt(variance)

price_std_devs = []

for price_file in price_files:
    with open(os.path.join(price_folder, price_file), 'r') as file:
        data = json.load(file)
        prices = [entry['Price'] for entry in data]
        std_deviation = calculate_std_deviation(prices)
        price_std_devs.append((price_file, std_deviation))

# Find the file with the highest and lowest standard deviation
highest_std_file = max(price_std_devs, key=lambda x: x[1])
lowest_std_file = min(price_std_devs, key=lambda x: x[1])

print(f'\n\nFile with the highest standard deviation: {highest_std_file[0]} ({highest_std_file[1]})')
print(f'File with the lowest standard deviation: {lowest_std_file[0]} ({lowest_std_file[1]})\n\n')

# Calculate solar order
def calculate_solar_metric(solar_values):
    differences = np.abs(np.diff(solar_values))
    total_difference = np.sum(differences)
    peak_power = np.max(solar_values)
    metric = total_difference - (2 * peak_power)
    return metric

solar_metrics = []

for solar_file in solar_files:
    if not solar_file.startswith('_'):
        solar_df = pd.read_csv(os.path.join(solar_folder, solar_file), delimiter=';')
        solar_values = solar_df['Day-ahead 6PM forecast'].dropna().astype(float).values
        metric = calculate_solar_metric(solar_values)
        solar_metrics.append((solar_file, metric))

# Find the file with the highest and lowest metric
highest_metric_file = max(solar_metrics, key=lambda x: x[1])
lowest_metric_file = min(solar_metrics, key=lambda x: x[1])

print(f'File with the highest solar metric: {highest_metric_file[0]} ({highest_metric_file[1]})')
print(f'File with the lowest solar metric: {lowest_metric_file[0]} ({lowest_metric_file[1]})')


Files copied to output folders successfully.


File with the highest standard deviation: pricing.json (30.680415667907035)
File with the lowest standard deviation: pricingFlat.json (0.0)


File with the highest solar metric: 15_may_normalized.csv (15.563999999999965)
File with the lowest solar metric: 11_may_normalized.csv (0.0)
