In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import czt
import time
import random
import japanize_matplotlib


# ==============================================================================
# --- 1. パラメータ設定関数 ---
# ==============================================================================
def setup_parameters():
    """各種パラメータを設定し、辞書として返す"""
    params = {
        # --- 物理仕様 ---
        "fs": 10000,              # サンプリング周波数 (Hz)
        "L": 1.0,                 # 信号長 (s)
        "A": 1.0,                 # 振幅
        # --- 周波数仕様 ---
        "f_c": 500.0,            # 中心周波数 (Hz)
        "target_bandwidth": 5.0,  # 目標帯域幅 (Hz)
        # --- 焼きなまし法 パラメータ ---
        "n_iterations_sa": 10000,  # 探索回数
        "initial_temp": 100.0,    # 初期温度
        "cooling_rate": 0.999,   # 冷却率
        "target_amplitude": 2000.0,  # 最適化の目標振幅
        # --- FFT関連 ---
        "n_fft_high_res": None,   # 高解像度FFT点数 (後で計算)
    }
    # 信号サンプル数を計算
    params["n_samples"] = int(params["L"] * params["fs"])
    # 高解像度FFT点数を設定
    params["n_fft_high_res"] = params["n_samples"] * 16
    return params


# ==============================================================================
# --- 2. 初期波形とドメインの生成関数 ---
# ==============================================================================
def generate_initial_waveforms(params):
    """初期値となるLFMチャープ信号、比較用の単一周波数信号、およびドメインを生成する"""
    print("基準となるチャープ信号からドメインを検出中...")
    n_samples = params["n_samples"]
    t = np.linspace(0, params["L"], n_samples, endpoint=False)

    # (A) 最適化の初期値となるLFMチャープ信号
    f_start = params["f_c"] - params["target_bandwidth"] / 2
    f_end = params["f_c"] + params["target_bandwidth"] / 2
    inst_phase_chirp = 2 * np.pi * \
        (f_start * t + (f_end - f_start) / (2 * params["L"]) * t**2)
    s_t_chirp_rect = params["A"] * np.sign(np.sin(inst_phase_chirp))
    # s_t_chirp_rect = params["A"] * \
    #     np.sign(np.sin(2 * np.pi * params["f_c"] * t))

    # (B) 比較用の単一周波数信号 (矩形波)
    s_t_base = params["A"] * np.sign(np.sin(2 * np.pi * params["f_c"] * t))

    # (C) チャープ信号からドメイン（符号が一定の区間）を検出
    change_points = np.where(np.diff(s_t_chirp_rect) != 0)[0] + 1
    domain_boundaries = np.concatenate(([0], change_points, [n_samples]))

    domains = []
    for i in range(len(domain_boundaries) - 1):
        start_idx, end_idx = int(domain_boundaries[i]), int(
            domain_boundaries[i+1])
        if start_idx < end_idx:
            domains.append((start_idx, end_idx))

    print(f"検出されたドメイン数: {len(domains)}")
    return s_t_chirp_rect, s_t_base, domains


# ==============================================================================
# --- 3. 近傍解の生成関数（対称性維持）---
# ==============================================================================
def create_symmetric_neighbor(current_signal, domains_list):
    """
    現在の信号から、時間対称性を維持した近傍解（一部のドメインを反転させた信号）を生成する。
    信号の前半の区間を反転させ、それと鏡合わせになる後半の区間も同時に反転させる。
    """
    neighbor_signal = current_signal.copy()
    n_domains = len(domains_list)
    n_half = n_domains // 2  # ドメイン数の半分（前半部分）

    if n_half < 2:
        # 区間フリップができないほどドメインが少ない場合は、単一ドメインを対称にフリップ
        if n_half > 0:
            domain_idx_to_flip = random.randint(0, n_half - 1)
            # 前半を反転
            start, end = domains_list[domain_idx_to_flip]
            neighbor_signal[start:end] *= -1
            # 対称な後半も反転
            symmetric_idx_to_flip = n_domains - 1 - domain_idx_to_flip
            start, end = domains_list[symmetric_idx_to_flip]
            neighbor_signal[start:end] *= -1
        return neighbor_signal  # 変更なし or 単一フリップ

    # 反転させるドメインの開始インデックスと終了インデックスを「前半から」ランダムに選ぶ
    start_domain_idx = random.randint(0, n_half - 2)
    end_domain_idx = random.randint(start_domain_idx + 1, n_half - 1)

    # --- 前半部分のフリップ ---
    for domain_idx in range(start_domain_idx, end_domain_idx + 1):
        start, end = domains_list[domain_idx]
        neighbor_signal[start:end] *= -1

    # --- 対称な後半部分のフリップ ---
    symmetric_start_idx = n_domains - 1 - end_domain_idx
    symmetric_end_idx = n_domains - 1 - start_domain_idx
    for domain_idx in range(symmetric_start_idx, symmetric_end_idx + 1):
        start, end = domains_list[domain_idx]
        neighbor_signal[start:end] *= -1

    return neighbor_signal


# ==============================================================================
# --- 4. コスト計算関数 ---
# ==============================================================================
def calculate_cost_czt(signal, params, czt_params):
    """Zoom FFT (CZT) を用いてコスト（目標振幅との二乗誤差）を計算する"""
    spectrum = np.abs(czt(signal, **czt_params))
    return np.sum((spectrum - params["target_amplitude"])**2)


# ==============================================================================
# --- 5. 焼きなまし法 実行関数 ---
# ==============================================================================
def run_simulated_annealing(initial_signal, domains, params):
    """焼きなまし法を実行して最適な波形を見つける"""
    print(f"\n焼きなまし法（対称ドメイン区間反転）による最適化開始...")

    current_signal = initial_signal.copy()
    temp = params["initial_temp"]

    # --- Zoom FFT (CZT) のパラメータを事前に計算 ---
    f_start_zoom = params["f_c"] - params["target_bandwidth"] / 2
    f_end_zoom = params["f_c"] + params["target_bandwidth"] / 2
    # 解析点数mを計算 (高解像度FFTの周波数分解能を基準)
    m_zoom = int(params["target_bandwidth"] /
                 (params["fs"] / params["n_fft_high_res"]))
    if m_zoom == 0:
        m_zoom = 1  # 帯域幅が狭すぎる場合のエラー防止

    czt_params = {
        "m": m_zoom,
        "w": np.exp(-1j * 2 * np.pi * (f_end_zoom - f_start_zoom) / (m_zoom * params["fs"])),
        "a": np.exp(1j * 2 * np.pi * f_start_zoom / params["fs"])
    }

    # 初期コストの計算
    current_cost = calculate_cost_czt(current_signal, params, czt_params)
    best_signal = current_signal.copy()
    best_cost = current_cost

    start_time = time.time()
    # メインループ
    for i in range(params["n_iterations_sa"]):
        # 対称性を維持した近傍解を生成
        neighbor_signal = create_symmetric_neighbor(current_signal, domains)

        # コストを計算
        neighbor_cost = calculate_cost_czt(neighbor_signal, params, czt_params)

        # 遷移判定 (メトロポリス基準)
        cost_diff = neighbor_cost - current_cost
        if cost_diff < 0 or random.random() < np.exp(-cost_diff / temp):
            current_signal = neighbor_signal
            current_cost = neighbor_cost

        # 最良解の更新
        if current_cost < best_cost:
            best_signal = current_signal
            best_cost = current_cost

        # 温度を更新
        temp *= params["cooling_rate"]

        # 途中経過の表示
        if (i + 1) % 1000 == 0:
            elapsed_time = time.time() - start_time
            print(
                f"  Iteration {i+1}/{params['n_iterations_sa']}: Cost = {best_cost:.4g}, Temp = {temp:.4g} ({elapsed_time:.2f}s)")

    print("最適化完了。")
    return best_signal

In [None]:
# ==============================================================================
# --- 7. メイン実行ブロック ---
# ==============================================================================
# 1. パラメータを読み込む
parameters = setup_parameters()

# 2. 初期波形、比較用波形、ドメインを生成する
initial_s_t, base_s_t, domain_list = generate_initial_waveforms(parameters)

# 3. 焼きなまし法で最適化を実行する
optimized_s_t = run_simulated_annealing(
    initial_s_t, domain_list, parameters)

In [None]:
# ==============================================================================
# --- 7. 結果の保存関数 (NEW) ---
# ==============================================================================
def save_results_npy(initial_signal, optimized_signal, params):
    """
    最適化結果を.npy形式で保存する。
    ファイル名にタイムスタンプと主要パラメータを含める。
    """
    timestamp = time.strftime("%Y%m%d-%H%M%S")
    bw = params['target_bandwidth']
    amp = params['target_amplitude']

    # ファイル名を生成
    filename_optimized = f"optimized_signal_{timestamp}_bw{bw}_amp{amp}.npy"
    filename_initial = f"initial_signal_{timestamp}_bw{bw}_amp{amp}.npy"

    # データを保存
    np.save(filename_optimized, optimized_signal)
    np.save(filename_initial, initial_signal)

    print(f"\n結果を.npyファイルに保存しました:")
    print(f"  - 最適化後信号: {filename_optimized}")
    print(f"  - 初期信号: {filename_initial}")


# save_results_npy(initial_s_t, optimized_s_t, parameters)

In [None]:
# ==============================================================================
# --- 6. 結果のプロット・分析関数 ---
# ==============================================================================
def plot_and_analyze_results(initial_signal, base_signal, optimized_signal, params):
    """最適化結果をプロットし、追加の分析（反転ドメインの可視化など）を行う。"""
    print("\n結果のプロットと分析を開始...")
    n_samples = params["n_samples"]
    t = np.linspace(0, params["L"], n_samples, endpoint=False)
    n_fft = params["n_fft_high_res"]
    freq_shifted = np.fft.fftshift(np.fft.fftfreq(n_fft, d=1/params["fs"]))
    s_f_initial = np.fft.fftshift(np.fft.fft(initial_signal, n_fft))
    s_f_base = np.fft.fftshift(np.fft.fft(base_signal, n_fft))
    s_f_optimized = np.fft.fftshift(np.fft.fft(optimized_signal, n_fft))

    fig, axes = plt.subplots(3, 2, figsize=(16, 18))
    fig.suptitle('焼きなまし法による波形最適化と構造分析', fontsize=18)

    axes[0, 0].plot(t, initial_signal)
    axes[0, 0].set_title('初期波形 (LFMチャープ)')
    axes[0, 0].set_xlabel('時間 (s)')
    axes[0, 0].set_ylabel('振幅')
    axes[0, 0].grid(True)

    axes[0, 1].plot(t, optimized_signal, color='r')
    axes[0, 1].set_title('最適化後の波形')
    axes[0, 1].set_xlabel('時間 (s)')
    axes[0, 1].grid(True)

    axes[1, 0].plot(freq_shifted, np.abs(s_f_base),
                    label='基準 (単一周波数)', alpha=0.6, linestyle=':')
    axes[1, 0].plot(freq_shifted, np.abs(s_f_initial),
                    label='初期 (LFM)', alpha=0.6)
    axes[1, 0].plot(freq_shifted, np.abs(s_f_optimized),
                    label='最適化後', color='r', alpha=0.9)
    axes[1, 0].set_title('周波数スペクトル (全体像)')
    axes[1, 0].set_xlabel('周波数 (Hz)')
    axes[1, 0].set_ylabel('振幅スペクトル')
    axes[1, 0].legend()
    axes[1, 0].grid(True)

    axes[1, 1].plot(freq_shifted, np.abs(s_f_base),
                    label='Reference (Single Frequency)', alpha=0.6, linestyle=':')
    axes[1, 1].plot(freq_shifted, np.abs(s_f_initial),
                    label='Initial (LFM)', alpha=0.6)
    axes[1, 1].plot(freq_shifted, np.abs(s_f_optimized),
                    label='Optimized', color='r', alpha=0.9)
    axes[1, 1].axhline(y=params["target_amplitude"],
                       color='g', linestyle='--', label='Target Amplitude')
    axes[1, 1].set_title('Frequency Spectrum (Target Bandwidth)')
    axes[1, 1].set_xlabel('Frequency (Hz)')
    axes[1, 1].set_xlim(params["f_c"] - 20, params["f_c"] + 20)
    axes[1, 1].legend()
    axes[1, 1].grid(True)

    flipped_mask = (initial_signal * optimized_signal)
    axes[2, 0].plot(t, flipped_mask, color='m')
    axes[2, 0].set_title('反転ドメインの可視化 (+1: 維持, -1: 反転)')
    axes[2, 0].set_xlabel('時間 (s)')
    axes[2, 0].set_ylabel('符号の変化')
    axes[2, 0].set_ylim(-1.2, 1.2)
    axes[2, 0].grid(True)

    def get_pulse_widths(signal):
        zero_crossings = np.where(np.diff(np.sign(signal)))[0]
        pulse_widths = np.diff(zero_crossings) / params["fs"] * 1e3
        time_points = (t[zero_crossings[:-1]] + t[zero_crossings[1:]]) / 2
        return time_points, pulse_widths

    t_initial, w_initial = get_pulse_widths(initial_signal)
    # t_optimized, w_optimized = get_pulse_widths(optimized_signal)

    # ★★★ 変更点: プロットを線から散布図（scatter）に変更 ★★★
    axes[2, 1].scatter(t_initial, w_initial, s=10, label='初期 (LFM)', alpha=0.7)
    # axes[2, 1].scatter(t_optimized, w_optimized, s=10,
    #                    label='最適化後', color='r', alpha=0.5)
    axes[2, 1].set_title('瞬時パルス幅（ゼロクロス間隔）の変化')
    axes[2, 1].set_xlabel('時間 (s)')
    axes[2, 1].set_ylabel('パルス幅 (ms)')
    axes[2, 1].legend()
    axes[2, 1].grid(True)

    plt.tight_layout(rect=[0, 0, 1, 0.96])
    plt.show()

In [None]:
import os


def load_all_signals(optimized_filepath: str, params: dict):
    """
    最適化後信号、初期信号、基準信号を読み込み・復元する。
    """
    optimized_signal = np.load(optimized_filepath)

    initial_filename = os.path.basename(optimized_filepath).replace(
        "optimized_signal_", "initial_signal_", 1
    )
    initial_filepath = os.path.join(
        os.path.dirname(optimized_filepath), initial_filename)
    initial_signal = np.load(initial_filepath)

    t = np.linspace(0, params["L"], params["n_samples"], endpoint=False)
    base_signal = params["A"] * np.sign(np.sin(2 * np.pi * params["f_c"] * t))

    return initial_signal, base_signal, optimized_signal

In [None]:
import sys

params = setup_parameters()
# initial_s_t, base_s_t, optimized_s_t = load_all_signals(
#     "optimized_signal_20250731-022737_bw5.0_amp1500.0.npy", params)
print(len(initial_s_t), len(base_s_t), len(optimized_s_t))
plot_and_analyze_results(initial_s_t, base_s_t, optimized_s_t, params)