***

*Course:* [Math 444](https://people.math.wisc.edu/~hlyu36/GNiDS/) - Graphs and Networks in Data Science (MMiDS) - Spring 2025

*Author:* [Hanbaek Lyu](https://hanbaeklyu.com), Department of Mathematics, University of Wisconsin-Madison  

***

In [None]:
### Load some packages 

import networkx as nx # for handling graphs/networks 
import numpy as np # for basic scientific computing 
import matplotlib.pyplot as plt # for plotting
import matplotlib.gridspec as gridspec
from tqdm import trange

# Barabasi-Albert model

In [None]:
def BA(G0=None, m0=1, m=1, n=100, alpha=1):
    # Barabasi-Albert model with baseline graph G = single node with m0 self-loops 
    # Each new node has m edges pointing to some nodes in the existing graph 
    # alpha=1 -> preferential attachment: The head of each new directed edge is chosen randomly with probability 
    # proportional to the degree
    # alpha=0 ->: Uniform attachment: The head of each new directed edge is chosen uniformly at random
    # alpha \notin\{0,1} -> nonlinear preferential attachment: The head of each new directed edge is chosen 
    # randomly with probability proportional to the degree^alpha
    
    if G0 is not None: 
        G = G0
    else: 
        G = nx.MultiGraph() # baseline graph with a single node and m0 self-loops 
        for i in np.arange(m0):
            G.add_edge(1,1)
        
    for s in np.arange(1,n):
        for j in np.arange(m):
            # form a degree distribution 
            degrees = np.asarray([G.degree(n)**(alpha) for n in G.nodes()])
            deg_dist = degrees*(1/np.sum(degrees))
            v = np.random.choice(G.nodes(), p=deg_dist)
            G.add_edge(s,v)
        
    return G
    

# Random Walk Attachement model

In [None]:
def RWA(G0=None, m0=1, m=1, n=100, p=0.9):
    # Random Walk Attachement model with baseline graph G = single node with m0 self-loops 
    # Each new node has m edges pointing to some nodes in the existing graph 
    # When adding a new node $u$, we add edges {u,v_1}, \dots, {u,v_m} 
    # v_1 is uniformly chosen among all nodes in G
    # v_2 is uniformly chosen among all neighbors of v_1 with probability p; 
    # (p is the probability of closing a triangle)
    # with the rest of probability, v_2 is chosen the same way as v_1
    # Do the same for the rest of v_3,..,v_m. 
    
    if G0 is not None: 
        G = G0
    else: 
        G = nx.MultiGraph() # baseline graph with a single node and m0 self-loops 
        for i in np.arange(m0):
            G.add_edge(1,1)
        
    for s in np.arange(1,n-m0):
        v = np.random.choice(G.nodes())
        for j in np.arange(m):
            U = np.random.rand()
            if (j == 1) or (U>p):
                candidates = list(G.nodes())
            else: 
                candidates = list(G.neighbors(v))
            if (len(candidates)>1) and (s in candidates):
                candidates.remove(s)
            v = np.random.choice(candidates)
            G.add_edge(s,v)
        
    return G
    

In [None]:
G = RWA(p=0.5)
nx.draw(G)

In [None]:
fig = plt.figure(figsize=[11,7], constrained_layout=False)
network_list = [""]
n = 500
m=3
p_list = [0, 0.3, 0.7, 1]
sample_size = 100

outer_grid = gridspec.GridSpec(nrows=3, ncols=len(p_list)+1, wspace=0.2, hspace=0.3)

G = nx.MultiGraph() # baseline graph with a single node and a self-loop
G.add_edge(1,1)

for i in trange(len(p_list)+1): 
    clustering_list = [] 
    path_length_list = []
    for j in np.arange(sample_size):
        if i < len(p_list):
            G = RWA(m0=1, m=m, n=n, p = p_list[i])
            title = r"RWA (n={}, $m$={}, $p$={})".format(n, m, p_list[i])   
        else:
            alpha = 1
            G = BA(m0=1, m=m, n=n, alpha = alpha)
            title = r"BA (n={}, $m$={}, $\alpha$={})".format(n, m, alpha)

        # convert multigraph from BA to simple graph
        G00 = nx.Graph()
        for e in G.edges():    
            G00.add_edge(e[0],e[1])
        G = G00

        clustering_list.append(nx.average_clustering(G))
        if nx.is_connected(G):
            path_length_list.append(nx.average_shortest_path_length(G))
        else:
            path_length_list.append(0)

    
    inner_grid = outer_grid[0,i].subgridspec(1, 1, wspace=0, hspace=0)
    ax = fig.add_subplot(inner_grid[0, 0])
    edges = G.edges()
    weights = [0.5 for e in G.edges]
    nx.draw(G, with_labels=False, width=weights, node_size=20, ax=ax, label='Graph', pos=nx.circular_layout(G))
    #ax.legend()
    ax.set_title(title, fontsize=8)

    
    inner_grid1 = outer_grid[1,i].subgridspec(1, 1, wspace=0.1, hspace=0.1)
    ax = fig.add_subplot(inner_grid1[0, 0])
    ax.hist(clustering_list, bins='auto', alpha=0.7, label='BA', edgecolor="k", histtype='stepfilled', density=True)
    #ax.legend()
    ax.set_title(r"Avg. Cl. Coeff.", fontsize=8)
    
    degrees = [G.degree(n) for n in G.nodes()]
    inner_grid1 = outer_grid[2,i].subgridspec(1, 1, wspace=0, hspace=0)
    ax = fig.add_subplot(inner_grid1[0, 0])
    ax.hist(degrees, bins='auto', alpha=0.7, label='RWA', edgecolor="k", histtype='stepfilled', density=True)
    ax.set_yscale('log')
    ax.set_xscale('log')
    ax.set_title(r"Degree dist.", fontsize=8)

    
    #inner_grid2 = outer_grid[2,i].subgridspec(1, 1, wspace=0, hspace=0)
    #ax = fig.add_subplot(inner_grid2[0, 0])
    #ax.hist(path_length_list, bins='auto', alpha=0.7, label='BA', edgecolor="k", histtype='stepfilled', density=True)
    #ax.legend()
    #ax.set_title(r"Avg. path length", fontsize=8)
    



plt.savefig("RWA_comparison2", bbox_inches="tight")

In [None]:
fig = plt.figure(figsize=[11,5], constrained_layout=False)
network_list = [""]
n_list=[10, 30, 50, 100]
m=3
p=1
outer_grid = gridspec.GridSpec(nrows=2, ncols=len(n_list), wspace=0.3, hspace=0.1)

G = nx.MultiGraph() # baseline graph with a single node and a self-loop
G.add_edge(1,1)

for i in np.arange(len(n_list)): 
    for j in np.arange(2):
        if j == 0:
            G = RWA(m0=1, m=m, n=n_list[i], p=0.9)
            title = r"RWA ($m$={}, $n$={}, $p$={})".format(m, n_list[i], p)
        else: 
            G = BA(m0=1, m=m, n=n_list[i], alpha=1)
            title = r"BA ($m$={}, $n$={}, $\alpha$={})".format(m, n_list[i], 1)
        G00 = nx.Graph()
        
        for e in G.edges():    
            G00.add_edge(e[0],e[1])
        G0 = G00
        degrees = [G.degree(n) for n in G.nodes()]

        inner_grid = outer_grid[j,i].subgridspec(1, 1, wspace=0, hspace=0)
        ax = fig.add_subplot(inner_grid[0, 0])
        #edges = G.edges()
        #weights = [0.5 for u,v in G.edges]
        nx.draw(G, with_labels=False, width=weights, node_size=20, ax=ax, label='Graph')
        #ax.legend()
        title = title + "\n avg. cl. coeff.={}".format(np.round(nx.average_clustering(G0),3))
        ax.set_title(title, fontsize=8)
    



plt.savefig("RWA_comparison0", bbox_inches="tight")

# Random walk on graphs

In [None]:
def RW(G, x0=None, steps=1, return_history=False):
    # simple symmetric random walk on graph G 
    # initialization at x0
    if x0 is None:
        x = np.random.choice(G.nodes())
    else:
        x = x0
    
    history = []
    for i in np.arange(steps):
        if len(list(G.neighbors(x))) == 1:
            print("RW is stuck at isolated node")
            x = np.random.choice(G.nodes()) # re-initialize uniformly at random
        else: 
            x = np.random.choice(list(G.neighbors(x)))

        if return_history:
            history.append(x)
        
    if not return_history: 
        return x 
    else: 
        return history

In [None]:
history = RW(G, x0=None, steps=20, return_history=True)

In [None]:
d = [1,2,3]

In [None]:
stubs_list = []
for i in np.arange(len(d)):
    for j in np.arange(d[i]):
        stubs_list.append([i,j])

In [None]:
stubs_list

In [None]:
np.random.choice(np.asarray(len(stubs_list)), 2)

In [None]:
np.asarray([len(s) for s in stubs_list])

In [None]:
ss = np.random.choice(np.asarray(100), 2)
s1 = ss[0]
s2 = ss[1]

In [None]:
ss

In [None]:
s1

In [None]:
s2

# Configuration model

## Model construction

In [None]:
def CM(d):
    d = list(d)
    stubs_list = []
    
    # (1) Append node index i for each of its stubs
    for i in np.arange(len(d)):
        for j in np.arange(d[i]):
            stubs_list.append(i)
    
    G = nx.MultiGraph()
    
    while len(stubs_list) > 0:
        # Randomly pick two distinct stubs
        ss = np.random.choice(np.arange(len(stubs_list)), 2, replace=False)
        s1, s2 = ss
        half_edge1 = stubs_list[s1]  # (2)
        half_edge2 = stubs_list[s2]  # (3)
        
        G.add_edge(half_edge1, half_edge2)  # (4)
        
        # Remove the matched stubs (careful with order)
        for index in sorted([s1, s2], reverse=True):
            stubs_list.pop(index)
    
    return G

In [None]:
d = [3]*50

In [None]:
nx.draw(CM(d))

In [None]:
fig = plt.figure(figsize=[12,4], constrained_layout=False)
network_list = [""]
n_list=[10, 50, 100]
outer_grid = gridspec.GridSpec(nrows=1, ncols=len(n_list), wspace=0.3, hspace=0.1)

r = 3
clustering_list = [] 
path_length_list = []

for i in np.arange(len(n_list)): 
    deg_dist = [r]*n_list[i]
    G = CM(d=deg_dist)
    title = r"CM (${}$-regular, $n$={})".format(r, n_list[i])
    
    #degrees = [G.degree(n) for n in G.nodes()]

    inner_grid = outer_grid[0,i].subgridspec(1, 1, wspace=0, hspace=0)
    ax = fig.add_subplot(inner_grid[0, 0])
    #edges = G.edges()
    #weights = [0.5 for u,v in G.edges]
    nx.draw(G, with_labels=False, width=0.5, node_size=20, ax=ax, label='Graph')
    #ax.legend()
    #title = title + "\n avg. cl. coeff.={}".format(np.round(nx.average_clustering(G0),3))
    ax.set_title(title, fontsize=10)


plt.savefig("CM_0", bbox_inches="tight")

## Hypothesis testing: Subgraph in Caltech

In [None]:
def p_value(x0, data_list):
    # computes p-value = P(random data point from data distribution > x0)
    excess_list = [x for x in data_list if x>=x0]
    return len(excess_list)/len(data_list)
    

In [None]:
## Load Caltech Facebook graph

# Initialize graph 
G = nx.Graph()

# Load COVID_PPI.txt edgelist
path = "Caltech.txt"
edgelist = list(np.genfromtxt(path, delimiter=",", dtype=str)) 

# Add in all edges in the edgelist to G
for e in edgelist:
    G.add_edge(e[0], e[1]) 
    
# Print out the number of nodes and edges in G

print("Number of nodes:", len(list(G.nodes())))
print("Number of edges:", len(list(G.edges())))
print("Avg. clustering coefficient:", nx.average_clustering(G0))
print("Avg. shortest path length:", nx.average_shortest_path_length(G0))
print("edge density:", nx.density(G))
G0 = G.subgraph(sorted(nx.connected_components(G), key=len, reverse=True)[0])

# Compute the number of connected compoenents
G0 = G.subgraph(sorted(nx.connected_components(G), key=len, reverse=True)[0])

nx.draw(G0)

In [None]:
# Perform a random walk and take an induced subgraph on a random walk trajectory 
RW_trajectory = RW(G0, steps=100, return_history=True)
H = G.subgraph(nodes=RW_trajectory)
print("Number of nodes:", len(list(H.nodes())))
print("Number of edges:", len(list(H.edges())))
nx.draw(H)

In [None]:
fig = plt.figure(figsize=[10,4], constrained_layout=False)
outer_grid = gridspec.GridSpec(nrows=1, ncols=2, wspace=0.3, hspace=0.1)

degrees = [H.degree(v) for v in H.nodes()]
G = CM(d=degrees)

inner_grid = outer_grid[0,0].subgridspec(1, 1, wspace=0, hspace=0)
ax = fig.add_subplot(inner_grid[0, 0])
nx.draw(H, with_labels=False, width=weights, node_size=20, ax=ax)
ax.set_title("Caltech Subgraph", fontsize=10)

inner_grid = outer_grid[0,1].subgridspec(1, 1, wspace=0, hspace=0)
ax = fig.add_subplot(inner_grid[0, 0])
nx.draw(G, with_labels=False, width=weights, node_size=20, ax=ax)
ax.set_title("Configuration Model", fontsize=10)

plt.savefig("CM_3", bbox_inches="tight")

In [None]:
from tqdm import trange

n_samples = 1000

statistics_list = ["avg. deg.", "avg. clustering coeff.", "avg. path length", "max matching"]

stats_list0 = []
stats_list1 = [] 
stats_list2 = []
stats_list3 = []

degrees = [H.degree(v) for v in H.nodes()]

for i in trange(n_samples): 
    G = CM(d = degrees)
    
    # convert multigraph G into a simple graph G00 
    G00 = nx.Graph()
    for e in G.edges():    
        G00.add_edge(e[0],e[1])
    G0 = G00
    
    avg_deg = np.mean(np.asarray([G.degree(v) for v in G.nodes()]))
    stats_list0.append(avg_deg)
    stats_list1.append(nx.average_clustering(G00))
    if nx.is_connected(G):
        stats_list2.append(nx.average_shortest_path_length(G))
    else:
        stats_list2.append(0)
        
    stats_list3.append(len(nx.maximal_matching(G00)))  

stats_list_all = [stats_list0, stats_list1, stats_list2, stats_list3]

In [None]:
fig = plt.figure(figsize=[11,2], constrained_layout=False)

test_0 = np.mean(np.asarray(degrees))
test_1 = nx.average_clustering(H)
test_2 = nx.average_shortest_path_length(H)
test_3 = len(nx.maximal_matching(H))

outer_grid = gridspec.GridSpec(nrows=1, ncols=len(statistics_list), wspace=0.2, hspace=0.3)

for i in np.arange(len(statistics_list)): 
    if i == 0:
        test_0 = np.mean(np.asarray(degrees))
    elif i == 1:
        test_0 = nx.average_clustering(H)
    elif i == 2:
        test_0 = nx.average_shortest_path_length(H)
    elif i == 3:
        test_0 = len(nx.maximal_matching(H))
    
    inner_grid = outer_grid[0,i].subgridspec(1, 1, wspace=0.1, hspace=0.1)
    ax = fig.add_subplot(inner_grid[0, 0])
    p0 = p_value(test_0, stats_list_all[i])
    ax.hist(stats_list_all[i], bins='auto', alpha=0.7, label="CM", edgecolor="k", histtype='stepfilled', density=True)
    #ax.legend()
    ax.set_title(statistics_list[i], fontsize=8)
    ax.axvline(x=test_0, color='r', label=r"$p$-value={}".format(p0))
    ax.legend(fontsize=9)

plt.suptitle("Hypothesis testing for a sugraph of Caltech FB graph", y=1.1)
plt.savefig("CM_hypothesis0", bbox_inches="tight")

## Exercise 4: Hypothesis Testing for ER and BA Models

In [None]:
# ===== Exercise 4(i): ER Hypothesis =====

# Load Caltech graph
G = nx.Graph()
path = "Caltech.txt"
edgelist = list(np.genfromtxt(path, delimiter=",", dtype=str)) 
for e in edgelist:
    G.add_edge(e[0], e[1]) 

G0 = G.subgraph(sorted(nx.connected_components(G), key=len, reverse=True)[0])

# Random walk subgraph H
RW_trajectory = RW(G0, steps=100, return_history=True)
H = G.subgraph(nodes=RW_trajectory)
n = len(H.nodes())
m = len(H.edges())

p_mle = 2 * m / (n * (n - 1))

n_samples = 1000

statistics_list = ["avg. clustering coeff."]
stats_list_er = []

for i in trange(n_samples): 
    G_er = nx.erdos_renyi_graph(n, p_mle)
    if not nx.is_connected(G_er):
        G_er = G_er.subgraph(sorted(nx.connected_components(G_er), key=len, reverse=True)[0])
    stats_list_er.append(nx.average_clustering(G_er))

test_stat = nx.average_clustering(H)

plt.hist(stats_list_er, bins=30, alpha=0.7, label="ER Samples")
plt.axvline(test_stat, color='red', label=f"Observed: {test_stat:.3f}")
plt.legend()
plt.title("Exercise 4(i): ER Model Hypothesis Test")
plt.show()

print("P-value (ER Model):", p_value(test_stat, stats_list_er))


In [None]:
# ===== Exercise 4(ii): BA Hypothesis =====

m_ba = max(1, int(m / n))

stats_list_ba = []

for i in trange(n_samples): 
    G_ba = nx.barabasi_albert_graph(n, m_ba)
    if not nx.is_connected(G_ba):
        G_ba = G_ba.subgraph(sorted(nx.connected_components(G_ba), key=len, reverse=True)[0])
    stats_list_ba.append(nx.average_clustering(G_ba))

plt.hist(stats_list_ba, bins=30, alpha=0.7, label="BA Samples")
plt.axvline(test_stat, color='red', label=f"Observed: {test_stat:.3f}")
plt.legend()
plt.title("Exercise 4(ii): BA Model Hypothesis Test")
plt.show()

print("P-value (BA Model):", p_value(test_stat, stats_list_ba))


## Hypothesis testing: CM graph

In [None]:
from tqdm import trange

degrees = [3]*100
H = CM(d = degrees)
H00 = nx.Graph()
for e in G.edges():    
    H00.add_edge(e[0],e[1])
H = H00

n_samples = 1000

statistics_list = ["avg. deg.", "avg. clustering coeff.", "avg. path length", "max matching"]

stats_list0 = []
stats_list1 = [] 
stats_list2 = []
stats_list3 = []

degrees = [H.degree(v) for v in H.nodes()]

for i in trange(n_samples): 
    G = CM(d = degrees)
    
    # convert multigraph G into a simple graph G00 
    G00 = nx.Graph()
    for e in G.edges():    
        G00.add_edge(e[0],e[1])
    G0 = G00
    
    avg_deg = np.mean(np.asarray([G.degree(v) for v in G.nodes()]))
    stats_list0.append(avg_deg)
    stats_list1.append(nx.average_clustering(G00))
    if nx.is_connected(G):
        stats_list2.append(nx.average_shortest_path_length(G))
    else:
        stats_list2.append(0)
        
    stats_list3.append(len(nx.maximal_matching(G00)))  

stats_list_all = [stats_list0, stats_list1, stats_list2, stats_list3]

In [None]:
fig = plt.figure(figsize=[11,2], constrained_layout=False)

test_0 = np.mean(np.asarray(degrees))
test_1 = nx.average_clustering(H)
test_2 = nx.average_shortest_path_length(H)
test_3 = len(nx.maximal_matching(H))

outer_grid = gridspec.GridSpec(nrows=1, ncols=len(statistics_list), wspace=0.2, hspace=0.3)

for i in np.arange(len(statistics_list)): 
    if i == 0:
        test_0 = np.mean(np.asarray(degrees))
    elif i == 1:
        test_0 = nx.average_clustering(H)
    elif i == 2:
        test_0 = nx.average_shortest_path_length(H)
    elif i == 3:
        test_0 = len(nx.maximal_matching(H))
    
    inner_grid = outer_grid[0,i].subgridspec(1, 1, wspace=0.1, hspace=0.1)
    ax = fig.add_subplot(inner_grid[0, 0])
    p0 = p_value(test_0, stats_list_all[i])
    ax.hist(stats_list_all[i], bins='auto', alpha=0.7, label="CM", edgecolor="k", histtype='stepfilled', density=True)
    #ax.legend()
    ax.set_title(statistics_list[i], fontsize=8)
    ax.axvline(x=test_0, color='r', label=r"$p$-value={}".format(p0))
    ax.legend(fontsize=9)

plt.suptitle("Hypothesis testing for a CM graph", y=1.1)
plt.savefig("CM_hypothesis1", bbox_inches="tight")