
# Conflicts and versions 

Conflicts are an essential part of any distributed system. 
Conflicts arise when there are two or more valid versions of the same message.

As we've seen from the previous notebooks, the network can be unreliable: there may be longer delays. But what about faults/conflicts from the clients/message producers? Moreover, a message might have multiple valid versions.  It could be that peers received two versions of the message, both being signed and valid. What version to choose? What to do when an order of the message is important? 



In [1]:
# Initialize the experiment:
import networkx as nx
import p2psimpy as p2p
import warnings
warnings.filterwarnings('ignore')

# Load the previous experiment configurations
exper = p2p.BaseSimulation.load_experiment(expr_dir='crash_gossip')

Locations, topology, peer_services, serv_impl = exper


## Client generating conflicting information

Let's assign first adversary nodes, we will assign randomly: 

In [2]:
import random
import string

from p2psimpy import BaseMessage, GossipMessage, MessageProducer, PullGossipService

from p2psimpy.consts import TEMPERED
from p2psimpy.config import Config, Func, Dist

class Transaction(BaseMessage):
    pass 

class ConflictMessageProducer(MessageProducer):
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        self.balance = 100 
        
        
    def _generate_tx(self):        
        msg_phash = ''.join(random.choices(string.ascii_uppercase, k=20))
        diff = random.randint(1, 9)        
        data = {'hash': msg_phash, 'balance': self.balance-diff, 'diff': diff}
        tx = Transaction(self.peer, data)
        
        msg_id = '_'.join((str(self.peer.peer_id), str(self.counter)))
        msg_ttl = self.init_ttl
        return GossipMessage(self.peer, msg_id, tx, msg_ttl, 
            pre_task=self.pre_task, post_task=self.post_task)
    
    def produce_transaction(self):
        # generate new transaction
        msg = self._generate_tx()
        if self.balance - msg.data.data['diff'] < 0:
            return
        
        cons = list(self.peer.connections.keys())
        m_ix = len(cons) // 2
        for p in cons[:m_ix]:
            self.peer.send(p, msg)
            
        # Generate conflicting message as if previous transaction hasn't happened  
        msg = self._generate_tx()
        for p in cons[m_ix:]:
            self.peer.send(p, msg)
        
        self.peer.store('msg_time', str(self.peer.peer_id) +'_' + str(self.counter), self.peer.env.now)
        self.peer.store('msg_data', str(self.peer.peer_id) + '_' + str(self.counter), msg)
        self.balance -= msg.data.data['diff']
        self.counter+=1

def validate_task(msg, peer):
    # time it takes to verify the signature
    crypto_verify = Dist('norm', (1, 0.2)) 
    # time to verify the message data
    msg_verify = Dist("lognorm", (0.49512563, 4.892564, 0.0425785)) 
    
    yield peer.env.timeout(crypto_verify.get() + msg_verify.get())
    if isinstance(msg, Transaction):        
        tx = msg.data
        if tx == TEMPERED or tx['balance'] < 0:
            # You can decide what to do in this case.
            return False
    return True


class MsgConfig(Config):
    pre_task = Func(validate_task)
    
peer_services['client'].service_map['MessageProducer'] = MsgConfig
serv_impl['MessageProducer'] = ConflictMessageProducer
serv_impl['RangedPullGossipService'] = PullGossipService

## Run simulation 

Let's see how adversarial agents together with crashing nodes affect the message dissemination. 

In [3]:
peer_services

{'client': PeerType(config=<class 'p2psimpy.config.PeerConfig'>, service_map={'BaseConnectionManager': None, 'MessageProducer': <class '__main__.MsgConfig'>}),
 'peer': PeerType(config=<class 'p2psimpy.config.PeerConfig'>, service_map={'BaseConnectionManager': None, 'RandomDowntime': <class 'p2psimpy.config.DowntimeConfig'>, 'RangedPullGossipService': <class 'p2psimpy.config.GossipConfig'>})}

In [4]:
# Init Graph
sim = p2p.BaseSimulation(Locations, topology, peer_services, serv_impl)
sim.run(5_200)

## Message analysis

Let's see how this fraction of adverserial nodes affected the network. 


In [5]:
import pandas as pd

def message_data(sim, peer_id, storage_name):
    store = sim.peers[peer_id].storage[storage_name].txs
    for msg_id, tx in store.items():
        client_id, msg_num = msg_id.split('_')
        client_tx = sim.peers[int(client_id)].storage[storage_name].txs[msg_id]
        yield (int(msg_num), tx.data == client_tx.data)
        
def get_gossip_table(sim, storage_name, func):
    return pd.DataFrame({k: dict(func(sim, k, storage_name)) 
                         for k in set(sim.types_peers['peer'])}).sort_index()

    
df = get_gossip_table(sim, 'msg_data', message_data)
df

Unnamed: 0,1,2,3,4,5,6,7,8,9,10,...,16,17,18,19,20,21,22,23,24,25
1,True,False,False,False,True,True,False,True,False,False,...,False,True,False,True,True,False,False,False,False,False
2,True,False,False,True,True,False,False,False,False,False,...,False,True,False,False,False,False,False,False,False,True
3,True,False,True,True,True,False,False,False,False,False,...,False,True,False,False,False,False,False,False,False,False
4,True,False,False,False,True,True,False,False,False,False,...,False,True,False,True,False,False,True,False,True,False
5,True,False,False,True,True,True,True,False,False,False,...,False,True,True,True,True,True,True,True,True,False
6,True,False,True,False,True,True,False,False,False,True,...,False,True,False,True,True,False,False,False,False,False
7,True,False,False,False,True,True,False,True,False,False,...,True,True,False,True,True,True,False,False,False,True
8,False,False,False,False,False,False,False,False,False,False,...,False,False,False,False,False,False,False,False,False,False
9,False,True,False,True,True,True,True,False,True,False,...,True,True,False,True,True,True,False,False,True,True
10,True,False,False,True,True,True,True,True,False,False,...,True,True,False,False,True,True,False,False,True,True


In [6]:
df[df==False].count()

1      5
2     16
3     13
4      9
5      2
6      5
7      9
8     10
9     13
10    20
11     8
12    10
13    15
14    19
15     2
16    13
17     2
18    13
19     6
20     6
21     9
22    13
23    15
24    13
25     7
dtype: int64

In [7]:
sim.peers[3].storage['msg_data'].txs

{'26_1': GossipMessage:Transaction:{'hash': 'KLBKOFKOGSHEVIKSQQNQ', 'balance': 99, 'diff': 1},
 '26_2': GossipMessage:Transaction:{'hash': 'BGZHEQRXJEEDVNPXZIUR', 'balance': 93, 'diff': 4},
 '26_3': GossipMessage:Transaction:{'hash': 'LWIRSDMBAAGYKFCFIGQF', 'balance': 83, 'diff': 9},
 '26_4': GossipMessage:Transaction:{'hash': 'IQKRQTFYTKVPSOYLIZHO', 'balance': 78, 'diff': 5},
 '26_5': GossipMessage:Transaction:{'hash': 'UQCTVIJADJIGTFTETPWO', 'balance': 79, 'diff': 3},
 '26_6': GossipMessage:Transaction:{'hash': 'HOUKUJGNBRCFJEDVBZED', 'balance': 73, 'diff': 3},
 '26_8': GossipMessage:Transaction:{'hash': 'BUECAEHGMMGNZVVMOLNG', 'balance': 59, 'diff': 7},
 '26_7': GossipMessage:Transaction:{'hash': 'IQYUSFTTRCKDUHTLECXS', 'balance': 65, 'diff': 8},
 '26_9': GossipMessage:Transaction:{'hash': 'VPABWHZRTJSZPJAUUPZX', 'balance': 58, 'diff': 2},
 '26_10': GossipMessage:Transaction:{'hash': 'MNEWXBQMOZOUWLRNSCKN', 'balance': 55, 'diff': 1},
 '26_11': GossipMessage:Transaction:{'hash': 'HJR

In [8]:
sim.peers[17].storage['msg_data'].txs

{'26_1': GossipMessage:Transaction:{'hash': 'PSVVXPHTHSBTMCCFAUWB', 'balance': 97, 'diff': 3},
 '26_2': GossipMessage:Transaction:{'hash': 'XRCEGWOETTRDZDXLNFMA', 'balance': 92, 'diff': 5},
 '26_3': GossipMessage:Transaction:{'hash': 'LWIRSDMBAAGYKFCFIGQF', 'balance': 83, 'diff': 9},
 '26_4': GossipMessage:Transaction:{'hash': 'MOMDIXBIBSVDVIQHTSLA', 'balance': 82, 'diff': 1},
 '26_5': GossipMessage:Transaction:{'hash': 'LBQEVNKDPCNVQPHHEIUU', 'balance': 76, 'diff': 6},
 '26_6': GossipMessage:Transaction:{'hash': 'HOUKUJGNBRCFJEDVBZED', 'balance': 73, 'diff': 3},
 '26_7': GossipMessage:Transaction:{'hash': 'ITNQWTRNGZCYQAVYSSMP', 'balance': 66, 'diff': 7},
 '26_9': GossipMessage:Transaction:{'hash': 'QTRIEJMLRJEYTTPAFTFM', 'balance': 56, 'diff': 4},
 '26_10': GossipMessage:Transaction:{'hash': 'UONGCLKDHJOXXQTQDABY', 'balance': 50, 'diff': 6},
 '26_8': GossipMessage:Transaction:{'hash': 'BUECAEHGMMGNZVVMOLNG', 'balance': 59, 'diff': 7},
 '26_11': GossipMessage:Transaction:{'hash': 'NXU

Peers see different versions of the same message!
This is an issue as it might violate integrity guarantees. For example, peers might have a different view on the client's balance. 

How to fix this? 
- One way to solve this is to use a (consensus algorithm)[https://en.wikipedia.org/wiki/Consensus_(computer_science)]


# Consensus algorithm


The consensus is a process that allows achieving a consistent view on a value (agreement). 
Some of the peers may fail or be unreliable, so consensus protocols must be fault-tolerant or resilient. The peers must communicate with one another and agree on a single value.

The consensus problem is fundamental in all distributed systems. One approach to generating consensus is for all processes (agents) to agree on a majority value. In this context, a majority requires at least one more than half of the available votes (where each process is given a vote). However, one or more faulty processes may skew the resultant outcome such that consensus may not be reached or reached incorrectly.



#  Exercise 

In this notebook we ask to implement a consensus service and show that all honest peers accept the same value.  
You can assume that an elected leader is never faulty. 
Here are some poissible algorithms you can consider: 
- **Majority Voting**. Fully connected network. Send votes to all nodes. Choose the value based on majority/super-majority. How many rounds/phases do you need?  
- **Majority Voting with Neighbours**. Connected network. Send votes to all neighbors. Choose the value based on majority/super-majority of your neighbors. How many rounds/phases you need for full convergence? 
- **Consensus through a lottery**. Send transactions through gossip. Everybody runs some lottery mechanism: that both takes time and chooses one or several nodes. This(these) nodes decide which version of the transaction to pick and send through gossip a decided version of transactions (block). How many rounds of lottery you need for convergence? 

**You can choose any of the above algorithm or propose your own**


# The actual message sending and simulation was a bit above my python skills, 
# so I simulated some two consensus algorithms using simply using lists and the peers storage and accessing the other peers storage as 'sending' a message. 
# I hope the intention is clear!

In [211]:
# the first one is a consensus algorithm by majority voting with neighbors

# for all peer in the peer group
# send the message 1 to neighbours

import statistics as stat
from collections import Counter
from itertools import takewhile
import numpy as np
import random

# create a network 
def prepare_topology(num_peers=25, num_clients=0):    
    # Create network topology
    G = nx.erdos_renyi_graph(num_peers, 0.4)   
    nx.relabel_nodes(G, {k: k+1 for k in G.nodes()} ,copy=False)
    
    # Connect the client node to a random peer
    client_edges = [(i, choice(list(G.nodes()))) for i in range(num_peers+1, num_clients+num_peers+1)]
    G.add_edges_from(client_edges)

    types_map = {k: 'peer' if k < num_peers+1 else 'client' for k in G.nodes()}
    # Assign a peer type to the peers 
    nx.set_node_attributes(G, types_map , 'type')
    return G



# this function update the values of the neigbhouring nodes to update its own based on the majority
def update_value_from_neighbours(G, list_total, real_nodes, faulty_nodes):
    for peer_id in real_nodes:
        #if decided[peer_id] is not True:
            list_values=[]
            for neighbor in G.neighbors(peer_id+1):
                if neighbor<26:
                    list_values.append(list_total[neighbor-1])
                    #list_neighbors[peer_id].append(neighbor)
            test_list1 = Counter(list_values) 
            new_value = test_list1.most_common(1)[0][0]
            list_total[peer_id]=new_value
            
     #faulty peers choose and send a random value
    for peer_id in faulty_nodes:
        list_total[peer_id]=random.choice(value_set)
        
    return list_total

# reiterating until all the nodes have the same value. (this would not be possible normally, because the nodes cannot see the
#other values and therefore would not know where to stop)
def reiterate_till_convergence(G,list_total,rounds, real, faulty):
    list_total_new= update_value_from_neighbours(G, list_total, real, faulty)
    rounds=rounds+1
    test_list2 = Counter(list_total_new[real])
    if test_list2.most_common()[0][1]<len(list_total_new[real]):
        return reiterate_till_convergence(G,list_total_new,rounds, real, faulty)
    else:
        return list_total_new[real],rounds


def divide_peers(num_peers=25, honest=0.8):
    data =list(range(0, num_peers))
    random.shuffle(data)
    split=int(25*honest)
    honest_peers = data[:split]
    faulty_peers = data[split:]
    return honest_peers, faulty_peers

#set variables 
rounds=0
value_set= ['a', 'b', 'c', 'd' ]
num_peers=25

G=prepare_topology(num_peers)
honest_peers, faulty_peers= divide_peers(num_peers)
value_list=np.array(random.choices(value_set, k=25))

outcome = reiterate_till_convergence(G , value_list, rounds, honest_peers, faulty_peers)
print(outcome)


# you can see that with this network and the message spread of the previous simulation, it takes about 2 to 3 rounds until 
#the network is actually converged, but this changes a lot when changes the amount of values, neighbours or faulty nodes
# however, like this the nodes do not know when to stop themselves
# an important part that this misses is that the nodes themselves need to know when the honest nodes has converged on a single value




(array(['c', 'c', 'c', 'c', 'c', 'c', 'c', 'c', 'c', 'c', 'c', 'c', 'c',
       'c', 'c', 'c', 'c', 'c', 'c', 'c'], dtype='<U1'), 2)


In [212]:
# in this example, I (tried to) simulate the random byzantine agreement as was discussed in the lecture
# this is in principle a fully connected network, and I simulated several dishonest nodes
# the dishonest nodes simply pick a random value each round 


# this causes faulty nodes, where the messages don't arrive to node they are sent to
def get_faulty_broadcasted_set(list_total, peer_nr, faulty):
    inputs=np.random.choice(25, 25-faulty, replace=False)
    return [list_total[k] for k in inputs]


#creating the proposal value based on the input from the other nodes
def update_value_from_neighbours(list_total, honest_peers, faulty_peers, faulty):
    new_values=random.choices(value_set, k=25)
    print(new_values)
    for peer_id in honest_peers:
        list_values=get_faulty_broadcasted_set(list_total, peer_id, faulty)
        test_list1 = Counter(list_values) 
        if test_list1.most_common()[0][1]>(25+(25-len(honest_peers)))/2:
            new_values[peer_id]=test_list1.most_common()[0][0]                           
        else:
            new_values[peer_id]='idk'
    return new_values

#deciding whether enough nodes have the same proposal values
def check_prosopal(list_total,decided, honest_peers, faulty_peers, faulty):
    new_values = update_value_from_neighbours(list_total, honest_peers, faulty_peers, faulty)
    for peer_id in honest_peers:
        if decided[peer_id]==False:
            list_values=get_faulty_broadcasted_set(new_values, peer_id, faulty)
            test_list1 = Counter(list_values) 
            if test_list1.most_common()[0][1]>5 and test_list1.most_common()[0][0] is not 'idk':
                list_total[peer_id]=test_list1.most_common()[0][0]       
                if test_list1.most_common()[0][1]>15 and test_list1.most_common()[0][0] is not 'idk':
                    decided[peer_id]=True
            else:
                list_total[peer_id]=random.choice(value_set)
    return list_total, decided

                                           
#iterating to completion                                          
def iterate_to_completion(list_total, rounds, decided, honest_peers, faulty_peers, failing):
    list_total, decided = check_prosopal(list_total, decided, honest_peers, faulty_peers, failing)
    if not all(x==True for x in decided[honest_peers]):
        rounds=rounds+1 
        return iterate_to_completion(list_total, rounds, decided, honest_peers, faulty_peers, failing)
    else:
        rounds=rounds+1 
        return list_total, decided[honest_peers], rounds

#setup of the honest vs dishonest nodes
def divide_peers(num_peers=25, honest=0.8):
    data =list(range(0, num_peers))
    random.shuffle(data)
    split=int(25*honest)
    honest_peers = data[:split]
    faulty_peers = data[split:]
    return honest_peers, faulty_peers


#set variables 
rounds=0
value_set= ['a', 'b']
num_peers=25
#amount of peers that fail to reach the other peer each round
failing=4

# prepare peer set
honest_peers, faulty_peers= divide_peers(num_peers, 21/25)
value_list=np.array(random.choices(value_set, k=25))
decided=np.array([False]*25)


print(iterate_to_completion(value_list, rounds, decided, honest_peers, faulty_peers, failing))

# this algorithm takes longer on average, all the nodes are connected with each other. 
#This is because here every nodes can decide when to stop iterating because the network has converged.


['b', 'a', 'b', 'b', 'a', 'b', 'b', 'a', 'a', 'b', 'b', 'b', 'b', 'b', 'a', 'a', 'a', 'a', 'a', 'b', 'a', 'b', 'a', 'b', 'a']
['b', 'a', 'b', 'a', 'a', 'b', 'b', 'b', 'b', 'b', 'a', 'b', 'b', 'b', 'b', 'b', 'a', 'a', 'b', 'b', 'a', 'a', 'b', 'b', 'a']
['a', 'b', 'b', 'a', 'a', 'b', 'a', 'b', 'b', 'a', 'b', 'a', 'b', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'a', 'b']
['a', 'a', 'b', 'a', 'a', 'b', 'b', 'b', 'b', 'a', 'b', 'a', 'b', 'b', 'a', 'a', 'a', 'a', 'a', 'b', 'b', 'a', 'b', 'b', 'b']
['a', 'b', 'a', 'b', 'a', 'b', 'b', 'b', 'a', 'a', 'a', 'b', 'b', 'b', 'a', 'a', 'a', 'a', 'b', 'a', 'a', 'a', 'b', 'a', 'b']
['a', 'a', 'a', 'a', 'a', 'b', 'b', 'a', 'a', 'a', 'b', 'a', 'a', 'b', 'a', 'b', 'a', 'a', 'b', 'b', 'a', 'b', 'b', 'a', 'b']
['b', 'a', 'b', 'b', 'a', 'a', 'b', 'b', 'a', 'a', 'a', 'b', 'a', 'a', 'a', 'a', 'b', 'b', 'b', 'b', 'b', 'b', 'a', 'b', 'b']
['b', 'a', 'a', 'a', 'b', 'b', 'b', 'b', 'b', 'b', 'a', 'b', 'b', 'a', 'b', 'a', 'a', 'b', 'b', 'a', 'b', 'a', 'b', 'a