# 第3回講義 宿題

## 課題

今回のLessonで学んだことを元に，MNISTのファッション版 (Fashion MNIST，クラス数10) を多層パーセプトロンによって分類してみましょう．

Fashion MNISTの詳細については以下のリンクを参考にしてください．

Fashion MNIST: https://github.com/zalandoresearch/fashion-mnist

### 目標値

Accuracy 85%

### ルール

- 訓練データは`x_train`， `t_train`，テストデータは`x_test`で与えられます．
- 予測ラベルは one_hot表現ではなく0~9のクラスラベル で表してください．
- **下のセルで指定されている`x_train`，`t_train`以外の学習データは使わないでください．**
- **多層パーセプトロンのアルゴリズム部分は第3回の演習を参考に，NumPyのみで実装してください．** (sklearnやtensorflowなどは使用しないでください)．
    - データの前処理部分でsklearnの関数を使う (例えば `sklearn.model_selection.train_test_split`) のは問題ありません．

### 提出方法
- 2つのファイルを提出していただきます．
    1. テストデータ (`x_test`) に対する予測ラベルを`submission_pred.csv`として保存し，**Omnicampusの宿題タブから「第3回 ニューラルネットワーク基礎」を選択して**提出してください．
    2. それに対応するpythonのコードを`submission_code.py`として保存し，**Omnicampusの宿題タブから「第3回 ニューラルネットワーク基礎 (code)」を選択して**提出してください．pythonファイル自体の提出ではなく，「提出内容」の部分にコードをコピー&ペーストしてください．
      
- なお，採点は1で行い，2はコードの確認用として利用します（成績優秀者はコード内容を公開させていただくかもしれません）．コードの内容を変更した場合は，**1と2の両方を提出し直してください**．

### 評価方法
- 予測ラベルの`t_test`に対する精度 (Accuracy) で評価します．
- 即時採点しLeader Boardを更新します（採点スケジュールは別アナウンス）．
- 締切時の点数を最終的な評価とします．

### ドライブのマウント

In [None]:
from google.colab import drive
drive.mount('/content/drive')

### データの読み込み（このセルは修正しないでください）

In [None]:
import os
import numpy as np
import pandas as pd
from sklearn.utils import shuffle
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
import inspect


#学習データ
x_train = np.load('drive/MyDrive/Colab Notebooks/DLBasics2025_colab/Lecture03/data/x_train.npy')
t_train = np.load('drive/MyDrive/Colab Notebooks/DLBasics2025_colab/Lecture03/data/y_train.npy')

#テストデータ
x_test = np.load('drive/MyDrive/Colab Notebooks/DLBasics2025_colab/Lecture03/data/x_test.npy')

# データの前処理（正規化， one-hot encoding)
x_train, x_test = x_train / 255., x_test / 255.
x_train, x_test = x_train.reshape(x_train.shape[0], -1), x_test.reshape(x_test.shape[0], -1)
t_train = np.eye(N=10)[t_train.astype("int32").flatten()]

In [1]:
import os
import numpy as np
import pandas as pd
from sklearn.utils import shuffle
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
import inspect


#学習データ
x_train = np.load('x_train.npy')
t_train = np.load('y_train.npy')

#テストデータ
x_test = np.load('x_test.npy')

# データの前処理（正規化， one-hot encoding)
x_train, x_test = x_train / 255., x_test / 255.
x_train, x_test = x_train.reshape(x_train.shape[0], -1), x_test.reshape(x_test.shape[0], -1)
t_train = np.eye(N=10)[t_train.astype("int32").flatten()]

### 多層パーセプトロンの実装

In [2]:
# データの分割
x_train, x_val, t_train, t_val =\
    train_test_split(x_train, t_train, test_size=10000)

In [3]:
def np_log(x):
    return np.log(np.clip(x, 1e-10, 1e+10))


def create_batch(data, batch_size):
    """
    :param data: np.ndarray，入力データ
    :param batch_size: int，バッチサイズ
    """
    num_batches, mod = divmod(data.shape[0], batch_size)
    batched_data = np.split(data[: batch_size * num_batches], num_batches)
    if mod:
        batched_data.append(data[batch_size * num_batches:])

    return batched_data

In [None]:
# シード値を変えることで何が起きるかも確かめてみてください．
rng = np.random.RandomState(1234)
random_state = 42


# 発展: 今回の講義で扱っていない活性化関数について調べ，実装してみましょう
def relu(x):
    return np.maximum(0, x)


def deriv_relu(x):
    return (x > 0).astype(x.dtype)


def softmax(x):
    x -= x.max(axis=1, keepdims=True)
    x_exp = np.exp(x)
    return x_exp / x_exp.sum(axis=1, keepdims=True)


def deriv_softmax(x):
    s = softmax(x)
    return s * (1 - s)


def crossentropy_loss(t, y):
    return (-np.sum(t * np_log(y), axis=1)).mean()

class Dense:
    def __init__(self, in_dim, out_dim, function, deriv_function):
        self.W = np.random.uniform(low=-0.08, high=0.08, size=(in_dim, out_dim)).astype("float64")
        self.b = np.zeros(out_dim).astype("float64")
        self.function = function
        self.deriv_function = deriv_function
        
        self.x = None
        self.u = None

        self.dW = None
        self.db = None

        self.params_idxs = np.cumsum([self.W.size, self.b.size])
    
    def __call__(self, x):
        self.x = x
        self.u = np.matmul(self.x, self.W) + self.b
        h = self.function(self.u)
        return h
    
    def b_prop(self, delta, W):
        self.delta = self.deriv_function(self.u) * np.matmul(delta, W.T)
        return self.delta
    
    def compute_grad(self):
        batch_size = self.delta.shape[0]
        self.dW = np.matmul(self.x.T, self.delta) / batch_size
        self.db = np.matmul(np.ones(batch_size), self.delta) / batch_size
    
    def get_params(self):
        return np.concatenate([self.W.ravel(), self.b], axis=0)
    
    def set_params(self, params):
        _W, _b = np.split(params, self.params_idxs[:-1])
        self.W = _W.reshape(self.W.shape)
        self.b = _b

    def get_grads(self):
        return np.concatenate([self.dW.ravel(), self.db], axis=0)

class Model:
    def __init__(self, hidden_dims, activation_functions, deriv_functions):
        self.layers = []
        for i in range(len(hidden_dims) - 2):
            self.layers.append(Dense(hidden_dims[i], hidden_dims[i + 1], activation_functions[i], deriv_functions[i]))
        self.layers.append(Dense(hidden_dims[-2], hidden_dims[-1], activation_functions[-1], deriv_functions[-1]))
    
    def __call__(self, x):
        return self.forward(x)

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x
    
    def backward(self, delta):
        batch_size = delta.shape[0]

        for i, layer in enumerate(self.layers[::-1]):
            if i == 0:
                layer.delta = delta
                layer.compute_grad()
            else:
                delta = layer.b_prop(delta, W)
                layer.compute_grad()
            
            W = layer.W
    
    def update(self, eps=0.01):
        for layer in self.layers:
            layer.W -= eps * layer.dW
            layer.b -= eps * layer.db
lr = 0.01
n_epochs = 100
batch_size = 32


### モデルの学習

In [29]:
def train_model(mlp, x_train, t_train, x_val, t_val, n_epochs=10):
    best_valid_cost = float('inf')
    patience = 0
    patience_threshold = 3
    improvement_threshold = 1e-4
    eps_reduction_factor = 0.8
    eps_minimum_threshold = 0.001
    eps = 0.05
    for epoch in range(n_epochs):
        losses_train = []
        losses_valid = []
        train_num = 0
        train_true_num = 0
        valid_num = 0
        valid_true_num = 0

        x_train, t_train = shuffle(x_train, t_train)
        x_train_batches, t_train_batches = create_batch(x_train, batch_size), create_batch(t_train, batch_size)

        x_val, t_val = shuffle(x_val, t_val)
        x_val_batches, t_val_batches = create_batch(x_val, batch_size), create_batch(t_val, batch_size)

        # モデルの訓練
        for x, t in zip(x_train_batches, t_train_batches):
            # 順伝播
            y = mlp(x)

            # 損失の計算
            loss = crossentropy_loss(t, y)
            losses_train.append(loss.tolist())

            # パラメータの更新
            delta = y - t
            mlp.backward(delta)
            mlp.update(eps)

            # 精度を計算
            acc = accuracy_score(t.argmax(axis=1), y.argmax(axis=1), normalize=False)
            train_num += x.shape[0]
            train_true_num += acc

        # モデルの評価
        for x, t in zip(x_val_batches, t_val_batches):
            # 順伝播
            y = mlp(x)

            # 損失の計算
            loss = crossentropy_loss(t, y)
            losses_valid.append(loss.tolist())

            acc = accuracy_score(t.argmax(axis=1), y.argmax(axis=1), normalize=False)
            valid_num += x.shape[0]
            valid_true_num += acc

        print('EPOCH: {}, Train [Loss: {:.3f}, Accuracy: {:.3f}], Valid [Loss: {:.3f}, Accuracy: {:.3f}]'.format(
            epoch,
            np.mean(losses_train),
            train_true_num/train_num,
            np.mean(losses_valid),
            valid_true_num/valid_num
        ))

        valid_cost = np.mean(losses_valid)
        if best_valid_cost - valid_cost > improvement_threshold:
            best_valid_cost = valid_cost
            patience = 0
        else:
            patience += 1

        if patience > patience_threshold and eps*eps_reduction_factor > eps_minimum_threshold:
            eps *= eps_reduction_factor
            patience = 0
            print('Learning rate reduced to {:.5f}'.format(eps))

mlp = Model(
    hidden_dims=[784, 100, 100, 10],
    activation_functions=[relu, relu, softmax],
    deriv_functions=[deriv_relu, deriv_relu, deriv_softmax]
)

n_epochs = 50
train_model(mlp, x_train, t_train, x_val, t_val, n_epochs)

EPOCH: 0, Train [Loss: 0.726, Accuracy: 0.738], Valid [Loss: 0.538, Accuracy: 0.811]
EPOCH: 1, Train [Loss: 0.458, Accuracy: 0.836], Valid [Loss: 0.567, Accuracy: 0.799]
EPOCH: 2, Train [Loss: 0.407, Accuracy: 0.852], Valid [Loss: 0.407, Accuracy: 0.854]
EPOCH: 3, Train [Loss: 0.376, Accuracy: 0.863], Valid [Loss: 0.386, Accuracy: 0.858]
EPOCH: 4, Train [Loss: 0.356, Accuracy: 0.870], Valid [Loss: 0.385, Accuracy: 0.860]
EPOCH: 5, Train [Loss: 0.339, Accuracy: 0.876], Valid [Loss: 0.342, Accuracy: 0.876]
EPOCH: 6, Train [Loss: 0.327, Accuracy: 0.879], Valid [Loss: 0.336, Accuracy: 0.877]
EPOCH: 7, Train [Loss: 0.313, Accuracy: 0.885], Valid [Loss: 0.341, Accuracy: 0.872]
EPOCH: 8, Train [Loss: 0.304, Accuracy: 0.887], Valid [Loss: 0.339, Accuracy: 0.872]
EPOCH: 9, Train [Loss: 0.293, Accuracy: 0.891], Valid [Loss: 0.350, Accuracy: 0.869]
EPOCH: 10, Train [Loss: 0.285, Accuracy: 0.894], Valid [Loss: 0.348, Accuracy: 0.875]
Learning rate reduced to 0.04000
EPOCH: 11, Train [Loss: 0.268, 

In [30]:
t_pred = []
for x in x_test:
    # 順伝播
    x = x[np.newaxis, :]
    y = mlp(x)

    # モデルの出力を予測値のスカラーに変換
    pred = y.argmax(1).tolist()

    t_pred.extend(pred)

submission = pd.Series(t_pred, name='label')
submission.to_csv('submission_pred.csv', header=True, index_label='id')