# Community Deception

## Import Libraries

In [1]:
# Used to refer to the current class in type hints
from __future__ import annotations
# Only from python 3.11
# from typing import Self

from typing import List, Tuple, Annotated, ValueRange

import networkx as nx
import numpy as np

import torch
import torch.nn as nn

from collections import namedtuple, deque

import random
import math


## Deception Score

**Deception Score**: Given a community $\mathcal{C}$ and a community structure $C = \{ G_1, G_2, ... G_K \}$ found by some community detection algorithm, the community deception score is defined as: 

$H(\mathcal{C}, C) = (1 - \frac{\vert S(\mathcal{C}) \vert - 1}{\vert \mathcal{C} \vert - 1}) \times ( \frac{1}{2} ( 1 - max_{G_i \in C} \{ \mathcal{R} (G_i, \mathcal{C})\} ) + \frac{1}{2} ( 1 - \frac{ \sum_{G_i \bigcap \mathcal{C} \not = \empty} \mathcal{P} (G_i, \mathcal) }{\vert G_i \bigcap \mathcal{C} \not = \empty \vert} ) )$

with **Recall** $\mathcal{R}$ and **Precision** $\mathcal{P}$ defined as:

- $\mathcal{R} (G_i, \mathcal{C}) = \frac{\# \mathcal{C} \text{'s memebers in } G_i \text{ found by } \mathcal{A}_D}{\vert \mathcal{C} \vert} \forall G_i \in C$

- $\mathcal{P} (G_i, \mathcal{C}) = \frac{\# \mathcal{C} \text{'s memebers in } G_i \text{ found by } \mathcal{A}_D}{\vert G_i \vert} \forall G_i \bigcap \mathcal{C} \not = \empty$

In [5]:
class DeceptionScore(object):
    def __init__(
        self, 
        community:List(int), 
        community_structure:List(List(int))) -> None:
        self.community = community
        self.community_structure = community_structure

    @staticmethod
    def recall(G_i:List(int), community:List(int)) -> float:
        """Calculate recall score of a community G_i

        Parameters
        ----------
        G_i : List(int)
            Community found by a community detection algorithm.

        Returns
        -------
        float
            Recall score of G_i.
        """
        # Number of members in G_i that are also in our community
        members_in_G_i = len(set(community) & set(G_i))
        return members_in_G_i / len(community)

    @staticmethod
    def precision(G_i:List(int), community:List(int)) -> float:
        """Calculate precision score of a community G_i

        Parameters
        ----------
        G_i : List(int)
            Community found by a community detection algorithm.

        Returns
        -------
        float
            Precision score of G_i.
        """
        # Number of members in G_i that are also in our community
        members_in_G_i = len(set(community) & set(G_i))
        return members_in_G_i / len(G_i)

    def deception_score(self) -> float:
        """Calculate deception score of a community detection algorithm.

        Returns
        -------
        float
            Deception score of a community detection algorithm.
        """
        S_C = [G_i for G_i in self.community_structure if len(set(self.community) & set(G_i)) > 0]
        num_S_C = len(S_C)
        num_C = len(self.community)

        max_recall = max([self.recall(G_i, self.community) for G_i in S_C])
        avg_precision = sum([self.precision(G_i, self.community)
                            for G_i in S_C]) / num_S_C

        deception_score = (1 - (num_S_C - 1) / (num_C - 1)) \
            * (0.5 * (1 - max_recall) + 0.5 * (1 - avg_precision))
        return deception_score

## Graph Structure

In [2]:
class GraphStructure(object):
    def __init__(self, graph:nx.Graph) -> None:
        """
        Constructor of the Struct2Vector class.

        Parameters
        ----------
        graph : nx.Graph
            Graph to be converted to Struct2Vector.
        """
        self.n_nodes = graph.number_of_nodes()
        self.nodes_label = np.arange(self.n_nodes)
        self.nodes_list = set(self.nodes_label)
        # self.n_edges = graph.number_of_edges()
        
        u, v = zip(*graph.edges())
        self.n_edges = len(v)
        
        # Create a matrix of node labels, Nx2, with N=n_nodes
        self.edge_pairs = np.ndarray(shape=(self.n_edges, 2), dtype=np.int32)
        self.edge_pairs[:, 0] = u
        self.edge_pairs[:, 1] = v
        
        # Use the same matrix but with the shape Nx2
        self.edge_pairs_unravel = self.edge_pairs
        
        # Obtain a single contiguous flattened array [u1,v1,u2,v2,...]
        self.edge_pairs = np.ravel(self.edge_pairs)
        
        self.first_node = None
        self.second_node = None
        
        self.budget_eps = 1e-5
    
    def to_networkx(self) -> nx.Graph:
        """
        Convert Struct2Vector graph to NetworkX graph.

        Returns
        -------
        graph : nx.Graph
            NetworkX graph.
        """
        edges = self.convert_edges()
        graph = nx.Graph()
        graph.add_edges_from(edges)
        graph.add_nodes_from(self.nodes_list)
        return graph
    
    @staticmethod
    def has_edge(self, first_node: int, second_node: int) -> bool:
        """Check if the graph has an edge between first_node and second_node.

        Parameters
        ----------
        first_node : int
            First node of the edge.
        second_node : int
            First node of the edge.
        
        Returns
        ----------
        bool
            True if the graph has an edge between first_node and second_node.
        """
        # Get NetworkX graph
        # graph = self.to_networkx()
        # Check if the graph has an edge between first_node and second_node
        # return graph.has_edge(first_node, second_node)
        
        # Withour converting to NetworkX
        e = [first_node, second_node]
        return e in self.edge_pairs_unravel
        
    def add_edge(
        self, 
        first_node:int, 
        second_node:int) -> Tuple(GraphStructure, int):
        """
        Add an edge to the graph, between first_node and second_node.

        Parameters
        ----------
        first_node : int
            First node to connect.
        second_node : int
            Second node to connect.

        Returns
        -------
        s2v_graph : Struct2Vector
            New Struct2Vector graph with the added edge, between first_node 
            and second_node.
        """
        # Convert S2V graph to NetworkX graph
        nx_graph = self.to_networkx()  
        # add edge
        nx_graph.add_edge(first_node, second_node)  
        # convert NetworkX graph back to S2V graph
        s2v_graph = GraphStructure(nx_graph)
        return s2v_graph, 1

    def remove_edge(
        self, 
        first_node:int, 
        second_node:int) -> Tuple(GraphStructure, int):
        """_summary_

        Parameters
        ----------
        first_node : int
            First node of the edge to remove.
        second_node : int
            Second node of the edge to remove.

        Returns
        -------
        s2v_graph : Struct2Vector
            New Struct2Vector graph with the removed edge, between first_node 
            and second_node.
        """
        nx_graph = self.to_networkx()
        nx_graph.remove_edge(first_node, second_node)
        s2v_graph = GraphStructure(nx_graph)
        return s2v_graph, 1
    
    def get_banned_edge_actions(
        self, 
        community:List(int),
        budget=None) -> None:
        """
        Compute the list of edges banned to be removed and added.
            - We can add a new edge (u,v), iff u is in the community and v is 
                not, or viceversa. 
            - We can remove an edge (u,v), iff u and v are both in the community.

        Parameters
        ----------
        community : List(int)
            List of nodes representing the community to hide.
        """
        
        # TODO: Check the reason of this check
        if budget is not None:
            if budget < self.budget_eps:
                self.banned_actions = self.nodes_list
                return
        
        # List of edges banned to be removed
        banned_edges_remove = []
        # List of edges banned to be added
        banned_edges_add = []
        
        # Helper functions to check if a node is in/out-side the community
        def in_community(node):
            return node in community
        def out_community(node):
            return node not in community
        
        # Iterate through all possible combinations of nodes
        for u in self.nodes_list:
            for v in self.nodes_list:
                # Check if both nodes are outside the community, this means 
                # that the edge is banned to be removed and also to be added.
                if out_community(u) and out_community(v) and u != v:
                    banned_edges_remove.append((u, v))
                    banned_edges_add.append((u, v))
                
                # Check if both nodes are inside the community, this means
                # that the edge is banned to be added.
                elif in_community(u) and in_community(v) and u != v:
                    banned_edges_add.append((u, v))
                
                # Check if one node is inside the community and the other
                # outside, this means that the edge is banned to be removed.
                elif (in_community(u) and out_community(v)) \
                    or (out_community(u) and in_community(v)):
                    banned_edges_remove.append((u, v))
        
        self.banned_edges_add = banned_edges_add
        self.banned_edges_remove = banned_edges_remove


In [7]:
class GraphEnviroment(object):
    def __init__(
        self,
        objective_function:DeceptionScore,
        beta: Annotated[float, ValueRange(1.0, 100.0)]) -> None:
        """Constructor for GraphEnviroment

        Parameters
        ----------
        objective_function : DeceptionScore
            DeceptionScore object used as the objective function
        beta : float
            Percentage of edges to rewire/update, real number between 1 and 100
        """
        self.objective_function = objective_function
        self.beta = beta
        self.rewards_scale_multiplier = 10 # 0, 10, 100
    
    @staticmethod
    def apply_action(
        graph:GraphStructure, 
        community:List(int),
        action:Tuple(int, int), 
        remaining_budget:List(int))->None:
        """Applies the action to the graph, if there is an edge between the two 
        nodes, it removes it, otherwise it adds it

        Parameters
        ----------
        graph : GraphStructure
            GraphStructure object to apply the action to
        community : List(int)
            List of nodes in the community
        action : Tuple(int, int)
            Tuple containing the indices of the nodes to rewire
        remaining_budget : List
            List of remaining budgets for each graph
        """
        updated_budget = 0
        # Check if between the two nodes there is an edge
        if graph.has_edge(action[0], action[1]):
            # If there is an edge, it means that the action is to remove it
            graph, edge_cost = graph.remove_edge(action[0], action[1])
            # TODO: Check why this update
            updated_budget = remaining_budget - edge_cost
        else:
            # If there is no edge, it means that the action is to add it
            graph, edge_cost = graph.add_edge(action[0], action[1])
            # TODO: Check why this update
            updated_budget = remaining_budget - edge_cost
        
        graph.get_banned_edge_actions(community, remaining_budget)
        return graph, updated_budget
    
    @staticmethod
    def get_edge_budget(graphs:List(GraphStructure), beta:float) -> List(int):
        """Computes the edge budget for each graph

        Parameters
        ----------
        graphs : List(GraphStructure)
            List of GraphStructure objects, i.e. graphs to compute the edge 
            budget for 
        beta : float
            Percentage of edges to rewire/update

        Returns
        -------
        List(int)
            List of edge budgets for each graph
        """
        edge_budget = np.zeros(len(graphs), dtype=np.float32)
        for i, graph in enumerate(graphs):
            edge_budget[i] = int(math.ceil((graph.n_edges * beta / 100)))
        return edge_budget
    
    def get_remaining_budget(self, i:int) -> float:
        """Computes the remaining edge budget for graph i

        Parameters
        ----------
        i : int
            Index of the graph

        Returns
        -------
        float
            Remaining edge budget for graph i
        """
        return self.edge_budgets[i] - self.used_edge_budgets[i]
    
    def setup(
        self, 
        graphs:List(GraphStructure), 
        community:List(int),
        initial_objective_function_values:List(float),
        training=False) -> None:
        """Setup function for the environment

        Parameters
        ----------
        graphs : List(GraphStructure)
            List of GraphStructure objects
        community : List
            Community to hide
        initial_objective_function_values : List(float)
            Initial objective function values for each graph
        training : bool, optional
            Whether the environment is used for training, by default False
        """
        self.graphs = graphs
        self.community = community
        self.n_steps = 0
        
        self.edge_budget = self.get_edge_budget(self.graphs, self.beta)
        self.used_edge_budget = np.zeros(len(self.graphs), dtype=np.float)
        self.exhausted_budget = np.zeros(len(self.graphs), dtype=np.bool)
        
        for i, graph in enumerate(self.graphs):
            graph.first_node = None
            graph.second_node = None
            graph.get_banned_edge_actions(community, self.edge_budget[i])
        
        self.training = training
        
        self.objective_function_values = np.zeros((2, len(self.graphs)), dtype=np.float)
        self.objective_function_values[0, :] = initial_objective_function_values
        
        self.rewards = np.zeros(len(self.graphs), dtype=np.float)
        
        if self.training:
            self.objective_function_values[0, :] = np.multiply(
                self.objective_function_values[0, :], 
                self.rewards_scale_multiplier)
    
    def step(self, actions:List(Tuple(int, int))) -> Tuple[float, bool]:
        """Step function for the environment

        Parameters
        ----------
        actions : List[int]
            List of actions to take on the graph

        Returns
        -------
        Tuple[float, bool]
            Tuple containing the reward and whether the episode is done
        """
        
        # Loop over all graphs
        for i, graph in enumerate(self.graphs):
            # Check if the graph has been exhausted
            if self.exhausted_budget[i]:
                continue
            
            remaining_budget = self.get_remaining_budget(i)
            
            # Take action
            graph, updated_budget = self.apply_action(
                graph, actions[i], remaining_budget)
            self.used_edge_budgets[i] += (remaining_budget - updated_budget)
            
            # Check if the action is valid
            if actions[i] in graph.banned_edge_actions:
                # If the action is valid, update the graph
                graph.update(actions[i])
                # Update the used edge budget
                self.used_edge_budget[i] += 1
                # Update the objective function value
                self.objective_function_values[1, i] = self.objective_function(graph)
                # Update the reward
                self.rewards[i] = self.objective_function_values[0, i] - self.objective_function_values[1, i]
                # Check if the budget has been exhausted
                if self.used_edge_budget[i] >= self.edge_budget[i]:
                    self.exhausted_budget[i] = True
            else:
                # If the action is invalid, set the reward to 0
                self.rewards[i] = 0

## Graph Embedding