__Federated Learning__



In [108]:
import random

import sklearn.metrics
import torch
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
device = "cuda" if torch.cuda.is_available() else "cpu"

__Data Preprocessing__

In [169]:
df = pd.read_csv("mushrooms.csv")
df = pd.get_dummies(df, drop_first=True).astype(float)
y = df.class_p
X = df.drop('class_p', axis=1)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=239)
X_train = torch.from_numpy(X_train.to_numpy()).to(device, dtype=torch.float32)
X_test = torch.from_numpy(X_test.to_numpy()).to(device, dtype=torch.float32)
y_train = torch.from_numpy(y_train.to_numpy()).to(device, dtype=torch.float32)
y_test = torch.from_numpy(y_test.to_numpy()).to(device, dtype=torch.float32)

In [188]:
class GradientPair:
    
    def __init__(self, dw: torch.Tensor, db: torch.Tensor):
        self.dw: torch.Tensor = dw
        self.db: torch.Tensor = db
        

class LogisticRegressionModel:
    def __init__(self, train_sample):
        self.weights: torch.Tensor = torch.from_numpy(
            np.zeros(train_sample.size(dim=0))
        ).to(dtype=torch.float64).to(device)
        self.bias = torch.Tensor([0]).to(dtype=torch.float64).to(device)
        
    @staticmethod
    def fromWeights(train_sample, w, b):
        model = LogisticRegressionModel(train_sample)
        model.weights = w
        model.bias = b
        return model
        
    def calculateProbability(self, x: torch.Tensor) -> torch.Tensor:
        # print(self.weights.dtype)
        
        return 1 / (
                1 + torch.exp(
                    -1 * self.weights.T @ x - self.bias
                )
        )
    
    def calculateLossAndGradient(self, x: torch.Tensor, y: torch.Tensor) -> tuple[torch.Tensor, GradientPair]:
        param_num = x.size(dim=0)
        prediction = self.calculateProbability(x)
        loss = (
                       -1 / param_num) * (y * torch.log(prediction)
        ) + (
                (torch.Tensor([1]).to(device) - y) * torch.log(torch.Tensor([1]).to(device) - prediction)
        )
        
        wGrad = (1 / param_num) * (
            (prediction - y) * x
        )
        bGrad = (1 / param_num) * (
            (prediction - y)
        )
        return loss, GradientPair(wGrad, bGrad)
    
    def updateWeights(self, delta_w: torch.Tensor, delta_b: torch.Tensor) -> None:
        self.weights += delta_w
        self.bias += delta_b
        
    def predict(self, X):
        return 0 if self.calculateProbability(X) <= 0.5 else 1
        
    def test(self, X_test, y_test):
        results = []
        for i in range(X_test.size(dim = 0)):
            arguments = X_test[0]
            results.append(self.predict(arguments))
        return accuracy_score(y_test.to("cpu"), results)
            

In [189]:
class FederatedLearningNode:

    def __init__(self, alpha, p, n, arguments, labels):
        """
        
        :param alpha: learning rate for mode 
        :param p: probability of choosing node training
        :param n: number of nodes participating in training
        :param arguments: Train data arguments
        :param labels: Train data labels
        """
        
        self.alpha = alpha
        self.p = p
        self.n = n
        self.arguments = arguments
        self.labels = labels
        
        self.local_model = LogisticRegressionModel(self.arguments[0])
        self.index = 0
        
        
        
    def localTrainStep(self):
        """
        THIS METHOD MUST BE OVERWRITTEN FOR SPECIFIC ALGORITHM
        :return: 
        """
        pass
    

class FederatedLearningAlgorithm:
    def __init__(self, arguments: pd.DataFrame = None, labels: pd.Series = None, alpha: float = 0.1, lam: float = 0.01, p: float = 0.5, k: int = 1000, n: int = 3):
        """
            :param alpha: float in [0; 1] Learning rate of gradient descent
            :param lam: float in [0; +inf); difference parameter for psi-quadratic penalty. If lam -> 0 => several models are local only, is lam -> +inf, all local models are the same
            :param p: float in [0; 1]; probability of choosing the local step instead og aggregation. p -> 0 => only local model training without aggregation; p -> 1 => only aggregation without training (useless)
            :param k: integer number of learning iterations
            :param n: integer amount of nodes participating in the learning
            :param arguments: pandas.Dataframe with train data arguments
            :param labels: pandas.Series with train data labels
        """
        assert arguments.shape[0] == labels.size
        self.arguments = torch.from_numpy(arguments.to_numpy()).to(device)
        self.labels = torch.from_numpy(labels.to_numpy()).to(device)
        self.alpha = alpha
        self.lam = lam
        self.p = p
        self.k = k
        self.n = n
        self.nodes = []
        print(type(self.arguments))
        self.commonModel = LogisticRegressionModel(self.arguments[0])
        
        self.constructNodes()
        
    
    def constructNodes(self) -> None:
        """
        THIS METHOD MUST BE OVERWRITTEN IN EACH ALGORITHM CLASS TO CONSTRUCT THE LIST OF NODES WITH APPROPRIEATE INSTANCES
        :return: 
        """
        pass
    
    
    def train(self) -> None:
        for i in range(self.k):
            localTraining = random.random() < self.p
            if localTraining:
                for node in self.nodes:
                    node.localTrainStep()
            else:
                self.aggregateResults()
            
    def aggregateResults(self) -> None:
        """
        THIS METHOD MUST BE OVERWRITTEN FOR SPECIFIC ALGORITHM
        :return: 
        """
        pass
        
    
        


In [190]:
class L2GDNode (FederatedLearningNode):
    
    def localTrainStep(self):
        gradient_result = self.local_model.calculateLossAndGradient(
            self.arguments[self.index],
            self.labels[self.index]
        )[1]
        nabla_f_w = gradient_result.dw
        nabla_f_b = gradient_result.db
        delta_w = -1 * self.alpha / (self.n * (1 - self.p)) * nabla_f_w
        delta_b = -1 * self.alpha / (self.n * (1 - self.p)) * nabla_f_b
        self.local_model.updateWeights(delta_w, delta_b)


class L2GD(FederatedLearningAlgorithm):
    
    def constructNodes(self) -> None:
        self.nodes = [
            L2GDNode(
                self.alpha,
                self.p,
                self.n,
                self.arguments[self.labels.size(dim=0) // self.n * i : self.labels.size(dim=0) // self.n * (i + 1)],
                self.labels[self.labels.size(dim=0) // self.n * i : self.labels.size(dim=0) // self.n * (i + 1)]
            )
            for i in range(self.n)
        ]
        
    def aggregateResults(self) -> None:
        w_mean = torch.zeros(self.arguments[0].size(dim = 0)).to(device)
        b_mean = torch.zeros(1).to(device)
        for node in self.nodes:
            w_mean += 1 / self.n * node.local_model.weights
            b_mean += 1 / self.n * node.local_model.bias
        for node in self.nodes:
            al_np = self.alpha * self.lam / self.n / self.p
            node.local_model.weights *= (1 - al_np)
            node.local_model.bias *= (1 - al_np)
            node.local_model.updateWeights(
                al_np * w_mean,
                al_np * b_mean
            )
            
        test_model = LogisticRegressionModel.fromWeights(self.arguments[0], w_mean, b_mean)
        print(
            test_model.test(
                X_test, y_test
            )
        )
        








In [191]:
l2gd = L2GD(
    arguments=X,
    labels=y
)

<class 'torch.Tensor'>


In [192]:
l2gd.train()

0.5128683327116748
0.5128683327116748
0.5128683327116748
0.5128683327116748
0.5128683327116748
0.5128683327116748
0.5128683327116748
0.5128683327116748
0.5128683327116748
0.5128683327116748
0.5128683327116748
0.5128683327116748
0.5128683327116748
0.5128683327116748
0.5128683327116748


KeyboardInterrupt: 