# Gossip Learning Overview

# Implementation

## Dummy Network Implementation

In [1]:
import time
import threading
import traceback

# Packet class
class Packet:
    def __init__(self, src, dest, data):
        self.src = src
        self.dest = dest
        self.data = data
    def __str__(self):
        return "src=" + self.src + ",dest=" + self.dest + ",data=" + str(self.data)

class DummyNet:
    def __init__(self, address, neighbor_addrs = []):
        self.ip = address
        self.neighbor_addrs = neighbor_addrs
        self.outbox = []
        self.inbox = []
        self.active = True
        
    # Second stage initialization to build the 'network connections'.
    def init_network(self, registry):
        # Build neighbors
        self.build_neighbors(registry)
        # Init sender and receiver processes
        self.init_sender()
        self.init_receiver()
        
    # Used to set active flag such that send/receive processes terminate.
    def kill(self):
        self.active = False
        
    # Networ reporting function.
    def print_(self, message):
        # Network level print messaging
        output = "NET::[" + str(self.ip) + "]::"
        try:
            output = output + str(message)
        except:
            output = output + "Message not printable."
        print(output)
        
    # Given registry, builds neighbor dictionary.
    def build_neighbors(self, registry):
        # The registry is built as a dictionary with key IP address an entry DummyNet object
        self.neighbors = {}
        for addr in self.neighbor_addrs:
            self.neighbors[addr] = registry[addr]
    
    # Starts sender service.
    def init_sender(self):
        threading.Thread(target=self.__send, args=()).start()
        pass
    
    # Starts receiver service.
    def init_receiver(self):
        threading.Thread(target=self.__receive, args=()).start()
        pass
    
    # Network layer send function.
    def __send(self):
        while self.active:
            # Send packet, if failed, print exception.
            try:
                if len(self.outbox):
                    packet = self.outbox.pop(0)
                    self.neighbors[packet.dest].inbox.append(packet)
                    self.print_("Sent: " + str(packet))
            except Exception as e:
                self.print_("Sending error has occurred.")
                traceback.print_exc()
    
    # Receiving/processing function.
    def __receive(self):
        while self.active:
            try:
                if len(self.inbox):
                    packet = self.inbox.pop(0)
                    self.print_("Received: " + str(packet))
            except Exception as e:
                self.print_("Receiving error has occurred.")
                traceback.print_exc()
        pass
    
    # Application layer send function.
    def send(self, payload, address):
        # Create packet
        packet = Packet(self.ip, address, payload)
        # Load packet into outbox
        self.outbox.append(packet)
        
    # Application layer receive function, gets from inbox buffer.
    def receive(self):
        # If buffer is not empty, return packet, else return None.
        if len(inbox):
            return inbox.pop(0)
        else:
            return None

## Client

In [2]:
import secrets

# A single record.
class Record:
    def __init__(self, ip, weights, expiry):
        self.ip = ip
        self.weights = weights
        self.expiry = expiry
    def step(self, s=1):
        self.expiry -= 1
        if self.expiry <= 0:
            return True
        else:
            return False

# Holds and manages records and requests.
class GuestBook:
    def __init__(self):
        self.records = {}
    def encounter(self, ip, weights, expiry):
        self.records[ip] = Record(ip, weights, expiry)
    def step(self):
        # Increment another step and reduce expirations across records.
        for ip in self.records.keys():
            # If incremental step results in expiration, remove record.
            if self.records[ip].step():
                expunged = self.records.pop(ip)
                

class Client:
    def __init__(self, netNode=None, ip=None, neighbor_addrs=None):
        if netNode is not None:
            self.net = netNode
        else:
            self.net = DummyNet(ip, neighbor_addrs)
        self.model = "hi it's me from " + self.net.ip
            
    # Client reporting function
    def print_(self, message):
        # Client level print messaging
        output = "CLIENT::[" + str(self.net.ip) + "]::"
        try:
            output = output + str(message)
        except:
            output = output + "Message not printable."
        print(output)
    
    # Lowest level client transmission function.
    def transmit(self, payload, target_addr):
        self.net.send(payload, target_addr)
        
    # Model transfer function.
    def transmit_model(self):
        # Select recipient
        recipient = self.select_random_recv()
        # Transmit model to recipient
        self.transmit(self.model, recipient)
        self.print_("Transmitting model to " + recipient)
    
    # Select random recipient.
    def select_random_recv(self):
        return secrets.choice(list(self.net.neighbors.keys()))

## Execution

In [3]:
import time
from datetime import datetime

# Create a graph
graph = {}
graph["10.0.0.1"] = ["10.0.0.2", "10.0.0.3"]
graph["10.0.0.2"] = ["10.0.0.1", "10.0.0.3"]
graph["10.0.0.3"] = ["10.0.0.1", "10.0.0.2"]
print("Created network graph.")

# Create nodes for the virtual network
ipRegistry = {}
for addr in graph.keys():
    newNode = DummyNet(addr, graph[addr])
    ipRegistry[addr] = newNode
# Build network (decentralized)
for addr in graph.keys():
    ipRegistry[addr].init_network(ipRegistry)
print("Registered nodes in network graph.")

# Create clients
clientDict = {}
for ip in ipRegistry.keys():
    clientDict[ip] = Client(netNode=ipRegistry[ip])
print("Clients created and linked to nodes.")

# clientDict["10.0.0.1"].transmit(str(time.time()), "10.0.0.2")
# time.sleep(2.5)
# clientDict["10.0.0.2"].transmit(str(time.time()), "10.0.0.3")
# time.sleep(2.5)

print("Begin experiment.")
for i in range(10):
    secrets.choice(list(clientDict.values())).transmit_model()
    time.sleep(1)

print("Ending experiment.")
# Kill all nodes
for addr in graph.keys():
    ipRegistry[addr].kill()


Created network graph.
Registered nodes in network graph.
Clients created and linked to nodes.
Begin experiment.
CLIENT::[10.0.0.2]::Transmitting model to 10.0.0.1
NET::[10.0.0.2]::Sent: src=10.0.0.2,dest=10.0.0.1,data=hi it's me from 10.0.0.2
NET::[10.0.0.1]::Received: src=10.0.0.2,dest=10.0.0.1,data=hi it's me from 10.0.0.2
CLIENT::[10.0.0.2]::Transmitting model to 10.0.0.1
NET::[10.0.0.2]::Sent: src=10.0.0.2,dest=10.0.0.1,data=hi it's me from 10.0.0.2NET::[10.0.0.1]::Received: src=10.0.0.2,dest=10.0.0.1,data=hi it's me from 10.0.0.2

CLIENT::[10.0.0.3]::Transmitting model to 10.0.0.1
NET::[10.0.0.3]::Sent: src=10.0.0.3,dest=10.0.0.1,data=hi it's me from 10.0.0.3
NET::[10.0.0.1]::Received: src=10.0.0.3,dest=10.0.0.1,data=hi it's me from 10.0.0.3
CLIENT::[10.0.0.2]::Transmitting model to 10.0.0.3NET::[10.0.0.2]::Sent: src=10.0.0.2,dest=10.0.0.3,data=hi it's me from 10.0.0.2NET::[10.0.0.3]::Received: src=10.0.0.2,dest=10.0.0.3,data=hi it's me from 10.0.0.2


CLIENT::[10.0.0.3]::Transmi