In [1]:
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import os.path as path

from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Embedding, Flatten
from keras.layers import LSTM, SimpleRNN, GRU, Bidirectional, BatchNormalization, Conv1D, MaxPooling1D, Reshape, GlobalAveragePooling1D

2023-08-06 16:30:21.312483: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-08-06 16:30:21.647335: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
ROOT_DIR = path.abspath("../../data/datasets")

def csvfile(root_dir, train):
    # UNSW_NB15_testing-set.csv is actually more suitable for training because it has more data
    return path.join(root_dir, "UNSW_NB15_" + ("testing" if train else "training") + "-set.csv")

train = pd.read_csv(csvfile(ROOT_DIR, True))
test = pd.read_csv(csvfile(ROOT_DIR, False))

## Config

In [3]:
NUM_CLIENTS = 3
S_ADDR = "127.0.0.1:8080"
DEBUG = 0
MODEL_CKPT_DIR=path.abspath("../../model_ckpt/")

## Global Vars

In [4]:
fed_session = 1
histories = []

## Start to Preprocess

In [5]:
list_drop = ['id', 'attack_cat']
train = train.drop(list_drop, axis=1)
test = test.drop(list_drop, axis=1)

df = pd.concat([train, test])
len(df)

257673

## Removing outliers

In [6]:
# Select numeric categories
df_numeric = df.select_dtypes(include=[np.number])
df_numeric.describe(include='all')

# Remove outliers
for feature in df_numeric.columns:
    if DEBUG == 1:
        print(feature)
        print('max = '+str(df_numeric[feature].max()))
        print('75th = '+str(df_numeric[feature].quantile(0.95)))
        print('median = '+str(df_numeric[feature].median()))
        print(df_numeric[feature].max()>10*df_numeric[feature].median())
        print('----------------------------------------------------')
    if df_numeric[feature].max()>10*df_numeric[feature].median() and df_numeric[feature].max()>10 :
        df[feature] = np.where(df[feature] < df[feature].quantile(0.95), df[feature], df[feature].quantile(0.95))

df

Unnamed: 0,dur,proto,service,state,spkts,dpkts,sbytes,dbytes,rate,sttl,...,ct_src_dport_ltm,ct_dst_sport_ltm,ct_dst_src_ltm,is_ftp_login,ct_ftp_cmd,ct_flw_http_mthd,ct_src_ltm,ct_srv_dst,is_sm_ips_ports,label
0,0.121478,tcp,-,FIN,6.0,4.0,258.0,172.0,74.087490,252,...,1.0,1.0,1.0,0,0,0.0,1.0,1.0,0,0
1,0.649902,tcp,-,FIN,14.0,38.0,734.0,33044.0,78.473372,62,...,1.0,1.0,2.0,0,0,0.0,1.0,6.0,0,0
2,1.623129,tcp,-,FIN,8.0,16.0,364.0,13186.0,14.170161,62,...,1.0,1.0,3.0,0,0,0.0,2.0,6.0,0,0
3,1.681642,tcp,ftp,FIN,12.0,12.0,628.0,770.0,13.677108,62,...,1.0,1.0,3.0,1,1,0.0,2.0,1.0,0,0
4,0.449454,tcp,-,FIN,10.0,6.0,534.0,268.0,33.373826,254,...,2.0,1.0,34.0,0,0,0.0,2.0,34.0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
82327,0.000005,udp,-,INT,2.0,0.0,104.0,0.0,200000.005100,254,...,1.0,1.0,2.0,0,0,0.0,2.0,1.0,0,0
82328,1.106101,tcp,-,FIN,20.0,8.0,13454.0,354.0,24.410067,254,...,1.0,1.0,1.0,0,0,0.0,3.0,2.0,0,0
82329,0.000000,arp,-,INT,1.0,0.0,46.0,0.0,0.000000,0,...,1.0,1.0,1.0,0,0,0.0,1.0,1.0,1,0
82330,0.000000,arp,-,INT,1.0,0.0,46.0,0.0,0.000000,0,...,1.0,1.0,1.0,0,0,0.0,1.0,1.0,1,0


## Other unused pruning

In [7]:

# # Apply log to features > 50 unique values.
# df_numeric = df.select_dtypes(include=[np.number])
# for feature in df_numeric.columns:
#     if DEBUG == 1:
#         print(feature)
#         print('nunique = '+str(df_numeric[feature].nunique()))
#         print(df_numeric[feature].nunique()>50)
#         print('----------------------------------------------------')
#     if df_numeric[feature].nunique()>50:
#         if df_numeric[feature].min()==0:
#             df[feature] = np.log(df[feature]+1)
#         else:
#             df[feature] = np.log(df[feature])

# # Reduce labels of categorical features
# df_cat = df.select_dtypes(exclude=[np.number])
# for feature in df_cat.columns:
#     if DEBUG == 1:
#         print(feature)
#         print('nunique = '+str(df_cat[feature].nunique()))
#         print(df_cat[feature].nunique()>6)
#         print(df[feature].value_counts().head().index)
#         print(sum(df[feature].isin(df[feature].value_counts().head().index)))
#         print('----------------------------------------------------')
    
#     if df_cat[feature].nunique()>6:
#         df[feature] = np.where(df[feature].isin(df[feature].value_counts().head().index), df[feature], '-')


## Encoding and Normalisations

In [8]:

# One hot encoding
cols = ['proto', 'service', 'state']
for each in cols:
    dummies = pd.get_dummies(df[each], prefix=each, drop_first=False)
    df = pd.concat([df, dummies], axis=1)
    df = df.drop(each, axis=1)


In [9]:

# Normalise
#Function to min-max normalize
def normalize(df, cols):
    """
    @param df pandas DataFrame
    @param cols a list of columns to encode
    @return a DataFrame with normalized specified features
    """
    result = df.copy() # do not touch the original df
    for feature_name in cols:
        max_value = df[feature_name].astype('float').max()
        min_value = df[feature_name].astype('float').min()
        if max_value > min_value:
            result[feature_name] = (df[feature_name].astype('float') - min_value) / (max_value - min_value)
    return result

new_train_df = normalize(df, df.columns)
new_train_df

Unnamed: 0,dur,spkts,dpkts,sbytes,dbytes,rate,sttl,dttl,sload,dload,...,state_CLO,state_CON,state_ECO,state_FIN,state_INT,state_PAR,state_REQ,state_RST,state_URN,state_no
0,0.043209,0.081967,0.064516,0.017424,0.005205,0.000222,0.988235,1.000000,0.000053,0.002142,...,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
1,0.231166,0.213115,0.612903,0.052867,1.000000,0.000235,0.243137,0.992126,0.000031,0.126990,...,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
2,0.577335,0.114754,0.258065,0.025316,0.399044,0.000043,0.243137,0.992126,0.000006,0.015365,...,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
3,0.598148,0.180328,0.193548,0.044974,0.023302,0.000041,0.243137,0.992126,0.000010,0.000847,...,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
4,0.159868,0.147541,0.096774,0.037975,0.008110,0.000100,0.996078,0.992126,0.000032,0.001005,...,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
82327,0.000002,0.016393,0.000000,0.005957,0.000000,0.600000,0.996078,0.000000,0.312000,0.000000,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
82328,0.393432,0.311475,0.129032,1.000000,0.010713,0.000073,0.996078,0.992126,0.000465,0.000565,...,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
82329,0.000000,0.000000,0.000000,0.001638,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
82330,0.000000,0.000000,0.000000,0.001638,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0


## Resulting DataFrame

In [10]:
def partition(num_clients: int, cid: int, df: pd.DataFrame):
    n = len(df)
    div = n // num_clients
    start = (cid - 1) * div
    end = (cid) * div
    part = df.iloc[start:, :] if cid == num_clients else df.iloc[start:end, :]
    y = part["label"]
    X = part.drop(["label"], axis=1)
    return train_test_split(X, y, test_size=0.3, random_state=None)

## K-Fold validation

In [11]:
from sklearn.model_selection import cross_validate
from imblearn.over_sampling import RandomOverSampler
oversample = RandomOverSampler(sampling_strategy='minority')

# kfold = StratifiedKFold(n_splits=2,shuffle=True,random_state=42)
# kfold.get_n_splits(X_train, y_train)

## Model

In [12]:
import keras.backend as K

def sensitivity(y_true, y_pred): 
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    return true_positives / (possible_positives + K.epsilon())

def specificity(y_true, y_pred): 
    true_positives = K.sum(K.round(K.clip(1 - y_true * 1 - y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(1 - y_true, 0, 1)))
    return true_positives / (possible_positives + K.epsilon())

In [13]:
batch_size = 32

def get_big_model():
    model = Sequential()
    model.add(Conv1D(64, kernel_size=64, padding="same",activation="relu",input_shape=(196, 1)))
    model.add(MaxPooling1D(pool_size=(10)))
    model.add(BatchNormalization())
    model.add(Bidirectional(LSTM(64, return_sequences=False)))
    model.add(Reshape((128, 1), input_shape = (128, )))
    model.add(MaxPooling1D(pool_size=(5)))
    model.add(BatchNormalization())
    model.add(Bidirectional(LSTM(128, return_sequences=False)))
    model.add(Dropout(0.6))
    model.add(Dense(1))
    model.add(Activation('sigmoid'))
    model.compile(loss='binary_crossentropy',optimizer='adam',metrics=['accuracy', sensitivity, specificity])
    return model

def get_med_cnn():
    model = Sequential()
    model.add(Conv1D(64, kernel_size=64, padding='same', activation='relu', input_shape=(196, 1)))
    model.add(MaxPooling1D(pool_size=(10)))
    model.add(BatchNormalization())
    model.add(Bidirectional(LSTM(64, return_sequences=False)))
    model.add(Dropout(0.6))
    model.add(Dense(1))
    model.add(Activation('sigmoid'))
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy', sensitivity, specificity])
    return model

In [14]:
# from sklearn import metrics
# from sklearn.svm import LinearSVC


# oos_pred = []
# model = LinearSVC(gamma=0.6)

# cross_validate(model, X, y, cv=10)
# for train_index, test_index in kfold.split(X_train, y_train):
#     train_X, test_X = X_train.iloc[train_index], X_train.iloc[test_index]
#     train_y, test_y = y_train.iloc[train_index], y_train.iloc[test_index]
    
#     print("train index:", train_index)
#     print("test index:", test_index)
#     print(train_y.value_counts())
    
#     train_X_over,train_y_over= oversample.fit_resample(train_X, train_y)
#     print(train_y_over.value_counts())
    
#     x_columns_train = new_train_df.columns.drop('label')
#     x_train_array = train_X_over[x_columns_train].values
#     x_train_1=np.reshape(x_train_array, (x_train_array.shape[0], x_train_array.shape[1], 1))
    
#     y_train_1 = train_y_over.values # Classification
    
#     x_columns_test = new_train_df.columns.drop('label')
#     x_test_array = test_X[x_columns_test].values
#     x_test_2=np.reshape(x_test_array, (x_test_array.shape[0], x_test_array.shape[1], 1))
    
#     y_test_2 = test_y.values # Classification
    

#     model.fit(x_train_1, y_train_1,validation_data=(x_test_2,y_test_2), epochs=15)
    
#     pred = model.predict(x_test_2)
#     pred = np.argmax(pred,axis=1)
#     y_eval = y_test_2.astype('int')
#     score = metrics.accuracy_score(y_eval, pred)
#     oos_pred.append(score)
#     print("Validation score: {}".format(score))

In [15]:
# oos_pred

In [16]:
# from sklearn.metrics import confusion_matrix
# import numpy as np
# from scipy import interp
# import matplotlib.pyplot as plt
# from itertools import cycle
# from sklearn.metrics import roc_curve, auc

# pred1 = model.predict(X_test)
# # Plot linewidth.
# lw = 2

## Client

In [27]:
from typing import Dict
import flwr as fl
from flwr.common import Config, Scalar
import tensorflow as tf

import ipfshttpclient2 as ipfshttpclient


client_received_param = None
client_trained_param = None

class BFLClient(fl.client.NumPyClient):

    def __init__(self, cid: int, model: tf.keras.Model, x_train, y_train, x_test, y_test) -> None:
        self.model: Sequential = model
        self.cid = cid
        self.x_train = x_train
        self.y_train = y_train
        self.x_test = x_test
        self.y_test = y_test
        self._ipfs_client: ipfshttpclient.client.Client = None

    def get_ipfs_client(self):
        if not self._ipfs_client:
            self._ipfs_client = ipfshttpclient.connect()
        return self._ipfs_client

    def get_properties(self, config: Config) -> Dict[str, Scalar]:
        return {"ipfs_client": self.get_ipfs_client(), "cid": self.cid, "domain_name": f"peer0.org{self.cid}.example.com"}

    def get_parameters(self, config):
        return self.model.get_weights()
    
    def set_parameters_from_file(self, file):
        return self.model.load_weights(file)

    def fit(self, parameters, config):
        self.model.set_weights(parameters)
        with tf.device('/device:gpu:0'):
            self.model.fit(x=self.x_train, y=self.y_train, epochs=5, batch_size=32)
        return self.model.get_weights(), len(self.x_train), {}

    def evaluate(self, parameters, config):
        if parameters:
            self.model.set_weights(parameters)
        loss, accuracy, sensitivity, specificity = self.model.evaluate(self.x_test, self.y_test)
        return loss, len(self.x_test), {"accuracy": float(accuracy), "sensitivity": sensitivity, "specificity": specificity}

## FedAvg Strategy

In [18]:
from typing import Dict, List, Tuple
import flwr as fl
from flwr.common import FitRes, Parameters, Scalar
from flwr.server.client_proxy import ClientProxy
from flwr.server.strategy import FedAvg
import numpy as np

param_storer = None

class BFedAvg(FedAvg):
    def __init__(self, *args, save_path, **kwargs):
        super().__init__(*args, **kwargs)
        self.save_path = save_path
    
    def set_fed_session(self, fed_session: int):
        self.fed_session = fed_session

    def get_fed_session(self):
        return self.fed_session

## BFLServer

### History

In [19]:
"""Training history retrieved from the blockchain ledger."""

from functools import reduce
from typing import Dict, List, Tuple

from flwr.common.typing import Scalar
from flwr.server import History

class BFedHistory(History):
    """History class for training and/or evaluation metrics collection."""

    def __init__(self, client_name, algorithm) -> None:
        self.client_name = client_name
        self.losses_distributed: List[Tuple[int, float]] = []
        self.losses_centralized: List[Tuple[int, float]] = []
        self.metrics_distributed_fit: Dict[str, List[Tuple[int, Scalar]]] = {}
        self.metrics_distributed: Dict[str, List[Tuple[int, Scalar]]] = {}
        self.metrics_centralized: Dict[str, List[Tuple[int, Scalar]]] = {}
        self.current_fed_session: int = fed_session
        self.algorithm: str = algorithm

    def add_loss_distributed(self, server_round: int, loss: float) -> None:
        """Add one loss entry (from distributed evaluation)."""
        self.losses_distributed.append((server_round, loss))

    def add_metrics_distributed(
        self, server_round: int, metrics: Dict[str, Scalar]
    ) -> None:
        """Add metrics entries (from distributed evaluation)."""
        for key in metrics:
            # if not (isinstance(metrics[key], float) or isinstance(metrics[key], int)):
            #     continue  # ignore non-numeric key/value pairs
            if key not in self.metrics_distributed:
                self.metrics_distributed[key] = []
            self.metrics_distributed[key].append((server_round, metrics[key]))

    def __repr__(self) -> str:
        rep = ""
        if self.losses_distributed:
            rep += "History (loss, distributed):\n" + reduce(
                lambda a, b: a + b,
                [
                    f"\tround {server_round}: {loss}\n"
                    for server_round, loss in self.losses_distributed
                ],
            )
        if self.losses_centralized:
            rep += "History (loss, centralized):\n" + reduce(
                lambda a, b: a + b,
                [
                    f"\tround {server_round}: {loss}\n"
                    for server_round, loss in self.losses_centralized
                ],
            )
        if self.metrics_distributed_fit:
            rep += "History (metrics, distributed, fit):\n" + str(
                self.metrics_distributed_fit
            )
        if self.metrics_distributed:
            rep += "History (metrics, distributed, evaluate):\n" + str(
                self.metrics_distributed
            )
        if self.metrics_centralized:
            rep += "History (metrics, centralized):\n" + str(self.metrics_centralized)
        return rep

### Server

In [33]:
import timeit
import flwr as fl
from flwr.server import Server
from flwr.client import Client
from flwr.common.typing import GetPropertiesIns

from flwr.common.logger import log
from logging import INFO

ipfs_cid = None

class BFLServer(Server):
    def __init__(self, associated_client_id: str, algorithm_name: str, **kwargs):
        Server.__init__(self, **kwargs)
        self.associated_client_id: str = associated_client_id
        self.algorithm = algorithm_name

    def fit(self, num_rounds: int, timeout: float | None) -> BFedHistory:
        """Run federated averaging for a number of rounds."""
        associated_client = self.client_manager().all()[self.associated_client_id]
        
        properties_ins = GetPropertiesIns({})
        properties_res = associated_client.get_properties(ins=properties_ins, timeout=timeout)
        props = properties_res.properties

        history = BFedHistory(props["domain_name"], self.algorithm)
        fed_session = history.current_fed_session
        self.strategy: BFedAvg.set_fed_session(fed_session)

        # Initialize parameters
        log(INFO, "Initializing global parameters")
        self.parameters = self._get_initial_parameters(timeout=timeout)

        log(INFO, "FL starting")
        start_time = timeit.default_timer()

        for current_round in range(1, num_rounds + 1):
            # Train model and replace previous global model
            res_fit = self.fit_round(
                server_round=current_round,
                timeout=timeout,
            )
            if res_fit is not None:
                parameters_prime, _, _ = res_fit  # fit_metrics_aggregated
                if parameters_prime:
                    self.parameters = parameters_prime

            # Evaluate model on a sample of available clients
            res_fed = self.evaluate_round(server_round=current_round, timeout=timeout)
            if res_fed is not None:
                loss_fed, evaluate_metrics_fed, _ = res_fed
                if loss_fed is not None:
                    history.add_loss_distributed(
                        server_round=current_round, loss=loss_fed
                    )
                    history.add_metrics_distributed(
                        server_round=current_round, metrics=evaluate_metrics_fed
                    )

            
            model = get_med_cnn()
            model.set_weights(fl.common.parameters_to_ndarrays(self.parameters))
            file_name = f"notebook_gmodel_rc{history.current_fed_session}_r{current_round}.keras"
            file_path= path.join(MODEL_CKPT_DIR, file_name)
            model.save_weights(file_path)
            global ipfs_cid 
            ipfs_cid = props["ipfs_client"].add(file_path)

            log(INFO, f"Client {associated_client.cid}: Global model saved in IPFS with hash {ipfs_cid['Hash']}")

            # print("About the client ipfs cli", properties.ipfs_client)

        fed_session += 1
        # Bookkeeping
        end_time = timeit.default_timer()
        elapsed = end_time - start_time
        log(INFO, "FL finished in %s", elapsed)
        return history

## Load Data

In [21]:
client_ids = [str(i) for i in range(1, NUM_CLIENTS+1)]

client_data = {}
for cid in client_ids:
    client_data[cid] = partition(NUM_CLIENTS, int(cid), new_train_df)

In [22]:
from flwr.common.typing import Metrics
from typing import List, Tuple

def client_fn(cid: str):
    model = get_med_cnn()
    print(f"Loading data for client {cid}")
    X_train, X_test, y_train, y_test = client_data[cid]
    # Start client
    print(f"Client {cid} connecting to server {S_ADDR}")
    return BFLClient(cid, model, x_train=X_train, x_test=X_test, y_train=y_train, y_test=y_test)

def eval_metrics_aggregation_fn(results: List[Tuple[int, Metrics]]):
    # Weigh accuracy of each client by number of examples used
    accuracies = [metric["accuracy"] * num for num, metric in results]
    examples = [num for num, _ in results]

    # Aggregate and print custom metric
    aggregated_accuracy = sum(accuracies) / sum(examples)
    return {"accuracy": aggregated_accuracy}

strategy = BFedAvg(
    save_path="./test_save",
    evaluate_metrics_aggregation_fn=eval_metrics_aggregation_fn
)

## Simulation

In [23]:
from flwr.server.client_manager import SimpleClientManager

fl.simulation.start_simulation(
    client_fn = client_fn,
    clients_ids = [str(i) for i in range(1, NUM_CLIENTS+1)],
    server = BFLServer('1', "BiLSTM", client_manager=SimpleClientManager(), strategy=strategy),
    strategy = strategy,
    num_clients = NUM_CLIENTS,
    config = fl.server.ServerConfig(num_rounds=1),
    client_resources=None,
)

INFO flwr 2023-08-06 16:30:27,585 | app.py:146 | Starting Flower simulation, config: ServerConfig(num_rounds=1, round_timeout=None)
2023-08-06 16:30:29,554	INFO worker.py:1636 -- Started a local Ray instance.
INFO flwr 2023-08-06 16:30:30,958 | app.py:180 | Flower VCE: Ray initialized with resources: {'CPU': 12.0, 'memory': 2507784192.0, 'object_store_memory': 1253892096.0, 'node:172.19.27.35': 1.0}
[2m[36m(launch_and_get_properties pid=4213)[0m 2023-08-06 16:30:34.193518: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:266] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO flwr 2023-08-06 16:30:34,775 | 727629419.py:29 | Initializing global parameters
INFO flwr 2023-08-06 16:30:34,778 | server.py:273 | Requesting initial parameters from one random client


[2m[36m(launch_and_get_properties pid=4213)[0m Loading data for client 1
[2m[36m(launch_and_get_properties pid=4213)[0m Client 1 connecting to server 127.0.0.1:8080


INFO flwr 2023-08-06 16:30:35,374 | server.py:277 | Received initial parameters from one random client
INFO flwr 2023-08-06 16:30:35,375 | 727629419.py:32 | FL starting
DEBUG flwr 2023-08-06 16:30:35,376 | server.py:218 | fit_round 1: strategy sampled 3 clients (out of 3)


[2m[36m(launch_and_get_parameters pid=4213)[0m Loading data for client 1
[2m[36m(launch_and_get_parameters pid=4213)[0m Client 1 connecting to server 127.0.0.1:8080
[2m[36m(launch_and_fit pid=4213)[0m Loading data for client 1
[2m[36m(launch_and_fit pid=4213)[0m Client 1 connecting to server 127.0.0.1:8080


ERROR flwr 2023-08-06 16:30:40,320 | ray_client_proxy.py:87 | Task was killed due to the node running low on memory.
Memory on the node (IP: 172.19.27.35, ID: 015ce9b31b4df4822c77af44052e29105309be326ff2c29ac4e87a85) where the task (task ID: ad3c9a1dd6ce0f0984f410fa62a0086e3c4ac5ab01000000, name=launch_and_fit, pid=4209, memory used=0.20GB) was running was 7.24GB / 7.58GB (0.955179), which exceeds the memory usage threshold of 0.95. Ray killed this worker (ID: fea398306d6605b4a1619c78bfe6bde052cbe2826327294c82f6e2cd) because it was the most recently scheduled task; to see more information about memory usage on this node, use `ray logs raylet.out -ip 172.19.27.35`. To see the logs of the worker, use `ray logs worker-fea398306d6605b4a1619c78bfe6bde052cbe2826327294c82f6e2cd*out -ip 172.19.27.35. Top 10 memory users:
PID	MEM(GB)	COMMAND
343	1.96	/home/dylonwong/miniconda3/envs/fedlearn/bin/python -m ipykernel_launcher --ip=127.0.0.1 --stdin=900...
257	1.14	/home/dylonwong/.vscode-server/bi

[2m[36m(launch_and_fit pid=4213)[0m Epoch 1/5


[2m[36m(launch_and_fit pid=4213)[0m Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
[2m[36m(launch_and_fit pid=4213)[0m Cause: Unknown node type <gast.gast.Import object at 0x7fa55819cb20>
[2m[36m(launch_and_fit pid=4213)[0m Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
[2m[36m(launch_and_fit pid=4213)[0m Cause: Unknown node type <gast.gast.Import object at 0x7fa55819d750>


   5/1879 [..............................] - ETA: 26s - loss: 0.6563 - accuracy: 0.5688 - sensitivity: 0.2942 - specificity: 0.7882    
  13/1879 [..............................] - ETA: 25s - loss: 0.5859 - accuracy: 0.6755 - sensitivity: 0.3859 - specificity: 0.8920
  22/1879 [..............................] - ETA: 24s - loss: 0.5239 - accuracy: 0.7301 - sensitivity: 0.5203 - specificity: 0.8850
  30/1879 [..............................] - ETA: 24s - loss: 0.4857 - accuracy: 0.7531 - sensitivity: 0.6122 - specificity: 0.8589
  34/1879 [..............................] - ETA: 24s - loss: 0.4724 - accuracy: 0.7583 - sensitivity: 0.6262 - specificity: 0.8593
  38/1879 [..............................] - ETA: 24s - loss: 0.4552 - accuracy: 0.7706 - sensitivity: 0.6610 - specificity: 0.8558
  42/1879 [..............................] - ETA: 24s - loss: 0.4401 - accuracy: 0.7798 - sensitivity: 0.6870 - specificity: 0.8538




  50/1879 [..............................] - ETA: 25s - loss: 0.4127 - accuracy: 0.7925 - sensitivity: 0.6977 - specificity: 0.8662
  59/1879 [..............................] - ETA: 24s - loss: 0.3909 - accuracy: 0.8067 - sensitivity: 0.7298 - specificity: 0.8673
  67/1879 [>.............................] - ETA: 24s - loss: 0.3781 - accuracy: 0.8139 - sensitivity: 0.7470 - specificity: 0.8680
  75/1879 [>.............................] - ETA: 24s - loss: 0.3635 - accuracy: 0.8233 - sensitivity: 0.7672 - specificity: 0.8687
  79/1879 [>.............................] - ETA: 24s - loss: 0.3594 - accuracy: 0.8256 - sensitivity: 0.7739 - specificity: 0.8672
  87/1879 [>.............................] - ETA: 24s - loss: 0.3521 - accuracy: 0.8279 - sensitivity: 0.7775 - specificity: 0.8688
  96/1879 [>.............................] - ETA: 23s - loss: 0.3429 - accuracy: 0.8324 - sensitivity: 0.7918 - specificity: 0.8662
 106/1879 [>.............................] - ETA: 23s - loss: 0.3330 - accur

[2m[36m(launch_and_fit pid=4210)[0m 2023-08-06 16:30:58.467437: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:266] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected


 247/1879 [==>...........................] - ETA: 22s - loss: 0.2709 - accuracy: 0.8669 - sensitivity: 0.8538 - specificity: 0.8792
 257/1879 [===>..........................] - ETA: 21s - loss: 0.2697 - accuracy: 0.8671 - sensitivity: 0.8543 - specificity: 0.8793
 262/1879 [===>..........................] - ETA: 21s - loss: 0.2669 - accuracy: 0.8690 - sensitivity: 0.8562 - specificity: 0.8811
 272/1879 [===>..........................] - ETA: 21s - loss: 0.2637 - accuracy: 0.8709 - sensitivity: 0.8610 - specificity: 0.8808
 282/1879 [===>..........................] - ETA: 21s - loss: 0.2619 - accuracy: 0.8713 - sensitivity: 0.8627 - specificity: 0.8807
[2m[36m(launch_and_fit pid=4210)[0m Loading data for client 2
[2m[36m(launch_and_fit pid=4210)[0m Client 2 connecting to server 127.0.0.1:8080
 286/1879 [===>..........................] - ETA: 21s - loss: 0.2609 - accuracy: 0.8718 - sensitivity: 0.8641 - specificity: 0.8804
 294/1879 [===>..........................] - ETA: 21s - los

[2m[36m(launch_and_fit pid=4210)[0m Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
[2m[36m(launch_and_fit pid=4210)[0m Cause: Unknown node type <gast.gast.Import object at 0x7f91b82b8310>
[2m[36m(launch_and_fit pid=4210)[0m Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
[2m[36m(launch_and_fit pid=4210)[0m Cause: Unknown node type <gast.gast.Import object at 0x7f91b82b8190>


   1/1879 [..............................] - ETA: 2:09:25 - loss: 0.9144 - accuracy: 0.1875 - sensitivity: 0.1379 - specificity: 0.6667
   9/1879 [..............................] - ETA: 27s - loss: 0.3693 - accuracy: 0.8333 - sensitivity: 0.8624 - specificity: 0.0741   
  17/1879 [..............................] - ETA: 26s - loss: 0.3124 - accuracy: 0.8695 - sensitivity: 0.9272 - specificity: 0.0392
  24/1879 [..............................] - ETA: 27s - loss: 0.2754 - accuracy: 0.8906 - sensitivity: 0.9484 - specificity: 0.0660
  32/1879 [..............................] - ETA: 26s - loss: 0.2313 - accuracy: 0.9131 - sensitivity: 0.9613 - specificity: 0.1745
  40/1879 [..............................] - ETA: 26s - loss: 0.2087 - accuracy: 0.9250 - sensitivity: 0.9690 - specificity: 0.2229
  44/1879 [..............................] - ETA: 26s - loss: 0.2038 - accuracy: 0.9283 - sensitivity: 0.9719 - specificity: 0.2367
  52/1879 [..............................] - ETA: 26s - loss: 0.1854 

[2m[33m(raylet)[0m [2023-08-06 16:31:29,522 E 4149 4149] (raylet) node_manager.cc:3069: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 015ce9b31b4df4822c77af44052e29105309be326ff2c29ac4e87a85, IP: 172.19.27.35) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 172.19.27.35`
[2m[33m(raylet)[0m 
[2m[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


 390/1879 [=====>........................] - ETA: 24s - loss: 0.1694 - accuracy: 0.9109 - sensitivity: 0.9125 - specificity: 0.9113
 396/1879 [=====>........................] - ETA: 24s - loss: 0.1692 - accuracy: 0.9112 - sensitivity: 0.9130 - specificity: 0.9116
 402/1879 [=====>........................] - ETA: 24s - loss: 0.1688 - accuracy: 0.9115 - sensitivity: 0.9135 - specificity: 0.9116
 408/1879 [=====>........................] - ETA: 24s - loss: 0.1690 - accuracy: 0.9111 - sensitivity: 0.9139 - specificity: 0.9106
 415/1879 [=====>........................] - ETA: 24s - loss: 0.1687 - accuracy: 0.9112 - sensitivity: 0.9128 - specificity: 0.9115
 422/1879 [=====>........................] - ETA: 24s - loss: 0.1690 - accuracy: 0.9110 - sensitivity: 0.9120 - specificity: 0.9118
 426/1879 [=====>........................] - ETA: 24s - loss: 0.1689 - accuracy: 0.9110 - sensitivity: 0.9112 - specificity: 0.9122
 434/1879 [=====>........................] - ETA: 23s - loss: 0.1691 - accur

DEBUG flwr 2023-08-06 16:33:31,088 | server.py:232 | fit_round 1 received 2 results and 1 failures
DEBUG flwr 2023-08-06 16:33:31,097 | server.py:168 | evaluate_round 1: strategy sampled 3 clients (out of 3)


[2m[36m(launch_and_evaluate pid=4213)[0m Loading data for client 3
[2m[36m(launch_and_evaluate pid=4213)[0m Client 3 connecting to server 127.0.0.1:8080


[2m[36m(launch_and_evaluate pid=4213)[0m Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.[32m [repeated 2x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)[0m
[2m[36m(launch_and_evaluate pid=4213)[0m Cause: Unknown node type <gast.gast.Import object at 0x7fa558340ee0>[32m [repeated 2x across cluster][0m


  1/806 [..............................] - ETA: 2:22:19 - loss: 0.6815 - accuracy: 0.5312 - sensitivity: 0.6471 - specificity: 0.4000
  1/806 [..............................] - ETA: 2:22:19 - loss: 0.6815 - accuracy: 0.5312 - sensitivity: 0.6471 - specificity: 0.4000
  1/806 [..............................] - ETA: 2:22:19 - loss: 0.6815 - accuracy: 0.5312 - sensitivity: 0.6471 - specificity: 0.4000
  8/806 [..............................] - ETA: 6s - loss: 0.5957 - accuracy: 0.7109 - sensitivity: 0.9922 - specificity: 0.4806     
 17/806 [..............................] - ETA: 5s - loss: 0.6860 - accuracy: 0.6011 - sensitivity: 0.7611 - specificity: 0.3711   
 25/806 [..............................] - ETA: 5s - loss: 0.6604 - accuracy: 0.6837 - sensitivity: 0.9874 - specificity: 0.4790
 34/806 [>.............................] - ETA: 4s - loss: 0.7066 - accuracy: 0.5983 - sensitivity: 0.7842 - specificity: 0.3437
 42/806 [>.............................] - ETA: 5s - loss: 0.6587 - accura

[2m[36m(launch_and_evaluate pid=4213)[0m Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.[32m [repeated 4x across cluster][0m
[2m[36m(launch_and_evaluate pid=4213)[0m Cause: Unknown node type <gast.gast.Import object at 0x7fa57acf0fd0>[32m [repeated 4x across cluster][0m


  1/806 [..............................] - ETA: 11:38 - loss: 0.7779 - accuracy: 0.5938 - sensitivity: 0.5806 - specificity: 1.0000
  1/806 [..............................] - ETA: 11:38 - loss: 0.7779 - accuracy: 0.5938 - sensitivity: 0.5806 - specificity: 1.0000
  1/806 [..............................] - ETA: 11:38 - loss: 0.7779 - accuracy: 0.5938 - sensitivity: 0.5806 - specificity: 1.0000
  1/806 [..............................] - ETA: 11:38 - loss: 0.7779 - accuracy: 0.5938 - sensitivity: 0.5806 - specificity: 1.0000
  1/806 [..............................] - ETA: 11:38 - loss: 0.7779 - accuracy: 0.5938 - sensitivity: 0.5806 - specificity: 1.0000
  1/806 [..............................] - ETA: 11:38 - loss: 0.7779 - accuracy: 0.5938 - sensitivity: 0.5806 - specificity: 1.0000
  1/806 [..............................] - ETA: 11:38 - loss: 0.7779 - accuracy: 0.5938 - sensitivity: 0.5806 - specificity: 1.0000
  1/806 [..............................] - ETA: 11:38 - loss: 0.7779 - accur

DEBUG flwr 2023-08-06 16:34:10,874 | server.py:182 | evaluate_round 1 received 3 results and 0 failures




2023-08-06 16:34:11.004405: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:982] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2023-08-06 16:34:11.229941: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:982] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2023-08-06 16:34:11.230016: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:982] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2023-08-06 16:34:11.236805: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:982] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2023-08-06 16:34:11.236898: I tensorflow/compile



2023-08-06 16:34:13.671563: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'gradients/split_2_grad/concat/split_2/split_dim' with dtype int32
	 [[{{node gradients/split_2_grad/concat/split_2/split_dim}}]]
2023-08-06 16:34:13.673431: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'gradients/split_grad/concat/split/split_dim' with dtype int32
	 [[{{node gradients/split_grad/concat/split/split_dim}}]]
2023-08-06 16:34:13.674327: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You mus

History (loss, distributed):
	round 1: 0.6839785774548849
History (metrics, distributed, evaluate):
{'accuracy': [(1, 0.6426963607470194)]}

In [24]:
ipfs_client = ipfshttpclient.connect()

In [32]:
with open("../../model_ckpt/from_ipfs.keras", 'wb+') as f:
    f.write(ipfs_client.cat(f"/ipfs/{ipfs_cid['Hash']}"))
    f.flush()
    f.close()

client = client_fn('2')
client.set_parameters_from_file("../../model_ckpt/from_ipfs.keras")

# Evaluated result should be same as shown in simulation output

client.evaluate(None, {})

2023-08-06 17:03:08.661328: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'gradients/split_2_grad/concat/split_2/split_dim' with dtype int32
	 [[{{node gradients/split_2_grad/concat/split_2/split_dim}}]]
2023-08-06 17:03:08.662846: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'gradients/split_grad/concat/split/split_dim' with dtype int32
	 [[{{node gradients/split_grad/concat/split/split_dim}}]]
2023-08-06 17:03:08.664001: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You mus

Loading data for client 2
Client 2 connecting to server 127.0.0.1:8080


2023-08-06 17:03:09.229953: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'gradients/split_2_grad/concat/split_2/split_dim' with dtype int32
	 [[{{node gradients/split_2_grad/concat/split_2/split_dim}}]]
2023-08-06 17:03:09.230845: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'gradients/split_grad/concat/split/split_dim' with dtype int32
	 [[{{node gradients/split_grad/concat/split/split_dim}}]]
2023-08-06 17:03:09.231740: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You mus



(0.6662592887878418,
 25768,
 {'accuracy': 0.652437150478363,
  'sensitivity': 0.6529966592788696,
  'specificity': 0.5681878328323364})

In [None]:
import pickle
import os

def load_model(path: str):
    try:
        with open(path, 'rb') as file:
            return pickle.load(file)
    except IOError:
        print(f"Error loading file at {path}")
        return None
    
# prm = load_model(os.path.abspath("../../model_ckpt/model_r3.ckpt"))


## Hashing

In [None]:
client_param = fl.common.parameters_to_ndarrays(param_storer)
bytes_client_param = b''.join(fl.common.parameter.ndarrays_to_parameters(client_param).tensors)
server_param = b''.join(param_storer.tensors)
# client_1.evaluate(, {})

In [None]:
print("Aggregated", hs.sha256(server_param).hexdigest())
print("Client received", hs.sha256(bytes_client_param).hexdigest())