# Laboratorio 2025
### Análisis y diseño de algoritmos distribuidos en redes
### Andrés Montoro 5.169.779-1


## Importación de módulos

In [None]:
from pydistsim.algorithm.node_algorithm import NodeAlgorithm, StatusValues
from pydistsim.algorithm.node_wrapper import NodeAccess
from pydistsim.message import Message
from pydistsim.restrictions.communication import BidirectionalLinks
from pydistsim.restrictions.reliability import TotalReliability
from pydistsim.restrictions.topological import Connectivity
from pydistsim.restrictions.knowledge import InitialDistinctValues

from pydistsim import NetworkGenerator, Simulation
from pydistsim.logging import set_log_level, LogLevels, enable_logger, disable_logger, logger

# from pydistsim.gui import drawing as draw
# %matplotlib inline
# from matplotlib import pyplot as plt

import numpy as np
from pprint import pformat
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.exceptions import InvalidSignature

import time

from utils import *

set_log_level(LogLevels.INFO)
# disable_logger()
enable_logger()

Para empezar, se solicitan algunos hiperparámetros para determinar la ejecución del notebook. Los mismos pueden ser cambiados para cambiar las pruebas.

In [None]:
############
#### OM ####
############
max_f = 4                                               # Hasta con que cantidad de traidores se prueba Oral Message. Tener en cuenta que n = 3f + 1, y que f=4 puede demorar entre una y dos horas.
traitor_strategy = ByzantineBehavior.Behavior.CONFUSER  # ByzantineBehavior.Behavior provee distintas estrategias de traicion, por ejemplo ByzantineBehavior.Behavior.LIER
verbose_OM = False                                      # Si se quiere logging detallado de la ejecucion del algoritmo Oral Messages en carpeta results




#############
#### 2PC ####
#############
n_2PC = 10          # Cantidad de nodos nodos en el algoritmo 2PC. n-1 de ellos son participantes y 1 es el coordinador
its_2PC = 20        # Cantidad de iteraciones para el algoritmo 2PC
verbose_2PC = True  # Si se quiere logging detallado de la ejecucion del algoritmo 3PC en carpeta results

# Si se quiere que el nodo 2PC tenga chance de crashear:
between_Prepares_2PC = False        # mientras envía mensajes Prepare. Default es False
between_timeout_aborts_2PC = False  # mientras envía abort por timeout. Default es False
before_OK_2PC = True                # antes de enviar su mensaje OK. Default es True
between_Commits_2PC = False         # mientras envía mensajes Commit. Default es False



#############
#### 3PC ####
#############
n_3PC = 10          # Cantidad de nodos nodos en el algoritmo 3PC. n-1 de ellos son participantes y 1 es el coordinador
its_3PC = 20        # Cantidad de iteraciones para el algoritmo 3PC
verbose_3PC = True  # Si se quiere logging detallado de la ejecucion del algoritmo 3PC en carpeta results

# Si se quiere quere que el nodo tenga chances de crashear:
between_Prepares_3PC = False            # mientras envía mensajes Prepare. Default es False
between_OKtimeout_aborts_3PC = False    # mientras envía abort por timeout. Default es False
before_OK_3PC = True                    # antes de enviar su mensaje OK. Default es True
between_PreCommits_3PC = True           # mientras envía mensajes PreCommit. Default es True
between_Readytimeout_aborts_3PC = False # mientras envía abort por timeout en PreCommit. Default es False
before_Ready_3PC = True                 # antes de enviar su mensaje Ready. Default es True
between_Alerts_3PC = False              # mientras ordena commitear si detecta que el coordinador falló. Default es False
between_Commits_3PC = True              # mientras envía mensajes Commit. Default es False




###########################
###### Byzantine 3PC ######
###########################
n_B3PC = 20
Its_B3PC = 20
verbose_B3PC = 20


## Parte 1: 

### 1. Implementación básica del problema

- Simule un conjunto de n generales (nodos) que deben decidir si atacar o retirarse.
- Permita que hasta f generales sean bizantinos, es decir, que puedan enviar mensajes contradictorios.
- Los generales leales deberán acordar una decisión común, cumpliendo las condiciones de consistencia y validez.

### 2. Protocolos de comunicación


- Implemente el protocolo recursivo propuesto por Lamport, Shostak y Pease (OM(f)).

In [None]:
class ByzantineGenerals(NodeAlgorithm):
    def __init__(self, simulation, traitor_strategy, *args, f=None, verbose=False, **kwargs):
        super().__init__(simulation, *args, **kwargs)
        self.f = f
        self.messages_counter = 0
        self.verbose = verbose
        if self.verbose:
            self.logging = []
        self.traitor_strategy = traitor_strategy
        # self.max_messages = self.network.size() ** (f + 2)

    default_params = {
        "Value" : "Value",
    }

    class Status(StatusValues):
        COMMANDER = "COMMANDER"
        GENERAL = "GENERAL"
        DONE = "DONE"
        TRAITOR = "TRAITOR"
    S_init = (Status.COMMANDER, Status.GENERAL, Status.TRAITOR)
    S_term = (Status.DONE,)

    algorithm_restrictions = (
        BidirectionalLinks,
        Connectivity, 
        TotalReliability,
        InitialDistinctValues 
    )
        
    def initializer(self):
        self.apply_restrictions()
        traidores = random.sample(list(self.network.nodes()), self.f)
        siege = Siege(len(self.network.nodes()) - self.f, self.log)
        commander_general = self.network.nodes_sorted()[0]
        for node in self.network.nodes():
            node.memory["siege"] = siege
            node.memory["values_tree"] = {}
            node.memory["decision_tree"] = {}
            node.memory["liutenants"] = self.network.nodes() - {node, commander_general}
            node.status = self.Status.GENERAL
        commander_general.status = self.Status.COMMANDER
        for traitor in traidores:
            traitor.status = self.Status.TRAITOR
            traitor.memory["behavior"] = ByzantineBehavior(self.traitor_strategy)
        commander_general.push_to_inbox(Message(meta_header=NodeAlgorithm.INI, destination=commander_general))


    def log(self, msg):
        if self.verbose:
            self.logging.append(msg)


    def get_included_liutenants(self, node: NodeAccess, path):
        already_visited = []
        for neigh in node.memory["liutenants"]:
            if neigh.memory["unique_value"] in path:
                already_visited.append(neigh)
        self.log(f"[General {node.memory['unique_value']}] de {node.memory['liutenants']}, descarto {already_visited}")
        return node.memory["liutenants"] - set(already_visited)

    def OM_commander(self, node : NodeAccess, packet : Data):
        liutenants = self.get_included_liutenants(node, packet.path)
        self.log(f"[General {node.memory['unique_value']}] Enviando mensaje con data {packet} por el path {packet.path} a los tenientes {liutenants}")
        send_and_count(
            node, 
            datos=packet, 
            dest=liutenants,
            msj_type=self.default_params["Value"],
            algorithm=self,
        )


    def checks(self, node: NodeAccess, message = None):
        self.log("\n\n")
        if message:
            datos : Data = message.data
            self.log(f"[General {node.memory['unique_value']}] Le llega el mensaje con data {datos}")                        
        if node.status == self.Status.TRAITOR:
            self.log(f"[Traidor {node.memory['unique_value']}]")
        elif node.status == self.Status.COMMANDER:
            self.log(f"\n\n[General {node.memory['unique_value']}] Observes they should {node.memory['values_tree'][()]}")

        # if self.messages_counter > self.max_messages:
        #     raiseError("Cota superior de mensajes excedida. Imprima logging para mas detalles.")

    def get_layer_decisions(self, node: NodeAccess, path):
        layer = len(path)
        nodos = {key: value for (key, value) 
                in node.memory["decision_tree"].items() 
                if (len(key) == layer and key[:layer-1] == path[:layer-1])
                }
        ronda = nodos.values()
        awaiting_values = len(self.get_included_liutenants(node,path)) + 2
        self.log(f"[General {node.memory['unique_value']}] En el path {path}, esperando {awaiting_values} valores, recibidos {len(ronda)}: {list(ronda)}")
        if len(ronda) == (awaiting_values):
            return ronda
        return []


    @Status.COMMANDER
    def spontaneously(self, node: NodeAccess, _: Message):
        siege : Siege = node.memory["siege"]
        decision = node.memory["values_tree"][()] = siege.observe(node)
        self.checks(node)
        datos = Data((), decision)
        self.OM_commander(node, datos)
        if decision == GeneralDecision.ATTACK:
            siege.attack(node)
            node.status = self.Status.DONE
        else:
            siege.retreat(node)
            node.status = self.Status.DONE


    @Status.COMMANDER
    def receiving(self, node: NodeAccess, message: Message):
        error("COMMANDER::receiving", message)


    @Status.TRAITOR
    def spontaneously(self, node: NodeAccess, _: Message):
        self.checks(node)
        behavior : ByzantineBehavior = node.memory["behavior"]
        behavior.send(
            node =node, 
            algorithm = self, 
            path = (), 
            destination = self.get_included_liutenants(node, ()), 
            header = self.default_params["Value"], 
            rcvd_decision = GeneralDecision.RETREAT
        )
        node.status = self.Status.DONE

    @Status.TRAITOR
    def receiving(self, node: NodeAccess, message: Message):
        self.checks(node, message)
        siege : Siege = node.memory["siege"]
        if siege.state != Siege.State.ONGOING:
            node.status = self.Status.DONE
            return
        datos : Data = message.data
        path = datos.path
        new_path =  path + (node.memory["unique_value"],)
        behavior : ByzantineBehavior = node.memory["behavior"]
        behavior.send(
            node=node, 
            algorithm=self, 
            path=new_path, 
            destination=self.get_included_liutenants(node, path), 
            header=self.default_params["Value"], 
            rcvd_decision=datos.value
        )


    @Status.GENERAL
    def receiving(self, node: NodeAccess, message: Message):
        #TODO: Si tiene algun id duplicado podria detectar que es malicioso, pues los leales acordamos no volver a enviar mensajes por un path ya usado.
        self.checks(node, message)
        datos : Data = message.data
        path = datos.path
        m = self.f - len(path)
        if m > 0:
            recursive_path = path + (node.memory["unique_value"],)
            if len(recursive_path) > self.f:
                return
            node.memory["decision_tree"][recursive_path] = datos.value
            new_datos = Data(recursive_path, datos.value)
            self.OM_commander(node, new_datos)
        elif m == 0:
            node.memory["decision_tree"][path] = datos.value
            ronda = self.get_layer_decisions(node, path)
            while path != () and len(ronda) != 0: 
                maj = majority(ronda)
                path = path[:len(path)-1]
                node.memory["decision_tree"][path] = maj
                self.log(f"[General {node.memory['unique_value']}] Decision en path {path}: {maj}")
                ronda = self.get_layer_decisions(node, path)
            if path == ():
                self.log(f"Arbol de decisiones del General {node.memory['unique_value']}:\n{pformat(node.memory['decision_tree'], width=80, sort_dicts=True)}")
                maj = node.memory["decision_tree"][()]
                siege : Siege = node.memory["siege"]
                if maj == GeneralDecision.ATTACK:
                    siege.attack(node)
                    node.status = self.Status.DONE
                else:
                    siege.retreat(node)
                    node.status = self.Status.DONE

    @Status.DONE
    def default(self, *args, **kwargs):    
        pass

- Mida el número total de mensajes intercambiados y el tiempo necesario para llegar al consenso.

In [None]:
def get_no_iters(layer):
    if layer == 1:
        return 100
    elif layer == 2:
        return 100
    elif layer == 3:
        return 50
    elif layer == 4:
        return 1
    else:
        return 1

for layer in range(1, max_f + 1):
    print(f"Corriendo capa {layer}")
    f = layer
    n = 3*f + 1
    its = get_no_iters(layer)
    for i in range(its):
        logger.info(f"Iteracion {i}")
        net_gen = NetworkGenerator(directed=False)
        net = net_gen.generate_complete_network(n)
        sim = Simulation(net, check_restrictions=True)
        sim.algorithms = ((ByzantineGenerals, {"f": f, "verbose" : verbose_OM, "traitor_strategy": traitor_strategy}),)

        begin = time.perf_counter()
        sim.run()
        end = time.perf_counter()

        alg = sim.algorithms[0]
        siege = list(net.nodes())[0].memory["siege"]
        if siege.state not in {Siege.State.SUCCESSFUL, Siege.State.ABORTED}:
            logger.error("Consenso fallido")
        else:
            logger.info("Consenso exitoso")
        logger.info(f"Resultado del asedio: {siege.state}")
        logger.info(f"Tiempo de ejecucion: {end - begin} segundos")
        logger.info(f"Se intercambiaron {alg.messages_counter} mensajes")
        logger.info("\n\n")
        
        if verbose_OM:
            with open(f"results/OM_f={layer}_it={i}.txt", "w") as res:
                for log in alg.logging:
                    print(log, file=res)


### 3. Análisis

- Determine experimentalmente el número mínimo de nodos necesario para alcanzar consenso frente a f fallos bizantinos.
- Verifique la condición teórica n ≥ 3f + 1.

In [None]:
def find_minimum_generals_for_consensus(f : int, its : int = None):
    print(f"------------------------------------------------ Buscando n minimo para f = {f} ------------------------------------------------")
    protective_max = 4*f
    n_search = f
    statistic_success = False
    while not statistic_success and n_search <= protective_max:
        n_search +=1
        print(f"Probando con n = {n_search}")
        statistic_success = True
        for i in range(its):
            net_gen = NetworkGenerator(directed=False)
            net = net_gen.generate_complete_network(n_search)
            sim = Simulation(net, check_restrictions=True)
            sim.algorithms = ((ByzantineGenerals, {"f": f, "verbose" : True, "traitor_strategy" : traitor_strategy}),)
            sim.run()
            siege : Siege = list(net.nodes())[0].memory["siege"]
            statistic_success &= siege.state in {Siege.State.SUCCESSFUL, Siege.State.ABORTED}
            if not statistic_success:
                break
    print("---------------------------------------------------------------------------------------------------------------------------------")
    return n_search if statistic_success else None


disable_logger()
fs = np.arange(1, max_f+1)

for fi in fs:
    iti = get_no_iters(fi)
    n_found = find_minimum_generals_for_consensus(fi, iti)
    if not n_found:
        print(f"No se encontro n minimo para f = {fi} en el espacio de busqueda permitido\n")
    elif n_found >= 3*fi + 1:
        print(f"Se cumple la condicion para f = {fi}: {n_found} ≥ 3*{fi} + 1\n")
    else:
        print(f"No se cumple la condicion para f = {fi}: {n_found} < 3*{fi} + 1\n")
enable_logger()

## Parte 2: Protocolos de Commit de Transacciones

### 1. Implementación del protocolo 2PC (Two-Phase Commit)

- Simule el proceso de coordinación entre un coordinador y varios participantes.

In [None]:
class TwoPhaseCommit(NodeAlgorithm):
    def __init__(self, simulation, crash_place, verbose = False, waiting_time = 15, *args, **kwargs):
        super().__init__(simulation, *args, **kwargs)
        self.messages_counter = 0
        self.verbose = verbose
        if self.verbose:
            self.logging = []
        self.waiting_time = waiting_time
        self.crash_place = crash_place
        
    default_params = {
        "Prepare" : "Prepare", #Can you commit this transaction?
        "OK" : "OK", #I can commit the transaction
        "Commit" : "Commit", #Commit the transaction
        "Abort" : "Abort", #Abort the transaction
        "OKTimeout" : "OKTimeout", #Timeout waiting for participant's OKs
    }

    class Status(StatusValues):
        COORDINATOR = "COORDINATOR"
        PARTICIPANT = "PARTICIPANT"
        POST_PREPARE_COORDINATOR = "POST_PREPARE_COORDINATOR"
        POST_PREPARE_PARTICIPANT = "POST_PREPARE_PARTICIPANT"
        SUCCESS = "SUCCESS"
        CRASHED = "CRASHED"
    S_init = (Status.PARTICIPANT, Status.COORDINATOR,)
    S_term = (Status.SUCCESS, Status.CRASHED)

    algorithm_restrictions = (
        BidirectionalLinks,
        Connectivity, 
        #TotalReliability, #Pueden perderse mensajes
        InitialDistinctValues 
    )
        
    def initializer(self):
        self.apply_restrictions()
        coordinator = self.network.nodes_sorted()[0]
        transaction = Siege(len(self.network.nodes()) - 1, self.log)
        for node in self.network.nodes():
            node.memory["transaction"] = transaction
            node.memory["coordinator"] = coordinator
            node.memory["faulty_behavior"] = CrashBehavior(node, self.log)
            node.status = self.Status.PARTICIPANT
        coordinator.memory["oks"] = {}
        coordinator.status = self.Status.COORDINATOR
        coordinator.push_to_inbox(Message(meta_header=NodeAlgorithm.INI, destination=coordinator))

    def log(self, msg):
        if self.verbose:
            self.logging.append(msg)

    def crash(self, node : NodeAccess):
        if node.status != self.Status.COORDINATOR:
            node.memory["transaction"].crash(node)
        node.status = self.Status.CRASHED


#####################################################################################################################################
##############################################             ETAPA 1                       ###########################################
#####################################################################################################################################

    @Status.COORDINATOR
    def spontaneously(self, node: NodeAccess, _: Message):
        self.log(f"[Node {node.memory['unique_value']}] is the coordinator")
        behavior : CrashBehavior = node.memory["faulty_behavior"]
        sucess = behavior.send(
            node=node, 
            algorithm=self, 
            header=self.default_params["Prepare"], 
            destination=node.neighbors(), 
            chance=self.crash_place[0]
        )
        if not sucess:
            self.crash(node)
            return
        m = Message(
            destination=node.memory["coordinator"],
            header=self.default_params["OKTimeout"]
            )
        node.memory["OKAlarm"] = self.set_alarm(node, self.waiting_time, m)
        node.status = self.Status.POST_PREPARE_COORDINATOR


    @Status.POST_PREPARE_COORDINATOR
    def alarm(self, node: NodeAccess, message: Message):
        if node.status != self.Status.POST_PREPARE_COORDINATOR:
            self.log("No deberia pasar por aca")
        if message.header == self.default_params["OKTimeout"]:
            self.log(f"[Node {node.memory['unique_value']}] Coordinator not waiting any longer for OKs, aborting transaction")
            behavior : CrashBehavior = node.memory["faulty_behavior"]
            sucess = behavior.send(
                node=node, 
                algorithm=self, 
                header=self.default_params["Abort"], 
                destination=node.neighbors(), 
                chance=self.crash_place[1]
            )
            if not sucess:
                self.crash(node)
                return
            node.status = self.Status.SUCCESS
        else:
            error("POST_PREPARE_COORDINATOR::receiving", message)


    @Status.PARTICIPANT
    def receiving(self, node: NodeAccess, message: Message):
        if message.header == self.default_params["Prepare"]:
            behavior : CrashBehavior = node.memory["faulty_behavior"]
            success = behavior.send(
                node=node, 
                algorithm=self, 
                header=self.default_params["OK"], 
                destination=node.memory["coordinator"], 
                chance=self.crash_place[2]
            )
            if not success:
                self.crash(node)
                return
            node.status = self.Status.POST_PREPARE_PARTICIPANT
        elif message.header == self.default_params["Abort"]:
            transaction : Siege = node.memory["transaction"]
            transaction.retreat(node)
            node.status = self.Status.SUCCESS
        else:
            error("PARTICIPANT::receiving", message)



#####################################################################################################################################
##############################################             ETAPA 2                       ###########################################
#####################################################################################################################################

    @Status.POST_PREPARE_COORDINATOR
    def receiving(self, node: NodeAccess, message: Message):
        if message.header == self.default_params["OK"]:
            self.log(f"[Node {node.memory['unique_value']}] Received OK from label {message.source}")
            node.memory["oks"][message.source] = True
            if len(node.memory["oks"]) == len(node.neighbors()):
                self.disable_alarm(node.memory["OKAlarm"])
                behavior : CrashBehavior = node.memory["faulty_behavior"]
                sucess = behavior.send(
                    node=node, 
                    algorithm=self, 
                    header=self.default_params["Commit"], 
                    destination=node.neighbors(), 
                    chance=self.crash_place[3]
                )
                if not sucess:
                    self.crash(node)
                    return
                node.status = self.Status.SUCCESS
        else:
            error("POST_PREPARE_COORDINATOR::receiving", message)
        

    @Status.POST_PREPARE_PARTICIPANT
    def receiving(self, node: NodeAccess, message: Message):
        #Asumo que los participantes no fallan despues de confirmar que pueden hacer la transaccion
        transaction : Siege = node.memory["transaction"]
        if message.header == self.default_params["Commit"]:
            transaction.attack(node)
            node.status = self.Status.SUCCESS
        elif message.header == self.default_params["Abort"]:
            transaction.retreat(node)
            node.status = self.Status.SUCCESS
        else:
            error("POST_PREPARE_PARTICIPANT::receiving", message)


    @Status.SUCCESS
    def default(self, *args, **kwargs):    
        pass

    @Status.CRASHED
    def default(self, *args, **kwargs):    
        pass


#### Pruebas

Forzar crash en momentos concretos:

In [None]:
crash_places_2PC = [between_Prepares_2PC, between_timeout_aborts_2PC, before_OK_2PC, between_Commits_2PC]
for i in range(its_2PC):
    net_gen = NetworkGenerator(directed=False)
    net = net_gen.generate_complete_network(n_2PC)
    sim = Simulation(net, check_restrictions=True)
    sim.algorithms = ((TwoPhaseCommit, {"verbose": verbose_2PC, "crash_place": crash_places_2PC}),)
    sim.reset()
    sim.run()
    alg = sim.algorithms[0]
    transaction : Siege = list(net.nodes())[0].memory["transaction"]
    if verbose_2PC:
        with open(f"results/2PC_it={i}.txt", "w") as res:
            for log in alg.logging:
                print(log, file=res)
    if transaction.state == Siege.State.FAILED:
        logger.info(f"Iteracion {i} perdió atomicidad")
    elif transaction.state == Siege.State.ONGOING:
        logger.info(f"Iteracion {i} terminó en deadlock")
    else:
        logger.info(f"Iteracion {i} exitosa. Todos los nodos" + (" abortaron" if transaction.state == Siege.State.ABORTED else " commitearon la transacción") )
    logger.info("\n")

### 2. Extensión al protocolo 3PC (Three-Phase Commit)

- Mejore el protocolo anterior incorporando una fase intermedia para evitar bloqueos permanentes.

In [None]:
class ThreePhaseCommit(NodeAlgorithm):
    def __init__(self, simulation, crash_place, verbose=False, waiting_time = 15,*args, **kwargs):
        super().__init__(simulation, *args, **kwargs)
        self.messages_counter = 0
        self.waiting_time = waiting_time
        self.crash_place = crash_place
        self.verbose = verbose
        if verbose:
            self.logging = []
        
    default_params = {
        "Prepare" : "Prepare", #Can you commit this transaction?
        "OK" : "OK", #I can commit the transaction
        "PreCommit" : "PreCommit", #Get ready for commiting the transaction
        "Ready" : "Ready", #I'm ready for commiting the transaction
        "Commit" : "Commit", #Commit the transaction
        "Abort" : "Abort", #Abort the transaction
        "OKTimeout" : "OKTimeout", #Timeout waiting for all participant's OKs
        "PreCommitTimeout" : "PreCommitTimeout", #timeout waiting for coordinator's PreCommit
        "ReadyTimeout" : "ReadyTimeout", #timeout waiting for all coordinator's Ready
        "DoCommitTimeout" : "DoCommitTimeout", #Timeout waiting for coordinator's DoCommit

    }

    class Status(StatusValues):
        COORDINATOR = "COORDINATOR"
        PARTICIPANT = "PARTICIPANT"
        POST_PREPARE_COORDINATOR = "POST_PREPARE_COORDINATOR" 
        POST_PREPARE_PARTICIPANT = "POST_PREPARE_PARTICIPANT" 
        PRE_COMMIT_COORDINATOR = "PRE_COMMIT_COORDINATOR"
        PRE_COMMIT_PARTICIPANT = "PRE_COMMIT_PARTICIPANT"
        RECOVERING = "RECOVERING"
        SUCCESS = "SUCCESS"
        CRASHED = "CRASHED"
    S_init = (Status.PARTICIPANT, Status.COORDINATOR,)
    S_term = (Status.SUCCESS, Status.CRASHED)

    algorithm_restrictions = (
        BidirectionalLinks,
        Connectivity, 
        #TotalReliability, #Pueden perderse mensajes
        InitialDistinctValues 
    )
        
    def initializer(self):
        self.apply_restrictions()
        coordinator = self.network.nodes_sorted()[0]
        transaction = Siege(len(self.network.nodes()) - 1, self.log)
        for node in self.network.nodes():
            node.memory["self"] = node
            node.memory["coordinator"] = coordinator
            node.memory["faulty_behavior"] = CrashBehavior(node, self.log)
            node.memory["transaction"] = transaction
            node.status = self.Status.PARTICIPANT
        coordinator.memory["oks"] = {}
        coordinator.memory["readys"] = {}
        coordinator.status = self.Status.COORDINATOR
        coordinator.push_to_inbox(Message(meta_header=NodeAlgorithm.INI, destination=coordinator))

    def log(self, msg):
        if self.verbose:
            self.logging.append(msg)

    def crash(self, node : NodeAccess):
        self.log(f"[Node {node.memory['unique_value']}] Crashed")
        if node.status not in {self.Status.COORDINATOR, self.Status.PRE_COMMIT_COORDINATOR, self.Status.POST_PREPARE_COORDINATOR}:
            node.memory["transaction"].crash(node)
        node.status = self.Status.CRASHED



####################################################################################################################################
##############################################             ETAPA 1                       ###########################################
####################################################################################################################################

    @Status.COORDINATOR
    def spontaneously(self, node: NodeAccess, _: Message):
        behavior : CrashBehavior = node.memory["faulty_behavior"]
        sucess = behavior.send(
            node=node, 
            algorithm=self, 
            header=self.default_params["Prepare"], 
            destination=node.neighbors(), 
            chance=self.crash_place[0]
        )
        if not sucess:
            self.crash(node)
            return
        self.log(f"[Node {node.memory['unique_value']}] is the coordinator")
        m = Message(
            destination=node.memory["coordinator"],
            header=self.default_params["OKTimeout"]
            )
        node.memory["OKAlarm"] = self.set_alarm(node, self.waiting_time, m)
        node.status = self.Status.POST_PREPARE_COORDINATOR


    @Status.POST_PREPARE_COORDINATOR
    def alarm(self, node: NodeAccess, message: Message):
        if message.header == self.default_params["OKTimeout"]:
            self.log(f"[Node {node.memory['unique_value']}] Coordinator not waiting any longer for OKs, aborting transaction")
            behavior : CrashBehavior = node.memory["faulty_behavior"]
            sucess = behavior.send(
                node=node, 
                algorithm=self, 
                header=self.default_params["Abort"], 
                destination=node.neighbors(), 
                chance=self.crash_place[1]
            )
            if not sucess:
                self.crash(node)
                return
            node.status = self.Status.SUCCESS
        else:
            error("POST_PREPARE_COORDINATOR::alarm", message)


    @Status.PARTICIPANT
    def receiving(self, node: NodeAccess, message: Message):
        if message.header == self.default_params["Prepare"]:
            behavior : CrashBehavior = node.memory["faulty_behavior"]
            success = behavior.send(
                node=node, 
                algorithm=self, 
                header=self.default_params["OK"], 
                destination=node.memory["coordinator"], 
                chance=self.crash_place[2]
            )
            if not success:
                self.crash(node)
                return
            m = Message(
                destination=node.memory["self"],
                header=self.default_params["PreCommitTimeout"]
                )
            node.memory["PreCommitAlarm"] = self.set_alarm(node, 2*self.waiting_time, m)
            node.status = self.Status.POST_PREPARE_PARTICIPANT
        elif message.header == self.default_params["Abort"]:
            transaction : Siege = node.memory["transaction"]
            transaction.retreat(node)
            node.status = self.Status.SUCCESS
        else:
            error("PARTICIPANT::receiving", message)


    @Status.POST_PREPARE_PARTICIPANT
    def alarm(self, node: NodeAccess, message: Message):
        if message.header == self.default_params["PreCommitTimeout"]:
            # Un participante podría tomar el lugar de segundo coordinador.
            pass
        else:
            error("POST_PREPARE_PARTICIPANT::alarm", message)




####################################################################################################################################
##############################################             ETAPA 2                       ###########################################
####################################################################################################################################

    @Status.POST_PREPARE_COORDINATOR
    def receiving(self, node: NodeAccess, message: Message):
        if message.header == self.default_params["OK"]:
            node.memory["oks"][message.source] = True
            if len(node.memory["oks"]) == len(node.neighbors()):
                behavior : CrashBehavior = node.memory["faulty_behavior"]
                sucess = behavior.send(
                    node=node, 
                    algorithm=self, 
                    header=self.default_params["PreCommit"], 
                    destination=node.neighbors(), 
                    chance=self.crash_place[3]
                )
                if not sucess:
                    # Este caso puede terminar en deadlock con los valores default, si crashea antes de enviar siquiera un solo precommit para que algun participante se haga cargo
                    self.crash(node)
                    return
                self.disable_alarm(node.memory["OKAlarm"])
                m = Message(
                    destination=node.memory["coordinator"],
                    header=self.default_params["ReadyTimeout"]
                    )
                node.memory["OKAlarm"] = self.set_alarm(node, self.waiting_time, m)
                node.status = self.Status.PRE_COMMIT_COORDINATOR
        else:
            error("POST_PREPARE_COORDINATOR::receiving", message)
        

    @Status.PRE_COMMIT_COORDINATOR
    def alarm(self, node: NodeAccess, message: Message):
        if message.header == self.default_params["ReadyTimeout"]:
            self.log(f"[Node {node.memory['unique_value']}] Coordinator not waiting any longer for Readys, aborting transaction")
            behavior : CrashBehavior = node.memory["faulty_behavior"]
            sucess = behavior.send(
                node=node, 
                algorithm=self, 
                header=self.default_params["Abort"], 
                destination=node.neighbors(), 
                chance=self.crash_place[4]
            )
            if not sucess:
                self.crash(node)
                return
            node.status = self.Status.SUCCESS
        else:
            error("POST_PREPARE_COORDINATOR::alarm", message)

    @Status.POST_PREPARE_PARTICIPANT
    def receiving(self, node: NodeAccess, message: Message):
        self.disable_alarm(node.memory["PreCommitAlarm"])
        if message.header == self.default_params["PreCommit"]:
            m = Message(
                destination=node.memory["self"],
                header=self.default_params["DoCommitTimeout"]
                )
            node.memory["DoCommitAlarm"] = self.set_alarm(node, 3*self.waiting_time, m)
            behavior : CrashBehavior = node.memory["faulty_behavior"]
            sucess = behavior.send(
                node=node, 
                algorithm=self, 
                header=self.default_params["Ready"], 
                destination=node.memory["coordinator"], 
                chance=self.crash_place[5]
            )
            if not sucess:
                self.crash(node)
                return
            node.status = self.Status.PRE_COMMIT_PARTICIPANT
        elif message.header == self.default_params["Commit"]:
            # Jugado si tengo bizantinos o bugs
            node.memory["transaction"].attack(node)
            node.status = self.Status.SUCCESS
        elif message.header == self.default_params["Abort"]:
            node.memory["transaction"].retreat(node)
            node.status = self.Status.SUCCESS
        else:
            error("POST_PREPARE_PARTICIPANT::receiving", message)


    @Status.PRE_COMMIT_PARTICIPANT
    def alarm(self, node: NodeAccess, message: Message):
        if message.header == self.default_params["DoCommitTimeout"]:
            behavior : CrashBehavior = node.memory["faulty_behavior"]
            sucess = behavior.send(
                node=node, 
                algorithm=self, 
                header=self.default_params["Commit"], 
                destination=node.neighbors() - {node.memory["coordinator"]}, 
                chance=self.crash_place[6]
            )
            if not sucess:
                self.crash(node)
                return
            node.memory["transaction"].attack(node)
            node.status = self.Status.SUCCESS
        else:
            error("PRE_COMMIT_PARTICIPANT::alarm", message)




#####################################################################################################################################
##############################################             ETAPA 3                       ############################################
#####################################################################################################################################

    @Status.PRE_COMMIT_COORDINATOR
    def receiving(self, node: NodeAccess, message: Message):
        if message.header == self.default_params["Ready"]:
            node.memory["readys"][message.source] = True
            if len(node.memory["readys"]) == len(node.neighbors()):
                behavior : CrashBehavior = node.memory["faulty_behavior"]
                sucess = behavior.send(
                    node=node, 
                    algorithm=self, 
                    header=self.default_params["Commit"], 
                    destination=node.neighbors(), 
                    chance=self.crash_place[7]
                )
                if not sucess:
                    self.crash(node)
                    return
                node.status = self.Status.SUCCESS
        else:
            error("PRE_COMMIT_COORDINATOR::receiving", message)



    @Status.PRE_COMMIT_PARTICIPANT
    def receiving(self, node: NodeAccess, message: Message):
        self.disable_alarm(node.memory["DoCommitAlarm"])
        if message.header == self.default_params["Commit"]:
            node.memory["transaction"].attack(node)
            node.status = self.Status.SUCCESS
        elif message.header == self.default_params["Abort"]:
            node.memory["transaction"].retreat(node)
            node.status = self.Status.SUCCESS
        else:
            error("PRE_COMMIT_PARTICIPANT::receiving", message)


    #TODO implement recovery
    @Status.RECOVERING
    def default(self, *args, **kwargs):    
        pass


    @Status.SUCCESS
    def default(self, *args, **kwargs):    
        pass

    @Status.CRASHED
    def default(self, *args, **kwargs):    
        pass


#### Pruebas

In [None]:
crash_places_3PC = [between_Prepares_3PC, between_OKtimeout_aborts_3PC, before_OK_3PC, between_PreCommits_3PC, between_Readytimeout_aborts_3PC, before_Ready_3PC, between_Alerts_3PC, between_Commits_3PC]
for i in range(its_3PC):
    net_gen = NetworkGenerator(directed=False)
    net = net_gen.generate_complete_network(n_3PC)
    sim = Simulation(net, check_restrictions=True)
    sim.algorithms = ((ThreePhaseCommit, {"verbose": verbose_3PC, "crash_place": crash_places_3PC}),)
    sim.reset()
    sim.run()
    alg = sim.algorithms[0]
    transaction : Siege = list(net.nodes())[0].memory["transaction"]
    if verbose_3PC:
        with open(f"results/3PC_it={i}.txt", "w") as res:
            for log in alg.logging:
                print(log, file=res)
    if transaction.state == Siege.State.FAILED:
        logger.info(f"Iteracion {i} perdió atomicidad")
    elif transaction.state == Siege.State.ONGOING:
        logger.info(f"Iteracion {i} terminó en deadlock")
    else:
        logger.info(f"Iteracion {i} exitosa. Todos los nodos" + (" abortaron" if transaction.state == Siege.State.ABORTED else " commitearon la transacción") )
    logger.info("\n")

### 3. Integración con escenarios bizantinos

- Modifique el protocolo para tolerar fallos bizantinos, incorporando firmas digitales y verificación de mensajes.

No dio el tiempo para hacer una implementacion correcta, pues faltó integrar las primitivas de bizantinos con el tipo de mensajes que envían los participantes de un protocolo CP. Sin embargo, acá esta la idea de por donde iría

In [None]:
class ByzantineThreePhaseCommit(NodeAlgorithm):
    def __init__(self, simulation, traitor_strategy, verbose=False, waiting_time = 15,*args, **kwargs):
        super().__init__(simulation, *args, **kwargs)
        self.messages_counter = 0
        self.waiting_time = waiting_time
        self.traitor_strategy = traitor_strategy
        self.verbose = verbose
        if verbose:
            self.logging = []
        
    default_params = {
        "Prepare" : "Prepare", #Can you commit this transaction?
        "OK" : "OK", #I can commit the transaction
        "PreCommit" : "PreCommit", #Get ready for commiting the transaction
        "Ready" : "Ready", #I'm ready for commiting the transaction
        "Commit" : "Commit", #Commit the transaction
        "Abort" : "Abort", #Abort the transaction
        "OKTimeout" : "OKTimeout", #Timeout waiting for all participant's OKs
        "PreCommitTimeout" : "PreCommitTimeout", #timeout waiting for coordinator's PreCommit
        "ReadyTimeout" : "ReadyTimeout", #timeout waiting for all coordinator's Ready
        "DoCommitTimeout" : "DoCommitTimeout", #Timeout waiting for coordinator's DoCommit

    }

    class Status(StatusValues):
        COORDINATOR = "COORDINATOR"
        PARTICIPANT = "PARTICIPANT"
        POST_PREPARE_COORDINATOR = "POST_PREPARE_COORDINATOR" 
        POST_PREPARE_PARTICIPANT = "POST_PREPARE_PARTICIPANT" 
        PRE_COMMIT_COORDINATOR = "PRE_COMMIT_COORDINATOR"
        PRE_COMMIT_PARTICIPANT = "PRE_COMMIT_PARTICIPANT"
        RECOVERING = "RECOVERING"
        SUCCESS = "SUCCESS"
        CRASHED = "CRASHED"
    S_init = (Status.PARTICIPANT, Status.COORDINATOR,)
    S_term = (Status.SUCCESS, Status.CRASHED)

    algorithm_restrictions = (
        BidirectionalLinks,
        Connectivity, 
        #TotalReliability, #Pueden perderse mensajes
        InitialDistinctValues 
    )
        
    def initializer(self):
        self.apply_restrictions()
        porcentaje =random.random()
        cantidad = int((self.network.number_of_nodes() -1)*porcentaje)
        participantes = self.network.nodes_sorted()[1:]
        traidores = random.sample(participantes, cantidad)
        coordinator = self.network.nodes_sorted()[0]
        transaction = Siege(len(self.network.nodes()) - 1 - cantidad, self.log)
        private_key = Ed25519PrivateKey.generate()
        for node in self.network.nodes():
            node.memory["self"] = node
            node.memory["coordinator"] = coordinator
            node.memory["transaction"] = transaction
            node.memory["private_key"] = private_key if node not in traidores else Ed25519PrivateKey.generate() # Otra clave privada para los traidores
            node.memory["signature"] = msj = b"My signature"
            node.memory["behavior"] = NonFaultyBehavior()
            node.memory["participants"] = set(self.network.nodes()) - {coordinator}
            node.status = self.Status.PARTICIPANT
        for traitor in traidores:
            if self.verbose:
                self.log(f"[Node {traitor.memory['unique_value']}] is a traitor")
            traitor.memory["behavior"] = ByzantineBehavior(self.traitor_strategy)

        coordinator.memory["oks"] = {}
        coordinator.memory["readys"] = {}
        coordinator.status = self.Status.COORDINATOR
        coordinator.push_to_inbox(Message(meta_header=NodeAlgorithm.INI, destination=coordinator))

    def log(self, msg):
        if self.verbose:
            self.logging.append(msg)

    def crash(self, node : NodeAccess):
        if node.status not in {self.Status.COORDINATOR, self.Status.PRE_COMMIT_COORDINATOR, self.Status.POST_PREPARE_COORDINATOR}:
            node.memory["transaction"].crash(node)
        node.status = self.Status.CRASHED

    def check_digital_firm(self, node: NodeAccess, message: Message):
        try:
            data : EncryptedData = message.data
            public_key = node.memory["private_key"].public_key()
            public_key.verify(data.value, node.memory["signature"])     
            # firma válida
        except InvalidSignature:
            self.log(f"[Node {node.memory['unique_value']}] Received message with invalid signature from label {message.source}")
            node.memory["participants"].remove(message.source)

####################################################################################################################################
##############################################             ETAPA 1                       ###########################################
####################################################################################################################################

    @Status.COORDINATOR
    def spontaneously(self, node: NodeAccess, _: Message):
        behavior : Behavior = node.memory["behavior"]
        sucess = behavior.send(
            node = node, 
            algorithm = self, 
            header = self.default_params["Prepare"], 
            destination = node.neighbors())
        self.log(f"[Node {node.memory['unique_value']}] is the coordinator")
        m = Message(
            destination=node.memory["coordinator"],
            header=self.default_params["OKTimeout"]
            )
        node.memory["OKAlarm"] = self.set_alarm(node, self.waiting_time, m)
        node.status = self.Status.POST_PREPARE_COORDINATOR


    @Status.POST_PREPARE_COORDINATOR
    def alarm(self, node: NodeAccess, message: Message):
        if message.header == self.default_params["OKTimeout"]:
            self.log(f"[Node {node.memory['unique_value']}] Coordinator not waiting any longer for OKs, aborting transaction")
            behavior : Behavior = node.memory["behavior"]
            sucess = behavior.send(
                node=node, 
                algorithm=self, 
                header=self.default_params["Abort"], 
                destination=node.neighbors()
            )
            node.status = self.Status.SUCCESS
        else:
            error("POST_PREPARE_COORDINATOR::alarm", message)


    @Status.PARTICIPANT
    def receiving(self, node: NodeAccess, message: Message):
        if message.header == self.default_params["Prepare"]:
            behavior : Behavior = node.memory["behavior"]
            success = behavior.send(
                node=node, 
                algorithm=self, 
                header=self.default_params["OK"], 
                destination={node.memory["coordinator"]}
            )
            m = Message(
                destination=node.memory["self"],
                header=self.default_params["PreCommitTimeout"]
                )
            node.memory["PreCommitAlarm"] = self.set_alarm(node, 2*self.waiting_time, m)
            node.status = self.Status.POST_PREPARE_PARTICIPANT
        elif message.header == self.default_params["Abort"]:
            transaction : Siege = node.memory["transaction"]
            transaction.retreat(node)
            node.status = self.Status.SUCCESS
        else:
            error("PARTICIPANT::receiving", message)


    @Status.POST_PREPARE_PARTICIPANT
    def alarm(self, node: NodeAccess, message: Message):
        if message.header == self.default_params["PreCommitTimeout"]:
            # Un participante podría tomar el lugar de segundo coordinador.
            pass
        else:
            error("POST_PREPARE_PARTICIPANT::alarm", message)




####################################################################################################################################
##############################################             ETAPA 2                       ###########################################
####################################################################################################################################

    @Status.POST_PREPARE_COORDINATOR
    def receiving(self, node: NodeAccess, message: Message):
        self.check_digital_firm(node, message)
        if message.header == self.default_params["OK"]:
            node.memory["oks"][message.source] = True
            if len(node.memory["oks"]) == len(node.neighbors()):
                behavior : Behavior = node.memory["behavior"]
                sucess = behavior.send(
                    node=node, 
                    algorithm=self, 
                    header=self.default_params["PreCommit"], 
                    destination=node.neighbors()
                )
                self.disable_alarm(node.memory["OKAlarm"])
                m = Message(
                    destination=node.memory["coordinator"],
                    header=self.default_params["ReadyTimeout"]
                    )
                node.memory["OKAlarm"] = self.set_alarm(node, self.waiting_time, m)
                node.status = self.Status.PRE_COMMIT_COORDINATOR
        else:
            error("POST_PREPARE_COORDINATOR::receiving", message)
        

    @Status.PRE_COMMIT_COORDINATOR
    def alarm(self, node: NodeAccess, message: Message):
        if message.header == self.default_params["ReadyTimeout"]:
            self.log(f"[Node {node.memory['unique_value']}] Coordinator not waiting any longer for Readys, aborting transaction")
            behavior : Behavior = node.memory["behavior"]
            sucess = behavior.send(
                node=node, 
                algorithm=self, 
                header=self.default_params["Abort"], 
                destination=node.neighbors()
            )
            node.status = self.Status.SUCCESS
        else:
            error("POST_PREPARE_COORDINATOR::alarm", message)

    @Status.POST_PREPARE_PARTICIPANT
    def receiving(self, node: NodeAccess, message: Message):
        self.disable_alarm(node.memory["PreCommitAlarm"])
        if message.header == self.default_params["PreCommit"]:
            m = Message(
                destination=node.memory["self"],
                header=self.default_params["DoCommitTimeout"]
                )
            node.memory["DoCommitAlarm"] = self.set_alarm(node, 3*self.waiting_time, m)
            behavior : Behavior = node.memory["behavior"]

            sucess = behavior.send(
                node = node, 
                algorithm = self, 
                header = self.default_params["Ready"], 
                destination = {node.memory["coordinator"]}
            )
            if isinstance(behavior, ByzantineBehavior):
                node.status = self.Status.CRASHED
            else:
                node.status = self.Status.PRE_COMMIT_PARTICIPANT

        elif message.header == self.default_params["Commit"]:
            node.memory["transaction"].attack(node)
            node.status = self.Status.SUCCESS
        elif message.header == self.default_params["Abort"]:
            node.memory["transaction"].retreat(node)
            node.status = self.Status.SUCCESS
        else:
            error("POST_PREPARE_PARTICIPANT::receiving", message)


    @Status.PRE_COMMIT_PARTICIPANT
    def alarm(self, node: NodeAccess, message: Message):
        if message.header == self.default_params["DoCommitTimeout"]:
            behavior : Behavior = node.memory["behavior"]
            sucess = behavior.send(
                node=node, 
                algorithm=self, 
                header=self.default_params["Commit"], 
                destination=node.neighbors()
            )
            node.memory["transaction"].attack(node)
            node.status = self.Status.SUCCESS
        else:
            error("PRE_COMMIT_PARTICIPANT::alarm", message)




#####################################################################################################################################
##############################################             ETAPA 3                       ############################################
#####################################################################################################################################

    @Status.PRE_COMMIT_COORDINATOR
    def receiving(self, node: NodeAccess, message: Message):
        self.check_digital_firm(node, message)
        if message.header == self.default_params["Ready"]:
            node.memory["readys"][message.source] = True
            if len(node.memory["readys"]) == len(node.neighbors()):
                behavior : Behavior = node.memory["behavior"]
                sucess = behavior.send(
                    node=node, 
                    algorithm=self, 
                    header=self.default_params["Commit"], 
                    destination=node.neighbors()
                )
                if not sucess:
                    self.crash(node)
                    return
                node.status = self.Status.SUCCESS
        else:
            error("PRE_COMMIT_COORDINATOR::receiving", message)



    @Status.PRE_COMMIT_PARTICIPANT
    def receiving(self, node: NodeAccess, message: Message):
        self.disable_alarm(node.memory["DoCommitAlarm"])
        if message.header == self.default_params["Commit"]:
            node.memory["transaction"].attack(node)
            node.status = self.Status.SUCCESS
        elif message.header == self.default_params["Abort"]:
            node.memory["transaction"].retreat(node)
            node.status = self.Status.SUCCESS
        else:
            error("PRE_COMMIT_PARTICIPANT::receiving", message)


    @Status.SUCCESS
    def default(self, *args, **kwargs):    
        pass

    @Status.CRASHED
    def default(self, *args, **kwargs):    
        pass


- Simule casos en los que un participante miente sobre su voto o altera mensajes.

In [None]:
for i in range(Its_B3PC):
    net_gen = NetworkGenerator(directed=False)
    net = net_gen.generate_complete_network(n_B3PC)
    sim = Simulation(net, check_restrictions=True)
    sim.algorithms = ((ByzantineThreePhaseCommit, {"verbose": verbose_B3PC, "traitor_strategy" : traitor_strategy}),)
    sim.reset()
    sim.run()
    alg = sim.algorithms[0]
    transaction : Siege = list(net.nodes())[0].memory["transaction"]
    if verbose_3PC:
        with open(f"results/B3PC_it={i}.txt", "w") as res:
            for log in alg.logging:
                print(log, file=res)
    if transaction.state == Siege.State.FAILED:
        logger.info(f"Iteracion {i} perdió atomicidad")
    elif transaction.state == Siege.State.ONGOING:
        logger.info(f"Iteracion {i} terminó en deadlock")
    else:
        logger.info(f"Iteracion {i} exitosa. Todos los nodos" + (" abortaron" if transaction.state == Siege.State.ABORTED else " commitearon la transacción") )
    logger.info("\n")