In [None]:
#| output: false

!pip install --upgrade ts2vg

In [None]:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import numpy as np
import neurokit2 as nk
import yfinance as yf
import pandas as pd
import networkx as nx
import scienceplots
import math

from sklearn import preprocessing
from tqdm import tqdm
from ts2vg import NaturalVG, HorizontalVG
from scipy.spatial import distance

%matplotlib inline

In [None]:
plt.style.use(['science', 'notebook', 'grid']) # стиль, що використовуватиметься
                                               # для виведення рисунків

size = 22
params = {
    'figure.figsize': (8, 6),            # встановлюємо ширину та висоту рисунків за замовчуванням
    'font.size': size,                   # розмір фонтів рисунку
    'lines.linewidth': 2,                # товщина ліній
    'axes.titlesize': 'small',           # розмір титулки над рисунком
    'axes.labelsize': size,              # розмір підписів по осям
    'legend.fontsize': size,             # розмір легенди
    'xtick.labelsize': size,             # розмір розмітки по осі Ох
    'ytick.labelsize': size,             # розмір розмітки по осі Ох
    "font.family": "Serif",              # сімейство стилів підписів 
    "font.serif": ["Times New Roman"],   # стиль підпису
    'savefig.dpi': 500,                  # якість збережених зображень
    'axes.grid': False                   # побудова сітки на самому рисунку
}

plt.rcParams.update(params)              # оновлення стилю згідно налаштувань

In [None]:
symbol = '^GSPC'
start = '1980-01-01'                                 # символ індексу
end = '2025-08-22'                                   # кінцевий період
data = yf.download(symbol, start=start, end=end)     # вивантажуємо дані
time_ser = data['Close'].copy()                      # зберігаємо саме ціни закриття
date_in_num = mdates.date2num(time_ser.index)

symbol = 'S&P 500'
xlabel = 'time, days'    # підпис по вісі Ох 
ylabel = symbol          # підпис по вісі Оу

In [None]:
#| label: fig-dji-init 
#| fig-cap: "Динаміка щоденних значень індексу Біткоїна"

fig, ax = plt.subplots()                   # Створюємо порожній графік
ax.plot(time_ser.index, time_ser.values)   # Додаємо дані до графіка
ax.legend([symbol])                        # Додаємо легенду
ax.set_xlabel(xlabel)                      # Встановимо підпис по вісі Ох
ax.set_ylabel(ylabel)                      # Встановимо підпис по вісі Oy

plt.xticks(rotation=45)                    # оберт позначок по осі Ох на 45 градусів

plt.savefig(f'{symbol}.jpg')               # Зберігаємо графік 
plt.show();                                # Виводимо графік

In [None]:
def transformation(signal, ret_type):

    for_graph = signal.copy()

    if ret_type == 1:       # Зважаючи на вид ряду, виконуємо
                            # необхідні перетворення
        pass
    elif ret_type == 2:
        for_graph = for_graph.diff()
    elif ret_type == 3:
        for_graph = for_graph.pct_change()
    elif ret_type == 4:
        for_graph = for_graph.pct_change()
        for_graph -= for_graph.mean()
        for_graph /= for_graph.std()
    elif ret_type == 5: 
        for_graph = for_graph.pct_change()
        for_graph -= for_graph.mean()
        for_graph /= for_graph.std()
        for_graph = for_graph.abs()
    elif ret_type == 6:
        for_graph -= for_graph.mean()
        for_graph /= for_graph.std()

    for_graph = for_graph.dropna().values.squeeze()

    return for_graph

In [None]:
# ---- VAQ-VG helpers ----
def _rolling_quantile_transform(x: np.ndarray, window: int) -> np.ndarray:
    """rank/(W+1) per rolling window (expanding until window is filled)."""
    n = len(x)
    q = np.empty(n, dtype=float)
    for i in range(n):
        s = max(0, i - window + 1)
        block = x[s:i+1]
        # rank of last element within block
        r = np.argsort(np.argsort(block))[-1] + 1
        q[i] = r / (len(block) + 1.0)
    return q

def _rolling_mad_of_returns(x: np.ndarray, window: int) -> np.ndarray:
    """robust local volatility proxy (MAD of first differences)."""
    dx = np.diff(x, prepend=x[0])
    n = len(dx)
    out = np.empty(n, dtype=float)
    for i in range(n):
        s = max(0, i - window + 1)
        block = dx[s:i+1]
        med = np.median(block)
        out[i] = np.median(np.abs(block - med)) + 1e-12
    return out

def _visible_vaq(q: np.ndarray, sigma: np.ndarray, kappa: float, i: int, j: int) -> bool:
    qi, qj = q[i], q[j]
    ti, tj = i, j
    tau = kappa * math.sqrt(sigma[i]**2 + sigma[j]**2)
    denom = (tj - ti)
    for k in range(i+1, j):
        q_line = qj + (qi - qj) * (tj - k) / denom
        slack  = tau * (tj - k) / denom
        if q[k] >= q_line + slack:
            return False
    return True

def build_vaqvg(
    fragm_vals: np.ndarray,
    kappa: float = 0.25,
    directed: bool = False,
    orientation: str = "lr",   # "lr" (left→right), "rl" (right→left), "both", "sign"
) -> nx.Graph | nx.DiGraph:
    """
    Volatility-Adaptive Quantile Visibility Graph (VAQ-VG).

    directed=False  → undirected (classic behavior)
    directed=True   → DiGraph with orientation:
        "lr"   : add edges i->j for i<j when visible (left→right in time)
        "rl"   : add edges j->i for i<j (right→left)
        "both" : add both directions
        "sign" : direct towards rising endpoint: if x_j >= x_i add i->j else j->i
    """
    W = len(fragm_vals)
    q = _rolling_quantile_transform(fragm_vals, window=W)
    sigma = _rolling_mad_of_returns(fragm_vals, window=W)

    G = nx.DiGraph() if directed else nx.Graph()
    G.add_nodes_from(range(W))

    for i in range(W-1):
        for j in range(i+1, W):
            if _visible_vaq(q, sigma, kappa, i, j):
                if not directed:
                    G.add_edge(i, j)
                else:
                    if orientation == "lr":
                        G.add_edge(i, j)
                    elif orientation == "rl":
                        G.add_edge(j, i)
                    elif orientation == "both":
                        G.add_edge(i, j); G.add_edge(j, i)
                    elif orientation == "sign":
                        if fragm_vals[j] >= fragm_vals[i]:
                            G.add_edge(i, j)
                        else:
                            G.add_edge(j, i)
                    else:
                        raise ValueError(f"Unknown orientation: {orientation}")
    return G


def annotate_edge_angles(nxg: nx.Graph, fragm_vals: np.ndarray) -> None:
    """If graph came from ts2vg, annotate angles for VAE."""
    for u, v in nxg.edges():
        theta = math.atan2((fragm_vals[v] - fragm_vals[u]), (v - u))
        nxg[u][v]["theta"] = float(theta)

In [None]:
def visibility_angle_entropy(nxg: nx.Graph, bins: int = 24, normalize=True) -> float:
    
    thetas = []
    for u, v, d in nxg.edges(data=True):
        if "theta" in d:
            thetas.append(d["theta"])
        else:
            theta = math.atan2((v - u), (v - u))  # fallback: ~45deg
            thetas.append(theta)
    if len(thetas) == 0:
        return 0.0
    hist, _ = np.histogram(thetas, bins=bins, range=(-math.pi/2, math.pi/2))
    p = hist.astype(float) / max(1, hist.sum())
    p = p[p > 0]
    H = float(-(p * np.log(p)).sum())

    if not normalize:
        return H
    
    # Normalize to [0,1] by dividing by log(#non-empty bins)
    return H / math.log(len(p)) if len(p) > 1 else 0.0

def square_motif_density(nxg: nx.Graph) -> float:
    """Normalized 4-cycle count via common neighbors (fast, counts all C4)."""
    n = nx.number_of_nodes(nxg)
    if n < 4: return 0.0
    A  = nx.to_numpy_array(nxg, dtype=np.int8, weight=None)
    A2 = A @ A  # common neighbors between pairs
    c4 = 0
    N  = A.shape[0]
    for i in range(N):
        for j in range(i+1, N):
            cn = int(A2[i, j])
            if cn >= 2:
                c4 += math.comb(cn, 2)
    c4 //= 2  # opposite pairs double-count
    denom = math.comb(n, 4)
    return float(c4) / denom if denom > 0 else 0.0

In [None]:
signal = time_ser.copy()
ret_type = 1            # вид ряду: 1 - вихідний, 
                        # 2 - детрендований (різниця між теп. значенням та попереднім)
                        # 3 - прибутковості звичайні, 
                        # 4 - стандартизовані прибутковості, 
                        # 5 - абсолютні значення (волатильності)
                        # 6 - стандартизований ряд

for_graph = transformation(signal, ret_type) # перетворення сигналу

# гіперпараметри для нових режимів
kappa = 0.25           # VAQ-VG slack

window = 75            # розмір вікна
tstep = 1              # крок вікна
graph_type = 'classic'   # тип графу: classic, horizontal, vaqvg

length = len(time_ser)

comp_idx = pd.DataFrame(data=time_ser.values[window:length:tstep], index=time_ser.index[window:length:tstep], columns=[symbol])

In [None]:
index_begin = 1700
index_end = 2200

date = date_in_num[index_begin:index_end]

if graph_type == 'classic':
    g = NaturalVG(directed=None).build(for_graph[index_begin:index_end], xs=date)
    pos1 = g.node_positions()
    nxg = g.as_networkx()
if graph_type == 'horizontal':
    g = HorizontalVG(directed=None).build(for_graph[index_begin:index_end], xs=date)
    pos1 = g.node_positions()
    nxg = g.as_networkx()
if graph_type == 'vaqvg':
    g = build_vaqvg(for_graph[index_begin:index_end], kappa=kappa)
    pos1 = g.node_positions()
    nxg = g.as_networkx()    

graph_plot_options = {
    'with_labels': False,
    'node_size': 0,
    'node_color': [(0, 0, 0, 1)],
    'edge_color': [(0, 0, 0, 0.15)],
}

In [None]:
fig, ax = plt.subplots(1, 1)

nx.draw_networkx(nxg, ax=ax, pos=pos1, **graph_plot_options)
ax.tick_params(bottom=True, labelbottom=True)
ax.plot(time_ser.index[index_begin:index_end], for_graph[index_begin:index_end], label=fr"{ylabel}")
ax.set_xlabel(xlabel)
ax.set_ylabel(f"Visibility connection for {ylabel}")
ax.legend(loc='upper right')
ax.tick_params(axis='x', labelrotation=45)

plt.savefig(f"Time_ser_connections_symbol={symbol}_idx_beg={index_begin}_\
            idx_end={index_end}_sertype={ret_type}_network_type={graph_type}.jpg", bbox_inches="tight", dpi=1000)

plt.show()

In [None]:
fig, ax = plt.subplots(1, 2, figsize=(15, 8))

nx.draw_networkx(nxg, ax=ax[0], pos=pos1, **graph_plot_options)
ax[0].tick_params(bottom=True, labelbottom=True)
ax[0].plot(time_ser.index[index_begin:index_end], for_graph[index_begin:index_end], label=fr"{ylabel}")
ax[0].set_title(f'Visibility connections for {ylabel}', pad=10)
ax[0].set_xlabel(xlabel)
ax[0].set_ylabel(f"{ylabel}")
ax[0].legend(loc='upper right')
ax[0].tick_params(axis='x', labelrotation=45)


ax[1].set_title(f'Graph representation for {symbol}', pad=10)

# визначаємо позицію вузлів на графі
pos2 = nx.spring_layout(nxg, k=0.15, iterations=100)

# розраховуємо ступеневу центральність
degCent = nx.degree_centrality(nxg)

# створити список розмірів вершин на основі ступеневої центральності
node_sizes = [v*100 for v in degCent.values()]

# кольори вузлів на основі їх ступеневої центральності
node_colors = [v for v in degCent.values()]

# будуємо граф
nx.draw_networkx(nxg, ax=ax[1], pos=pos2,
                node_size=node_sizes,  
                node_color=node_colors,
                with_labels=False,
                cmap=plt.get_cmap('plasma'))

# присвоюємо мінімальне та максимальне значення 
# ступеневої центральності для побудови теплової шкали
vmin = np.asarray(list(degCent.values())).min()
vmax = np.asarray(list(degCent.values())).max()

sm = plt.cm.ScalarMappable(cmap=plt.get_cmap('plasma'), 
                           norm=plt.Normalize(vmin=vmin, vmax=vmax))
cb = plt.colorbar(sm, ax=ax[1])
cb.set_label('Degree centrality')

plt.savefig(f"Time_ser_connections_symbol={symbol}_idx_beg={index_begin}_\
            idx_end={index_end}_sertype={ret_type}_network_type={graph_type}.jpg", bbox_inches="tight", dpi=1000)

plt.show()

Як ми можемо бачити з представленого рисунку, три послідовних зростання та спадання ціни BTC у 2021-2022 роках характеризуються доволі високим ступенем видимості в передкризовий період. Також дані піки утворюють орієнтовно 3 кластери із високою ступеневою центральністю. Крахові події на криптовалютному ринку можна розглядати як графи переважного приєднання, де, можливо, ключову роль у цих підйомах та спадах можуть відігравати один або декілька "китів" ринку, котрі чинять найбільший вплив на ринок і спрямовують вектор уваги всіх трейдерів у тому чи іншому напрямі.  

### Віконна процедура

Далі будемо спостерігати за тим, як змінюються властивості мережі з плином часу. Для цього використаємо добре знайому нам процедуру рухомого вікна. У рамках цієї процедури дослідимо графодинаміку як спектральних, так і топологічних показників. 

Для побудови парної динаміки конкретного індикатора та досліджуваного ряду визначимо функцію `plot_pair`:

In [None]:
def plot_pair(x_values, 
              y1_values,
              y2_values,  
              y1_label, 
              y2_label,
              x_label, 
              file_name, clr="magenta"):

    fig, ax = plt.subplots()

    ax2 = ax.twinx()
    ax2.spines.right.set_position(("axes", 1.03))

    p1, = ax.plot(x_values, 
                  y1_values, 
                  "b-", label=fr"{y1_label}")
    p2, = ax2.plot(x_values,
                   y2_values, 
                   color=clr, 
                   label=y2_label)

    ax.set_xlabel(x_label)
    ax.set_ylabel(f"{y1_label}")
    ax.yaxis.label.set_color(p1.get_color())
    ax2.yaxis.label.set_color(p2.get_color())

    tkw = dict(size=2, width=1.5)

    ax.tick_params(axis='x', rotation=35, **tkw)
    ax.tick_params(axis='y', colors=p1.get_color(), **tkw)
    ax2.tick_params(axis='y', colors=p2.get_color(), **tkw)
    ax2.legend(handles=[p1, p2])

    plt.savefig(file_name + ".jpg")
        
    plt.show();

In [None]:
AlgebraicCon = []
GraphEnergy = []
SpecMoment_3 = []
SpecRadius = []
SpecGap = []
NaturalConnectivity = []
GraphCompIdx = []

In [None]:
for i in tqdm(range(0,length-window,tstep)):
    # відбираємо фрагмент
    fragm = time_ser.iloc[i:i+window].copy()  

    # виконуємо процедуру трансформації ряду 
    fragm = transformation(fragm, ret_type)
    
    if graph_type == 'classic':
        g = NaturalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()
        annotate_edge_angles(nxg, fragm)

    elif graph_type == 'horizontal':
        g = HorizontalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()
        annotate_edge_angles(nxg, fragm)

    elif graph_type == 'vaqvg':
        nxg = build_vaqvg(fragm, kappa=kappa)   # наш VAQ-VG
        annotate_edge_angles(nxg, fragm)

    else:
        raise ValueError(f"Unknown graph_type: {graph_type}")
    
    # спектр власних значень матриці суміжності
    adj_spectrum = nx.adjacency_spectrum(nxg).real

    # сортуємо власні значення в порядку зростання
    sorted_adj_spectrum = np.sort(adj_spectrum)
    
    # розраховуємо алгебраїчну зв'язність
    alg_con = nx.algebraic_connectivity(nxg, normalized=True, method='tracemin_lu') 

    # розраховуємо енергію графа
    graph_en = np.sum(np.abs(adj_spectrum))

    # розраховуємо спектральний розрив
    spec_gap = sorted_adj_spectrum[-1] - sorted_adj_spectrum[-2]

    # розраховуємо спектральний радіус
    spec_rad = np.max(np.abs(adj_spectrum))

    # розраховуємо спектральний момент
    spec_mom_3 = np.mean(adj_spectrum ** 3)

    # розраховуємо природню зв'язність
    nat_con = np.log(np.mean(np.exp(adj_spectrum)))

    # індекс складності графа 
    largest_eigenvalue = float(max(adj_spectrum))
    n = nx.number_of_nodes(nxg)
    c = (largest_eigenvalue - 2*np.cos(np.pi/(n+1)))/(n - 1 - 2*np.cos(np.pi/(n+1)))
    gic = 4*c*(1-c)

    
    AlgebraicCon.append(alg_con)
    GraphEnergy.append(graph_en)
    SpecRadius.append(spec_rad)
    SpecGap.append(spec_gap)
    SpecMoment_3.append(spec_mom_3)
    NaturalConnectivity.append(nat_con)
    GraphCompIdx.append(gic)

In [None]:
ind_names = ['algebraic_conn', 'graph_energy', 'spectral_radius', 
             'spectral_grap', 'spectral_moment_3', 'natural_connectivity', 'graph_complexity_index']

indicators = [AlgebraicCon, GraphEnergy, SpecRadius, 
              SpecGap, SpecMoment_3, NaturalConnectivity, GraphCompIdx]

measure_labels = [r'$\lambda_2$', r'$E$', r'$R$', r'$\delta$', r'$m_3$', r'$N_c$', r'$GIC$']

file_names = []

for i in range(len(ind_names)):
    name = f"{ind_names[i]}_symbol={symbol}_wind={window}_step={tstep}_seriestype={ret_type}_graph_type={graph_type}"
    comp_idx[f"{ind_names[i]}_{window}"] = indicators[i]
    np.savetxt(name + ".txt", indicators[i])
    file_names.append(name)

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[0], 
          ylabel, 
          measure_labels[0],
          xlabel,
          file_names[0],
          clr="magenta")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[1], 
          ylabel, 
          measure_labels[1],
          xlabel,
          file_names[1],
          clr="crimson")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[2], 
          ylabel, 
          measure_labels[2],
          xlabel,
          file_names[2],
          clr="orange")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[3], 
          ylabel, 
          measure_labels[3],
          xlabel,
          file_names[3],
          clr="darkgreen")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[4], 
          ylabel, 
          measure_labels[4],
          xlabel,
          file_names[4],
          clr="chocolate")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[5], 
          ylabel, 
          measure_labels[5],
          xlabel,
          file_names[5],
          clr="black")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[6], 
          ylabel, 
          measure_labels[6],
          xlabel,
          file_names[6],
          clr="crimson")

In [None]:
DegreeMax = []
GlobalEigenvectorCentrality = []
GlobalClosenessCentrality = []
GlobalInformationCentrality = []
GlobalBetweennessCentrality = []
GlobalHarmonicCentrality = []

In [None]:
for i in tqdm(range(0,length-window,tstep)):
    # відбираємо фрагмент
    fragm = time_ser.iloc[i:i+window].copy()  

    # виконуємо процедуру трансформації ряду 
    fragm = transformation(fragm, ret_type)
    
    if graph_type == 'classic':
        g = NaturalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()
        annotate_edge_angles(nxg, fragm)

    elif graph_type == 'horizontal':
        g = HorizontalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()
        annotate_edge_angles(nxg, fragm)

    elif graph_type == 'vaqvg':
        nxg = build_vaqvg(fragm, kappa=kappa)   # наш VAQ-VG
        annotate_edge_angles(nxg, fragm)

    else:
        raise ValueError(f"Unknown graph_type: {graph_type}")
    
    # максимальний ступінь вершини
    deg_max = max(dict(nxg.degree()).values())

    # середній ступінь впливовості
    glob_eigenvector_centrality = np.mean(list(nx.eigenvector_centrality_numpy(nxg).values()))

    # середній ступінь близькості
    glob_closeness_centrality = np.mean(list(nx.closeness_centrality(nxg).values()))

    # середній ступінь інформаційності
    glob_information_centrality = np.mean(list(nx.information_centrality(nxg).values()))

    # максимальний ступінь посередництва
    glob_betweenness_centrality = np.max(list(nx.betweenness_centrality(nxg).values()))

    # середній ступінь гармонійності
    glob_harm_centrality = np.mean(list(nx.harmonic_centrality(nxg).values()))

    DegreeMax.append(deg_max)
    GlobalEigenvectorCentrality.append(glob_eigenvector_centrality)
    GlobalClosenessCentrality.append(glob_closeness_centrality)
    GlobalInformationCentrality.append(glob_information_centrality)
    GlobalBetweennessCentrality.append(glob_betweenness_centrality)
    GlobalHarmonicCentrality.append(glob_harm_centrality)

In [None]:
ind_names = ['DegreeMax', 'GlobalEigenvectorCentrality', 'GlobalClosenessCentrality', 
             'GlobalInformationCentrality', 'GlobalBetweennessCentrality', 'GlobalHarmonicCentrality']

indicators = [DegreeMax, GlobalEigenvectorCentrality, GlobalClosenessCentrality, 
              GlobalInformationCentrality, GlobalBetweennessCentrality, GlobalHarmonicCentrality]

measure_labels = [r'$D_{max}$', r'$X$', r'$C$', r'$I$', r'$B$', r'$GHc$']

file_names = []

for i in range(len(ind_names)):
    name = f"{ind_names[i]}_symbol={symbol}_wind={window}_step={tstep}_seriestype={ret_type}_graph_type={graph_type}"
    comp_idx[f"{ind_names[i]}_{window}"] = indicators[i]
    np.savetxt(name + ".txt", indicators[i])
    file_names.append(name)

In [None]:
#| label: fig-deg  
#| fig-cap: "Динаміка індексу BTC та максимального ступеня вершини"

plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[0], 
          ylabel, 
          measure_labels[0],
          xlabel,
          file_names[0],
          clr="magenta")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[1], 
          ylabel, 
          measure_labels[1],
          xlabel,
          file_names[1],
          clr="crimson")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[2], 
          ylabel, 
          measure_labels[2],
          xlabel,
          file_names[2],
          clr="orange")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[3], 
          ylabel, 
          measure_labels[3],
          xlabel,
          file_names[3],
          clr="darkgreen")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[4], 
          ylabel, 
          measure_labels[4],
          xlabel,
          file_names[4],
          clr="chocolate")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[5], 
          ylabel, 
          measure_labels[5],
          xlabel,
          file_names[5],
          clr="black")

In [None]:
Assortativity = []
AvgDegreeConnectivity = []

In [None]:
for i in tqdm(range(0,length-window,tstep)):
    # відбираємо фрагмент
    fragm = time_ser.iloc[i:i+window].copy()  

    # виконуємо процедуру трансформації ряду 
    fragm = transformation(fragm, ret_type)

    if graph_type == 'classic':
        g = NaturalVG(directed='left_to_right').build(fragm)
        pos = g.node_positions()
        nxg_dir = g.as_networkx()
        annotate_edge_angles(nxg_dir, fragm)

    elif graph_type == 'horizontal':
        g = HorizontalVG(directed='left_to_right').build(fragm)
        pos = g.node_positions()
        nxg_dir = g.as_networkx()
        annotate_edge_angles(nxg, fragm)

    elif graph_type == 'vaqvg':
        nxg_dir = build_vaqvg(fragm, kappa=0.25, directed=True, orientation="lr")   # наш VAQ-VG
        annotate_edge_angles(nxg_dir, fragm)

    else:
        raise ValueError(f"Unknown graph_type: {graph_type}")
    
    # розрахунок асортативності
    assort = nx.degree_pearson_correlation_coefficient(nxg_dir)

    # середня степенева зв'язність
    avg_deg_con = np.mean(list(nx.average_degree_connectivity(nxg_dir, source="in", target="in").values()))

    Assortativity.append(assort)
    AvgDegreeConnectivity.append(avg_deg_con)

In [None]:
ind_names = ['Assortativity', 'AvgDegreeConnectivity']

indicators = [Assortativity, AvgDegreeConnectivity]

measure_labels = [r'$r$', r'$\langle d_{nn}^{w} \rangle$']

file_names = []

for i in range(len(ind_names)):
    name = f"{ind_names[i]}_symbol={symbol}_wind={window}_step={tstep}_seriestype={ret_type}_graph_type={graph_type}"
    comp_idx[f"{ind_names[i]}_{window}"] = indicators[i]
    np.savetxt(name + ".txt", indicators[i])
    file_names.append(name)

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[1], 
          ylabel, 
          measure_labels[1],
          xlabel,
          file_names[1],
          clr="darkorange")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[0], 
          ylabel, 
          measure_labels[0],
          xlabel,
          file_names[0],
          clr="darkgreen")

In [None]:
Transitivity = []
AvgClustering = []

In [None]:
for i in tqdm(range(0,length-window,tstep)):
    # відбираємо фрагмент
    fragm = time_ser.iloc[i:i+window].copy()  

    # виконуємо процедуру трансформації ряду 
    fragm = transformation(fragm, ret_type)
    
    if graph_type == 'classic':
        g = NaturalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()

    elif graph_type == 'horizontal':
        g = HorizontalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()

    elif graph_type == 'vaqvg':
        nxg = build_vaqvg(fragm, kappa=kappa)   # наш VAQ-VG

    else:
        raise ValueError(f"Unknown graph_type: {graph_type}")

    # транзитивність
    trans = nx.transitivity(nxg)

    # глобальний коефіцієнт кластеризації
    avg_clust = nx.average_clustering(nxg)
    
    Transitivity.append(trans)
    AvgClustering.append(avg_clust)

In [None]:
ind_names = ['AvgClustering', 'Transitivity']

indicators = [AvgClustering, Transitivity]

measure_labels = [r'$\langle C_3 \rangle$', r'$T$']

file_names = []

for i in range(len(ind_names)):
    name = f"{ind_names[i]}_symbol={symbol}_wind={window}_step={tstep}_seriestype={ret_type}_graph_type={graph_type}"
    comp_idx[f"{ind_names[i]}_{window}"] = indicators[i]
    np.savetxt(name + ".txt", indicators[i])
    file_names.append(name)

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[0], 
          ylabel, 
          measure_labels[0],
          xlabel,
          file_names[0],
          clr="magenta")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[1], 
          ylabel, 
          measure_labels[1],
          xlabel,
          file_names[1],
          clr="crimson")

In [None]:
Density = []

In [None]:
for i in tqdm(range(0,length-window,tstep)):
    # відбираємо фрагмент
    fragm = time_ser.iloc[i:i+window].copy()  

    # виконуємо процедуру трансформації ряду 
    fragm = transformation(fragm, ret_type)
    
    if graph_type == 'classic':
        g = NaturalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()

    elif graph_type == 'horizontal':
        g = HorizontalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()

    elif graph_type == 'vaqvg':
        nxg = build_vaqvg(fragm, kappa=kappa)   # наш VAQ-VG

    else:
        raise ValueError(f"Unknown graph_type: {graph_type}")
    
    # розрахунок щільності
    dens = nx.density(nxg)

    Density.append(dens)

In [None]:
ind_names = ['Density']

indicators = [Density]

measure_labels = [r'$\rho$']

file_names = []

for i in range(len(ind_names)):
    name = f"{ind_names[i]}_symbol={symbol}_wind={window}_step={tstep}_seriestype={ret_type}_graph_type={graph_type}"
    comp_idx[f"{ind_names[i]}_{window}"] = indicators[i]
    np.savetxt(name + ".txt", indicators[i])
    file_names.append(name)

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[0], 
          ylabel, 
          measure_labels[0],
          xlabel,
          file_names[0],
          clr="black")

In [None]:
Diameter = []
Radius = []

In [None]:
for i in tqdm(range(0,length-window,tstep)):
    # відбираємо фрагмент
    fragm = time_ser.iloc[i:i+window].copy()  

    # виконуємо процедуру трансформації ряду 
    fragm = transformation(fragm, ret_type)
    
    if graph_type == 'classic':
        g = NaturalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()

    elif graph_type == 'horizontal':
        g = HorizontalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()

    elif graph_type == 'vaqvg':
        nxg = build_vaqvg(fragm, kappa=kappa)   # наш VAQ-VG

    else:
        raise ValueError(f"Unknown graph_type: {graph_type}")
    
    # розрахунок діаметра
    diameter = nx.diameter(nxg)
    
    # розрахунок радіуса
    rad = nx.radius(nxg)
    
    Diameter.append(diameter)
    Radius.append(rad)

In [None]:
ind_names = ['Diameter', 'Radius']

indicators = [Diameter, Radius]

measure_labels = [r'$diam$', r'rad']

file_names = []

for i in range(len(ind_names)):
    name = f"{ind_names[i]}_symbol={symbol}_wind={window}_step={tstep}_seriestype={ret_type}_graph_type={graph_type}"
    comp_idx[f"{ind_names[i]}_{window}"] = indicators[i]
    np.savetxt(name + ".txt", indicators[i])
    file_names.append(name)

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[0], 
          ylabel, 
          measure_labels[0],
          xlabel,
          file_names[0],
          clr="magenta")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[1], 
          ylabel, 
          measure_labels[1],
          xlabel,
          file_names[1],
          clr="crimson")

In [None]:
GlobalEfficiency = []

In [None]:
for i in tqdm(range(0,length-window,tstep)):
    # відбираємо фрагмент
    fragm = time_ser.iloc[i:i+window].copy()  

    # виконуємо процедуру трансформації ряду 
    fragm = transformation(fragm, ret_type)
    
    if graph_type == 'classic':
        g = NaturalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()

    elif graph_type == 'horizontal':
        g = HorizontalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()

    elif graph_type == 'vaqvg':
        nxg = build_vaqvg(fragm, kappa=kappa)   # наш VAQ-VG

    else:
        raise ValueError(f"Unknown graph_type: {graph_type}")

    # розрахунок глобальної ефективності
    glob_eff = nx.global_efficiency(nxg)
    
    #LocalEfficiency.append(local_eff)
    GlobalEfficiency.append(glob_eff)

In [None]:
ind_names = ['GlobalEfficiency']

indicators = [GlobalEfficiency]

measure_labels = [r'$E_{glob}$']

file_names = []

for i in range(len(ind_names)):
    name = f"{ind_names[i]}_symbol={symbol}_wind={window}_step={tstep}_seriestype={ret_type}_graph_type={graph_type}"
    comp_idx[f"{ind_names[i]}_{window}"] = indicators[i]
    np.savetxt(name + ".txt", indicators[i])
    file_names.append(name)

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[0], 
          ylabel, 
          measure_labels[0],
          xlabel,
          file_names[0],
          clr="indigo")

In [None]:
AvgPathLength = []

In [None]:
for i in tqdm(range(0,length-window,tstep)):
    # відбираємо фрагмент
    fragm = time_ser.iloc[i:i+window].copy()  

    # виконуємо процедуру трансформації ряду 
    fragm = transformation(fragm, ret_type)
    
    if graph_type == 'classic':
        g = NaturalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()

    elif graph_type == 'horizontal':
        g = HorizontalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()

    elif graph_type == 'vaqvg':
        nxg = build_vaqvg(fragm, kappa=kappa)   # наш VAQ-VG

    else:
        raise ValueError(f"Unknown graph_type: {graph_type}")
    
    # розрахунок середньої довжини найкоротшого шляху
    avg_path_len = nx.average_shortest_path_length(nxg)
   
    AvgPathLength.append(avg_path_len)

In [None]:
ind_names = ['AvgPathLength']

indicators = [AvgPathLength]

measure_labels = [r'$ApLen$']

file_names = []

for i in range(len(ind_names)):
    name = f"{ind_names[i]}_symbol={symbol}_wind={window}_step={tstep}_seriestype={ret_type}_graph_type={graph_type}"
    comp_idx[f"{ind_names[i]}_{window}"] = indicators[i]
    np.savetxt(name + ".txt", indicators[i])
    file_names.append(name)

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[0], 
          ylabel, 
          measure_labels[0],
          xlabel,
          file_names[0],
          clr="deeppink")

In [None]:
def small_world(G, random_seed=0):
    num_graphs = 10
    counter = 0
    V = nx.number_of_nodes(G)
    E = nx.number_of_edges(G)
    C_rand = 0
    L_rand = 0
    
    while counter < num_graphs:
        erdos_renyi_graph = nx.gnm_random_graph(V, E, random_seed)
        if not nx.is_connected(erdos_renyi_graph):
            c, l, num_subgraphs = 0, 0, 0
            for connected_component_nodes in nx.connected_components(erdos_renyi_graph):
                subgraph = erdos_renyi_graph.subgraph(connected_component_nodes)
                num_subgraphs += 1
                c += nx.average_clustering(subgraph)
                l += nx.average_shortest_path_length(subgraph)
            c /= num_subgraphs
            l /= num_subgraphs
        else:
            c, l = nx.average_clustering(erdos_renyi_graph), nx.average_shortest_path_length(erdos_renyi_graph)
        C_rand += c
        L_rand += l
        counter += 1
        
    C_g, L_g = nx.average_clustering(G), nx.average_shortest_path_length(G)
    C_rand /= num_graphs
    L_rand /= num_graphs
    return (C_g/C_rand)/(L_g/L_rand)

In [None]:
Small_Worldness = []

In [None]:
for i in tqdm(range(0,length-window,tstep)):
    # відбираємо фрагмент
    fragm = time_ser.iloc[i:i+window].copy()  

    # виконуємо процедуру трансформації ряду 
    fragm = transformation(fragm, ret_type)
    
    if graph_type == 'classic':
        g = NaturalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()

    elif graph_type == 'horizontal':
        g = HorizontalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()

    elif graph_type == 'vaqvg':
        nxg = build_vaqvg(fragm, kappa=kappa)   # наш VAQ-VG

    else:
        raise ValueError(f"Unknown graph_type: {graph_type}")
    
    # розрахунок малосвітовості для графа видимості
    sw = small_world(nxg)
   
    Small_Worldness.append(sw)

In [None]:
ind_names = ['Small-Worldness']

indicators = [Small_Worldness]

measure_labels = [r'$S$']

file_names = []

for i in range(len(ind_names)):
    name = f"{ind_names[i]}_symbol={symbol}_wind={window}_step={tstep}_seriestype={ret_type}_graph_type={graph_type}"
    comp_idx[f"{ind_names[i]}_{window}"] = indicators[i]
    np.savetxt(name + ".txt", indicators[i])
    file_names.append(name)

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[0], 
          ylabel, 
          measure_labels[0],
          xlabel,
          file_names[0],
          clr="tomato")

In [None]:
def tsp_approx(G):
    T = nx.minimum_spanning_tree(G)
    dfs = nx.dfs_preorder_nodes(T, 0)
    node_list = []
    for item in dfs:
        node_list.append(item)
    node_list.append(0)
    path = [0]
    for i in range(len(node_list) - 1):
        path.pop()
        path += nx.dijkstra_path(G, node_list[i], node_list[i + 1])
    total_cost = len(path)
    return total_cost

In [None]:
Travelling_Prob = []

In [None]:
for i in tqdm(range(0,length-window,tstep)):
    # відбираємо фрагмент
    fragm = time_ser.iloc[i:i+window].copy()  

    # виконуємо процедуру трансформації ряду 
    fragm = transformation(fragm, ret_type)
    
    if graph_type == 'classic':
        g = NaturalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()

    elif graph_type == 'horizontal':
        g = HorizontalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()

    elif graph_type == 'vaqvg':
        nxg = build_vaqvg(fragm, kappa=kappa)   # наш VAQ-VG

    else:
        raise ValueError(f"Unknown graph_type: {graph_type}")
    
    # розрахунок вартості задачі комівояжера
    tsp = tsp_approx(nxg)
   
    Travelling_Prob.append(tsp)

In [None]:
ind_names = ['Travelling-Problem']

indicators = [Travelling_Prob]

measure_labels = [r'$TSP$']

file_names = []

for i in range(len(ind_names)):
    name = f"{ind_names[i]}_symbol={symbol}_wind={window}_step={tstep}_seriestype={ret_type}_graph_type={graph_type}"
    comp_idx[f"{ind_names[i]}_{window}"] = indicators[i]
    np.savetxt(name + ".txt", indicators[i])
    file_names.append(name)

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[0], 
          ylabel, 
          measure_labels[0],
          xlabel,
          file_names[0],
          clr="darkorange")

In [None]:
VisibilityAngleEntropy = []
SquareMotifDensity     = []

In [None]:
for i in tqdm(range(0,length-window,tstep)):
    # відбираємо фрагмент
    fragm = time_ser.iloc[i:i+window].copy()  
    
    # виконуємо процедуру трансформації ряду 
    fragm = transformation(fragm, ret_type)
    
    if graph_type == 'classic':
        g = NaturalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()
        annotate_edge_angles(nxg, fragm)

    elif graph_type == 'horizontal':
        g = HorizontalVG(directed=None).build(fragm)
        pos = g.node_positions()
        nxg = g.as_networkx()
        annotate_edge_angles(nxg, fragm)

    elif graph_type == 'vaqvg':
        nxg = build_vaqvg(fragm, kappa=kappa)   # наш VAQ-VG
        annotate_edge_angles(nxg, fragm)

    else:
        raise ValueError(f"Unknown graph_type: {graph_type}")
    
    vae = visibility_angle_entropy(nxg, bins=25, normalize=True)
    smd = square_motif_density(nxg)

    
    VisibilityAngleEntropy.append(vae)
    SquareMotifDensity.append(smd)

In [None]:
ind_names = ['vis_angle_ent', 'square_motif_density']

indicators = [VisibilityAngleEntropy, SquareMotifDensity]

measure_labels = [r'$VAE$', r'$SMD$']

file_names = []

for i in range(len(ind_names)):
    name = f"{ind_names[i]}_symbol={symbol}_wind={window}_step={tstep}_seriestype={ret_type}_graph_type={graph_type}"
    comp_idx[f"{ind_names[i]}_{window}"] = indicators[i]
    np.savetxt(name + ".txt", indicators[i])
    file_names.append(name)

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[0], 
          ylabel, 
          measure_labels[0],
          xlabel,
          file_names[0],
          clr="red")

In [None]:
plot_pair(time_ser.index[window:length:tstep],
          time_ser.values[window:length:tstep],
          indicators[1], 
          ylabel, 
          measure_labels[1],
          xlabel,
          file_names[1],
          clr="black")

In [None]:
comp_idx.to_csv(f'{symbol}_VisibilityGraph_{graph_type}_{window}_{tstep}.csv', index_label='Date')