# Jetson Nano를 사용해서 Federated Learning으로 학습 과제
* 이전 실습에서 Django를 가지고 서버를 만들고 Federated learning을 구현해보는 실습을 진행하였다.
* 이번 과제에서는 Jetson Nano를 사용해서 Federated Learning을 진행해야 한다.

* 우선 필요한 패키지를 설치한다.

In [1]:
!pip install -q -r ../requirements.txt

[31mERROR: tensorflow-gpu 2.6.0 has requirement absl-py~=0.10, but you'll have absl-py 0.9.0 which is incompatible.[0m
[31mERROR: tensorflow-gpu 2.6.0 has requirement gast==0.4.0, but you'll have gast 0.2.2 which is incompatible.[0m
[31mERROR: tensorflow-gpu 2.6.0 has requirement grpcio<2.0,>=1.37.0, but you'll have grpcio 1.27.2 which is incompatible.[0m
[31mERROR: tensorflow-gpu 2.6.0 has requirement h5py~=3.1.0, but you'll have h5py 2.10.0 which is incompatible.[0m
[31mERROR: tensorflow-gpu 2.6.0 has requirement keras~=2.6, but you'll have keras 2.3.1 which is incompatible.[0m
[31mERROR: tensorflow-gpu 2.6.0 has requirement keras-preprocessing~=1.1.2, but you'll have keras-preprocessing 1.1.0 which is incompatible.[0m
[31mERROR: tensorflow-gpu 2.6.0 has requirement numpy~=1.19.2, but you'll have numpy 1.18.4 which is incompatible.[0m
[31mERROR: tensorflow-gpu 2.6.0 has requirement six~=1.15.0, but you'll have six 1.14.0 which is incompatible.[0m
[31mERROR: t

In [7]:
%%writefile jetson_client.py
# Usage : python jetson_client.py --ip IP --p PORT
import matplotlib.pyplot as plt
import argparse
import json
import os
import threading
import time
from random import random
import numpy as np
import requests
import tensorflow as tf

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Quiet tensorflow error messages


class NumpyEncoder(json.JSONEncoder): # inherits JSONEncoder 
    def default(self, o):
        if isinstance(o, np.ndarray):
            return o.tolist()
        return json.JSONEncoder.default(self, o)

class Client:
    def __init__(self, max_round: int, time_delay = 5, suppress=True, num_samples=600):
        """
        @params: 
            experiment : Desired data split type (1~4)
            max_round : the maximum number of rounds that should be trained (arbitrary integer)
            model : the NN model type (either 'ann' or 'cnn')
            time_delay : the time delay until the next local check (arbitrary positive integer) 
                        (Need to increase this value if one round of training takes much longer than current time_delay. 
                        The reason is that any network communication until next round after the client has already uploaded 
                        the parameters for current round increases network overhead. Thus, higher time_delay will make communication
                        more stable while increasing the absolute time it takes. Requires careful selection of this value.)
            suppress : boolean value to print the logs
        
        @return: 
            None : Initializes the variables
                   Setup the urls for communication
                   Fetch client's id from the server
                   Downloads MNIST dataset and splits
                   Build model
        """
        base_url = f"http://{IP}:{PORT}/" # Base Url that we communicate with
        self.weight_url = base_url + "weight" # Url that we send or fetch weight parameters
        self.round_url = base_url + "round" # Url that helps synchronization
        self.id_url = base_url+"get_id" # Url from which we fetch the current client's id
        self.total_num_data_url = base_url + "total_num" # Url from which we fetch the number of total data points (seen by N clients)
        self.experiment_url = base_url + "experiment"
        self.accuracy_url = base_url + "accuracy"
        self.fed_id = self.request_fed_id() 
        self.experiment = self.request_experiment() # Experiment to test the performance of federated learning regime 
        
        self.time_delay = time_delay
        
        self.suppress = suppress
        '''
        Initial setup
        '''
        self.global_round = self.request_global_round()
        self.current_round = 0
        
        #self.change_client_number(max_round)
        self.max_round = max_round # Set the maximum number of rounds
        '''
        Downloads MNIST dataset and prepares (train_x, train_y), (test_x, test_y)
        '''
        self.train_images, self.train_labels = None, None
        self.test_images, self.test_labels = None, None
        self.prepare_images()
        
        self.train_index_list = None
        self.test_index_list = None
        self.split_train_images = []
        self.split_train_labels = []
        
        self.local_data_num = 0
        self.data_split(num_samples=num_samples)
        
        '''
        Builds model
        '''
        self.model = None
        self.build_cnn_model()
        
    def prepare_images(self):
        """
        @params: 
            model : 'ann' or 'cnn'. They need slightly different format for the input. For cnn, we add additional dimension for channel
        
        @return: 
            None : Prepares MNIST images in the required format for each model
            
        """
        mnist = tf.keras.datasets.mnist
        (self.train_images, self.train_labels), (self.test_images, self.test_labels) = mnist.load_data()
        self.train_images, self.test_images = self.train_images / 255, self.test_images / 255
        
        # For CNN, add dummy channel to feed the images to CNN
        self.train_images=self.train_images.reshape(-1,28, 28, 1)
        self.test_images=self.test_images.reshape(-1,28, 28, 1)
            
    
    def build_cnn_model(self):
        """
        @params: 
            None
        
        @return: 
            None : saves the CNN model in self.model variable 
        """
        self.model = tf.keras.models.Sequential([
            tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
            tf.keras.layers.Conv2D(64, (3, 3), activation='relu'),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Dropout(0.25),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(10, activation='softmax')
        ])

        self.model.compile(optimizer=tf.keras.optimizers.SGD(),
                      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                      metrics=['accuracy'])
        
    def data_split(self, num_samples):
        """
        @params: 
            num_samples : The number of sample images in each client. This value is used for equally
                          sized dataset
        
        @return: 
            None : Split the dataset depending on the self.experiment value
           
                If self.experiment is 1: Uniform data split: We take equal amount of data from each class (iid)
                If self.experiment is 2: Random data split1: We take equal amount of data, but not uniformly distributed across classes
                If self.experiment is 3: Random data split2: We take different amount of data and not uniformly distributed across classes
                If self.experiment is 4: Skewed: We take disproportionate amount of data for some classes
                        
        """
        if self.train_index_list is None or self.test_index_list is None:
            self.train_index_list = [[], [], [], [], [], [], [], [], [], []]
            self.test_index_list = [[], [], [], [], [], [], [], [], [], []]
            for i, v in enumerate(self.train_labels):
                self.train_index_list[v].append(i)

            for i, v in enumerate(self.test_labels):
                self.test_index_list[v].append(i)

        
        self.split_train_images = []
        self.split_train_labels = []
        
        if self.experiment == 1: #uniform data split
            self.local_data_num = num_samples
            


            for i in range(len(self.train_index_list)):
                indices = self.train_index_list[i]
                random_indices = np.random.choice(indices, size=num_samples//10)
                
                self.split_train_images.extend(self.train_images[random_indices])
                self.split_train_labels.extend(self.train_labels[random_indices])
            

        elif self.experiment == 2: # Randomly selected, equally sized dataset
            self.local_data_num = num_samples
            random_indices = np.random.choice([i for i in range(len(self.train_labels))], size=num_samples)
            self.split_train_images = self.train_images[random_indices]
            self.split_train_labels = self.train_labels[random_indices]

            counts = [0 for _ in range(10)]
            
            for label in self.train_labels[random_indices]:
                counts[label] += 1
            



        elif self.experiment == 3: # Randomly selected, differently sized dataset
            n = np.random.randint(1, num_samples)
            self.local_data_num = n
            random_indices = np.random.choice([i for i in range(len(self.train_labels))], size=n)
            self.split_train_images = self.train_images[random_indices]
            self.split_train_labels = self.train_labels[random_indices]
            

            counts = [0 for _ in range(10)]
            
            for label in self.train_labels[random_indices]:
                counts[label] += 1
            


            

        elif self.experiment == 4: #Skewed
            skewed_numbers = np.random.choice([i for i in range(10)], np.random.randint(1, 10))
            non_skewed_numbers = list(set([i for i in range(10)])-set(skewed_numbers))
            N = 0
            
            counts = [0 for _ in range(10)]
            
            for i in skewed_numbers:
                n = np.random.randint(50, 60)
                N += n
                
                indices = self.train_index_list[i]
                random_indices = np.random.choice(indices, size=n)
                
                self.split_train_images.extend(self.train_images[random_indices])
                self.split_train_labels.extend(self.train_labels[random_indices])
                
                counts[i] += n
            
                
            for i in non_skewed_numbers:
                n = np.random.randint(1, 10)
                N += n
                
                indices = self.train_index_list[i]
                random_indices = np.random.choice(indices, size=n)
                
                self.split_train_images.extend(self.train_images[random_indices])
                self.split_train_labels.extend(self.train_labels[random_indices])
                
                counts[i] += n
            
            
            
            self.local_data_num = N
        
        else:
            print("Pick from 1,2,3,4")
            return 
    
        self.split_train_images = np.array(self.split_train_images)
        self.split_train_labels = np.array(self.split_train_labels)
        
        self.update_total_num_data(self.local_data_num)    

        
        
    def update_total_num_data(self, num_data):
        """
        @params: 
            num_data : the number of training images that the current client has
        
        @return: 
            None : update the total number of training images that is stored in the server
        """
        local_num_data_to_json = json.dumps(num_data)
        requests.put(self.total_num_data_url, data=local_num_data_to_json)
    
    def request_total_num_data(self):
        """
        @params: 
            None
        
        @return: 
            result : Total number of training images available to all clients
        """
        result = requests.get(self.total_num_data_url)
        result = int(result.text)
        return result

    def request_fed_id(self):
        """
        @params: 
            None
        
        @return: 
            result : Automatically assigned client id that is given by the server
        """
        result = requests.get(self.id_url)
        result = result.json()
        return result
    
    def request_global_round(self):
        """
        @params: 
            None
        
        @return: 
            result : Current global round that the server is in
        """
        result = requests.get(self.round_url)
        result = result.json()
        return result
    
    def request_experiment(self):
        result = requests.get(self.experiment_url)
        result_data = result.json()
        
        if result_data is not None:
            return int(result_data)
        
        else:
            return 1
    
    def request_global_weight(self):
        """
        @params: 
            None
        
        @return: 
            global_weight : Up-to-date version of the model parameters
        """
        result = requests.get(self.weight_url)
        result_data = result.json()
        
        global_weight = None
        if result_data is not None:
            global_weight = []
            for i in range(len(result_data)):
                temp = np.array(result_data[i], dtype=np.float32)
                global_weight.append(temp)
            
        
        return global_weight

    def upload_local_weight(self, local_weight=[]):
        """
        @params: 
            local_weight : the local weight that current client has converged to
        
        @return: 
            None : Add current client's weights to the server (Server accumulates these from multiple clients and computes the global weight)
        """
        local_weight_to_json = json.dumps(local_weight, cls=NumpyEncoder)
        requests.put(self.weight_url, data=local_weight_to_json)
        
    def upload_local_accuracy(self, accuracy):
        temp_dict = {'acc':accuracy, 'id':self.fed_id}
        local_acc_to_json = json.dumps(temp_dict)
        requests.put(self.accuracy_url, data=local_acc_to_json)
        
    def validation(self, local_weight=[]):
        """
        @params: 
            local_weight : the current client's weights
        
        @return: 
            acc : test accuracy of the current client's model
        """
        if local_weight is not None:
            self.model.set_weights(local_weight)
            acc = self.model.evaluate(self.test_images, self.test_labels, verbose=0 if self.suppress else 1)
            self.upload_local_accuracy(acc)
            e = {out: acc[i] for i, out in enumerate(self.model.metrics_names)}

            return acc
        
    def train_local_model(self):
        """
        @params: 
            None
        
        @return: 
            local_weight : local weight of the current client after training
        """
        global_weight = self.request_global_weight()
        if global_weight != None:
            global_weight = np.array(global_weight)
            self.model.set_weights(global_weight)
        
        self.model.fit(self.split_train_images, self.split_train_labels, epochs=10, batch_size=16, verbose=0)
        N = self.request_total_num_data()
        
        local_weight = np.multiply(self.model.get_weights(), (self.local_data_num/N))
        return local_weight
    
    def task(self):
        """
        @params: 
            None
        
        @return: 
            None : Delayed execution of Federated Learning task
                  1. Check the client's current round
                      1.1. If the current round is 
        """
        
        #this is for executing on multiple devices
        self.global_round = self.request_global_round()

        if self.current_round >= self.max_round:
            print(f"Client {self.fed_id} finished")
            return 

        if self.global_round == self.current_round: #need update 
            global_weight = self.request_global_weight()

            local_weight = self.train_local_model()

            acc = self.validation(local_weight)

            self.upload_local_weight(local_weight)

            self.current_round += 1

            time.sleep(self.time_delay)

            return self.task()

        else: #need to wait until other clients finish
            time.sleep(self.time_delay * 2)
            return self.task()

        '''#this is for executing on multiple devices
        else:
            #this is for executing on one device
            self.global_round = self.request_global_round()
            


            if self.global_round == self.current_round: #need update 
                start = time.time()
                if not self.suppress:
                    print("Request global weight...")
                global_weight = self.request_global_weight()
                if not self.suppress:
                    print("Global weight request done")

                if not self.suppress:
                    print("Training local model...")
                local_weight = self.train_local_model()
                if not self.suppress:
                    print("Training done")

                acc = self.validation(local_weight)


                if not self.suppress:
                    print("Uploading local weight...")
                self.upload_local_weight(local_weight)
                if not self.suppress:
                    print("Weight upload done")

                if not self.suppress:
                    print("=========================")
                end = time.time()

                self.current_round += 1

                threading.Timer(self.time_delay, self.task, [multiple_devices]).start()

            else: #need to wait until other clients finish
                threading.Timer(self.time_delay*2, self.task, [multiple_devices]).start()
        #this is for executing on one device'''
        
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Usage --ip {ip} --p {port} --max {max round} --delay {time delay} --num {num samples}")
    parser.add_argument("--ip", type=str, help="base ip address", default="127.0.0.1")
    parser.add_argument("--max", type=int, help="max round", default=5)
    parser.add_argument("--delay", type=int, help="time delay", default=5)
    parser.add_argument("--num", type=int, help="num samples", default=600)
    
    args = parser.parse_args()
    IP = args.ip
    max_round = args.max
    time_delay = args.delay
    num_samples = args.num
    
    client = Client(max_round = max_round, time_delay = time_delay, num_samples)
    client.task()

Writing jetson_client.py


In [9]:
!pip install paramiko

Collecting paramiko
  Downloading paramiko-2.7.2-py2.py3-none-any.whl (206 kB)
[K     |████████████████████████████████| 206 kB 15.0 MB/s eta 0:00:01
[?25hCollecting bcrypt>=3.1.3
  Downloading bcrypt-3.2.0-cp36-abi3-manylinux2010_x86_64.whl (63 kB)
[K     |████████████████████████████████| 63 kB 1.6 MB/s  eta 0:00:01
Collecting pynacl>=1.0.1
  Downloading PyNaCl-1.4.0-cp35-abi3-manylinux1_x86_64.whl (961 kB)
[K     |████████████████████████████████| 961 kB 24.8 MB/s eta 0:00:01
Installing collected packages: bcrypt, pynacl, paramiko
Successfully installed bcrypt-3.2.0 paramiko-2.7.2 pynacl-1.4.0


In [13]:
!pip install ping3

Collecting ping3
  Downloading ping3-3.0.2-py3-none-any.whl (12 kB)
Installing collected packages: ping3
Successfully installed ping3-3.0.2


In [15]:
%%writefile jetson_server.py
import paramiko
import getpass
from ping3 import ping
import time

###### JETSON NANO ADDRESS ######
JETSON_IP = "147.47.200.209"
MIN_PORT = "20101"
MAX_PORT = "20136"
EXPERIMENT = 1
MAX_ROUND = 5
TIME_DELAY = 5
USERNAME = "jetson"
PASSWORD = "jetson"

###### SERVER ADDRESS ######
SERVER_IP = "147.47.200.178:9103"

# (Resets the server and change the total number of clients recorded in the server)
def initialize_server(client_number):
    base_url = f"http://{IP}:{PORT}/"
    client_url = base_url + "client_num"
    assert(isinstance(client_number, int))

    reset_url = base_url + "reset"
    reset_result = requests.get(reset_url)
    
    max_round_to_json = json.dumps(client_number)
    client_result = requests.put(client_url, data=max_round_to_json)
    
    assert reset_result.text == "Request OK" and client_result.text == "Request PUT OK", "Server Init Failed"
    print("Server reset success" if reset_result.text == "Request OK" else "Server reset failed")
    
max_round = 3 # Any positive integer (not tested for extremely large value)
experiment = 1 # 1,2,3,4
time_delay = 5 # time in seconds to wait until retry (not tested for extremely large value)
suppress = False # 모든 출력을 보고싶지 않으면 True
initialize_server()

#Instantiate the clients and create Threads to execute them
#client.task is recursively called within until all clients finish training (including itself)

"""
Initialize multiple Jetson Nanos and start FL
"""
class Jetson:
    def __init__(self, IP='147.47.200.22', min_port = "20101", max_port="20202"):
        assert int(min_port) < int(max_port), "max port must be >= min port"
        self.addresses = [IP+f":{i}" for i in range(int(min_port), int(max_port)+1)]
        self.available = [] #available addresses
        
        self.test_jetson_nano()
        
    def __ping(self, address):
        resp = ping(address)
        
        if resp == False:
            return False
        
        return True
    
    def __initialize(self, address, experiment, max_round, time_delay, num_samples):
        # SSH 로 접속 해서 Client들 initialize 시키고 필요한 것들 하기
        cli = paramiko.SSHClient()
        cli.set_missing_host_key_policy(paramiko.AutoAddPolicy)

        server = address  # 호스트명이나 IP 주소
        user = USERNAME
        pwd = PASSWORD

        cli.connect(server, port=22, username=user, password=pwd)
        stdin, stdout, stderr = cli.exec_command(f"python jetson_client.py --ip {SERVER_IP} --max {max_round} --delay {time_delay} --num {num_samples}")
        
        lines = stdout.readlines()
        print(''.join(lines))
        cli.close()
        
        return True
    
    
    def start_federated_learning(self):
        
    def test_jetson_nano(self):
        for address in self.addresses:
            temp = self.__ping(address)    
            if temp == True:
                self.available.append(address)
                
    def initialize_jetson_nano(self, experiment, max_round, time_delay, num_samples):
        for address in self.available:
            self.__initialize(address, experiment, max_round, time_delay, num_samples)
    
    def fetch_results(self):
        return FederatedServer.accuracies
    
    
start = time.time()
jetson = Jetson(IP=JETSON_IP, min_port = MIN_PORT, max_port=MAX_PORT)

jetson.initialize_jetson_nano(experiment=experiment, max_round=max_round, time_delay=time_delay, suppress=suppress)
jetson.start_federated_learning()

results = jetson.fetch_results()
end = time.time()

print(f"Federated learning took {end-start} seconds")

import numpy as np
import matplotlib.pyplot as plt
rounds = max(results[0].keys())
ids = max(results.keys())
result = np.zeros((ids, rounds))

for round_ in rounds:
    for cur_id in range(ids):
        result[cur_id, round_] = results[round_][cur_id]
        
def get_result(id, round):
    return result[id, round]

def plot_accuracy():
    rounds = list(range(len(result[0])))
    rounds = rounds.astype("int8")
    for fed_id in range(len(result)):
        accs = result[fed_id, :]
        plt.plot(rounds, accs, label=f"{fed_id}")
        plt.xticks(range(len(result[0]+1)))
    plt.title("Accuracies of the clients (%)")
    plt.xlabel("rounds")
    plt.ylabel("accuracy (%)")
    plt.legend()
    plt.show() 
    
plot_accuracy()

Overwriting jetson_server.py


# HW {#}
* 실습에서 컴퓨터 하나로 Federated learning을 진행해 보았다.
* 이번 과제는 미리 설치해 놓은 여러 대의 Jetson Nano를 사용해서 실제로 네트워크 상에서 Federated learning을 진행하는 것이다. 
* 4개의 실험 (experiment:1,2,3,4)을 진행해보고, 결과를 보고서로 작성하여 제출.
* 보고서에는 실습에서의 결과와의 비교가 포함되어야 한다. 