2D-TokamakDisruption-Simulator    by  J. Kawamura (ITER)

This simulator is a toymodel to test disruption detection tools in tokamaks with circular toroidal geometry. The first module simulates the magnetic field instability (Kink Instability). The second module simulates the central anomaly in the plasma density (values ​​greater than 0.7 cause a void in the central density that results in a strong temperature gradient) as a result of the Kink instability. The third module presents the evolution of a temperature anomaly characterizing the disruption that can compromise the stability of the fusion process in a simple tokamak. Techniques that show good performance in detecting and predicting these anomalies can be applied in more realistic simulations using the JOREK package.


---



Module 01 (V2-Nov24) - Kink Instability

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import imageio.v2 as imageio
import os

# 2D-Domain
nx, ny = 64, 64
x = np.linspace(-1.0, 1.0, nx)
y = np.linspace(-1.0, 1.0, ny)
X, Y = np.meshgrid(x, y, indexing='ij')
R = np.sqrt(X**2 + Y**2)
Theta = np.arctan2(Y, X)

# Initial Magnetic Field: B_theta ~ pinch, Bz ~ uniforme
Bz = np.ones_like(R) * 1.0
Btheta0 = 1 / (1 + 10 * R**2)  # centred profile

# Helicoidal Perturbation m=1
kink_amp = 0.1
perturb = kink_amp * np.cos(Theta) * np.exp(-10 * R**2)

# Artificial Dissipation
timesteps = 60
gamma = 0.05  # Dissipation Coefficient
frames = []

if not os.path.exists("frames"): os.makedirs("frames")

for t in range(timesteps):
    phase = 2 * np.pi * t / timesteps
    kink = perturb * np.sin(phase)

    # B_theta with temporary perturbation and imposed dissipation
    Btheta = Btheta0 * np.exp(-gamma * t) + kink

    # Cartesian Magnetic Components
    Bx = -Btheta * np.sin(Theta) + 0.05 * Bz * np.cos(Theta)
    By =  Btheta * np.cos(Theta) + 0.05 * Bz * np.sin(Theta)

    # Visualization
    plt.figure(figsize=(6, 6))
    plt.streamplot(x, y, Bx.T, By.T, color=np.sqrt(Bx**2 + By**2).T, cmap='inferno', density=1.5)
    plt.title(f'Plasma Instability (1,1) from Reconnexion \nFrame {t}')
    plt.axis('equal')
    plt.axis('off')
    filename = f"frames/frame_{t:03d}.png"
    plt.savefig(filename, bbox_inches='tight')
    plt.close()
    frames.append(imageio.imread(filename))

# Saving GIF
imageio.mimsave("kink_rec_instability.gif", frames, duration=0.1)

# Cleaning temporary images (optional)
for f in os.listdir("frames"):
    os.remove(os.path.join("frames", f))
os.rmdir("frames")

print("GIF save as kink_rec_instability.gif")

In [None]:
from IPython.display import Image, display

# Caminho do GIF salvo
gif_path = "kink_rec_instability.gif"

# Exibir no notebook
display(Image(filename=gif_path))

Module 02 (V2-Nov24) - Plasma Density Central Anomaly

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import imageio
import os

# Parameters
nx, nr = 64, 64
x = np.linspace(-1, 1, nx)
r = np.linspace(-1, 1, nr)
X, Y = np.meshgrid(x, r, indexing='ij')
R = np.sqrt(X**2 + Y**2)
dt, nt = 1e-3, 60  # shorter simulation for demo

# Initial plasma density profile (Gaussian centered)
density0 = 1.0 * np.exp(-5 * R**2)

# Prepare for saving frames
frames_dir = "frames_disruption"
os.makedirs(frames_dir, exist_ok=True)
filenames = []

# Temporal loop simulating a disruption (growing anomaly at center)
for it in range(nt):
    time_factor = it / nt
    anomaly = 0.75 * np.exp(-50 * R**2) * time_factor  # grows in center
    density = density0 * (1 - anomaly)

    # Save color plot of density
    fig, ax = plt.subplots(figsize=(6, 5))
    im = ax.imshow(density.T, origin='lower', extent=[x.min(), x.max(), r.min(), r.max()],
                   cmap='RdBu_r', aspect='equal', vmin=0, vmax=1.0)
    ax.set_title(f'Plasma Density at t={it}')
    ax.set_xlabel('x')
    ax.set_ylabel('y')
    plt.colorbar(im, ax=ax, label='Density')
    fname = os.path.join(frames_dir, f"frame_{it:03d}.png")
    plt.savefig(fname)
    plt.close()
    filenames.append(fname)

# Create GIF
gif_filename = "plasma_density_anomaly.gif"
with imageio.get_writer(gif_filename, mode='I', duration=0.1) as writer:
    for fname in filenames:
        image = imageio.imread(fname)
        writer.append_data(image)

# Clean up frames
for fname in filenames:
    os.remove(fname)
os.rmdir(frames_dir)

print(f"GIF saved as {gif_filename}")


In [None]:
from IPython.display import Image, display

# Path to the generated GIF
gif_path = "plasma_density_anomaly.gif"

# Display the GIF
display(Image(filename=gif_path))

Module 03 (V2-Nov24) - Plasma Temperature Anomaly (Disruption)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import zoom, sobel, center_of_mass
from scipy.stats import skew, kurtosis
from skimage.measure import shannon_entropy
from numpy.fft import ifft2
from matplotlib.ticker import MaxNLocator

# --------- Diffusive Noise 2D ---------
def diffusive_noise_2d(shape, alpha=2.0):
    ny, nx = shape
    kx = np.fft.fftfreq(nx).reshape(1, nx)
    ky = np.fft.fftfreq(ny).reshape(ny, 1)
    k = np.sqrt(kx**2 + ky**2)
    k[0, 0] = 1
    amplitude = 1 / (k ** alpha)
    noise = np.random.normal(size=shape) + 1j * np.random.normal(size=shape)
    red_spectrum = noise * amplitude
    red_noise = np.real(ifft2(red_spectrum))
    red_noise = (red_noise - red_noise.min()) / (red_noise.max() - red_noise.min())
    return red_noise

# --------- Multifractal Noise 2D ---------
def multifractal_noise_2d(size=(64, 64), levels=6, p=0.15):
    field = np.ones((1, 1))
    for _ in range(levels):
        field = np.kron(field, np.ones((2, 2)))
        shape = field.shape
        mask = np.random.rand(*shape) < 0.5
        weights = np.where(mask, p, 1 - p)
        field *= weights
    if field.shape != size:
        zoom_factor = (size[0] / field.shape[0], size[1] / field.shape[1])
        field = zoom(field, zoom_factor)
    field = (field - field.min()) / (field.max() - field.min())
    return field

# --------- Plasma Temperature Anomaly Evolution ---------
def generate_synthetic_plasma_sequence(seq_len=128, size=(64, 64), anomaly_start=110, p=0.28, seed=42):
    np.random.seed(seed)
    sequence = []
    mean_temps = []
    std_temps = []
    entropy_temps = []
    energy_temps = []
    gradient_mags = []
    skews = []
    kurtoses = []
    centroids = []

    for t in range(seq_len):
        base = np.exp(-((np.indices(size)[0] - size[0] // 2)**2 +
                        (np.indices(size)[1] - size[1] // 2)**2) / 300)

        if t >= anomaly_start:
            x, y = 20 + (t - anomaly_start) * 2, 20 + (t - anomaly_start) * 2
            anomaly = np.exp(-((np.indices(size)[0] - x)**2 +
                               (np.indices(size)[1] - y)**2) / (20 - t + anomaly_start + 1))
            base += 0.5 * anomaly

        base += 0.01 * diffusive_noise_2d(size)
        base *= 1 + 0.3 * multifractal_noise_2d(size=size, levels=6, p=p)
        base = (base - base.min()) / (base.max() - base.min())

        # Métricas
        mean_temps.append(base.mean())
        std_temps.append(base.std())
        entropy_temps.append(shannon_entropy(base))
        energy_temps.append(np.sum(base**2))
        gradient_mags.append(np.mean(np.hypot(sobel(base, axis=0), sobel(base, axis=1))))
        skews.append(skew(base.ravel()))
        kurtoses.append(kurtosis(base.ravel()))
        centroids.append(center_of_mass(base))

        sequence.append(base)

    # Distância entre centros de massa consecutivos
    centroid_dists = [np.linalg.norm(np.array(centroids[i+1]) - np.array(centroids[i]))
                      for i in range(len(centroids)-1)]
    centroid_dists.insert(0, 0.0)

    return (np.array(sequence), np.array(mean_temps),
            np.array(std_temps), np.array(entropy_temps),
            np.array(energy_temps), np.array(gradient_mags),
            np.array(skews), np.array(kurtoses),
            np.array(centroid_dists))

# --------- Visualization: Sequence ---------
def plot_sequence(sequence, cols=5):
    n_frames = sequence.shape[0]
    rows = n_frames // cols + int(n_frames % cols > 0)
    fig, axes = plt.subplots(rows, cols, figsize=(15, 3 * rows))
    axes = axes.flatten()
    for i in range(n_frames):
        axes[i].imshow(sequence[i], cmap='hot', interpolation='nearest')
        axes[i].set_title(f"Frame {i+1}")
        axes[i].axis('off')
    for j in range(n_frames, rows * cols):
        axes[j].axis('off')
    plt.tight_layout()
    plt.show()

# --------- Visualization: Mean Temperature ---------
def plot_mean_temperature_curve(mean_temps, p=None):
    global_mean = mean_temps.mean()
    lower = global_mean * 0.95
    upper = global_mean * 1.05

    fig, ax = plt.subplots(figsize=(8, 4))
    ax.plot(mean_temps, marker='o', color='blue', label='Snapshot Mean Temp.')
    ax.axhline(lower, color='red', linestyle='--', label='Limit -5%')
    ax.axhline(upper, color='red', linestyle='--', label='Limit +5%')
    ax.axhline(global_mean, color='black', linestyle='-', label='Global Mean')
    ax.fill_between(range(len(mean_temps)), lower, upper, color='red', alpha=0.1)

    ax.set_title(f"Mean Temp within ±5% Range (centered at global mean){f' | p = {p}' if p is not None else ''}")
    ax.set_xlabel("Snapshot (t)")
    ax.set_ylabel("Normalized Mean Temperature")
    ax.legend()
    ax.grid(True)
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.tight_layout()
    plt.show()

# --------- Visualization: All Metrics ---------
def plot_temporal_metrics(metrics_dict, anomaly_start=None):
    fig, ax = plt.subplots(len(metrics_dict), 1, figsize=(10, 2.2 * len(metrics_dict)), sharex=True)
    for i, (label, data) in enumerate(metrics_dict.items()):
        ax[i].plot(data, label=label)
        if anomaly_start is not None:
            ax[i].axvline(anomaly_start, color='r', linestyle='--', label='Anomalia')
        ax[i].legend()
        ax[i].grid(True)
    ax[-1].set_xlabel("Frame")
    plt.tight_layout()
    plt.show()

# --------- Execução principal ---------
# if __name__ == "__main__":
#     results = generate_synthetic_plasma_sequence()
#     sequence, mean, std, entropy, energy, gradient, skewness, kurt, centroid_d = results
#     plot_sequence(sequence)
#     plot_mean_temperature_curve(mean)
#     plot_temporal_metrics({
#         "Média": mean,
#         "Desvio Padrão": std,
#         "Entropia": entropy,
#         "Energia": energy,
#         "Gradiente": gradient,
#         "Skewness": skewness,
#         "Curtose": kurt,
#         "Δ Centro de Massa": centroid_d,
#     }, anomaly_start=110)


In [None]:
anomaly_start=110
results = generate_synthetic_plasma_sequence(p=0.6, anomaly_start=anomaly_start)
sequence, mean, std, entropy, energy, gradient, skewness, kurt, centroid_d = results

In [None]:
!pip install git+https://github.com/rsautter/GPA

In [None]:
sequence[0].shape

In [None]:
from GPA import GPA

image = sequence[0]

# normaliza pelo valor maximo
im = np.array(image).astype(float)[2:-2,2:-2]

# tol eh a tolerancia da tecnica
ga = GPA(tol = 0.05)
ga(im)
sym = np.array(ga.symmetricalP)
asym = np.array(ga.asymmetricalP)

dx, dy = np.gradient(im)
abs = (dx**2+dy**2)**0.5
dx,dy = dx/abs,dy/abs

#mostra a imagem
plt.figure(figsize=(4,4))
plt.imshow(im,cmap='gray',origin='lower')

#mostra a grade gradiente total
plt.quiver(dy,dx,color='b',scale=27,width=0.0025,headwidth=5.5)
#plt.legend(fontsize=8,loc=1)
plt.show()

In [None]:
# tol eh a tolerancia da tecnica
ga = GPA(tol = 0.05)

# observacao: G1_Classic eh a primeira medida proposta para o GPA (NC-NV)/NV
ga(im,moment=["G1_Classic", "G1","G2","G3","G4"])

In [None]:
def calculateGPAMoments(mat):
    mat = np.array(mat).astype(np.float64)
    moments = ga(mat, moment=['G1', 'G2', 'G3', 'G4'])
    moments = {key :
        round(moments[key], 3) if type(moments[key]) != np.dtype(np.complex128)
        else complex(np.round(moments[key].real,3),np.round(moments[key].imag,3)) for key in moments}
    return moments

In [None]:
moments_tokamak = [calculateGPAMoments(frame) for frame in sequence]

In [None]:
import pandas as pd

df_moments_tokamak = pd.DataFrame(moments_tokamak)

df_moments_tokamak['G4_real'] = df_moments_tokamak['G4'].apply(lambda x: x.real)
df_moments_tokamak['G4_imag'] = df_moments_tokamak['G4'].apply(lambda x: x.imag)

In [None]:
import matplotlib.pyplot as plt

# Define colors for each moment
colors = {
    'G1': 'tab:blue',
    'G2': 'tab:orange',
    'G3': 'tab:green',
    'G4_real': 'tab:red',
    'G4_imag': 'tab:purple'
}

# Create the figure and a single axis
fig, ax = plt.subplots(figsize=(14, 5))

# Plot the moments of the Pmodel
ax.plot(df_moments_tokamak['G1'], label='Pmodel G1', color=colors['G1'], linestyle='--')
ax.plot(df_moments_tokamak['G2'], label='Pmodel G2', color=colors['G2'], linestyle='--')
ax.plot(df_moments_tokamak['G3'], label='Pmodel G3', color=colors['G3'], linestyle='--')
# ax.plot(df_moments_tokamak['G4_real'], label='Pmodel Re(G4)', color=colors['G4_real'], linestyle='--')
# ax.plot(df_moments_tokamak['G4_imag'], label='Pmodel Im(G4)', color=colors['G4_imag'], linestyle='--')

# Add vertical line indicating the start of the anomaly
ax.axvline(x=anomaly_start, color='red', linestyle=':', linewidth=2, label='Anomaly Start')

# Configure plot
ax.set_title('Evolution of GPA Moments (Pmodel)')
ax.set_xlabel('Frame')
ax.set_ylabel('Moment Value')
ax.legend(ncol=2, fontsize='small')
ax.grid(True)

# Adjust layout and show plot
plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt

# Define colors for each moment
colors = {
    'G1': 'tab:blue',
    'G2': 'tab:orange',
    'G3': 'tab:green',
    'G4_real': 'tab:red',
    'G4_imag': 'tab:purple'
}

# Normalize columns to [0, 1] using min-max scaling
cols_to_normalize = ['G1', 'G2', 'G3']  # Adicione 'G4_real', 'G4_imag' se quiser
df_norm = df_moments_tokamak.copy()
for col in cols_to_normalize:
    min_val = df_norm[col].min()
    max_val = df_norm[col].max()
    df_norm[col] = (df_norm[col] - min_val) / (max_val - min_val)

# Create the figure and a single axis
fig, ax = plt.subplots(figsize=(14, 5))

# Plot the normalized moments
ax.plot(df_norm['G1'], label='Pmodel G1', color=colors['G1'], linestyle='--')
ax.plot(df_norm['G2'], label='Pmodel G2', color=colors['G2'], linestyle='--')
ax.plot(df_norm['G3'], label='Pmodel G3', color=colors['G3'], linestyle='--')
# ax.plot(df_norm['G4_real'], label='Pmodel Re(G4)', color=colors['G4_real'], linestyle='--')
# ax.plot(df_norm['G4_imag'], label='Pmodel Im(G4)', color=colors['G4_imag'], linestyle='--')

# Add vertical line indicating the start of the anomaly
ax.axvline(x=anomaly_start, color='red', linestyle=':', linewidth=2, label='Anomaly Start')

# Configure plot
ax.set_title('Evolution of Normalized GPA Moments (Pmodel)')
ax.set_xlabel('Frame')
ax.set_ylabel('Normalized Moment Value')
ax.legend(ncol=2, fontsize='small')
ax.grid(True)

# Adjust layout and show plot
plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt

# Define color for G2
color_g2 = 'tab:orange'

# Normaliza G2 para o intervalo [0, 1]
df_norm = df_moments_tokamak.copy()
g2_min = df_norm['G2'].min()
g2_max = df_norm['G2'].max()
df_norm['G2'] = (df_norm['G2'] - g2_min) / (g2_max - g2_min)

# Cria a figura e o eixo
fig, ax = plt.subplots(figsize=(14, 5))

# Plota o G2 normalizado
ax.plot(df_norm['G2'], label='Pmodel G2 (normalized)', color=color_g2, linestyle='--')

# Linha vertical indicando o início da anomalia
ax.axvline(x=anomaly_start, color='red', linestyle=':', linewidth=2, label='Anomaly Start')

# Configurações do gráfico
ax.set_title('Evolution of Normalized GPA Moment G2 (Pmodel)')
ax.set_xlabel('Frame')
ax.set_ylabel('Normalized G2 Value')
ax.legend(fontsize='small')
ax.grid(True)

# Ajusta layout e exibe o gráfico
plt.tight_layout()
plt.show()


In [None]:
import pandas as pd

# Cria uma cópia e normaliza G2
df_norm = df_moments_tokamak.copy()
g2_min = df_norm['G2'].min()
g2_max = df_norm['G2'].max()
df_norm['G2_normalized'] = (df_norm['G2'] - g2_min) / (g2_max - g2_min)

# Salva somente a coluna G2 normalizada no CSV
df_norm[['G2_normalized']].to_csv('g2_normalized.csv', index_label='Frame')


In [None]:
plot_sequence(sequence)

In [None]:
plot_mean_temperature_curve(mean)

In [None]:
import matplotlib.pyplot as plt

# Valores de p para testar
p_values = [0.1, 0.2, 0.3, 0.4, 0.6, 0.7, 0.8, 0.9]

for p in p_values:
    results = generate_synthetic_plasma_sequence(p=p)
    sequence, mean, std, entropy, energy, gradient, skewness, kurt, centroid_d = results
    plot_mean_temperature_curve(mean, p=p)


In [None]:
results = generate_synthetic_plasma_sequence(p=0.6)
sequence, mean, std, entropy, energy, gradient, skewness, kurt, centroid_d = results

In [None]:
mean = np.array(mean)

# Normalização padrão (0 para mínimo, 1 para máximo)
norm = (mean - np.min(mean)) / (np.max(mean) - np.min(mean))

# Inversão: agora o menor vira 1 e o maior vira 0
norm_invertido = 1 - norm

In [None]:
plot_mean_temperature_curve(norm_invertido)

In [None]:
import pandas as pd
import numpy as np

df = pd.DataFrame({'values': norm_invertido})
df.to_csv('norm_invertido.csv', index=False)

In [None]:
plot_temporal_metrics({
    "Média": mean,
    "Desvio Padrão": std,
    "Entropia": entropy,
    "Energia": energy,
    "Gradiente": gradient,
    "Skewness": skewness,
    "Curtose": kurt,
    "Δ Centro de Massa": centroid_d,
}, anomaly_start=110)

In [None]:
#XP-model adapted from Meneveau & Sreenevasan, 1987 & Malara et al., 2016
#Valid Ranges for p, based on Didier-Sornette's Theory
#Author: R.R.Rosa, R. Sautter and  N. Joshi
#Version: 1.1
#Date: 23/08/2022

import numpy as np
#import statistics as stat
from matplotlib import pyplot


def pmodel(noValues=1024, p=0.3, slope=[]):
    print(noValues)
    noOrders = int(np.ceil(np.log2(noValues)))
    noValues = int(noValues)
    noValuesGenerated = 2**noOrders

    dx = np.array([1])
    for n in range(noOrders):
        dx = next_step_1d(dx, p)

    if (slope):
        fourierCoeff = fractal_spectrum_1d(noValues, slope/2)
        meanVal = np.mean(dx)
        stdy = np.std(dx)
        x = np.fft.ifft(dx - meanVal)
        phase = np.angle(x)
        x = fourierCoeff*np.exp(1j*phase)
        x = np.fft.fft(x).real
        x *= stdy/np.std(x)
        x += meanVal
    else:
        x = dx

    return x[0:noValues], dx[0:noValues]


def next_step_1d(dx, p):
    y2 = np.zeros(dx.size*2)
    sign = np.random.rand(1, dx.size) - 0.5
    sign /= np.abs(sign)
    y2[0:2*dx.size:2] = dx + sign*(1-2*p)*dx
    y2[1:2*dx.size+1:2] = dx - sign*(1-2*p)*dx

    return y2


def fractal_spectrum_1d(noValues, slope):
    ori_vector_size = noValues
    ori_half_size = ori_vector_size//2
    a = np.zeros(ori_vector_size)

    for t2 in range(ori_half_size):
        index = t2
        t4 = 1 + ori_vector_size - t2
        if (t4 >= ori_vector_size):
            t4 = t2
        coeff = (index + 1)**slope
        a[t2] = coeff
        a[t4] = coeff

    a[1] = 0

    return a



In [None]:
serie = np.array(mean)

In [None]:
def flutuacoes_por_escala(serie, max_scale=6):
    """Calcula variância das diferenças em múltiplas escalas logarítmicas"""
    variancias = []
    escalas = [2**i for i in range(1, max_scale+1)]
    for s in escalas:
        diffs = serie[s:] - serie[:-s]
        variancias.append(np.var(diffs))
    return escalas, variancias

def plot_pdf_deltas(serie, label):
    deltas = np.diff(serie)
    plt.hist(deltas, bins=30, density=True, alpha=0.5, label=label)

def espectro_potencia(serie, label):
    n = len(serie)
    if n == 0:
        print(f'Série {label} vazia, pulando espectro.')
        return
    f = np.fft.rfftfreq(n)
    fft = np.fft.rfft(serie - np.mean(serie))
    psd = np.abs(fft)**2

    if len(f) <= 1:
        print(f'Série {label} muito curta para espectro.')
        return

    plt.loglog(f[1:], psd[1:], label=label)  # pula f=0


ps = np.linspace(0.5, 0.6, 20)

for p in ps:
    serie_p = pmodel(len(serie),p)[1]

    # Flutuações por escala
    escalas, var_real = flutuacoes_por_escala(serie)
    _, var_p = flutuacoes_por_escala(serie_p)

    plt.figure(figsize=(15, 4))

    plt.subplot(1, 3, 1)
    plot_pdf_deltas(serie, 'Real')
    plot_pdf_deltas(serie_p, f'P-model p={p:.4f}')
    plt.title("PDF dos Incrementos")
    plt.legend()

    plt.subplot(1, 3, 2)
    espectro_potencia(serie, 'Real')
    espectro_potencia(serie_p, f'P-model p={p:.4f}')
    plt.title("Espectro de Potência")
    plt.legend()

    plt.subplot(1, 3, 3)
    plt.plot(np.log2(escalas), np.log2(var_real), label='Real')
    plt.plot(np.log2(escalas), np.log2(var_p), label=f'P-model p={p:.4f}')
    plt.title('Log-Log: Escala vs Variância')
    plt.xlabel('log2(Escala)')
    plt.ylabel('log2(Variância)')
    plt.legend()

    plt.tight_layout()
    plt.show()
