In [3]:
# Requires networkx version >= 3.2 (for find_cliques)
import networkx as nx
import random
import math

In [4]:
# Values of hyper parameters as mentioned in the research paper

p = 0.01    # propagation probability

no_of_iterations = 100    # Maximum number of cycles/iterations
k = 5    # Size of seed set
N = 30    # Number of bats

alpha = 0.3
beta = 3    # beta * k = the size of CandidatesPool
gamma = 0.3
mu = 0.5
nu = 0.5

fmin = 0
fmax = 2

In [5]:
class Bat:
    def __init__(self, graph: nx.Graph):
        self.n = len(graph.nodes())
        self.initialize_pulse_rate()
        self.initialize_loudness()
        self.initialize_frequency(fmin, fmax)
        self.initialize_location(self.n, k)
        self.initialize_velocity(k)

    def initialize_pulse_rate(self):
        self.pulse_rate0 = random.uniform(0, 1)
        self.pulse_rate = self.pulse_rate0

    def initialize_loudness(self):
        self.loudness = random.uniform(0, 1)
    
    def initialize_frequency(self, fmin: float, fmax: float):
        self.frequency = fmin + (fmax - fmin) * random.uniform(0, 1)

    def initialize_location(self, n: int, k: int):
        self.location = random.sample(list(range(1, n + 1)), k)

    def initialize_velocity(self, k: int):
        temp = []
        for _ in range(k):
            temp.append(random.randint(0, 1))
        self.velocity = temp

    def update_loc_vel_with(self, best_location: list):
        self.update_velocity(best_location)
        self.update_location()

    def update_velocity(self, best_location: list):
        new_velocity = self.velocity.copy()
        loc_set = set(self.location)
        
        for i in range(k):
            if best_location[i] in loc_set:
                new_velocity[i] += self.frequency
        
        for i in range(k):
            v = new_velocity[i]

            if v < (fmax - fmin + 1)/2 :
                self.velocity[i] = 0
            else:
                self.velocity[i] = 1
    
    def update_location(self):
        loc_set = set(self.location)
        list_of_nodes = list(range(1, self.n + 1))

        for i in range(k):
            if self.velocity[i]:
                v = random.choice(list_of_nodes)
                
                while v in loc_set:
                    v = random.choice(list_of_nodes)
                
                loc_set.remove(self.location[i])
                self.location[i] = v
                loc_set.add(v)

    def update_loudness(self):
        self.loudness *= alpha

    def update_pulse_rate(self, g: int):
        self.pulse_rate = self.pulse_rate0 * (1 - (math.e ** (-gamma * g)))

In [6]:
# Local Influence Estimate/LIE function
def LIE(graph: nx.Graph, bat: Bat):
    seed = bat.location
    seed_set = set(seed)

    one_neighbours = set()
    for node_id in seed_set :
        one_neighbours.update(set(graph.neighbors(node_id)))
    one_neighbours = one_neighbours - seed_set

    two_neighbours = set()
    for node_id1 in one_neighbours :
        two_neighbours.update(set(graph.neighbors(node_id1)))
    two_neighbours = two_neighbours - seed_set

    one_hop_area = len(one_neighbours)
    if one_hop_area == 0:
        return k
    two_hop_area = len(two_neighbours)

    sigma_0 = k
    sigma_1 = 0
    for node_id1 in one_neighbours:
        val = 1
        for node_id in seed_set:
            if(graph.has_edge(node_id1,node_id)) :
                val *= ( 1 - p)
        sigma_1 += (1 - val)

    factor = 0

    for node_id2 in two_neighbours:
        d = 0
        for node_id1 in one_neighbours:
            if(graph.has_edge(node_id2, node_id1)):
                d += 1
        factor += p*d 

    factor /= one_hop_area
    factor += 1

    lie = sigma_0 + factor * sigma_1
    return lie

In [7]:
def generate_candidate_pool(graph: nx.Graph)->list[int]:
    graph = graph.copy()

    n = len(graph.nodes())
    nodes = set(graph.nodes())

    # List of list of maximal cliques
    list_of_maximal_cliques = []

    while len(nodes) > 0:
        # Finding the node with maximum degree
        node =  max(nodes, key = lambda node : graph.degree(node))

        # All cliques containing 'node'
        list_of_cliques = list(nx.find_cliques(graph, nodes=[node]))

        # Clique with the largest size which contains 'node'
        max_clique = max(list_of_cliques, key = lambda clique: len(clique))
        list_of_maximal_cliques.append(max_clique)

        for clique_node in max_clique:
            graph.remove_node(clique_node)
            nodes.remove(clique_node)

    clique_num = len(list_of_maximal_cliques)
    candidate_pool = []

    if clique_num <= 3 * k:
        for clique in list_of_maximal_cliques:
            req = math.ceil(len(clique) * 3 * k / n)
            candidates = random.sample(clique, req)

            candidate_pool.extend(candidates)

    elif 3 * k < clique_num and clique_num < 4 * k:
        for clique in list_of_maximal_cliques:
            candidate = random.choice(clique)
            candidate_pool.append(candidate)

    else:
        for i in range(3 * k):
            candidate = random.choice(list_of_maximal_cliques[i])
            candidate_pool.append(candidate)
        
        list_of_maximal_cliques = list_of_maximal_cliques[3 * k : ]
        list_of_maximal_cliques = random.sample(list_of_maximal_cliques, k)

        for clique in list_of_maximal_cliques:
            candidate = random.choice(clique)
            candidate_pool.append(candidate)

    return candidate_pool

In [8]:
def random_explore(candidate_pool: list[int]):
    return random.sample(candidate_pool, k)

In [9]:
def generate_bat_population(graph: nx.Graph)->list[Bat]:
    population = []

    for _ in range(N):
        population.append(Bat(graph))
    
    return population

In [10]:
def get_best_global_location(graph: nx.Graph, population: list[Bat]):
    max_lie = LIE(graph, population[0])
    index = 0

    for i in range(1, len(population)):
        lie = LIE(graph, population[i])
        
        if lie > max_lie:
            max_lie = lie
            index = i
    
    return population[index].location

In [11]:
def local_search(graph: nx.Graph, bat: Bat):
    best_seed = bat.location
    best_seed.sort(key = lambda node : graph.degree(node))

    for j in range(k):
        neighbours = list(set(graph.neighbors(best_seed[j]))  - set(best_seed))

        new_bat = bat
        new_best_lie = LIE(graph, new_bat)

        curr_node = new_bat.location[j]

        for node_id_index in range(len(neighbours)):
            nei_node = neighbours[node_id_index]
            new_bat.location[j] = nei_node

            new_bat_lie = LIE(graph, new_bat)

            if new_bat_lie > new_best_lie:
                new_best_lie = new_bat_lie
                curr_node = nei_node
            else:
                new_bat.location[j] = curr_node

        bat = new_bat

    return bat

In [12]:
def CDBA(graph: nx.Graph):
    population = generate_bat_population(graph)
    best_location = get_best_global_location(graph, population)
    candidate_pool = generate_candidate_pool(graph)

    for i in range(no_of_iterations):
        for j in range(len(population)):

            bat = population[j]
            bat.update_loc_vel_with(best_location)

            if random.random() > bat.pulse_rate:
                bat = local_search(graph, bat)

            if random.random() < bat.loudness:
                new_bat = Bat(graph)

                if(LIE(graph, new_bat) > LIE(graph, bat)):
                    new_bat.location = random_explore(candidate_pool)
                    population[j] = new_bat

            bat.update_loudness()
            bat.update_pulse_rate(i)

        best_location = get_best_global_location(graph, population)

    return best_location

In [14]:
graph = nx.Graph()
no_of_nodes = 50
no_of_edges = 250

# Constructing the graph
for i in range(no_of_nodes):
    graph.add_node(i + 1)

included = set()

for i in range(no_of_edges):
    u = random.randint(1, no_of_nodes)
    v = random.randint(1, no_of_nodes)

    while u == v or (u, v) in included:
        u = random.randint(1, no_of_nodes)
        v = random.randint(1, no_of_nodes)

    graph.add_edge(u, v)
    included.add((u, v))

seed_set = CDBA(graph)

print(seed_set)

[5, 19, 27, 40, 22]
