In [None]:
import networkx as nx


def valid_degree(G):
    '''
    计算节点有效度。
    计算方式为：
    1.将最大度值及其对应节点作为有效度的初始值及其初始节点。并将初始节点的邻居节点作为“用于比较的对象”。
    2.比较剩余节点的邻居节点与“用于比较的对象”中节点的差异性，将具有最大差异性的节点作为有效节点中的下一个节点，其有效度的值为差异性的值。
    差异性是指自身邻居节点中与用于较的对象中不同节点的个数。
    3.将2中所确定节点的邻居节点中的差异性节点更新到“用于比较的对象”。
    4.重复2和3获取每个节点的差异性值
    '''
    
    node_degree = dict(G.degree())

    # 获取具有最大度值的节点
    max_node = max(node_degree, key=node_degree.get)

    # 将度数最大的节点作为初始化出现节点
    emerged_nodes = set(G.neighbors(max_node))
    valid_degree_ = {node : 0 for node in G.nodes()}
    valid_degree[max_node] = node_degree[max_node]

    
    transacted_nodes = {max_node}
    untransacted_nodes = set(G.nodes()) - transacted_nodes

    while(untransacted_nodes):
        current_valid = {}
        for node in untransacted_nodes:
                
            # 计算当前节点的有效性
            current_valid[node] = len(set(G.neighbors(node)) - emerged_nodes)
            
        # 获取未处理节点中具有最大有效值的节点
        max_node = max(current_valid, key=current_valid.get) 
        valid_degree_[max_node] = current_valid[max_node]
        
        # 将最大有效性节点的邻居节点更新到已出现的节点里
        emerged_nodes.update(set(G.neighbors(max_node)))
        
        # 更新已经处理过的和未处理过的节点
        transacted_nodes.add(max_node)
        untransacted_nodes.remove(max_node)
        
        # 当网络中所有节点都已经出现过则令之后节点的有效度值全为0
        if len(emerged_nodes) == len(G.nodes()):
            for node in untransacted_nodes:
                valid_degree_[node] = 0
            return valid_degree_

    return valid_degree_
    

In [None]:
def diameter_network(graph: nx.Graph) -> int:
    '''
    计算网络直径
    '''
    
    if nx.is_connected(graph):
        
        # 计算并打印网络的直径
        diameter = nx.diameter(graph)
    else:
        
        # 获取最大连通子图
        largest_component = max(nx.connected_components(graph), key=len)  # 最大连通子图的节点集合
        subgraph = graph.subgraph(largest_component)
        # 计算最大连通子图的直径
        diameter = nx.diameter(subgraph)
    
    return diameter

In [None]:
import math

@functools.lru_cache(maxsize=None)
def communicability_matrix_with_belta (graph: nx.Graph, belta: float) -> list[list[float]]:
    '''
    计算感染率为belta下的可通信矩阵X
    '''

    infected_probability = belta
    A = nx.to_numpy_array(graph)  # 图graph的邻接矩阵
    diam = diameter_network(graph)  # 图的网络直径
    
    # 初始化通信概率矩阵 X
    X = np.zeros_like(A, dtype=np.float64)
    for n in range(diam + 1):
        A_n_power = np.linalg.matrix_power(A, n)  # 计算 A 的 n 次方
        n_infected_probability = infected_probability ** n  # 距离为n时的感染率
        
        X += (n_infected_probability / math.factorial(n)) * A_n_power

    return np.real(X)

In [None]:
import functools
@functools.lru_cache(maxsize=None)
def node_to_index(graph: nx.Graph) -> dict[int, int]:
    '''
    获取节点对应的索引
    '''
    
    return {node: i for i, node in enumerate(graph.nodes())}

In [None]:
@functools.lru_cache(maxsize=None)
def valid_degree_with_self_degree(graph:nx.Graph)->dict['node', 'value']:
    '''
    计算带有自身度值的有效度
    '''
    
    degree = dict(graph.degree())
    temp_valid_degree = valid_degree(graph)  # 存储每个节点的有效度
    node_to_idx = node_to_index(graph)  # dict{'node':index}
    improved_valid_degree = {}
    for node in node_to_idx.keys():

        # 第一次改进节点有效性，度值+有效值
        improved_valid_degree[node] = temp_valid_degree[node] + degree[node]
    
    return improved_valid_degree

In [None]:
def valid_degree_consider_comm(graph: nx.Graph, i:'node', belta: float) -> float:
    '''
    计算考虑通信矩阵的节点i的有效度
    '''

    node_to_idx = node_to_index(graph)  # dict{'node':index}
    X = communicability_matrix_with_belta(graph, belta)  # 获取通信力矩阵X
    improved_valide_degree = valid_degree_with_self_degree(graph)
    i_improved_valide_degree = 0  # 将节点i改进的有效度置为0
    i_index = node_to_idx[i]  # 获得节点i的索引
    for node in node_to_idx.keys():
        j_index = node_to_idx[node]
        i_improved_valide_degree += X[i_index][j_index] * improved_valide_degree[node]
        
    return i_improved_valide_degree

In [None]:
def all_valid_degree_consider_comm(graph: nx.Graph, belta: float)-> dict['node','value']:
    '''
    计算所有节点的有效度，考虑通信矩阵
    '''
    
    valid_degree_centrality = {}
    X = communicability_matrix_with_belta(graph, belta)  # 获取通信力矩阵X
    node_to_idx = node_to_index(graph)  # dict{'node':index}
    for node in node_to_idx.keys():
        valid_degree_centrality[node] = valid_degree_consider_comm(graph, node, belta)

    return valid_degree_centrality