# Federated learning

* 수업에서 배웠듯이 Federated learning이란 edge device에서 local dataset을 사용해서 training을 하고 weight를 전달하는 방법임
* 이러면 local dataset은 local에만 남아 있게 되어 privacy issue가 사라지게 됨
* 또한 computation이 edge device에서만 일어나게 되므로 서버에서의 부담이 적어지게 됨 

* 여러 <b>client</b> 들이 협력해서 ML 모델을 학습시키는 것

# 서버 FedAvg 알고리즘 구현
* 아래 ```Federated.py``` 파일은 Federated Learning을 위한 스크립트

# Step 1. Client Selection
* 현재는 Client가 이미 idle이거나 충전중이라고 생각하고, 서버에서 항상 클라이언트가 필요하다고 가정함

![image.png](attachment:image.png)

# Step 2. Broadcast the current global model
* ```FederatedServer```의 ```get_server_weight``` 함수를 통해서 각 클라이언트에게 전달해주게 됨
![image.png](attachment:image.png)

# Step 3. Local model updates at clients
* ```Client``` 클래스에서 다룸
* 각각의 client에서 가지고 있는 데이터를 사용해서 학습을 진행함
![image.png](attachment:image.png)


# Step 4. Aggregation
* ```Client``` 클래스에서 다룸
* 학습이 완료된 weight를 서버로 전송함
* 현재는 통신에 문제가 없다고 가정함
![image.png](attachment:image.png)

# Step 5. Global model update at the server
* ```Federated.py```에서 ```FedAvg``` 함수를 통해서 여러 weight들의 average를 함 
* 학습이 완료되었을 때 ```save```함수를 통해 기록을 저장한다.
![Picture1.png](attachment:Picture1.png)

![image.png](attachment:image.png)

# FederatedAveraging 알고리즘 

![image.png](attachment:image.png)

# views.py 와 urls.py 설명 
![image.png](attachment:image.png)

# Federated.py 설명

![image.png](attachment:image.png)

In [6]:
%%writefile FL_Server/app/Federated.py
import copy
import numpy as np
import tensorflow as tf
import json
from app import numpy_encoder
import os

class FederatedServer:
    client_number = 5 # 전체 클라이언트 개수
    server_weight = None # 현재 서버에 저장되어있는 weight
    local_weights = {} # 각 클라이언트에서 받아온 parameter들의 리스트

    experiment = 1 #Uniform by default

    done_clients = 0 # Task가 끝난 클라이언트의 개수
    server_round = 0 # 현재 라운드
    max_round = 5 #
    total_num_data = 0 # 전체 데이터 개수

    num_data = {}
    client_model_accuracy = {}
    server_model_accuracy = []

    model = tf.keras.models.Sequential([
                    tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
                    tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation='relu'),
                    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
                    tf.keras.layers.Dropout(0.25),
                    tf.keras.layers.Flatten(),
                    tf.keras.layers.Dense(128, activation='relu'),
                    tf.keras.layers.Dropout(0.5),
                    tf.keras.layers.Dense(10, activation='softmax')
            ])
    model.compile(optimizer=tf.keras.optimizers.SGD(), loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=['accuracy'])

    @classmethod
    def initialize(cls, client_num, experiment, max_round):
        cls.client_number = client_num
        cls.experiment = experiment
        cls.max_round = max_round
        cls.client_model_accuracy = {}
        cls.reset() # reset the variables when initialized
        return "Initialized server"

    @classmethod
    def update_num_data(cls, client_id, num_data):
        cls.total_num_data += num_data
        cls.num_data[client_id] = num_data
        return f"Number of data for {client_id} updated"

    @classmethod
    def update(cls, client_id, local_weight):
        local_weight = list(map(lambda weight: np.array(weight, dtype=np.float32), local_weight))
        cls.local_weights[client_id] = local_weight
        cls.evaluateClientModel(client_id, local_weight)
        cls.done_clients += 1 # increment current count

        if cls.done_clients == cls.client_number:
            cls.FedAvg() # fed avg
            cls.evaluateServerModel()
            cls.next_round()

        if cls.server_round == cls.max_round: # federated learning finished
            cls.save() # save all history into json file
            cls.reset()

    @classmethod
    def FedAvg(cls):
        """
        cls.local_weights 는 client id를 key로 weight array를 value로 가지는 dictionary임

        - 여기서, weight array의 shape를 모르기 때문에, cls.local_weights로부터 하나의 weight를 뽑아서 
        np.zeros_like 함수를 통해 임시 array를 만들고, 거기에 weight를 쌓는다
        
        - weight 변수는 각 layer에 해당하는 np.array들을 담고있는 list여야 한다.

        - FedAvg 알고리즘을 구현하라 (weighted average)
        - 각 client가 가지고 있는 데이터 수는 cls.num_data에 담겨 있음
        - 전체 데이터 수는 cls.total_num_data에 담겨 있음
        """
        ### TODO ###
        weight = list(map(lambda block: np.zeros_like(block, dtype=np.float32), cls.local_weights[0]))

        for client_id, client_weight in cls.local_weights.items():
            client_num_data = cls.num_data[client_id]

            for i in range(len(weight)):
                weighted_weight = client_weight[i] * (client_num_data/cls.total_num_data)
                weight[i] += weighted_weight
        ### TODO ###
        cls.set_server_weight(weight)

    @classmethod
    def evaluateClientModel(cls, client_id, weight):
        cls.model.set_weights(cls.local_weights[client_id]) # change to local weight

        mnist = tf.keras.datasets.mnist
        (_, _), (test_images, test_labels) = mnist.load_data()
        n = len(test_images)
        indices = np.random.choice([i for i in range(n)], n//10)

        test_images = test_images[indices]
        test_labels = test_labels[indices]
        test_images = test_images / 255
        test_images = test_images.reshape(-1,28, 28, 1)

        acc = cls.model.evaluate(test_images, test_labels)

        if client_id not in cls.client_model_accuracy:
            cls.client_model_accuracy[client_id] = []

        cls.client_model_accuracy[client_id].append(acc[1])

        if cls.server_weight != None:
            cls.model.set_weights(cls.server_weight) # revert to server weight

    @classmethod
    def evaluateServerModel(cls):
        mnist = tf.keras.datasets.mnist
        (_, _), (test_images, test_labels) = mnist.load_data()
        n = len(test_images)
        indices = np.random.choice([i for i in range(n)], n//10)

        test_images = test_images[indices]
        test_labels = test_labels[indices]
        test_images = test_images / 255
        test_images = test_images.reshape(-1,28, 28, 1)

        acc = cls.model.evaluate(test_images, test_labels)[1] # first index corresponds to accuracy
        # each index corresponds to a round
        cls.server_model_accuracy.append(acc)

    @classmethod
    def next_round(cls):
        cls.done_clients = 0 # reset current
        cls.server_round += 1 # proceed
        cls.total_num_data = 0 # 전체 데이터 개수
        cls.num_data = {}

    @classmethod
    def save(cls):
        result = {"client number": cls.client_number,
                  "experiment":cls.experiment,
                  "max round": cls.max_round,
                  "clients acc" : cls.client_model_accuracy,
                  "server acc" : cls.server_model_accuracy, 
                 "final weight": list(weight.tolist() for weight in cls.server_weight)}
        import json
        from time import gmtime, strftime
        timestamp = strftime("%Y%m%d_%H%M%S", gmtime())
        with open("../Logs/"+timestamp+".json", 'w') as f:
            json.dump(result, f)
        print("################################################")
        print("#Json file saved as ../Logs/", timestamp+".json#")
        print("################################################")

    @classmethod
    def reset(cls):
        cls.client_model_accuracy = {}
        cls.server_model_accuracy = []
        cls.server_weight = None
        cls.local_weights = {}
        cls.done_clients = 0
        cls.server_round = 0
        cls.num_data = {}
        cls.total_num_data = 0

    @classmethod
    def set_server_weight(cls, weight):
        cls.server_weight = weight

    @classmethod
    def get_server_weight(cls):
        return cls.server_weight

    @classmethod
    def get_done_clients(cls):
        return cls.done_clients

    @classmethod
    def get_server_round(cls):
        return cls.server_round

Overwriting FL_Server/app/Federated.py


# Client 클래스

* 아래는 학습을 위한 ```Client``` 클래스이다.

![image.png](attachment:image.png)

In [1]:
import argparse
import json
import threading
import time
from random import random
import numpy as np
import requests
import tensorflow as tf

class NumpyEncoder(json.JSONEncoder): # inherits JSONEncoder 
    def default(self, o):
        if isinstance(o, np.ndarray):
            return o.tolist()
        return json.JSONEncoder.default(self, o)

class Client:
    def __init__(self, max_round: int, time_delay = 5, suppress=True, num_samples=600, client_id = 0, experiment = 1):
        '''
        Urls
        '''
        self.base_url = "http://147.47.200.178:9103/" # Base Url
        self.put_weight_url =  self.base_url + "put_weight/" + str(client_id)
        self.get_weight_url =  self.base_url + "get_server_weight" # Url that we send or fetch weight parameters
        self.round_url =  self.base_url + "get_server_round" 

        '''
        Initial setup
        '''
        self.experiment = experiment
        self.client_id = client_id
        self.time_delay = time_delay
        self.suppress = suppress
        self.global_round = self.request_global_round()
        self.current_round = 0
        self.max_round = max_round # Set the maximum number of rounds
        
        '''
        Downloads MNIST dataset and prepares (train_x, train_y), (test_x, test_y)
        '''
        self.train_images, self.train_labels, self.test_images, self.test_labels = self.prepare_images()
        self.split_train_images, self.split_train_labels = self.data_split(num_samples)
        self.local_data_num = len(self.split_train_labels)
        
        '''
        Builds model
        '''
        self.model = self.build_cnn_model()
        
    def prepare_images(self):
        mnist = tf.keras.datasets.mnist
        (train_images, train_labels), (test_images, test_labels) = mnist.load_data()
        train_images, test_images = train_images / 255, test_images / 255
        
        # For CNN, add dummy channel to feed the images to CNN
        train_images=train_images.reshape(-1,28, 28, 1)
        test_images=test_images.reshape(-1,28, 28, 1)
        return train_images, train_labels, test_images, test_labels
    
    def build_cnn_model(self):
        #This model definition must be same in the server (Federated.py)
        model = tf.keras.models.Sequential([
            tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Dropout(0.25),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(10, activation='softmax')
        ])

        model.compile(optimizer=tf.keras.optimizers.SGD(),
                      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                      metrics=['accuracy'])
        return model
        
    def data_split(self, num_samples):
        train_index_list = [[], [], [], [], [], [], [], [], [], []]
        test_index_list = [[], [], [], [], [], [], [], [], [], []]
        for i, v in enumerate(self.train_labels):
            train_index_list[v].append(i)

        for i, v in enumerate(self.test_labels):
            test_index_list[v].append(i)

        
        split_train_images = []
        split_train_labels = []

        ### TODO ###
        """
        experiment 1: Uniform data split: 각 클래스에서 같은 개수의 데이터를 가져옴
                   2: Random data split1: 각 클래스마다 다른 개수의 데이터를 가져와서 총 개수는 같게 함
                   3: Random data split2: 각 클래스에서 다른 개수의 데이터를 가져오고, 총 개수도 다름 
                   4: Skewed: 특정 클래스의 데이터가 다른 클래스보다 많은 경우임
        
        데이터 셋을 적절하게 나눠서 experiment 1,2,3,4를 할 수 있게 알고리즘을 완성하면 됨
        
        각각의 experiment 마다 
            1. 전체 샘플 수를 self.local_data_num 변수에 저장
            2. 나눈 데이터 셋을 이미지는 split_train_images에, 
                                레이블은 split_train_labels에 각각 저장
        """
        if self.experiment == 1: #uniform data split
            # all 
            self.local_data_num = num_samples
            
            for i in range(len(train_index_list)):
                indices = train_index_list[i]
                random_indices = np.random.choice(indices, size=num_samples//10)
                
                split_train_images.extend(self.train_images[random_indices])
                split_train_labels.extend(self.train_labels[random_indices])
            

        elif self.experiment == 2: # Randomly selected, equally sized dataset
            self.local_data_num = num_samples
            random_indices = np.random.choice([i for i in range(len(self.train_labels))], size=num_samples)
            split_train_images = self.train_images[random_indices]
            split_train_labels = self.train_labels[random_indices]

        
            
        elif self.experiment == 3: # Randomly selected, differently sized dataset
            n = np.random.randint(1, num_samples)
            self.local_data_num = n
            random_indices = np.random.choice([i for i in range(len(self.train_labels))], size=n)
            split_train_images = self.train_images[random_indices]
            split_train_labels = self.train_labels[random_indices]
            
     
  
        elif self.experiment == 4: #Skewed
            temp = [i for i in range(10)]
            skewed_numbers = np.random.choice(temp, np.random.randint(1, 10))
            non_skewed_numbers = list(set(temp)-set(skewed_numbers))
            N = 0
          
            for i in skewed_numbers:
                n = np.random.randint(60, 110)
                N += n
                
                indices = train_index_list[i]
                random_indices = np.random.choice(indices, size=n)
                
                split_train_images.extend(self.train_images[random_indices])
                split_train_labels.extend(self.train_labels[random_indices])
                
                
            for i in non_skewed_numbers:
                n = np.random.randint(1, 10)
                N += n
                
                indices = train_index_list[i]
                random_indices = np.random.choice(indices, size=n)
                
                split_train_images.extend(self.train_images[random_indices])
                split_train_labels.extend(self.train_labels[random_indices])
      
            
            self.local_data_num = N
        
        ### TODO ###
        
        split_train_images = np.array(split_train_images)
        split_train_labels = np.array(split_train_labels)
        return split_train_images, split_train_labels

        
        
    def update_total_num_data(self, num_data):
        """
        num_data : the number of training images that the current client has
        
        update the total number of training images that is stored in the server
        """
        update_num_data_url =  self.base_url + "update_num_data/"+str(self.client_id)+"/"+str(num_data)
        requests.get(update_num_data_url)
        

    
    def request_global_round(self):
        """
        result : Current global round that the server is in
        """
        result = requests.get(self.round_url)
        result = result.json()
        return result
    
    def request_global_weight(self):
        """
        global_weight : Up-to-date version of the model parameters
        """
        result = requests.get(self.get_weight_url)
        result_data = result.json()
        
        global_weight = None
        if result_data is not None:
            global_weight = []
            for i in range(len(result_data)):
                temp = np.array(result_data[i], dtype=np.float32)
                global_weight.append(temp)
            
        return global_weight

    def upload_local_weight(self, local_weight):
        """
        local_weight : the local weight that current client has converged to
        
        Add current client's weights to the server (Server accumulates these from multiple clients and computes the global weight)
        """
        local_weight_to_json = json.dumps(local_weight, cls=NumpyEncoder)
        requests.put(self.put_weight_url, data=local_weight_to_json)
        
    def train_local_model(self):
        """
        local_weight : local weight of the current client after training
        """
        global_weight = self.request_global_weight()
        if global_weight != None:
            global_weight = list(map(lambda weight: np.array(weight), global_weight))
            self.model.set_weights(global_weight)
            
        
        self.model.fit(self.split_train_images, self.split_train_labels, epochs=10, batch_size=8, verbose=0)
        local_weight = self.model.get_weights()
        return local_weight
    
    def task(self):
        """
        Federated learning task
        1. If the current round is larger than the max round that we set, finish
        2. If the global round = current client's round, the client needs update
        3. Otherwise, we need to wait until other clients to finish
        """
        
        #this is for executing on multiple devices
        self.global_round = self.request_global_round()
        
        if self.current_round >= self.max_round:
            print("Client", str(self.client_id), "finished")
            return 

        if self.global_round == self.current_round: #need update 
            print("Client", str(self.client_id), "needs update")
            self.split_train_images, self.split_train_labels = self.data_split(num_samples=self.local_data_num)
            self.update_total_num_data(self.local_data_num)        
            local_weight = self.train_local_model()
            self.upload_local_weight(local_weight)
            self.current_round += 1
            time.sleep(self.time_delay)
            return self.task()

        else: #need to wait until other clients finish
            print("Client", str(self.client_id), "needs wait")
            time.sleep(self.time_delay)
            return self.task()

# 로컬에서 Federated Learning 진행
* Federated learning 이전에 아래 셀로 initialize를 먼저 해줘야 한다

In [2]:
CLIENT_NUM = 5
EXPERIMENT = 1
MAX_ROUND = 5

In [3]:
import requests
init = requests.get(f"http://147.47.200.178:9103/initialize/{CLIENT_NUM}/{EXPERIMENT}/{MAX_ROUND}")

In [4]:
init.text

'Initialized server'

In [5]:
clients = []

for i in range(CLIENT_NUM):
    client = Client(max_round =MAX_ROUND, 
                    time_delay = 5, 
                    num_samples = 600,  
                    suppress=True, 
                    client_id = i, 
                    experiment = EXPERIMENT)
    clients.append(client) #retain references to the clients

for client in clients:
    thread = threading.Thread(target=client.task)
    thread.start()

Client 1 needs update
Client 4 Client 0 needs update
Client 3 needs update
Client 2 needs update
needs update
Client 2 needs update
Client 1 needs update
Client 3 needs update
Client 4 needs update
Client 0 needs update
Client 2 needs wait
Client 3 needs update
Client 2 needs update
Client 1 needs update
Client 4 needs update
Client 0 needs update
Client 3 needs wait
Client 2 needs update
Client 1 needs update
Client 3 needs update
Client 4 needs update
Client 0 needs update
Client 2 needs wait
Client 2 needs wait
Client 1 needs update
Client 2 needs update
Client 3 needs update
Client 4 needs update
Client 0 needs update


Exception in thread Thread-5:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 699, in urlopen
    httplib_response = self._make_request(
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 394, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connection.py", line 234, in request
    super(HTTPConnection, self).request(method, url, body=body, headers=headers)
  File "/usr/lib/python3.8/http/client.py", line 1252, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/lib/python3.8/http/client.py", line 1298, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.8/http/client.py", line 1247, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.8/http/client.py", line 1046, in _send_output


Exception in thread Thread-7:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 699, in urlopen
    httplib_response = self._make_request(
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 394, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/lib/python3.8/dist-packages/urllib3/connection.py", line 234, in request
    super(HTTPConnection, self).request(method, url, body=body, headers=headers)
  File "/usr/lib/python3.8/http/client.py", line 1252, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/lib/python3.8/http/client.py", line 1298, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.8/http/client.py", line 1247, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.8/http/client.py", line 1046, in _send_output


* 학습이 다 종료되면 다음과 같이 Logs 폴더에 .json 파일로 결과를 저장함
* json 파일은 각각 다음을 담고 있음
    * client number : int
    * experiment : int
    * max round : int
    * clients acc : {client id: list(floats)}
    * server acc : list(floats)
    * final weight : list(list(floats))



* 예시는 다음과 같다 

![image.png](attachment:image.png)

```{"client number": 5, "experiment": 1, "max round": 3, "clients acc": {"2": [0.8640000224113464, 0.8709999918937683, 0.8949999809265137], "0": [0.8899999856948853, 0.8539999723434448, 0.9049999713897705], "1": [0.8830000162124634, 0.8659999966621399, 0.8999999761581421], "3": [0.9039999842643738, 0.8550000190734863, 0.8790000081062317], "4": [0.8889999985694885, 0.8799999952316284, 0.9070000052452087]}, "server acc": [0.8889999985694885, 0.6710000038146973, 0.890999972820282]
```

* Logs 폴더안에 있는 ```plot.py``` 파일을 사용해서 결과를 plot하고 이미지로 만들 수 있음
    * ```python3 plot.py -n <json filename>```

* Coral 실습때 했던 것처럼 png 파일을 호스트 컴퓨터로 가져와서 볼수 있음
    * ```Ctrl + p, q``` 한 후 ```docker cp <server container>:/home/ambient_fl/Logs/<output image file> .```


* 과제에서는 호스트 컴퓨터 (학교 서버)로 가져온 이후 로컬로 다시 보내야 함
    * ```scp -P 22222 owner@147.47.200.178:<image file> .```

* 다음과 같은 사진을 로컬로 가지고 와서 볼 수 있음

![20211118_062555.png](attachment:20211118_062555.png)