In [3]:
import pandas as pd
import numpy as np

# Function to generate random dataset
def generate_test_dataset(num_rows: int) -> pd.DataFrame:
    # Generate a range of datetimes
    datetimes = pd.date_range(start="2020-01-01", periods=num_rows, freq="H").astype(str)
    
    data = {
        "Datetime": datetimes,
        "Temperature": np.random.uniform(-20, 40, num_rows),
        "Humidity": np.random.uniform(0, 100, num_rows),
        "WindSpeed": np.random.uniform(0, 32, num_rows),
        "GeneralDiffuseFlows": np.random.uniform(0, 1000, num_rows),
        "DiffuseFlows": np.random.uniform(0, 1000, num_rows),
        "PowerConsumption_Zone1": np.random.uniform(0, 10000, num_rows),
        "PowerConsumption_Zone2": np.random.uniform(0, 10000, num_rows),
        "PowerConsumption_Zone3": np.random.uniform(0, 10000, num_rows)
    }
    
    return pd.DataFrame(data)

# Generate a sample dataset
random_dataset = generate_test_dataset(100)  # Generate 100 rows of data
print(random_dataset.head())

              Datetime  Temperature   Humidity  WindSpeed  \
0  2020-01-01 00:00:00    11.829344  30.880622  30.360938   
1  2020-01-01 01:00:00    29.154886  38.133578   7.481179   
2  2020-01-01 02:00:00    23.362978  57.679549  17.435908   
3  2020-01-01 03:00:00    31.083194  83.469354  13.093886   
4  2020-01-01 04:00:00    25.229984  32.815848   7.667718   

   GeneralDiffuseFlows  DiffuseFlows  PowerConsumption_Zone1  \
0           328.591293    399.367633             5720.378957   
1           230.901116    542.826966             5583.450918   
2           404.239627    220.301485             8288.914924   
3           180.456382    274.905705             6614.632861   
4            90.626333    450.753111             2190.434571   

   PowerConsumption_Zone2  PowerConsumption_Zone3  
0             7604.581893             8777.217575  
1             5033.148829             9196.579473  
2             8152.540637             5980.977412  
3             4121.746930             86

  datetimes = pd.date_range(start="2020-01-01", periods=num_rows, freq="H").astype(str)


In [5]:
import ray 

ds = ray.data.from_pandas(random_dataset)

  return bound(*args, **kwds)
2024-06-28 15:29:58,113	INFO worker.py:1586 -- Connecting to existing Ray cluster at address: 10.0.1.209:6379...
2024-06-28 15:29:58,139	INFO worker.py:1762 -- Connected to Ray cluster. View the dashboard at [1m[32m10.0.1.209:8265 [39m[22m


In [9]:
# COMPUTING NEW FEATURES
import pandas as pd

class BatchTransformer:
    def __init__(self):
        pass

    @staticmethod
    def categorize_time_of_day(hour):
        if 6 <= hour < 12:
            return 'Morning'
        elif 12 <= hour < 18:
            return 'Afternoon'
        elif 18 <= hour < 24:
            return 'Evening'
        else:
            return 'Night'

    @staticmethod
    def categorize_season(month):
        if month in [12, 1, 2]:
            return 'Winter'
        elif month in [3, 4, 5]:
            return 'Spring'
        elif month in [6, 7, 8]:
            return 'Summer'
        else:
            return 'Autumn'

    def transform(self, batch):
        batch['Datetime'] = pd.to_datetime(batch['Datetime'])
        batch['Year'] = batch['Datetime'].dt.year
        batch['Month'] = batch['Datetime'].dt.month
        batch['Day'] = batch['Datetime'].dt.day
        batch['Hour'] = batch['Datetime'].dt.hour
        batch['TimeOfDay'] = batch['Hour'].apply(self.categorize_time_of_day)
        batch['Weekday'] = batch['Datetime'].dt.weekday
        batch['IsWeekend'] = batch['Weekday'].apply(lambda x: 1 if x >= 5 else 0)
        batch['Season'] = batch['Month'].apply(self.categorize_season)
        batch['Year'] = batch['Year'].astype(int)
        batch['Weekday'] = batch['Weekday'].astype(int)
        batch['IsWeekend'] = batch['IsWeekend'].astype(int)
        return batch

# Instantiate the transformer
transformer = BatchTransformer()

# Assuming `ds` is your Ray dataset
transformed_ds = ds.map_batches(transformer.transform, batch_format="pandas")

# _________________________________________________________
# REMOVING UNNECESSARY COLUMNS
ds_updated = transformed_ds.drop_columns(["Datetime"])

df_updated = ds_updated.to_pandas()
# df_updated.head()



# _________________________________________________________
# ENCODING THE CATEGORICAL COLUMNS
import ray
import pandas as pd

# Assuming 'ds' is your Ray dataset

# Define the function to apply one-hot encoding
def encode_categorical_columns(batch):
    # Specify the columns you want to encode
    categorical_columns = ['TimeOfDay', 'Season']
    
    # Apply one-hot encoding to the specified columns
    batch_encoded = pd.get_dummies(batch, columns=categorical_columns)
    
    # Convert the resulting DataFrame to integer type
    batch_encoded = batch_encoded.astype(int)
    
    return batch_encoded

# Apply the function to each batch of the dataset
ds_encoded = ds_updated.map_batches(encode_categorical_columns, batch_format="pandas")

# Convert the iterable returned by iter_batches to an iterator
batches_iterator = iter(ds_encoded.iter_batches(batch_size=6, batch_format="pandas"))

# Now you can use next() to get the first batch
first_batch = next(batches_iterator)

# Display the first batch
# print(first_batch)



# _________________________________________________________
df_encoded = ds_encoded.to_pandas()
# df_encoded

# import modin.pandas as pd

# # Assuming df_encoded is loaded in a way that it's a Modin DataFrame

# # Define aggregation functions
# aggregation_functions = {
#     'Temperature': ['mean'],
#     'Humidity': ['mean'],
#     'WindSpeed': ['mean'],
#     'GeneralDiffuseFlows': ['mean'],
#     'DiffuseFlows': ['mean'],
#     'PowerConsumption_Zone1': ['sum'],
#     'PowerConsumption_Zone2': ['sum'],
#     'PowerConsumption_Zone3': ['sum'],
#     # 'IsHoliday': ['first'],
#     'Weekday': ['first'],
#     'IsWeekend': ['first'],
#     'TimeOfDay_Afternoon': ['first'],
#     'TimeOfDay_Evening': ['first'],
#     'TimeOfDay_Morning': ['first'],
#     'TimeOfDay_Night': ['first'],
#     'Season_Autumn': ['first'],
#     'Season_Spring': ['first'],
#     'Season_Summer': ['first'],
#     'Season_Winter': ['first']
# }

# # Group by Year, Month, Day, Hour and aggregate
# df_grouped = df_encoded.groupby(['Year', 'Month', 'Day', 'Hour']).agg(aggregation_functions)

# # Correct column names
# df_grouped.columns = ['_'.join(col) if isinstance(col, tuple) else col for col in df_grouped.columns]
# df_grouped = df_grouped.reset_index()

# # Display the result
# # df_grouped.head(25)


# # _________________________________________________________
# # LAGGING THE COLUMNS

# import modin.pandas as pd

# # Assuming df_grouped is loaded or converted to a Modin DataFrame

# # Columns to create lag features
# columns_to_lag = [
#     'Temperature_mean', 'Humidity_mean', 'WindSpeed_mean', 'GeneralDiffuseFlows_mean', 
#     'DiffuseFlows_mean', 'PowerConsumption_Zone1_sum', 'PowerConsumption_Zone2_sum', 
#     'PowerConsumption_Zone3_sum'
# ]

# # Lag values (24, 48, 72 hours)
# lags = [4, 8, 12, 24, 48]

# # Create lag features as before
# df_lagged = df_grouped.copy()

# for col in columns_to_lag:
#     for lag in lags:
#         df_lagged[f'{col}_lag{lag}'] = df_grouped[col].shift(lag)

# # Replace NaN values with 0
# df_lagged.fillna(0, inplace=True)

# # Display the result
# # df_lagged.head()



# # _________________________________________________________
# # FEATURE SCALING

# # List of columns to scale
# cols_to_scale = [
#     "Temperature_mean", "Humidity_mean", "WindSpeed_mean", 
#     "GeneralDiffuseFlows_mean", "DiffuseFlows_mean", "PowerConsumption_Zone1_sum", "PowerConsumption_Zone2_sum", "PowerConsumption_Zone3_sum", 
#     "Temperature_mean_lag4", "Temperature_mean_lag8", "Temperature_mean_lag12", "Temperature_mean_lag24", "Temperature_mean_lag48", 
#     "Humidity_mean_lag4", "Humidity_mean_lag8", "Humidity_mean_lag12", "Humidity_mean_lag24", "Humidity_mean_lag48",
#     "WindSpeed_mean_lag4", "WindSpeed_mean_lag8", "WindSpeed_mean_lag12", "WindSpeed_mean_lag24", "WindSpeed_mean_lag48", 
#     "GeneralDiffuseFlows_mean_lag4", "GeneralDiffuseFlows_mean_lag8", "GeneralDiffuseFlows_mean_lag12", "GeneralDiffuseFlows_mean_lag24", "GeneralDiffuseFlows_mean_lag48",
#     "DiffuseFlows_mean_lag4", "DiffuseFlows_mean_lag8", "DiffuseFlows_mean_lag12", "DiffuseFlows_mean_lag24", "DiffuseFlows_mean_lag48", 
#     "PowerConsumption_Zone1_sum_lag4", "PowerConsumption_Zone1_sum_lag8", "PowerConsumption_Zone1_sum_lag12", "PowerConsumption_Zone1_sum_lag24", "PowerConsumption_Zone1_sum_lag48",
#     "PowerConsumption_Zone2_sum_lag4", "PowerConsumption_Zone2_sum_lag8", "PowerConsumption_Zone2_sum_lag12", "PowerConsumption_Zone2_sum_lag24", "PowerConsumption_Zone2_sum_lag48", 
#     "PowerConsumption_Zone3_sum_lag4", "PowerConsumption_Zone3_sum_lag8", "PowerConsumption_Zone3_sum_lag12", "PowerConsumption_Zone3_sum_lag24",  "PowerConsumption_Zone3_sum_lag48"
# ]


# # Convert the DataFrame to a Ray Dataset
# ds = ray.data.from_pandas(df_lagged)

# # Define a function to apply MinMaxScaler to a pandas DataFrame
# def scale_partition(df, cols_to_scale):
#     scaler = MinMaxScaler()
#     df[cols_to_scale] = scaler.fit_transform(df[cols_to_scale])
#     return df

# # Use Ray's map_batches to apply the scaling function across partitions
# # map_batches expects a function that operates on pandas DataFrames (or Arrow tables)
# scaled_ds = feature_ds.map_batches(lambda batch: scale_partition(batch, cols_to_scale), batch_format="pandas")

# # Optionally, convert the scaled Ray Dataset back to a pandas DataFrame
# scaled_df = scaled_ds.to_pandas()

2024-06-28 15:35:51,059	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-06-28_13-39-03_152976_1135/logs/ray-data
2024-06-28 15:35:51,059	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(BatchTransformer.transform)->MapBatches(drop_columns)]


- MapBatches(BatchTransformer.transform)->MapBatches(drop_columns) 1:   0%|             | 0/1 [00:00<?, ?it/s]

Running 0:   0%|                                                                        | 0/1 [00:00<?, ?it/s]

2024-06-28 15:35:51,131	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-06-28_13-39-03_152976_1135/logs/ray-data
2024-06-28 15:35:51,132	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(BatchTransformer.transform)->MapBatches(drop_columns)->MapBatches(encode_categorical_columns)]


- MapBatches(BatchTransformer.transform)->MapBatches(drop_columns)->MapBatches(encode_categorical_columns) 1: 

Running 0:   0%|                                                                        | 0/1 [00:00<?, ?it/s]

2024-06-28 15:35:51,280	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-06-28_13-39-03_152976_1135/logs/ray-data
2024-06-28 15:35:51,281	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(BatchTransformer.transform)->MapBatches(drop_columns)->MapBatches(encode_categorical_columns)]


- MapBatches(BatchTransformer.transform)->MapBatches(drop_columns)->MapBatches(encode_categorical_columns) 1: 

Running 0:   0%|                                                                        | 0/1 [00:00<?, ?it/s]

In [12]:
import ray
import pandas as pd

# Assuming 'ds' is your Ray dataset

# Define the function to apply one-hot encoding
def encode_categorical_columns(batch):
    # Specify the columns you want to encode
    categorical_columns = ['TimeOfDay', 'Season']
    
    # Apply one-hot encoding to the specified columns
    batch_encoded = pd.get_dummies(batch, columns=categorical_columns)
    
    # Convert the resulting DataFrame to integer type
    batch_encoded = batch_encoded.astype(int)
    
    return batch_encoded

# Apply the function to each batch of the dataset
ds_encoded = ds_updated.map_batches(encode_categorical_columns, batch_format="pandas")

# Convert the iterable returned by iter_batches to an iterator
batches_iterator = iter(ds_encoded.iter_batches(batch_size=6, batch_format="pandas"))

# Now you can use next() to get the first batch
first_batch = next(batches_iterator)

# Display the first batch
print(first_batch)
df_encoded = ds_encoded.to_pandas()
df_encoded.head()

2024-06-28 15:38:10,309	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-06-28_13-39-03_152976_1135/logs/ray-data
2024-06-28 15:38:10,311	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(BatchTransformer.transform)->MapBatches(drop_columns)->MapBatches(encode_categorical_columns)]


- MapBatches(BatchTransformer.transform)->MapBatches(drop_columns)->MapBatches(encode_categorical_columns) 1: 

Running 0:   0%|                                                                        | 0/1 [00:00<?, ?it/s]

2024-06-28 15:38:10,376	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-06-28_13-39-03_152976_1135/logs/ray-data
2024-06-28 15:38:10,376	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(BatchTransformer.transform)->MapBatches(drop_columns)->MapBatches(encode_categorical_columns)]


   Temperature  Humidity  WindSpeed  GeneralDiffuseFlows  DiffuseFlows  \
0           11        30         30                  328           399   
1           29        38          7                  230           542   
2           23        57         17                  404           220   
3           31        83         13                  180           274   
4           25        32          7                   90           450   
5           28        98         13                  185           479   

   PowerConsumption_Zone1  PowerConsumption_Zone2  PowerConsumption_Zone3  \
0                    5720                    7604                    8777   
1                    5583                    5033                    9196   
2                    8288                    8152                    5980   
3                    6614                    4121                    8621   
4                    2190                    8599                    2999   
5                  

- MapBatches(BatchTransformer.transform)->MapBatches(drop_columns)->MapBatches(encode_categorical_columns) 1: 

Running 0:   0%|                                                                        | 0/1 [00:00<?, ?it/s]

Unnamed: 0,Temperature,Humidity,WindSpeed,GeneralDiffuseFlows,DiffuseFlows,PowerConsumption_Zone1,PowerConsumption_Zone2,PowerConsumption_Zone3,Year,Month,Day,Hour,Weekday,IsWeekend,TimeOfDay_Afternoon,TimeOfDay_Evening,TimeOfDay_Morning,TimeOfDay_Night,Season_Winter
0,11,30,30,328,399,5720,7604,8777,2020,1,1,0,2,0,0,0,0,1,1
1,29,38,7,230,542,5583,5033,9196,2020,1,1,1,2,0,0,0,0,1,1
2,23,57,17,404,220,8288,8152,5980,2020,1,1,2,2,0,0,0,0,1,1
3,31,83,13,180,274,6614,4121,8621,2020,1,1,3,2,0,0,0,0,1,1
4,25,32,7,90,450,2190,8599,2999,2020,1,1,4,2,0,0,0,0,1,1
