# Londonn Smart Meters (Simulation with TensorFlow/Keras)


In [1]:
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow import convert_to_tensor

import flwr as fl
from flwr.common import Metrics 
# Evaluation metrics that can be returned by clients or aggregated by the server during the federated learning process.


from flwr.simulation.ray_transport.utils import enable_tf_gpu_growth # to enable TensorFlow to grow its GPU memory allocation dynamically

import tensorflow as tf

VERBOSE = 0


2024-07-24 16:16:42,445	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


In [2]:
def get_model():
    model = Sequential()
    model.add(LSTM(20, return_sequences=True, input_shape=(None, 1))) # This adds an LSTM layer with 50 neurons (units)
    model.add(Dropout(0.2))
    model.add(LSTM(15, return_sequences=True))
    model.add(Dropout(0.2))
    model.add(LSTM(10, return_sequences=True))
    model.add(Dropout(0.2))
    model.add(LSTM(5))   # 4th LSTM layer
    model.add(Dropout(0.2))
    model.add(Dense(1))

    model.compile(optimizer=Adam(learning_rate=0.01), loss='mean_squared_error', metrics=['mean_absolute_error'])
    return model

In [3]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, x_train, y_train, x_val, y_val) -> None:
        # Create model
        self.model = get_model()
        self.x_train, self.y_train = x_train, y_train
        self.x_val, self.y_val = x_val, y_val

    def get_parameters(self, config):
        return self.model.get_weights()

    def fit(self, parameters, config):
        """Train parameters on the locally held training set."""

        # Update local model parameters
        self.model.set_weights(parameters)

        # Get hyperparameters for this round
        #batch_size: int = config["batch_size"]
        #epochs: int = config["local_epochs"]

        # Train the model using hyperparameters from config
        history = self.model.fit(
            self.x_train,
            self.y_train,
            batch_size = 512,
            epochs = 10,
            verbose=0
        )

        # Return updated model parameters and results
        parameters_prime = self.model.get_weights()
        num_examples_train = len(self.x_train)
        results = {
            "loss": history.history["loss"][0],
            "accuracy": history.history["mean_absolute_error"][0]
        }
        return parameters_prime, num_examples_train, results

    def evaluate(self, parameters, config):
        """Evaluate parameters on the locally held test set."""

        # Update local model with global parameters
        self.model.set_weights(parameters)

        # Get config values
        #steps: int = config["val_steps"]

        # Evaluate global model parameters on the local test data and return results
        results = self.model.evaluate(self.x_val, self.y_val, 32)#, steps=steps)
        num_examples_test = len(self.x_val)
        return results[0], num_examples_test, {"accuracy": results[1]}

In order to use preprocessed dataset with only LCLid (unique consumer ID) and KWH/hh (per hour) (energy consumption per hour) columns replace the following cell with the code snipped below:

hourly_data = pd.read_csv("Preprocessed_data2013_2.csv", dtype={'LCLid': np.int16, 'KWH/hh (per hour) ': np.float64})


In [4]:
# Read the data
hourly_data = pd.read_csv("Preprocessed_data2013.csv", dtype={'LCLid': np.int16, 'KWH/hh (per hour) ': np.float64, 'dayoftheyear': np.int16,
       'hour': np.int8, 'is_weekend': np.int8})

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 37078054 entries, 0 to 37078053
Data columns (total 5 columns):
 #   Column              Dtype  
---  ------              -----  
 0   LCLid               int16  
 1   dayoftheyear        int16  
 2   hour                int8   
 3   is_weekend          int8   
 4   KWH/hh (per hour)   float64
dtypes: float64(1), int16(2), int8(2)
memory usage: 495.0 MB


In [6]:
# Change NUM_CLIENTS to 30 for FL with Stacked LSTM Model where number of clients 30 and number of meters are 100
NUM_OF_METERS =   hourly_data['LCLid'].max() #  100
# write hourly_data['LCLid'].max() to include all meters available in the dataset
NUM_CLIENTS = NUM_OF_METERS
# Define number of meters to be used in the project 


In [7]:
def train_test_validate(data):

    # Initialize train and validation 
    train_l = []
    val_l = []

    # Counter of elements inside the partition 
    part_counter = 0

    # Number of meters that each of the clients will have (note: last client will have more or equal than others)
    part_size = int(NUM_OF_METERS / NUM_CLIENTS)

    # Initialize counter for the number of partitions 
    part_id = 0

    for i in range(0, NUM_OF_METERS):

        # Get the data for current meter
        tmp_data = data[data['LCLid'] == i]

        # Split index between train and validate
        val_split = int(len(tmp_data) * 0.8)
        # Split index between validate and test
        test_split = int(len(tmp_data) * 0.9)

        # Set initial splits for current meter
        train_ = tmp_data[:val_split]
        vali_ = tmp_data[val_split:test_split]
        test_ = tmp_data[test_split:]

        # Concatanate the test data 
        if (i > 0):
            test = pd.concat([test, test_], ignore_index=True)
        else:
            test = test_

        # Concatanate train and validation inside current partition  
        if (part_counter > 0):
            train = pd.concat([train, train_], ignore_index=True)
            valid = pd.concat([valid, vali_], ignore_index=True)
        else: 
            train = train_
            valid = vali_
            
        # Increase the counter inside current partition 
        part_counter += 1
        if (part_counter >= part_size) and (part_id < NUM_CLIENTS - 1):

            # Reset counter inside current partition for the next one 
            part_counter = 0

            # Append validation and train data to the partition they belong to   
            train_l.append(train.reset_index(drop=True))
            val_l.append(valid.reset_index(drop=True))

            # Increase partition counter
            part_id += 1

            # Reset values 
            train = pd.DataFrame()
            valid = pd.DataFrame()

    if (part_counter != 0):
        
        # Append train and validation to the last partition
        train_l.append(train)
        val_l.append(valid)

    return {"train": train_l, "test": test.reset_index(drop=True), "validation": val_l}

In [8]:
def get_client_fn(partition):
    """Return a function to construct a client.

    The VirtualClientEngine will execute this function whenever a client is sampled by
    the strategy to participate.
    """

    def client_fn(cid: str) -> fl.client.Client:
        """Construct a FlowerClient with its own dataset partition."""
        
        # Extract partition for client with id = cid
        trainset = partition["train"][int(cid)]
        valset = partition["validation"][int(cid)] 

        # Split into features and targets and transform into tensors 
        x_train_tensor = tf.convert_to_tensor(np.asarray(trainset.drop(columns=['KWH/hh (per hour) '])))
        y_train_tensor = tf.convert_to_tensor(np.asarray(trainset['KWH/hh (per hour) ']))
        x_val_tensor = tf.convert_to_tensor(np.asarray(valset.drop(columns=['KWH/hh (per hour) '])))
        y_val_tensor = tf.convert_to_tensor(np.asarray(valset['KWH/hh (per hour) ']))

        # Create and return client
        return FlowerClient(x_train_tensor, y_train_tensor, x_val_tensor, y_val_tensor).to_client()

    return client_fn


def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
    """Aggregation function for (federated) evaluation metrics, i.e. those returned by
    the client's evaluate() method."""
    # Multiply accuracy of each client by number of examples used
    accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics]
    examples = [num_examples for num_examples, _ in metrics]

    # Aggregate and return custom metric (weighted average)
    return {"accuracy": sum(accuracies) / sum(examples)}


def get_evaluate_fn(testset):
    """Return an evaluation function for server-side (i.e. centralised) evaluation."""

    # The `evaluate` function will be called after every round by the strategy
    def evaluate(
        server_round: int,
        parameters: fl.common.NDArrays,
        config: Dict[str, fl.common.Scalar],
    ):
        model = get_model()  # Construct the model
        model.set_weights(parameters)  # Update model with the latest parameters
        results = model.evaluate(tf.convert_to_tensor(np.asarray(testset.drop(columns=['KWH/hh (per hour) ']))), 
                                 tf.convert_to_tensor(np.asarray(testset['KWH/hh (per hour) '])), 
                                 verbose=VERBOSE)
        return results[0], {"accuracy": results[1]}

    return evaluate

In [9]:
from flwr.simulation.ray_transport.utils import enable_tf_gpu_growth
# Enable GPU growth in your main process
enable_tf_gpu_growth()

df_dataset = train_test_validate(hourly_data)

# Get the whole test set for centralised evaluation
centralized_testset = df_dataset["test"]


In [10]:
# Create FedAvg strategy
strategy = fl.server.strategy.FedAvg(
    fraction_fit=0.5,  # Sample 10% of available clients for training
    fraction_evaluate=0.1,  # Sample 5% of available clients for evaluation
    min_fit_clients=40,  # Never sample less than 10 clients for training
    min_evaluate_clients=10,  # Never sample less than 5 clients for evaluation
    min_available_clients=int(
        NUM_CLIENTS * 0.75
    ),  # Wait until at least 75 clients are available
    evaluate_metrics_aggregation_fn=weighted_average,  # aggregates federated metrics
    evaluate_fn=get_evaluate_fn(centralized_testset),  # global evaluation function
)

# With a dictionary, you tell Flower's VirtualClientEngine that each
# client needs exclusive access to these many resources in order to run
client_resources = {"num_cpus": 8.0}

# Start simulation
history = fl.simulation.start_simulation(
    client_fn=get_client_fn(df_dataset),
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=10),
    strategy=strategy,
    client_resources=client_resources,
    actor_kwargs={
            "on_actor_init_fn": enable_tf_gpu_growth  # Enable GPU growth upon actor init
            # does nothing if `num_gpus` in client_resources is 0.0
        },
)

[92mINFO [0m:      Starting Flower simulation, config: num_rounds=10, no round_timeout
2024-07-24 16:19:03,023	INFO worker.py:1752 -- Started a local Ray instance.
[92mINFO [0m:      Flower VCE: Ray initialized with resources: {'object_store_memory': 4332198297.0, 'memory': 8664396596.0, 'node:127.0.0.1': 1.0, 'GPU': 1.0, 'node:__internal_head__': 1.0, 'CPU': 12.0, 'accelerator_type:G': 1.0}
[92mINFO [0m:      Optimize your simulation with Flower VCE: https://flower.ai/docs/framework/how-to-run-simulations.html
[92mINFO [0m:      Flower VCE: Resources for each Virtual Client: {'num_cpus': 8.0}
[92mINFO [0m:      Flower VCE: Creating VirtualClientEngineActorPool with 1 actors
[92mINFO [0m:      [INIT]
[92mINFO [0m:      Requesting initial parameters from one random client
[36m(pid=38528)[0m 2024-07-24 16:19:06.918708: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off error

In [None]:
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

print(f"{history.metrics_centralized = }")

global_accuracy_centralised = history.metrics_distributed["accuracy"]
global_loss_centralised = history.losses_distributed
rounds = [data[0] for data in global_accuracy_centralised]
loss = [data[1] for data in global_loss_centralised]
acc = [100.0 * data[1] for data in global_accuracy_centralised]

fig = make_subplots(rows=2, cols=1, shared_xaxes=True, 
                    subplot_titles=("MEAN ABSOLUTE ERROR", "MEAN SQUARED ERROR"))

# Add scatter plot for accuracy
fig.add_trace(go.Scatter(x=rounds, y=acc, mode='markers', name='MAE'), row=1, col=1)
# Add line plot for accuracy
fig.add_trace(go.Scatter(x=rounds, y=acc, mode='lines', name='MAE Line'), row=1, col=1)

# Add scatter plot for loss
fig.add_trace(go.Scatter(x=rounds, y=loss, mode='markers', name='MSE'), row=2, col=1)
# Add line plot for loss
fig.add_trace(go.Scatter(x=rounds, y=loss, mode='lines', name='MSE Line'), row=2, col=1)

# Update layout
fig.update_layout(
    height=800,  # Height of the figure
    title_text=f"SMART METERS - {NUM_CLIENTS} clients with {int(0.5 * NUM_CLIENTS)} sampled clients per round",
)

# Update x-axis for all subplots
fig.update_xaxes(title_text="Round", row=2, col=1)
# Update y-axis for each subplot
fig.update_yaxes(title_text="MEAN ABSOLUTE ERROR", row=1, col=1)
fig.update_yaxes(title_text="MEAN SQUARED ERROR", row=2, col=1)

# Show the plot
fig.show()