In [11]:
from typing import Optional, Tuple
import logging
import yaml
import glob
import pandas as pd

## Load variables from config file

In [12]:
def load_config(config_path) -> dict:
    with open(config_path, "r") as config_file:
        return yaml.safe_load(config_file)

config = load_config("config.yaml")

for key in ['training_start_date', 'training_end_date', 'validation_end_date', 'test_end_date']:
    config[key] = pd.to_datetime(config[key]).tz_localize('UTC')


## Load data

In [None]:
def add_time_features(df, time_col):
    df['day_of_week'] = df[time_col].dt.dayofweek
    df['day_of_month'] = df[time_col].dt.day
    df['month'] = df[time_col].dt.month
    return df

In [13]:
def process_region(region, config) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame]]:
    try:
        instance_info_file = f"{config['data_folder']}/instance_info_{region}.csv"
        prices_files = glob.glob(f"{config['data_folder']}/prices_{region}_*.csv")
        
        instance_info_df = pd.read_csv(instance_info_file)
        instance_info_df['id_instance'] = instance_info_df['id']
        instance_info_df = instance_info_df.set_index('id_instance')
        instance_info_df = instance_info_df.drop('id', axis=1)
        
        prices_df_list = []
        for file in prices_files:
            df = pd.read_csv(file)
            df[config['time_col']] = pd.to_datetime(df[config['time_col']]).dt.floor('4h')
            prices_df_list.append(df)
        
        prices_df = pd.concat(prices_df_list, ignore_index=True)
        prices_df = prices_df[(prices_df[config['time_col']] >= config['training_start_date']) & 
                              (prices_df[config['time_col']] <= config['test_end_date'])]
        
        complete_time_range = pd.date_range(start=config['training_start_date'], 
                                            end=config['test_end_date'], freq='4h')
        
        id_instances = prices_df['id_instance'].unique()
        complete_time_df = pd.DataFrame({config['time_col']: complete_time_range})
        complete_time_df = complete_time_df.assign(key=1).merge(
            pd.DataFrame({'id_instance': id_instances, 'key': 1}), on='key').drop('key', axis=1)
        
        prices_df = pd.merge(complete_time_df, prices_df, on=[config['time_col'], 'id_instance'], how='left')
        prices_df[config['target_col']] = prices_df.groupby('id_instance')[config['target_col']].ffill()
        
        grouped_prices_df = prices_df.groupby(['id_instance', config['time_col']]).agg({config['target_col']: 'mean'}).reset_index()
        grouped_prices_df = prices_df.dropna(subset=[config['target_col']])

        instance_info_df['modifiers'] = instance_info_df['modifiers'].fillna('').apply(lambda x: sorted(list(x)))
        
        return instance_info_df, grouped_prices_df
    except Exception as e:
        print(f"Error processing region {region}: {str(e)}")
        return None, None

In [14]:
prices_dfs = []
instance_info_dfs = []
for region in config['regions']:
    instance_info_df, prices_df = process_region(region, config)
    if prices_df is not None:
        prices_dfs.append(prices_df)
        instance_info_dfs.append(instance_info_df)

combined_prices_df = pd.concat(prices_dfs, ignore_index=True)
combined_instance_info_df = pd.concat(instance_info_dfs)

# Merge with instance info DataFrame
general_df = pd.merge(combined_instance_info_df, combined_prices_df, on='id_instance', how='right')

# Data validation
assert not general_df.empty, "The resulting dataframe is empty"
assert general_df[config['target_col']].notna().all(), f"Missing values in {config['target_col']}"
assert general_df[config['time_col']].notna().all(), f"Missing values in {config['time_col']}"

print(general_df.head())
print(f"Data loaded. Total number of instances: {general_df['id_instance'].nunique()}. Total number of records: {general_df.shape[0]}. Number of features: {general_df.shape[1]}")  

training_df = general_df[(general_df[config['time_col']] >= config['training_start_date']) & (general_df[config['time_col']] <= config['training_end_date'])]
validation_df = general_df[(general_df[config['time_col']] > config['training_end_date']) & (general_df[config['time_col']] <= config['validation_end_date'])]
test_df = general_df[(general_df[config['time_col']] > config['validation_end_date']) & (general_df[config['time_col']] <= config['test_end_date'])]

# tener en cuenta instancias que no tienen datos en el periodo de entrenamiento. buen test para el modelo

Data loaded. Total number of instances: 3596. Total number of records: 6022485. Number of features: 15


In [15]:
print(general_df.head())


   id_instance          region av_zone  instance_type instance_family  \
0        11900  ap-northeast-1       c    r6i.2xlarge               r   
1        39977  ap-northeast-1       a  r7iz.32xlarge               r   
2         2192  ap-northeast-1       c   r7a.24xlarge               r   
3        44472  ap-northeast-1       d   m5n.24xlarge               m   
4        48316  ap-northeast-1       d   m7a.48xlarge               m   

   generation modifiers  size  vcpu     memory architectures  \
0           6       [i]   2.0     8    65536.0    ['x86_64']   
1           7    [i, z]  32.0   128  1048576.0    ['x86_64']   
2           7       [a]  24.0    96   786432.0    ['x86_64']   
3           5       [n]  24.0    96   393216.0    ['x86_64']   
4           7       [a]  48.0   192   786432.0    ['x86_64']   

  product_description  on_demand_price           price_timestamp  spot_price  
0          Linux/UNIX          0.60800 2024-08-01 00:00:00+00:00      0.2599  
1          Linux/U