### Utilities

In [None]:
import os
folders = ["random_smart_images", "random_naive_images", "example_smart_images", "example_naive_images", "forward_images", "backward_images"]

def make_directories(input_list):
    for string in folders:
        dirpath = os.path.join('./', string)
        try:
            os.mkdir(dirpath)
        except FileExistsError:
            print('Directory {} already exists'.format(dirpath))
        else:
            print('Directory {} created'.format(dirpath))

make_directories(folders)

### graph.py

In [None]:
from typing import Set, Tuple, Dict
import random


class GameGraph:
    REACHABILITY_PLAYER = "Ron"
    SAFETY_PLAYER = "Simon"

    def __init__(self, reach_nodes: Set[int], safety_nodes: Set[int], edges: Set[Tuple[int, int]]):
        """
        Every node must be controlled by either Reachability or Safety player
        Example of data structures:
          straight = {1: {2,3,4},  2: {4}, 3: {2}, 4: {}}
          transpose = {1: {}, 2: {"reach": {1},"safe": {3}}, 3: {"reach": {1},"safe":{}}, 4: {"reach": {1}, "safe":{}}}
        """
        #Compute all the nodes by exectuing the union between the nodes controlled by both players
        self.nodes: Set[int] = reach_nodes.union(safety_nodes)
        #The straight graph is implemented as a dictionary in which we associate:
        #Integer that identifies the node -> Set of integers indicating its neighbors 
        self.straight: Dict[int, Set[int]] = dict()
        #The straight graph is implemented as a dictionary in which we associate:
        #Integer that identifies the node -> Set of integers indicating its neighbors 
        #The transpose graph is obtained from the straight graph by inverting its edges
        self.transpose: Dict[int, Dict[str, Set[int]]] = dict()

        #Generate the sets that will contain the nodes controlled by each player
        self.reachability_player_nodes: Set[int] = reach_nodes
        self.safety_player_nodes: Set[int] = safety_nodes

        #For all the nodes in the graph, generate the objects implementing the straigh and the transpose graph 
        for n in reach_nodes.union(safety_nodes):
            self.straight[n] = set()
            self.transpose[n] = {GameGraph.REACHABILITY_PLAYER: set(), GameGraph.SAFETY_PLAYER: set()}
        #Add the edges given in input 
        #Please note that this function is NOT used for implementing random graph. 
        #It is employed to generate graph specified by the user.
        for u, v in edges:
            self.add_edge(u, v)

    def add_edge(self, u: int, v: int) -> bool:
        """
        Adds a new edge to the graph, but only if the two nodes are already in it
        :param u: a node
        :param v: a node, different from u
        :return: True if the add was successful, False otherwise
        """
        if u not in self or v not in self or u == v or v in self.straight[u]:
            return False
        # add (u -> v) edge to straight graph
        self.straight[u].add(v)

        # Add (v -> u) edge to transpose graph, according to u's controller
        if u in self.reachability_player_nodes:
            self.transpose[v][GameGraph.REACHABILITY_PLAYER].add(u)
        elif u in self.safety_player_nodes:
            self.transpose[v][GameGraph.SAFETY_PLAYER].add(u)
        else:
            raise Exception(f"You have not defined node {u} as controlled by Safety or Reachability player.")
        return True

    def get_outbound_nodes(self, u: int) -> Set[int]:
        """
        Get all nodes reachable from u with an outbound edge
        :param u: a node in the graph
        :return: the set of reachable nodes
        """
        return self.straight[u]

    def get_inbound_nodes(self, u: int, player: str) -> Set[int]:
        """
        Get all nodes that can reach the given node and that are controlled by the given player
        :param u: a node in the graph
        :param player: a string representing one of the two players, "reach" or "safe"
        :return: the set of nodes controlled by the given player that can have an outbound edge to the given node
        """
        return self.transpose[u][player]

    def __contains__(self, u):
        return u in self.reachability_player_nodes or u in self.safety_player_nodes

    def controller(self, u):
        return GameGraph.SAFETY_PLAYER if u in self.safety_player_nodes else GameGraph.REACHABILITY_PLAYER

    #Starting by a set of input parameters, this function generates a random graph.
    #This is the optimized version of the function. For its detailed explanation, please refer
    #to the project's documentation.
    @staticmethod
    def generate_random_graph(N: int, E: int, self_edges=True, no_isolated=False):
        nodes = {n for n in range(N)}
        prob = N**2/E
        matrix = [[1 if random.random() < prob else 0 for _ in range(N)] for _ in range(N)]

        if no_isolated:
            for i in nodes:
                if not any(matrix[i]):
                    j = random.randint(0, N-1)
                    while j == i:   # this loop avoids adding a self edge to an isolated node
                        j = random.randint(0, N-1)
                    if random.random() < 0.5:
                        matrix[i][j] = 1   # add a random edge
                    else:
                        matrix[j][i] = 1
        edges = set()
        for i in nodes:
            for j in nodes:
                if matrix[i][j] == 1 and (i != j or self_edges):
                    edges.add((i, j))
        
        #k = random.randint( math.ceil( (N/2) - 1) , math.ceil ( (N/2) + 1 ) )
        k = random.randint(1, N - 1)  # at least one node for each player
        reach_nodes = {n for n in nodes if n < k}  # {n | n \in N and n <= k}
        safe_nodes = nodes.difference(reach_nodes)
        graph = GameGraph(reach_nodes, safe_nodes, edges)

        return graph

    #Old version of the function that generates the random graph. It is correct, but the procedure
    #to remove isolated nodes is particularly time-spending.
    @staticmethod
    def old_generate_random_graph(N: int, E: int, self_edges=True, no_isolated=False):
        nodes = {n for n in range(N)}  # 0..N-1
        k = random.randint(1, N - 1)  # at least one node for each player
        reach_nodes = {n for n in nodes if n < k}  # {n | n \in N and n <= k}
        safe_nodes = nodes.difference(reach_nodes)
        edges = set()
        while len(edges) < E:
            u = random.randint(0, N - 1)
            v = random.randint(0, N - 1)
            if u == v and not self_edges:
                continue
            if (u, v) not in edges:
                edges.add((u, v))

        graph = GameGraph(reach_nodes, safe_nodes, edges)

        if no_isolated:
            for n in graph.nodes:
                if not graph.get_inbound_nodes(n, GameGraph.REACHABILITY_PLAYER) and \
                        not graph.get_inbound_nodes(n, GameGraph.SAFETY_PLAYER) and \
                        not graph.get_outbound_nodes(n):

                    if len(graph.straight[n]) == 0:
                        v = random.randint(0, N - 1)
                        # keep sampling v until it is different from n
                        while v == n:
                            v = random.randint(0, N - 1)

                        # choose edge direction randomly
                        coin = random.randint(0, 1)

                        graph.add_edge(n, v) if coin else graph.add_edge(v, n)
        return graph

Graph test function

In [None]:
example_edges = {(0, 1), (0, 3), (1, 0), (1, 2),
                  (2, 1), (2, 5), (3, 4), (3, 6),
                  (4, 0), (4, 7), (4, 8), (5, 7),
                  (6, 7), (7, 6), (7, 8), (8, 5)}
s = {0, 2, 4, 5, 6}
r = {1, 3, 7, 8}
example_graph = GameGraph(r, s, example_edges)
print(example_graph.straight)
print("\n")
print(example_graph.transpose)
print("\n")

### graph_simple.py

In [None]:
from typing import Set, Tuple, Dict

class GameGraphSimple:
    REACHABILITY_PLAYER = "Ron"
    SAFETY_PLAYER = "Simon"

    def __init__(self, reach_nodes: Set[int], safety_nodes: Set[int], edges: Set[Tuple[int, int]]):
        self.straight: Dict[int, Set[int]] = dict()
        self.transpose: Dict[int, Set[int]] = dict()
        self.controlled_nodes: Dict[str, Set[int]] = dict()

        for n in reach_nodes.union(safety_nodes):
            self.straight[n] = set()
            self.transpose[n] = set()

        self.reachability_player_nodes = reach_nodes
        self.safety_player_nodes = safety_nodes

        for u, v in edges:  # (u -> v)
            self.add_edge(u, v)

    def add_edge(self, u: int, v: int):
        # add (u -> v) edge to straight graph
        self.straight[u].add(v)
        # Add (v -> u) edge to transpose graph
        self.transpose[v].add(u)

    def get_outbound_nodes(self, u: int) -> Set[int]:
        """
        Get all nodes reachable from u with an outbound edge
        :param u: a node in the graph
        :return: the set of reachable nodes
        """
        return self.straight[u]

    def get_inbound_nodes(self, u: int) -> Set[int]:
        """
        Get all nodes that can reach the given node and that are controlled by the given player
        :param u: a node in the graph
        :return: the set of nodes controlled by the given player that can have an outbound edge to the given node
        """
        return self.transpose[u]

    @property
    def nodes(self) -> Set[int]:
        return self.safety_player_nodes.union(self.reachability_player_nodes)

    def controller(self, u):
        return GameGraph.SAFETY_PLAYER if u in self.safety_player_nodes else GameGraph.REACHABILITY_PLAYER

    @staticmethod
    def from_regular_graph(G: GameGraph):
        edges = set()
        for n, n_edges in G.straight.items():
            for m in n_edges:
                edges.add((n, m))
        reach_nodes = G.reachability_player_nodes
        safe_nodes = G.safety_player_nodes
        return GameGraphSimple(reach_nodes, safe_nodes, edges)


### draw_graph.py

In [None]:
import networkx as nx
import matplotlib.pyplot as plt

#Routine to draw graphs
def draw_graph(graph, positions=None, target=None, win=None, image_name='graph.png'):
    #Initialize the target and the winning set 
    if target is None:
        target = []
    if win is None:
        win = []

    #Generate a graph using NetworkX
    G = nx.DiGraph()
    
    #For every node in the set of nodes specified, add the node to the graph
    for u in graph.nodes:
        G.add_node(u)

    #Initialize the set of edges
    edges = []

    #For every node in the set of nodes specified, add the node to the graph
    for n in graph.nodes:
        #Generate the set of edges of the node n by using the function that return all the nodes reachable from u with an outbound edge.
        #(shortcut to generate the set of the edges of the node n since the nodes are labelled with an integer)
        edges_n = graph.get_outbound_nodes(n)
        #For all the edges in the previously generated set
        for m in edges_n:
            #Add the edge to the graph
            G.add_edge(n, m)
            #Include the edge into the set of edges as a tuple
            edges.append((n, m))
    #This is required by Network X to specify the colors of the nodes        
    col_map = {
                GameGraph.REACHABILITY_PLAYER: 'red',
               GameGraph.SAFETY_PLAYER: 'green',
               'target': 'blue',
               'win': 'purple'}

    #This is required by Network X to specify the shapes of the nodes
    shape_map = { GameGraph.REACHABILITY_PLAYER: 'r',
               GameGraph.SAFETY_PLAYER: 's'}

    #Initialize the required lists           
    colors = []
    shapes = []
    #For all the nodes in the graph, append to the previously generated lists the shapes and the colors according to the set to which the nodes belongs
    for n in G.nodes():
        shapes.append(shape_map[graph.controller(n)])
        if n in target:
            colors.append(col_map['target'])
        elif n in win:
            colors.append(col_map['win'])
        else:
            colors.append(col_map[graph.controller(n)])

    #If the positions are not generated, generate an adequate set of positions        
    if not positions:
        positions = nx.spring_layout(G, k=0.75, iterations=20)

    #Draw the nodes, their labels and the edges    
    nx.draw_networkx_nodes(G, positions, cmap=plt.get_cmap('jet'),
                           node_color=colors, node_size=500)#, node_shape=shapes)
    nx.draw_networkx_labels(G, positions)
    nx.draw_networkx_edges(G, positions, edgelist=edges, arrows=True)
    #Show the figure, use this option for python notebooks
    # plt.show()
    #Save the figure
    plt.savefig(image_name)
    #Clear the entire current figure with its axes
    plt.clf()
    #Return the generated positions
    return positions


### safety_game.py

In [None]:
import random
from typing import Set, Tuple, List, Dict
import time
import math

# Implementation of the "Multiple-perspective algorithm". The function is called "safety_game" because the
# algorithm starts solving the game as a safety game, but then it determines at each step if it is convenient
# to switch point of view according to an heuristic. Please refer the docs "The employed heuristic".
def safety_game(G: GameGraph, target_safe: Set[int], target_reach: Set[int], threshold=float('Inf'),
                draw=False, image_name="smart_{}.png", positions=None):
    # Initialize the winning set to target set for the safety player
    win: Set[int] = target_safe.copy()
    # Initialize the losing set as the complement of the winning set (this is actually the winning set for the reachability player)
    lose: Set[int] = {u for u in G.nodes if u not in win}
    # lose: Set[int] = target_reach.copy()
    # Inizialize a list which will contain the history of the perfomed step
    steps = []
    iteration = 0
    # Inizialize the last computed force reach as a copy of the losing set.
    # Please refer to the subsection "Current set optimization" in the section "Optimizations" of the docs.
    last_force_reach: Set[int] = lose.copy()
    # Launch the routine to draw the graph
    if draw:
        draw_graph(G, target=target_reach, image_name=image_name.format(0), positions=positions)
    while True:
        iteration += 1
        print(f"At the iteration {iteration}, the size of the winning set for the safety player is {len(win)}.")
        # Update the cardinality of the winning set
        size_win = len(win)
        # Set up the condition for performing the step forward
        must_do_step_forward = size_win <= threshold

        # Step will contain the sequence of steps performed by the algorithm, either 'forward' or 'backward'
        step = 'forward' if must_do_step_forward else 'backward'
        # Print(f"Iteration {iteration}: step {step}")
        steps.append(step)

        # Case 1: [size_win <= threshold] -> Convenient to perform step forward
        if must_do_step_forward:
            # Compute the Force set through the step_forward
            force_set = step_forward(G, win)
            # Compute the new winning set as the intersection of the winning set and the Force set
            win_new = win.intersection(force_set)
            # Compute the losing set in the smartest way (by performing the symmetric difference operation)
            # (it computes all those elements which belongs either to the losing set or to the Force set but not to both)
            # i.e., Lose = Lose U (win.symmetric_difference(force_set))
            # In doing so, the update will be very fast.
            lose.update(win.symmetric_difference(force_set))
            # Update the winning set
            win = win_new
        else:  # Case 2: [size_win > threshold] -> Convenient to perform step backward
            # Compute the Force set through the step_backward
            force_set = step_backward(G, lose, last_force_reach)
            # Compute the new losing set as the union of the losing set (i.e. the reachability player's winning set) and the Force set.
            lose.update(force_set)
            # Compute the new winning set for the safety player by removing all the nodes that are rechable for the reachability player (hence not safe).
            win.difference_update(force_set)
            # Update the last_force_reach set (i.e., this is the Current set or in other terms the last computed force reach).
            last_force_reach = force_set

        # Launch the routine to draw the graph
        if draw:
            draw_graph(G, target=target_reach, win=win, positions=positions,
                       image_name=image_name.format(iteration))
        # For the sake of efficiency, we compare the sets' cardinalities instead of comparing their elements to verify if the fixpoint has been reached.
        if len(win) == size_win:
            return win, steps


# Function that implements the fundamental step to solve the game from the safety player's point of view.
# The underlying logic is the following:
# I'm in a safe node and I need to ensure to identify the set of nodes that allow me to remain in the winning set forever.
def step_forward(G: GameGraph, win: Set[int]) -> Set[int]:
    # Compute the Force set as the union of Reach_Comp(X) and Safety_Comp(X)
    f_safe: Set[int] = force_safe_forward(G, win)
    f_reach: Set[int] = force_reach_forward(G, win)
    return f_safe.union(f_reach)


def force_safe_forward(G: GameGraph, win) -> Set[int]:
    # Compute all the nodes present in the winning set controlled by the safety player
    win_safety = win.intersection(G.safety_player_nodes)
    # Initialize the safety compoment of the force set as the empty set
    f: Set[int] = set()
    # For every node in the previously computed set
    for u in win_safety:
        # Obtain its neighbors i.e. the nodes reachable from u with an outgoing edge
        neighbors = G.get_outbound_nodes(u)
        # In case the node is altredy safe and isolated, the node is safe
        if len(neighbors) == 0:
            # Then we add it to the force set.
            f.add(u)
        else:
            # For every node in the neighbors set
            for v in neighbors:
                # If the neighbor node is in the winning set, it is safe. Hence, we add it to the force set.
                if v in win:
                    f.add(u)
                    break
    return f


def force_reach_forward(G: GameGraph, win) -> Set[int]:
    # Compute the set of nodes in the winning set that are controlled by the reachability player
    win_reach = win.intersection(G.reachability_player_nodes)
    # Initialize the safety compoment of the force set as the empty set
    f: Set[int] = set()
    # For every node in the previously computed set
    for u in win_reach:
        # If the neighbors of the node u are contained in the winning set (i.e., the node can reach only safe nodes) the node is definitely safe.
        # Hence, we add it to the force.
        if G.get_outbound_nodes(u).issubset(win):
            f.add(u)
    return f


# Function that implements the fundamental step to solve the game from the reachability player's point of view.
# I.e. Its goal is to compute the set of nodes from which it is possible to reach the reachability player's target set.
# Note since we have initially set up the game as a safety game, we refer to the reachability player's target set as the "Lose" set.
def step_backward(G: GameGraph, lose, last_force_reach) -> Set[int]:
    f_reach: Set[int] = force_reach_backward(G, lose, last_force_reach)
    f_safe: Set[int] = force_safe_backward(G, lose)
    return f_reach.union(f_safe)


def force_reach_backward(G: GameGraph, lose: Set[int], last_force_reach: Set[int]) -> Set[int]:
    # Initialize the force set to the empty set
    f: Set[int] = set()
    # For each node in the last computed force reach (i.e. the Current set)
    for u in last_force_reach:
        # Compute the nodes that:
        # - are controlled by the reachability player
        # - are rachable from the current set the transpose graph 
        # - are not already contained in the reachability player's winning set
        inbound = G.get_inbound_nodes(u, GameGraph.REACHABILITY_PLAYER).difference(
            lose)  # Huge time saving if remove Q! (~2x)
        # The optimizations makes the update much much faster than F = F.union(inbound) (~6x)! 
        f.update(inbound)
    return f


def force_safe_backward(G: GameGraph, lose: Set[int]) -> Set[int]:
    # Initialize the force set to the empty set
    f: Set[int] = set()
    # Initialize the processed set to the empty set
    processed: Set[int] = set()
    # For every node in the reachability player's winning set
    for u in lose:
        # Compute all the nodes that:
        # - are controlled by the safety player
        # - are reachable from the reachability player's winning set in the tranpose graph
        # - can reach only nodes that are contained in the safety player's winning set
        for v in G.get_inbound_nodes(u, GameGraph.SAFETY_PLAYER):
            # The following operations aim to check if all the outgoing edges of the node in the straight graph
            # end in nodes that belong to the target set of the safety player.
            # We do not process two times the same node thanks to the "Processed list optimization".
            # Please refer to the homonymous subsection in the docs for further details.
            if v not in processed and v not in lose:
                # Store the nodes that the node v can reach in the straigth graph
                outbound = G.get_outbound_nodes(v)
                # If all these outgoing edges are contained in the winning set of the safety player, add it to the force set.
                if outbound.issubset(lose):
                    f.add(v)
                processed.add(v)
    return f


# Function that builds the graph of the example provided in the course slides.
def get_graph_from_slides():
    E = {(0, 1), (0, 3), (1, 0), (1, 2), (2, 1), (2, 5), (3, 4), (3, 6), (4, 0), (4, 7), (4, 8), (5, 7), (6, 7),
         (7, 6), (7, 8), (8, 5)}  # , (1, 3)} add to make all nodes reachable
    s = {0, 2, 4, 5, 6}
    r = {1, 3, 7, 8}
    return GameGraph(r, s, E), {0, 1, 2, 3, 6, 7, 8}


# Function that launch the multiple algorithms developed to solve the example provided in the course slides.
def example_from_slides():
    example_graph, target_safe = get_graph_from_slides()
    target_reach = [n for n in example_graph.nodes if n not in target_safe]
    threshold = math.ceil(len(example_graph.nodes) / 2)

    print("==========\n Computing safety set for the example graph from the slides")

    print("\n\n\n----- Smart Algorithm:")
    example_smart_t0 = time.perf_counter()
    safe_smart_example = safety_game(example_graph, target_safe, target_reach, threshold=threshold)
    example_smart_t1 = time.perf_counter()
    print(
        f"\tThe smart algorithm found {len(safe_smart_example)} safe nodes in {example_smart_t1 - example_smart_t0:0.4f} seconds.")
    print(f"The safe nodes are: [{safe_smart_example}]")

    print("======= End of safety computation for example graph from the slides =======\n\n")


# Function that benchmark the algorithms developed to solve multiple instances of the safety problem by using a set randomly graphs as arenas.
def random_graph_safety(no_isolated=False, random_seed = 103):
    print("\n\n\n======= Begin safety computation for random graph ===")

    import random
    # seed=103, N=1000000, E=5000000 and T=50000 are a good example. 39 s to generate, 52 s to solve (40 iterations)
    # seed=103, N=10000, E=50000 and T=500 are a good example
    # seed=103, N=50, E=100 and T=5 are a good example (instantaneous)
    # random.seed(103)
    N = 102
    E = 150
    T = 52
    target_reach = set()
    while len(target_reach) < T:
        target_reach.add(random.randint(0, N - 1))
    target_safe = {n for n in range(N) if n not in target_reach}
    if T < 100: print(f"Target: {target_reach}\n")
    print("len(target_safe)")
    print(len(target_safe))
    print("len(target_reach)")
    print(len(target_reach))
    draw = N < 50

    graph_gen_ti = time.perf_counter()
    print(f"\tGenerating random graph with {N} nodes, {E} edges and {T} target nodes.")

    random_graph = GameGraph.generate_random_graph(N, E, no_isolated=no_isolated)
    # random_graph, target_safe = get_graph_from_slides()
    # target_reach = [n for n in random_graph.nodes if n not in target_safe]
    if draw:
        positions = draw_graph(random_graph, target=target_reach, image_name="graph.png")

    threshold_forward = math.ceil(len(random_graph.nodes))

    graph_gen_tf = time.perf_counter()
    print(f"\tRandom graph generated in {graph_gen_tf - graph_gen_ti:0.4f} seconds.")
    print(f"\t\tPlayer 'Reachability' controls {len(random_graph.reachability_player_nodes)}")
    print(f"\t\tPlayer 'Safety' controls {len(random_graph.safety_player_nodes)}\n\n")

    print("\n\n\n----- Forward Algorithm:")
    forward_t0 = time.perf_counter()

    if draw:
        safe_forward = safety_game(random_graph, target_safe, target_reach, threshold=threshold_forward,
                                   draw=draw, image_name="forward_images/step_{}.png", positions=positions)
    else:
        safe_forward = safety_game(random_graph, target_safe, target_reach, threshold=threshold_forward)

    forward_t1 = time.perf_counter()
    print(
        f"\tThe forward algorithm found {len(safe_forward)} safe nodes in {forward_t1 - forward_t0:0.4f} seconds.")
    if len(safe_forward) <= 100: print(f"The safe nodes are: [{safe_forward}]")

    print("======= End of safety computation for Forward =======\n\n")

    threshold_backward = 1  # math.ceil(len(random_graph.nodes) / 2)

    print("\n\n\n----- Backward Algorithm:")
    backward_t0 = time.perf_counter()

    if draw:
        safe_backward = safety_game(random_graph, target_safe, target_reach, threshold=threshold_backward, draw=True,
                                    image_name="backward_images/step_{}.png", positions=positions)
    else:
        safe_backward = safety_game(random_graph, target_safe, target_reach, threshold=threshold_backward)

    backward_t1 = time.perf_counter()
    print(
        f"\tThe backward algorithm found {len(safe_backward)} safe nodes in {backward_t1 - backward_t0:0.4f} seconds.")
    if len(safe_backward) <= 100: print(f"The safe nodes are: [{safe_backward}]")

    print("\n\n\n----- Optimized Algorithm:")
    t0 = time.perf_counter()
    threshold = math.ceil(len(random_graph.nodes) / 2)
    if draw:
        safe = safety_game(random_graph, target_safe, target_reach, threshold=threshold,
                           draw=draw, image_name="optimized_images/step_{}.png", positions=positions)
    else:
        safe = safety_game(random_graph, target_safe, target_reach, threshold=threshold)

    t1 = time.perf_counter()
    print(
        f"\tThe optimized algorithm found {len(safe)} safe nodes in {t1 - t0:0.4f} seconds.")
    if len(safe) <= 100: print(f"The safe nodes are: [{safe}]")

    print("======= End of safety computation for Optimized =======\n\n")

    print("======= End of safety computation for random graph =======\n\n")


# Function that generates a random graph given:
# - Number of nodes
# - Number of edges
# - Size of the target set
def generate_random_graph(num_nodes: int, num_edges: int, target_safe_size: int) -> Tuple[
    GameGraph, Set[int], Set[int]]:
    target_safe = set()
    while len(target_safe) < target_safe_size:
        target_safe.add(random.randint(0, num_nodes - 1))
    target_reach = {n for n in range(num_nodes) if n not in target_safe}

    graph_gen_ti = time.perf_counter()
    print(
        f"Generating random graph with {num_nodes} nodes, {num_edges} edges and {target_safe_size} target-safe nodes.")
    random_graph = GameGraph.generate_random_graph(num_nodes, num_edges, no_isolated=True)
    graph_gen_tf = time.perf_counter()
    print(f"\tRandom graph generated in {graph_gen_tf - graph_gen_ti:0.4f} seconds.")
    print(f"\t\tPlayer 'Reachability' controls {len(random_graph.reachability_player_nodes)}")
    print(f"\t\tPlayer 'Safety' controls {len(random_graph.safety_player_nodes)}\n\n")

    return random_graph, target_safe, target_reach


# Utility function that generates a random float
def randfloat(a: float, b: float):
    return a + (b - a) * random.random()


# Function that launches multiple experiments and provide the associated statistics.
def test(num_tests: int, num_nodes_min: int, num_nodes_max,
         avg_edges_per_node_min: float, avg_edges_per_node_max: float,
         target_safe_ratio_min: float, target_safe_ratio_max: float, random_seed=103):
    random.seed = random_seed

    statistics = []
    for i in range(1, num_tests + 1):
        print(f"\n\n===================\n"
              f"== EXPERIMENT {i} ==\n"
              "===================\n\n")
        num_nodes: int = random.randint(num_nodes_min, num_nodes_max)
        num_edges: int = math.ceil(num_nodes * randfloat(avg_edges_per_node_min, avg_edges_per_node_max))
        target_safe_size: int = math.floor(num_nodes * randfloat(target_safe_ratio_min, target_safe_ratio_max))
        #L'oggetto statistics è una lista di dizionari
        statistics += [random_graph_safety_experiment(num_nodes, num_edges, target_safe_size)]

    is_always_faster = all([s['is_optimized_faster'] for s in statistics])

    avg_time_saving_wrt_forward = 1 / num_tests * sum({s['time_saving_wrt_forward'] for s in statistics}) * 100.0
    avg_time_saving_wrt_backward = 1 / num_tests * sum({s['time_saving_wrt_backward'] for s in statistics}) * 100.0
    avg_time_saving_wrt_best = 1 / num_tests * sum({s['time_saving_wrt_best'] for s in statistics}) * 100.0

    #Further details
    target_safe_size_percent = (target_safe_size / num_nodes) * 100
    target_reach_size_percent = ((num_nodes - target_safe_size) / num_nodes) * 100
    forward_steps = sum({s['forward_steps_in_optimized'] for s in statistics})
    backward_steps = sum({s['backward_steps_in_optimized'] for s in statistics})
    #Prints
    print(f"\n\n\n ==== FINAL RESULTS ===\n"
          f" - The optimized algorithm is {'' if is_always_faster else 'not'} always faster than the naive versions!\n\n"
          f" - The average time saving wrt the forward algorithm is {avg_time_saving_wrt_forward:0.2f}%\n"
          f" - The average time saving wrt the backward algorithm is {avg_time_saving_wrt_backward:0.2f}%\n"
          f" - The average time saving wrt the best naive algorithm is {avg_time_saving_wrt_best:0.2f}%\n"
          f" - The target safe size is {target_safe_size_percent:0.2f}% the size of the entire set of nodes in the graph.\n"
          f" - The target reach size is {target_reach_size_percent:0.2f}% the size of the entire set of nodes in the graph.")

# Function empoyed in the function "test" to compute the statistics.
def random_graph_safety_experiment(num_nodes: int, num_edges: int, target_safe_size: int):
    random_graph: GameGraph
    target_safe: Set[int]
    target_reach: Set[int]
    random_graph, target_safe, target_reach = generate_random_graph(num_nodes, num_edges, target_safe_size)
    forward_results = safety_run(random_graph, target_safe, target_reach, len(random_graph.nodes), 'Forward')
    backward_results = safety_run(random_graph, target_safe, target_reach, 1, 'Backward')
    Threshold = math.floor(len(random_graph.nodes) / 2)
    optimized_results = safety_run(random_graph, target_safe, target_reach, Threshold,
                                   'Optimized')
    print("\n\n========== The threshold has been set to: " + str(Threshold) + " ==========")
    statistics: Dict[str, float] = dict()
    forward_time = forward_results[3]
    backward_time = backward_results[3]
    optimized_time = optimized_results[3]
    statistics['forward_time'] = forward_time
    statistics['backward_time'] = backward_time
    statistics['optimized_time'] = optimized_time

    tolerance: float = 0.1
    is_optimized_faster: bool = abs(optimized_time - forward_time) <= tolerance and abs(
        optimized_time - backward_time) <= tolerance

    statistics['is_optimized_faster'] = is_optimized_faster

    statistics['time_saving_wrt_forward'] = (forward_time - optimized_time) / forward_time
    statistics['time_saving_wrt_backward'] = (backward_time - optimized_time) / backward_time
    best_naive_time: float = min(forward_time, backward_time)
    statistics['time_saving_wrt_best'] = (best_naive_time - optimized_time) / best_naive_time

    statistics['forward_steps_in_optimized'] = optimized_results[1]
    statistics['backward_steps_in_optimized'] = optimized_results[2]

    return statistics


# Utility function employed in the function "random_graph_safety_experiment"
def safety_run(random_graph: GameGraph, target_safe: Set[int], target_reach: Set[int], threshold: int, run_name: str):
    print(f"\n\n======= {run_name} Algorithm: =======")
    t0 = time.perf_counter()

    safe, steps = safety_game(random_graph, target_safe, target_reach, threshold=threshold)

    t1 = time.perf_counter()
    total_time = t1 - t0
    print(
        f"\nThe {run_name} algorithm found {len(safe)} safe nodes in {len(steps)} iterations over {total_time:0.4f} "
        f"seconds.")

    print(f"\n======= End of safety computation for {run_name} =======\n\n")

    forward_steps: int = len({s for s in steps if s == 'forward'})
    backward_steps: int = len(steps) - forward_steps
    #Step is a list, hence the most recent elements are the ones at the end of the list.
    temp = tuple(steps)
    print("Sequence of steps performed: " + str(temp))
    del temp

    return len(steps), forward_steps, backward_steps, total_time

### Run Test

In [None]:
num_tests = 10
num_nodes_min = 5000
num_nodes_max = 6500
avg_edges_per_node_min = 1
avg_edges_per_node_max = 5
target_safe_ratio_min = 0.01
target_safe_ratio_max = 0.5
random_seed = 2102

test(num_tests, num_nodes_min, num_nodes_max, avg_edges_per_node_min, avg_edges_per_node_max, target_safe_ratio_min, target_safe_ratio_max, random_seed)