# 一些基础性的内容

In [88]:
import numpy as np
import networkx as nx
from itertools import chain, combinations
import math
import pandas as pd

# 示例邻接矩阵（无向图）
A = np.array([
	[0, 1, 0, 0, 0, 0],
	[1, 0, 1, 0, 0, 0],
	[0, 1, 0, 0, 0, 0],
	[0, 0, 0, 0, 1, 1],
	[0, 0, 0, 1, 0, 1],
	[0, 0, 0, 1, 1, 0]
])

# 将邻接矩阵转换为 NetworkX 图
G = nx.from_numpy_array(A)
# 找到子图
nx.to_numpy_array(G.subgraph([2,3,4]))
# 找到所有连通子图（返回的是节点集合的列表）
components = list(nx.connected_components(G))
print(components)


[{0, 1, 2}, {3, 4, 5}]


- 集合比较

In [73]:
a = frozenset([1, 3])
b = frozenset([3, 1])
a==b

True

# 定义给定的graph $g$ 下，对应的群体的 total payoff 的计算 （后面需要个人自己定义）

In [74]:
def characteristic_function_cooperation(cooperation_set=[1,2,3]):
	"""
	计算如果一群人成功合作了，产生多少的价值
	
	参数:
	cooperation_set (list): 合作集，默认为 [1, 2, 3]
	
	返回:
	int: 合作集的特征函数值
	"""

	value_dict = {
		frozenset([1]): 0,
		frozenset([2]): 0,
		frozenset([3]): 0,
		frozenset([1, 3]): 6,
		frozenset([2, 3]): 6,
		frozenset([1, 2]): 12,
		frozenset([1, 2, 3]): 12
	}
	if not isinstance(cooperation_set, frozenset):
		cooperation_set = frozenset(cooperation_set)

	if cooperation_set not in value_dict:
		return 0

	return value_dict[cooperation_set]

def graph_based_characteristic_function(g, coalition_set=[1,2,3], characteristic_function=characteristic_function_cooperation):
	"""
	计算图的特征函数。
	
	参数:
	g (networkx.Graph): 给定的合作图 g \in GR
	
	返回:
	dict: 图的特征函数，键为节点，值为该节点所在连通子图的标识符
	"""
	# 根据给定的g，查看可行的最大的合作网络
	subgraph = G.subgraph(coalition_set)
	# 所有人员的总payoff
	value = 0
	for coalition in nx.connected_components(subgraph):
		coalition = frozenset(coalition)
		value += characteristic_function(coalition)

	return value


# 计算Graph Shapley Value

In [76]:
def powerset(iterable, drop_last=True):
    """
    计算所有的子集（幂集），可选地排除最后一个空集。

    Args:
        iterable (_type_): compute the powerset of this iterable
        drop_last (bool, optional): _description_. Defaults to True. Ignore the full set.

    Returns:
        _type_: a list of all subsets of the iterable
    """
    s = list(iterable)
    if drop_last:
        return list(chain.from_iterable(combinations(s, r) for r in range(len(s))))
    else:
        return list(chain.from_iterable(combinations(s, r) for r in range(len(s)+1)))

In [79]:
def graph_shapley_value(g, characteristic_function=graph_based_characteristic_function):
    """
    计算图的Shapley值。
    
    参数:
    g (networkx.Graph): 给定的合作图 g \in GR
    characteristic_function (function): 图的特征函数，默认为 graph_based_characteristic_function
    
    返回:
    dict: 图的Shapley值，键为节点，值为该节点的Shapley值
    """
    # 找到所有参与分配的节点
    all_nodes = list(g.nodes())
    # 计算所有可能的合作
    all_potential_coalition_sets = powerset(all_nodes, drop_last=False)
    coalition_set_payoffs = {
        frozenset(coalition_set): characteristic_function(g, coalition_set=coalition_set)
        for coalition_set in all_potential_coalition_sets
    }

    
    shapley_value = {node: 0 for node in all_nodes}
    
    n = len(all_nodes)
    
    for node in all_nodes:
        for coalition_set, value in coalition_set_payoffs.items():
            if node in coalition_set:
                # 计算节点在该合作集中的边际贡献
                without_node_coalition_set = coalition_set - frozenset([node])
                if without_node_coalition_set in coalition_set_payoffs:
                    marginal_contribution = value - coalition_set_payoffs[without_node_coalition_set]
                    shapley_value[node] += marginal_contribution * math.factorial(len(without_node_coalition_set)) * math.factorial(n - len(coalition_set))
        shapley_value[node] /= math.factorial(n)
    return shapley_value, coalition_set_payoffs

# 测试

In [97]:
G = nx.Graph()
G.add_nodes_from([1,2,3])
G.add_edges_from([(1,2), (2,3)])

# 测试Shapley value的计算是否正确
shapley_value, all_potential_coaltition_set_payoffs = graph_shapley_value(G, characteristic_function=graph_based_characteristic_function)
print("Shapley value:", shapley_value)
payoff_sheet = pd.DataFrame.from_dict(all_potential_coaltition_set_payoffs, orient='index', columns=['payoff']).reset_index().rename(columns={'index': 'coalition_set'})
payoff_sheet.iloc[:, 0] = payoff_sheet.iloc[:, 0].apply(lambda x: list(x))
payoff_sheet

Shapley value: {1: 4.0, 2: 7.0, 3: 1.0}


Unnamed: 0,coalition_set,payoff
0,[],0
1,[1],0
2,[2],0
3,[3],0
4,"[1, 2]",12
5,"[1, 3]",0
6,"[2, 3]",6
7,"[1, 2, 3]",12
