# ナッシュ均衡ベースのニューラルネットワークPruning

このノートブックでは、ナッシュ均衡の概念を用いたニューラルネットワークのPruning（枝刈り）を実装します。

## 概要
- **参加度変数（Participation Variable）** `s ∈ [0,1]` を用いて各フィルタの重要度を学習
- **L1正則化**と**L2正則化**を組み合わせてスパース性を促進
- ResNet50V2アーキテクチャに適用してCIFAR-10データセットで評価

## 環境設定
TensorFlowのGPUメモリ管理を最適化するための環境変数を設定します。

In [None]:
import os
os.environ['TF_GPU_ALLOCATOR'] = 'cuda_malloc_async'
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'

## ライブラリのインポート

必要なライブラリをインポートします：
- `tensorflow`: 深層学習フレームワーク
- `numpy`, `matplotlib`: 数値計算と可視化
- `MinMaxNorm`: 参加度変数の制約（0-1の範囲に制限）

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models, mixed_precision
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.constraints import MinMaxNorm
from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2_as_graph

## GPU設定とMixed Precision

- **GPUメモリ動的確保**: メモリスタベーションを防ぐため、必要に応じてメモリを割り当て
- **Mixed Precision (混合精度)**: `float16`と`float32`を組み合わせて計算速度を向上させつつ、数値安定性を維持

## FLOPs計算関数

モデルの計算量（FLOPs: Floating Point Operations）を測定する関数です。

FLOPsはモデルの推論速度の指標となり、Pruningの効果を評価する際に重要です。

In [None]:
# GPUメモリ動的確保（スタベーション対策）
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)
        
# 高速化設定 (Mixed Precision)
mixed_precision.set_global_policy('mixed_float16')
print("Mixed Precision Policy:", mixed_precision.global_policy())

print("TensorFlow Version:", tf.__version__)

## データセット準備

CIFAR-10データセットを読み込み、前処理を行います：

- **データ拡張**: リサイズ（32×32→64×64）、左右反転、明るさ・コントラスト調整
- **正規化**: ピクセル値を0-1の範囲に正規化
- **バッチ処理**: `tf.data`パイプラインを使用して効率的にデータを読み込み

## ParticipatingConv2D層の実装

### 参加度変数（Participation Variable）`s`について

各フィルタに**参加度変数** `s ∈ [0,1]` を導入します：
- `s = 1`: フィルタが完全に参加（通常の畳み込み）
- `s = 0`: フィルタが非参加（実質的にpruning）
- `0 < s < 1`: フィルタの出力がスケールされる

### 正則化項

損失関数に以下の2つのペナルティ項を追加します：

#### L2正則化項
$$\beta \sum_{i} \|W_i\|^2_2 \cdot s_i^2$$

- `β`: L2ペナルティ係数
- `W_i`: フィルタ`i`の重み
- `s_i`: フィルタ`i`の参加度
- 参加度が大きいフィルタほど、重みのL2ノルムにペナルティがかかる

#### L1正則化項（スパース性促進）
$$\gamma \sum_{i} |s_i|$$

- `γ`: L1ペナルティ係数
- 参加度を直接スパース化することで、不要なフィルタを`0`に近づける

### 損失関数の全体

$$\mathcal{L} = \mathcal{L}_{task} + \beta \sum_{i} \|W_i\|^2_2 \cdot s_i^2 + \gamma \sum_{i} |s_i|$$

ここで、`L_task`はタスク損失（分類の場合はクロスエントロピー）です。

In [None]:
# 2. FLOPs計算関数 

def calculate_flops(model):
    input_signature = [tf.TensorSpec(shape=(1,) + model.input_shape[1:], dtype=tf.float32)]
    full_model = tf.function(lambda x: model(x))
    concrete_func = full_model.get_concrete_function(input_signature)
    frozen_func, _ = convert_variables_to_constants_v2_as_graph(concrete_func)
    run_meta = tf.compat.v1.RunMetadata()
    opts = tf.compat.v1.profiler.ProfileOptionBuilder.float_operation()
    flops = tf.compat.v1.profiler.profile(
        graph=frozen_func.graph,
        run_meta=run_meta, 
        cmd='op', 
        options=opts
    )
    return flops.total_float_ops

## 実装コード

以下のセルで、ParticipatingConv2D層、ResNet50V2アーキテクチャ、Pruning統計コールバックを実装します。

### 実装の構成
1. **ParticipatingConv2D層**: 参加度変数付きの畳み込み層
2. **ResNet50V2アーキテクチャ**: Bottleneck Blockとモデル構築関数
3. **Pruning統計コールバック**: 学習中のPruning統計を記録

In [None]:
# 3. データセット準備 (CPU負荷分散版)
BATCH_SIZE = 16
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.cifar10.load_data()

# 正規化
X_train = X_train.astype('float32') / 255.0
X_test = X_test.astype('float32') / 255.0

# データ拡張関数 (tf.imageを使用)
def augment(image, label):
    image = tf.image.resize(image, [64, 64])  # 32x32 → 64x64
    image = tf.image.random_flip_left_right(image)  # これも入れた方が良い
    image = tf.image.random_brightness(image, max_delta=0.1)
    image = tf.image.random_contrast(image, lower=0.9, upper=1.1)
    return image, label

# パイプライン構築
train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train)) \
    .shuffle(5000) \
    .map(augment, num_parallel_calls=tf.data.AUTOTUNE) \
    .batch(BATCH_SIZE) \
    .prefetch(tf.data.AUTOTUNE)

# テストデータもリサイズが必要
def resize_only(image, label):
    image = tf.image.resize(image, [64, 64])
    return image, label

test_ds = tf.data.Dataset.from_tensor_slices((X_test, y_test)) \
    .map(resize_only, num_parallel_calls=tf.data.AUTOTUNE) \
    .batch(BATCH_SIZE) \
    .prefetch(tf.data.AUTOTUNE)

In [None]:

import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np

# Participating Conv2D Layer (参加度変数付き畳み込み層)

class ParticipatingConv2D(layers.Layer):
    """
    ナッシュ均衡pruning用の畳み込み層
    各フィルタに参加度変数 s ∈ [0,1] を持つ
    """
    def __init__(self, filters, kernel_size, strides=1, padding='same', 
                 beta=0.05, gamma=0.05, use_bias=False, **kwargs):
        super().__init__(**kwargs)
        self.filters = filters
        self.kernel_size = kernel_size if isinstance(kernel_size, tuple) else (kernel_size, kernel_size)
        self.strides = strides
        self.padding = padding.upper()
        self.use_bias = use_bias
        
        # ペナルティ係数
        self.beta = beta   # L2 penalty
        self.gamma = gamma # L1 penalty (sparsity)
        
    def build(self, input_shape):
        # 畳み込みカーネル
        self.kernel = self.add_weight(
            name='kernel',
            shape=(*self.kernel_size, input_shape[-1], self.filters),
            initializer='glorot_uniform',
            trainable=True
        )
        
        if self.use_bias:
            self.bias = self.add_weight(
                name='bias',
                shape=(self.filters,),
                initializer='zeros',
                trainable=True
            )
        
        # 参加度変数 (各フィルタに1つ)
        self.participation = self.add_weight(
        name='participation',
        shape=(self.filters,),
        initializer='ones',
        trainable=True,
        constraint=MinMaxNorm(min_value=0.0, max_value=1.0)
        )

        
    def call(self, inputs, training=None):
        # 畳み込み実行
        if self.padding == 'SAME':
            y = tf.nn.conv2d(inputs, self.kernel, strides=self.strides, padding='SAME')
        else:
            y = tf.nn.conv2d(inputs, self.kernel, strides=self.strides, padding='VALID')
        
        if self.use_bias:
            y = tf.nn.bias_add(y, self.bias)
        
        # 参加度を掛ける (各フィルタの出力をスケール)
        y = y * self.participation
        
        # 訓練時のみペナルティを追加
        if training:
            l2_per_filter = tf.reduce_sum(
                tf.square(self.kernel), axis=[0, 1, 2]
            )

            l2_penalty = self.beta * tf.reduce_sum(
                l2_per_filter * tf.square(self.participation)
            )

            l1_penalty = self.gamma * tf.reduce_sum(
                tf.abs(self.participation)
            )

            # ★ ここが重要
            self.add_loss(tf.cast(l2_penalty, tf.float32))
            self.add_loss(tf.cast(l1_penalty, tf.float32))

        
        return y
    
    def get_active_filters(self, threshold=0.01):
        """参加度がthreshold以上のフィルタ数を返す"""
        return tf.reduce_sum(tf.cast(self.participation > threshold, tf.int32)).numpy()
    
    def get_sparsity(self, threshold=0.01):
        """pruningされたフィルタの割合"""
        active = self.get_active_filters(threshold)
        return 1.0 - (active / self.filters)

### ResNet50V2アーキテクチャの構築

`bottleneck_block_participating`関数と`build_resnet50_v2_participating`関数を定義します。

これらの関数は、ParticipatingConv2D層を使用してResNet50V2アーキテクチャを構築します。

In [None]:
# ResNet50V2 with Participating Convolutions

def bottleneck_block_participating(x, filters, stride=1, beta=0.05, gamma=0.05):
    """
    Participating Conv2D を使った Bottleneck Block
    """
    shortcut = x
    
    # Pre-activation
    pre_act = layers.BatchNormalization()(x)
    pre_act = layers.Activation("relu")(pre_act)
    
    # Shortcut調整
    if stride > 1 or x.shape[-1] != filters * 4:
        # ショートカットは通常のConvでOK
        shortcut = layers.Conv2D(filters * 4, 1, strides=stride, use_bias=False)(pre_act)
    
    # Main path with participating convolutions
    # 1x1 Conv (圧縮)
    m = ParticipatingConv2D(filters, 1, beta=beta, gamma=gamma)(pre_act)
    m = layers.BatchNormalization()(m)
    m = layers.Activation("relu")(m)
    
    # 3x3 Conv (特徴抽出) - ここが重要
    m = layers.ZeroPadding2D(padding=1)(m)
    m = ParticipatingConv2D(filters, 3, strides=stride, padding='valid', 
                           beta=beta, gamma=gamma)(m)
    m = layers.BatchNormalization()(m)
    m = layers.Activation("relu")(m)
    
    # 1x1 Conv (復元)
    m = ParticipatingConv2D(filters * 4, 1, beta=beta, gamma=gamma)(m)
    
    return layers.Add()([shortcut, m])


def build_resnet50_v2_participating(input_shape=(48, 48, 3), classes=10, 
                                   beta=0.05, gamma=0.05):
    """
    Nash equilibrium pruning用のResNet50V2
    """
    inputs = layers.Input(input_shape)
    
    # Stem (最初は通常のConv)
    x = layers.Conv2D(64, 3, strides=1, padding="same", use_bias=False)(inputs)
    
    # Stage 1
    for _ in range(3):
        x = bottleneck_block_participating(x, 64, stride=1, beta=beta, gamma=gamma)
    
    # Stage 2
    x = bottleneck_block_participating(x, 128, stride=2, beta=beta, gamma=gamma)
    for _ in range(3):
        x = bottleneck_block_participating(x, 128, stride=1, beta=beta, gamma=gamma)
    
    # Stage 3
    x = bottleneck_block_participating(x, 256, stride=2, beta=beta, gamma=gamma)
    for _ in range(5):
        x = bottleneck_block_participating(x, 256, stride=1, beta=beta, gamma=gamma)
    
    # Stage 4
    x = bottleneck_block_participating(x, 512, stride=2, beta=beta, gamma=gamma)
    for _ in range(2):
        x = bottleneck_block_participating(x, 512, stride=1, beta=beta, gamma=gamma)
    
    # Head
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)
    x = layers.GlobalAveragePooling2D()(x)
    outputs = layers.Dense(classes, activation="softmax", dtype='float32')(x)
    
    return models.Model(inputs, outputs, name="ResNet50V2_Nash_Pruning")

### Pruning統計コールバックの実装

`PruningStatsCallback`クラスを定義します。このコールバックは各エポック後に参加度統計を記録し、Pruningの進行状況を監視します。

In [None]:
# Pruning Statistics Callback

class PruningStatsCallback(tf.keras.callbacks.Callback):
    """
    各エポック後に参加度統計を記録するコールバック
    """
    def __init__(self, threshold=0.01):
        super().__init__()
        self.threshold = threshold
        self.history = {
            'active_filters': [],
            'sparsity': [],
            'mean_participation': []
        }
    
    def on_epoch_end(self, epoch, logs=None):
        total_filters = 0
        active_filters = 0
        participation_sum = 0.0
        
        for layer in self.model.layers:
            if isinstance(layer, ParticipatingConv2D):
                total_filters += layer.filters
                active_filters += layer.get_active_filters(self.threshold)
                participation_sum += tf.reduce_sum(layer.participation).numpy()
        
        sparsity = 1.0 - (active_filters / total_filters) if total_filters > 0 else 0.0
        mean_participation = participation_sum / total_filters if total_filters > 0 else 0.0
        
        self.history['active_filters'].append(active_filters)
        self.history['sparsity'].append(sparsity)
        self.history['mean_participation'].append(mean_participation)
        
        print(f"\n[Pruning Stats] Active: {active_filters}/{total_filters} "
              f"({(1-sparsity)*100:.1f}%), Mean s: {mean_participation:.4f}")

## モデル構築とFLOPs測定

Baselineモデルを構築し、計算量（FLOPs）とパラメータ数を測定します。

この時点では`beta`と`gamma`を小さく設定（`1e-5`）して、ほぼ通常のモデルとして動作させます。

In [None]:
# 5. モデル構築とFLOPs測定
model = build_resnet50_v2_participating(
    input_shape=(64, 64, 3),
    classes=10,
    beta=1e-5,
    gamma=1e-5
)


print("\n" + "="*40)
print("【Baseline (Normal) モデル計算量】")
print("="*40)
try:
    flops_val = calculate_flops(model)
    params = model.count_params()
    print(f"パラメータ数: {params:,}")
    print(f"FLOPs: {flops_val / 10**9:.4f} G (ギガ)")
except Exception as e:
    print(f"FLOPs計算エラー: {e}")
print("="*40 + "\n")

## モデルの学習

モデルをコンパイルし、訓練を実行します。

### 設定
- **オプティマイザ**: Adam
- **損失関数**: Sparse Categorical Crossentropy
- **コールバック**:
  - `PruningStatsCallback`: 各エポック後にPruning統計を記録
  - `EarlyStopping`: 検証損失が改善しない場合に訓練を早期終了（patience=15）

### 注意
`add_loss()`メソッドにより、L1/L2正則化項が自動的に損失関数に追加されます。

In [None]:
# 6. 学習実行
pruning_stats = PruningStatsCallback(threshold=0.01)
# 1. compile
model.compile(
    optimizer='adam',
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

# 2. callbacks
pruning_stats = PruningStatsCallback(threshold=0.01)
early_stop = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss",
    patience=15,
    restore_best_weights=True
)

# 3. fit（1回だけ）
history = model.fit(
    train_ds,
    validation_data=test_ds,
    epochs=50,
    callbacks=[early_stop, pruning_stats]
)


## 学習結果の確認

学習履歴から基本的な統計情報を表示します。

In [None]:
# 7. 結果の可視化（個別保存版）
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs_range = range(len(acc))

print(f"\n{'='*50}")
print(f"学習終了: 全{len(acc)}エポック (0-{len(acc)-1})")
print(f"最終 Val Accuracy: {val_acc[-1]:.4f}")
print(f"最終 Val Loss: {val_loss[-1]:.4f}")
print(f"{'='*50}\n")

## Accuracyの可視化

訓練データと検証データのAccuracyを時系列でプロットします。

In [None]:
# Accuracyグラフ
plt.figure(figsize=(8, 5))
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title(f'Baseline Accuracy (Final: {val_acc[-1]:.4f})')
plt.legend(loc='lower right')
plt.grid(True, alpha=0.3)
plt.xlim(0, len(acc))
plt.xticks(range(0, len(acc), 2))
plt.tight_layout()
plt.savefig('baseline_accuracy.png', dpi=150, bbox_inches='tight')
plt.show()


## Lossの可視化

訓練データと検証データのLossを時系列でプロットします。

In [None]:
# Lossグラフ
plt.figure(figsize=(8, 5))
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title(f'Baseline Loss (Final: {val_loss[-1]:.4f})')
plt.legend(loc='upper right')
plt.grid(True, alpha=0.3)
plt.xlim(0, len(acc))
plt.xticks(range(0, len(acc), 2))
plt.tight_layout()
plt.savefig('baseline_loss.png', dpi=150, bbox_inches='tight')
plt.show()

## モデルの保存

学習済みモデルと可視化結果を保存します。

- **モデルファイル**: `resnet50v2_baseline.h5`
- **グラフ**: `baseline_accuracy.png`, `baseline_loss.png`

これらのファイルは後の比較実験で使用されます。

In [None]:
# 保存（後の比較用）
model.save("resnet50v2_baseline.h5")
print("\n保存完了:")
print("  - baseline_accuracy.png")
print("  - baseline_loss.png")
print("  - resnet50v2_baseline.h5")