# Single Graph Drawing

#### Load packages

In [1]:
import networkx as nx
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

#### Load data

In [2]:
# This is a sampling for entire analysis
data_astronomy1 = pd.read_excel('data/Astronomy.xlsx', sheet_name='강민철')
data_astronomy2 = pd.read_excel('data/Astronomy.xlsx', sheet_name='강지헌')

# Drop an useless column
data_astronomy1.drop('Name', axis=1, inplace=True)
data_astronomy2.drop('Name', axis=1, inplace=True)

# Merge all dataframes into one dataframe:
data_astronomy = pd.concat([data_astronomy1, data_astronomy2], axis=0)
data_astronomy.drop(data_astronomy.loc[data_astronomy['ID'] == 53].index[0], inplace=True)
# data_astronomy

In [3]:
data_sampling1 = pd.read_excel('data/Sampling.xlsx', sheet_name='강지헌')
data_sampling2 = pd.read_excel('data/Sampling.xlsx', sheet_name='신아현')
data_sampling3 = pd.read_excel('data/Sampling.xlsx', sheet_name='신수연')

data_sampling1.drop('Name', axis=1, inplace=True)
data_sampling2.drop('Name', axis=1, inplace=True)
data_sampling3.drop('Name', axis=1, inplace=True)

data_sampling = pd.concat([data_sampling1, data_sampling2, data_sampling3], axis=0)
# data_sampling

In [4]:
data_database1 = pd.read_excel('data/Database.xlsx', sheet_name='신수연')
data_database2 = pd.read_excel('data/Database.xlsx', sheet_name='양연선')
data_database3 = pd.read_excel('data/Database.xlsx', sheet_name='김나영')

data_database1.drop('Name', axis=1, inplace=True)
data_database2.drop('Name', axis=1, inplace=True)
data_database3.drop('Name', axis=1, inplace=True)

data_database = pd.concat([data_database1, data_database2, data_database3], axis=0)
# data_database

#### Process graphs
- 이름 형식: `<Domain>_<Modality>_<ID>`
- Domain: `ASTRONOMY`, `SAMPLING`, `DATABASE`

In [5]:
graphs_astronomy = {}

for id, sub_df in data_astronomy.groupby('ID'):
    # New graph object
    graph_name = f"Astronomy_{sub_df['Mod.'].iloc[0]}_{sub_df['ID'].iloc[0]}"
    G = nx.DiGraph()
    
    # Add nodes and edges
    for _, row in sub_df.iterrows():
        start_node = row['Start']
        if pd.notna(row['End']):
            end_nodes = [end_node.rstrip() for end_node in row['End'].split(',')]
            for end_node in end_nodes:
                G.add_edge(start_node, end_node)
        # Add p.knowledge labels:  O -> 1(true)  |  X -> 0(false)
        try:
            G.nodes[start_node]['P.Knowledge'] = 1 if row['P.Knowledge'] == 'O' else 0
        except KeyError:
            G.add_node(start_node)
            G.nodes[start_node]['P.Knowledge'] = 0
    
    # Save the graph
    graphs_astronomy[graph_name] = G

In [6]:
graphs_sampling = {}

for id, sub_df in data_sampling.groupby('ID'):
    # New graph object
    graph_name = f"Sampling_{sub_df['Mod.'].iloc[0]}_{sub_df['ID'].iloc[0]}"
    G = nx.DiGraph()
    
    # Add nodes and edges
    for _, row in sub_df.iterrows():
        start_node = row['Start']
        if pd.notna(row['End']):
            end_nodes = [end_node.rstrip() for end_node in row['End'].split(',')]
            for end_node in end_nodes:
                G.add_edge(start_node, end_node)
        # Add p.knowledge labels:  O -> 1(true)  |  X -> 0(false)
        try:
            G.nodes[start_node]['P.Knowledge'] = 1 if row['P.Knowledge'] == 'O' else 0
        except KeyError:
            G.add_node(start_node)
            G.nodes[start_node]['P.Knowledge'] = 0
    
    # Save the graph
    graphs_sampling[graph_name] = G

In [7]:
graphs_database = {}

for id, sub_df in data_database.groupby('ID'):
    # New graph object
    graph_name = f"Database_{sub_df['Mod.'].iloc[0]}_{sub_df['ID'].iloc[0]}"
    G = nx.DiGraph()
    
    # Add nodes and edges
    for _, row in sub_df.iterrows():
        start_node = row['Start']
        if pd.notna(row['End']):
            end_nodes = [end_node.rstrip() for end_node in row['End'].split(',')]
            for end_node in end_nodes:
                G.add_edge(start_node, end_node)
        # Add p.knowledge labels:  O -> 1(true)  |  X -> 0(false)
        try:
            G.nodes[start_node]['P.Knowledge'] = 1 if row['P.Knowledge'] == 'O' else 0
        except KeyError:
            G.add_node(start_node)
            G.nodes[start_node]['P.Knowledge'] = 0
    
    # Save the graph
    graphs_database[graph_name] = G

#### Process multi-graphs
params:

- `graphs_dict`: 도메인별 그래프 딕셔너리
- `selected_G`: 멀티그래프 생성 시 제외할 그래프

In [8]:
def create_multigraph(graphs_dict, selected_G = None):
    G = nx.MultiDiGraph()
    for graph_name, graph in graphs_dict.items():
        if graph_name != selected_G:
            for node in graph.nodes():
                G.add_node(node)
            for edge in graph.edges():
                G.add_edge(edge[0], edge[1])
    return G

In [9]:
# Create multigraphs
multigraph_astronomy_full = create_multigraph(graphs_astronomy)
multigraph_sampling_full = create_multigraph(graphs_sampling)
multigraph_database_full = create_multigraph(graphs_database)

#### Methods for centrality metrics
(Multi-graph를 digraph로 변환하여 계산)

1. `cal_node_betweeness`
2. `cal_node_closeness`
3. `cal_node_degree`

*(각각 상위 3개의 항목을 추출함)*

In [10]:
def cal_node_betweenness(G):
    G = nx.DiGraph(G)
    values = nx.betweenness_centrality(G)
    top_3_nodes = sorted(values.items(), key=lambda item: item[1], reverse=True)[:3]
    return dict(top_3_nodes)

def cal_node_closeness(G):
    G = nx.DiGraph(G)
    values = nx.closeness_centrality(G)
    top_3_nodes = sorted(values.items(), key=lambda item: item[1], reverse=True)[:3]
    return dict(top_3_nodes)

def cal_node_degree(G):
    G = nx.DiGraph(G)
    values = nx.degree_centrality(G)
    top_3_nodes = sorted(values.items(), key=lambda item: item[1], reverse=True)[:3]
    return dict(top_3_nodes)

#### Analysis for centrality metrics

In [11]:
# Filling method
def fill_df(graph_name, graph):
    global df_centrality
    
    betweenness_top_3 = cal_node_betweenness(graph)
    closeness_top_3 = cal_node_closeness(graph)
    degree_top_3 = cal_node_degree(graph)
    row = {'Graph': graph_name}
    
    for i, (node, value) in enumerate(betweenness_top_3.items(), 1):
        row[('Betweenness', f'node{i}')] = node
        row[('Betweenness', f'value{i}')] = value
    
    for i, (node, value) in enumerate(closeness_top_3.items(), 1):
        row[('Closeness', f'node{i}')] = node
        row[('Closeness', f'value{i}')] = value
    
    for i, (node, value) in enumerate(degree_top_3.items(), 1):
        row[('Degree', f'node{i}')] = node
        row[('Degree', f'value{i}')] = value

    df_centrality.loc[len(df_centrality)] = row

In [12]:
columns = pd.MultiIndex.from_product(
    [['Betweenness', 'Closeness', 'Degree'], 
     ['node1', 'value1', 'node2', 'value2', 'node3', 'value3']]
)
df_centrality = pd.DataFrame(columns=['Graph'] + columns.tolist())

fill_df('Astronomy', multigraph_astronomy_full)
fill_df('Sampling', multigraph_sampling_full)
fill_df('Database', multigraph_database_full)

# Save the result to excel file
# ❗WARNING❗ It will automatically overwrite the existing file!!!
df_centrality.to_excel("result/All_centrality.xlsx", index=False)

#### Methods for consensus metrics

1. `cal_edge_consensus`
2. `cal_subgraph_coverage`
3. `cal_collective_shortest_path`
4. `cal_communicability`

In [13]:
def cal_edge_consensus(student_graph: nx.Graph, multi_graph: nx.MultiDiGraph) -> float:
    total_edges = len(student_graph.edges())
    matching_edges = 0
    
    for edge in student_graph.edges():
        multi_edge_count = multi_graph.number_of_edges(edge[0], edge[1])
        if multi_edge_count > 0:
            matching_edges += multi_edge_count
    
    return matching_edges / total_edges if total_edges > 0 else 0

def cal_subgraph_coverage(student_graph: nx.Graph, multi_graph: nx.MultiDiGraph) -> float:
    total_multi_edges = len(multi_graph.edges())
    matching_edges = 0
    
    for edge in student_graph.edges():
        if multi_graph.has_edge(*edge):
            matching_edges += 1
    
    return matching_edges / total_multi_edges if total_multi_edges > 0 else 0

def cal_collective_shortest_path(student_graph: nx.Graph, multi_graph: nx.MultiDiGraph) -> float:
    aggregate_shortest_paths = dict(nx.all_pairs_dijkstra_path_length(multi_graph))
    student_shortest_paths = dict(nx.all_pairs_dijkstra_path_length(student_graph))
    total_difference = 0
    count = 0
    
    for node, paths in student_shortest_paths.items():
        for target, length in paths.items():
            if node in aggregate_shortest_paths and target in aggregate_shortest_paths[node]:
                total_difference += abs(length - aggregate_shortest_paths[node][target])
                count += 1
    
    return total_difference / count if count > 0 else float('inf')

def cal_communicability(student_graph: nx.Graph, multi_graph: nx.MultiDiGraph) -> float:
    undirected_student_graph = student_graph.to_undirected()
    undirected_multi_graph = nx.Graph(multi_graph)

    student_communicability = nx.communicability_exp(undirected_student_graph)
    multi_communicability = nx.communicability_exp(undirected_multi_graph)

    total_difference = 0
    count = 0
    
    for node, comm_dict in student_communicability.items():
        for target, comm_value in comm_dict.items():
            if node in multi_communicability and target in multi_communicability[node]:
                difference = abs(comm_value - multi_communicability[node][target])
                total_difference += np.log1p(difference)  # Logarithm smoothing
                count += 1
    
    return total_difference / count if count > 0 else float('inf')

#### Analysis for consensus metrics

In [14]:
# Set a new dataframe for the analysis
columns = ["ID", "edge_consensus", "subgraph_coverage", "collective_shortest_path", "communicability"]
consen_astronomy = pd.DataFrame(columns=columns).set_index("ID")
consen_sampling = pd.DataFrame(columns=columns).set_index("ID")
consen_database = pd.DataFrame(columns=columns).set_index("ID")

In [15]:
for graph_name, G in graphs_astronomy.items():
    try:
        consen_astronomy.loc[graph_name] = [
            cal_edge_consensus(G, multigraph_astronomy_full),
            cal_subgraph_coverage(G, multigraph_astronomy_full),
            cal_collective_shortest_path(G, multigraph_astronomy_full),
            cal_communicability(G, multigraph_astronomy_full)
        ]
    except nx.NetworkXNoPath as e:
        print(f"Path error at {graph_name}: {e}")

# Save the result to excel file
# ❗WARNING❗ It will automatically overwrite the existing file!!!
consen_astronomy.to_excel("result/Astronomy_consen.xlsx", index=True)

In [16]:
for graph_name, G in graphs_sampling.items():
    try:
        consen_sampling.loc[graph_name] = [
            cal_edge_consensus(G, multigraph_sampling_full),
            cal_subgraph_coverage(G, multigraph_sampling_full),
            cal_collective_shortest_path(G, multigraph_sampling_full),
            cal_communicability(G, multigraph_sampling_full)
        ]
    except nx.NetworkXNoPath as e:
        print(f"Path error at {graph_name}: {e}")

# Save the result to excel file
# ❗WARNING❗ It will automatically overwrite the existing file!!!
consen_sampling.to_excel("result/Sampling_consen.xlsx", index=True)

In [17]:
for graph_name, G in graphs_database.items():
    try:
        consen_database.loc[graph_name] = [
            cal_edge_consensus(G, multigraph_database_full),
            cal_subgraph_coverage(G, multigraph_database_full),
            cal_collective_shortest_path(G, multigraph_database_full),
            cal_communicability(G, multigraph_database_full)
        ]
    except nx.NetworkXNoPath as e:
        print(f"Path error at {graph_name}: {e}")

# Save the result to excel file
# ❗WARNING❗ It will automatically overwrite the existing file!!!
consen_database.to_excel("result/Database_consen.xlsx", index=True)