# Gossip Learning Overview

# Implementation

## Dummy Network Implementation

In [1]:
import time
import threading
import traceback

# Packet class
class Packet:
    def __init__(self, src, dest, data):
        self.src = src
        self.dest = dest
        self.data = data
    def __str__(self):
        return "src=" + self.src + ",dest=" + self.dest + ",data=" + str(self.data)

class DummyNet:
    def __init__(self, address, neighbor_addrs = []):
        self.ip = address
        self.neighbor_addrs = neighbor_addrs
        self.outbox = []
        self.receiver = []
        self.inbox = []
        self.active = True
        
    # Second stage initialization to build the 'network connections'.
    def init_network(self, registry):
        # Build neighbors
        self.build_neighbors(registry)
        # Init sender and receiver processes
        self.init_sender()
        self.init_receiver()
        
    # Used to set active flag such that send/receive processes terminate.
    def kill(self):
        self.active = False
        
    # Networ reporting function.
    def print_(self, message):
        # Network level print messaging
        output = "NET::[" + str(self.ip) + "]::"
        try:
            output = output + str(message)
        except:
            output = output + "Message not printable."
        print(output)
        
    # Given registry, builds neighbor dictionary.
    def build_neighbors(self, registry):
        # The registry is built as a dictionary with key IP address an entry DummyNet object
        self.neighbors = {}
        for addr in self.neighbor_addrs:
            self.neighbors[addr] = registry[addr]
    
    # Starts sender service.
    def init_sender(self):
        threading.Thread(target=self.__send, args=()).start()
        pass
    
    # Starts receiver service.
    def init_receiver(self):
        threading.Thread(target=self.__receive, args=()).start()
        pass
    
    # Network layer send function.
    def __send(self):
        while self.active:
            # Send packet, if failed, print exception.
            try:
                if len(self.outbox):
                    packet = self.outbox.pop(0)
                    self.neighbors[packet.dest].receiver.append(packet)
                    self.print_("Sent: " + str(packet))
            except Exception as e:
                self.print_("Sending error has occurred.")
                traceback.print_exc()
    
    # Receiving/processing function.
    def __receive(self):
        while self.active:
            try:
                if len(self.receiver):
                    packet = self.receiver.pop(0)
                    self.inbox.append(packet)
                    self.print_("Received: " + str(packet))
            except Exception as e:
                self.print_("Receiving error has occurred.")
                traceback.print_exc()
        pass
    
    # Application layer send function.
    def send(self, payload, address):
        # Create packet
        packet = Packet(self.ip, address, payload)
        # Load packet into outbox
        self.outbox.append(packet)
        
    # Application layer receive function, gets from inbox buffer.
    def receive(self):
        # If buffer is not empty, return packet, else return None.
        try:
            return self.inbox.pop(0)
        except:
            return None

## Learning Modules

In [7]:
from __future__ import absolute_import, division, print_function, unicode_literals


import tensorflow as tf
import numpy as np

X_INDEX = 1
Y_INDEX = 2
DATA_INDEX = 0

NUM_EPOCHS = 2

class Model:
    
    def __init__(self, data):
        self.data = data
        self.model = self.create_model()
        self.sharing_model = None
        
    def step(self):
        history = self.model.fit(self.data[X_INDEX], self.data[Y_INDEX], epochs=NUM_EPOCHS)
        self.sharing_model = (self.data[DATA_INDEX], self.model.get_weights)
        
    # List of tuples of [data size, weights] from other nodes
    def aggregate(self, recv_list):
        pass
    
    # Use this function to select one of the model creation functions
    def create_model(self):
        return self.standardNN()

    # Standard Neural Network
    def standardNN(self):
        model = tf.keras.models.Sequential([
            tf.keras.layers.Flatten(input_shape=(28, 28)),
            tf.keras.layers.Dense(32, activation='relu'),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(10)
        ])
        loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        model.compile(optimizer='adam', loss=loss_fn, metrics=['accuracy'])
        return model

class ModelIncubator:
    def __init__(self, data_ratios):
        self.x_train, self.x_test, self.y_train, self.y_test = self.get_dataset()
        self.data_shares = self.rsplit(self.x_train, self.y_train, nonIID=True, ratios=data_ratios)
        
    # Use this function to select one of the dataset grab functions
    def get_dataset(self):
        return self.get_mnist()

    # MNIST Dataset
    def get_mnist(self):
        # Import MNIST data
        print ("\nDownloading MNIST data...")
        mnist = tf.keras.datasets.mnist
        # Load data into trains
        (x_train, y_train), (x_test, y_test) = mnist.load_data()
        x_train, x_test = x_train / 255.0, x_test / 255.0
        return x_train, x_test, y_train, y_test

    # To split the data
    def rsplit(self, x_train, y_train, nonIID=False, ratios=None):
        # Splitting the dataset for different clients
        print ("\nSplitting data into different clients...")
        if True:
            print ("\tAssigning ranges of data...")
            accumulations = np.array([sum(ratios[0:i+1]) for i in range(len(ratios))])
            print(accumulations)
            markers = accumulations * len(x_train)
            markers = [int(marker) for marker in markers]
            print(markers)
        else:
            print ("\tUniformly assigning ranges of data")
            # markers = [1/num_clients * (n+1) for n in range(num_clients)]
        # Storing each subset of data in a client
        print ("\tStoring subsets of data into each client...")
        dataSplits = []
        for j in range(len(markers)):
            x_data = x_train[(markers[j-1] if j > 0 else 0):markers[j]]
            y_data = y_train[(markers[j-1] if j > 0 else 0):markers[j]]
            data_size = len(x_data)
            dataSplits.append((data_size, x_data, y_data))
        return dataSplits
    
mi = ModelIncubator([0.5, 0.25, 0.25])
m = Model(mi.data_shares[0])
m.step()


Downloading MNIST data...

Splitting data into different clients...
	Assigning ranges of data...
[0.5  0.75 1.  ]
[30000, 45000, 60000]
	Storing subsets of data into each client...
Train on 30000 samples
Epoch 1/2
Epoch 2/2


## Client

In [2]:
import secrets

# A single record.
class Record:
    def __init__(self, ip, weights, expiry):
        self.ip = ip
        self.weights = weights
        self.expiry = expiry
    def step(self, s=1):
        self.expiry -= 1
        if self.expiry <= 0:
            return True
        else:
            return False

# Holds and manages records and requests.
class GuestBook:
    def __init__(self):
        self.records = {}
    def encounter(self, ip, weights, expiry):
        self.records[ip] = Record(ip, weights, expiry)
    def step(self):
        # Increment another step and reduce expirations across records.
        for ip in self.records.keys():
            # If incremental step results in expiration, remove record.
            if self.records[ip].step():
                expunged = self.records.pop(ip)
                

class Client:
    def __init__(self, netNode=None, ip=None, neighbor_addrs=None):
        if netNode is not None:
            self.net = netNode
        else:
            self.net = DummyNet(ip, neighbor_addrs)
        self.model = "hi it's me from " + self.net.ip
        self.model_ready = False
        # Set active flag
        self.active = True
        
        # Client data
        self.data = None
            
    # Client reporting function
    def print_(self, message):
        # Client level print messaging
        output = "CLIENT::[" + str(self.net.ip) + "]::"
        try:
            output = output + str(message)
        except:
            output = output + "Message not printable."
        print(output)
        
    # Main run process as a state machine.
    def process(self):
        # While active, run
        # Check inbox
        packet = self.net.receive()
        if packet is not None:
            self.model_ready = False
            self.print_("Processing model from " + str(packet.src))
            self.print_("Aggregating model with new input.")
        else:
            # Train the model on local data
            self.print_("Training model.")
            self.model_ready = True            
    
    # Lowest level client transmission function.
    def transmit(self, payload, target_addr):
        self.net.send(payload, target_addr)
        
    # Model transfer function.
    def transmit_model(self, recipient):
        # Select recipient
        # Transmit model to recipient
        if self.model_ready:
            self.transmit(self.model, recipient)
            self.print_("Transmitting model to " + recipient)
        else:
            self.print_("Model still processing.")
    # Select random recipient.
    def select_random_recv(self):
        return secrets.choice(list(self.net.neighbors.keys()))

## Execution Functions

In [3]:
import threading

class Console:
    def __init__(self, clientDict):
        self.clients = clientDict
        pass
    def run(self):
        while True:
            try:
                cmd = input(">>").strip().split(" ")
                cmd = [c.strip() for c in cmd]
                if cmd[0] == "exit":
                    break
                elif cmd[0] == "step":
                    if len(cmd) == 1:
                        self.step()
                    else:
                        self.istep(cmd[1])
                elif cmd[0] == "neighborhood":
                    print (self.get_all_addrs())
                elif cmd[0] == "exchange":
                    self.exchange(cmd[1], cmd[2])
                elif cmd[0] == "flood":
                    if len(cmd) == 1:
                        self.floodall()  
                    else:
                        self.flood(cmd[1])
                else:
                    print("Command does not exist.")
            except Exception as e:
                print("Command did not work. Check arguments.")
                print(e)
    # SYSTEM LEVEL COMMANDS
    def get_all_addrs(self):
        return str(list(self.clients.keys()))
    def step(self):
        for client in self.clients.values():
            client.process()
    def floodall(self):
        for client in self.clients.values():
            for neighbor in client.net.neighbors.keys():
                client.transmit_model(neighbor)
    # INTERNODE LEVEL COMMANDS
    def exchange(self, ip1, ip2):
        self.clients[ip1].transmit_model(ip2)
        self.clients[ip2].transmit_model(ip1)
    def flood(self, ip):
        for neighbor in self.clients[ip].net.neighbors.keys():
            self.clients[ip].transmit_model(neighbor)
    # NODE LEVEL COMMANDS
    def istep(self, ip):
        self.clients[ip].process()

## Execution

In [4]:
import time
from datetime import datetime

# Create a graph
graph = {}
graph["10.0.0.1"] = ["10.0.0.2", "10.0.0.3"]
graph["10.0.0.2"] = ["10.0.0.1", "10.0.0.3"]
graph["10.0.0.3"] = ["10.0.0.1", "10.0.0.2"]
print("Created network graph.")

# Create nodes for the virtual network
ipRegistry = {}
for addr in graph.keys():
    newNode = DummyNet(addr, graph[addr])
    ipRegistry[addr] = newNode
# Build network (decentralized)
for addr in graph.keys():
    ipRegistry[addr].init_network(ipRegistry)
print("Registered nodes in network graph.")

# Create clients
clientDict = {}
for ip in ipRegistry.keys():
    clientDict[ip] = Client(netNode=ipRegistry[ip])
print("Clients created and linked to nodes.")

# clientDict["10.0.0.1"].transmit(str(time.time()), "10.0.0.2")
# time.sleep(2.5)
# clientDict["10.0.0.2"].transmit(str(time.time()), "10.0.0.3")
# time.sleep(2.5)

# print("Begin experiment.")
# for i in range(10):
#     secrets.choice(list(clientDict.values())).transmit_model()
#     time.sleep(1)

# Start execution
console = Console(clientDict)
console.run()

print("Ending experiment.")
# Kill all nodes
for addr in graph.keys():
    ipRegistry[addr].kill()


Created network graph.
Registered nodes in network graph.
Clients created and linked to nodes.
>>hi
Command does not exist.
>>exchange 10.0.0.1 10.0.0.2
CLIENT::[10.0.0.1]::Model still processing.
CLIENT::[10.0.0.2]::Model still processing.
>>step
CLIENT::[10.0.0.1]::Training model.
CLIENT::[10.0.0.2]::Training model.
CLIENT::[10.0.0.3]::Training model.
>>exchange 10.0.0.1 10.0.0.2
CLIENT::[10.0.0.1]::Transmitting model to 10.0.0.2NET::[10.0.0.1]::Sent: src=10.0.0.1,dest=10.0.0.2,data=hi it's me from 10.0.0.1NET::[10.0.0.2]::Received: src=10.0.0.1,dest=10.0.0.2,data=hi it's me from 10.0.0.1


CLIENT::[10.0.0.2]::Transmitting model to 10.0.0.1
NET::[10.0.0.2]::Sent: src=10.0.0.2,dest=10.0.0.1,data=hi it's me from 10.0.0.2
NET::[10.0.0.1]::Received: src=10.0.0.2,dest=10.0.0.1,data=hi it's me from 10.0.0.2
>>step
CLIENT::[10.0.0.1]::Processing model from 10.0.0.2
CLIENT::[10.0.0.1]::Aggregating model with new input.
CLIENT::[10.0.0.2]::Processing model from 10.0.0.1
CLIENT::[10.0.0.2]::Ag