### 前準備


In [8]:
from collections.abc import Callable
from typing import Literal, Self, Protocol

import ibis
from ibis import _ as col
import jax.numpy as jnp
from jax import random
import jaxtyping as jnpt
import numpy as np
import nptyping as npt
import pandas as pl
import plotly.express as px
import plotly.graph_objects as go

In [9]:
ibis.set_backend("polars")
ibis.options.interactive = True

# 2 値分類


### テスト関数


In [10]:
class Classifier[N: int, P: int](Protocol):
    learning_rate: float
    n_epochs: int
    loss_by_epochs: list[float]

    def fit(self, X: jnpt.Float32[jnpt.Array, "N P"], y: jnpt.Int32[jnpt.Array, "N 1"]) -> Self:
        ...

    def predict(self, X: jnpt.Float32[jnpt.Array, "N P"]) -> jnpt.Int32[jnpt.Array, "N"]:
        ...

In [11]:
def show_fig(fig: go.Figure, title: str) -> None:
    fig.update_layout(title=title, height=500, width=700)
    fig.show()

In [12]:
def plot_classifier_loss[N: int, P: int](trained_classifier: Classifier[N, P]) -> None:
    misclassification_df: pl.DataFrame = ibis.memtable(
        {
            "Epochs": list(range(1, len(trained_classifier.loss_by_epochs) + 1)),
            "Loss": trained_classifier.loss_by_epochs,
        }
    ).execute()

    show_fig(
        fig=px.line(misclassification_df, x="Epochs", y="Loss", markers=True),
        title=f"{type(trained_classifier).__name__} - Learning rate {trained_classifier.learning_rate}, Epochs {trained_classifier.n_epochs}",
    )

In [13]:
def plot_decision_regions[N: int, P: int](  # TODO: Refactoring
    X: jnpt.Float[jnpt.Array, "N P"],
    y: jnpt.Int[jnpt.Array, "1 P"],
    classifier: Classifier[N, P],
    resolution: float = 0.02,
) -> None:
    x1_min, x1_max = X[:, 0].min() - 1, X[:, 0].max() + 1
    x2_min, x2_max = X[:, 1].min() - 1, X[:, 1].max() + 1
    xx1, xx2 = jnp.meshgrid(jnp.arange(x1_min, x1_max, resolution), jnp.arange(x2_min, x2_max, resolution))
    Z = classifier.predict(jnp.array([xx1.ravel(), xx2.ravel()]).T)
    Z = Z.reshape(xx1.shape)

    contour = go.Contour(x=xx1[0], y=xx2[:, 0], z=Z, showscale=False, colorscale="Viridis")

    scatter = go.Scatter(
        x=X[:, 0],
        y=X[:, 1],
        mode="markers",
        marker={"color": y, "colorscale": "Viridis", "line_width": 1},
        showlegend=False,
    )

    fig = go.Figure(data=[contour, scatter])
    fig.update_layout(
        title="Decision regions",
        xaxis_title="sepal length [cm]",
        yaxis_title="petal length [cm]",
        legend_title="Classes",
        margin={"l": 50, "r": 50, "b": 100, "t": 100, "pad": 4},
        height=500,
        width=700,
    )
    fig.show()

### テストデータ


In [14]:
iris_dataset_path = "../data/iris.csv"
iris_table = ibis.read_csv(iris_dataset_path)
iris_table.head()

In [15]:
iris_df: pl.DataFrame = iris_table.filter((col.variety == "Setosa") | (col.variety == "Versicolor")).execute()
show_fig(fig=px.scatter(iris_df, x="sepal.length", y="petal.length", color="variety"), title="Iris dataset")

In [16]:
X_table = iris_table.select("sepal.length", "petal.length").limit(100)
X_table.info()

In [17]:
y_table = (
    iris_table.select("variety")
    .filter(col.variety.isin(("Setosa", "Versicolor")))
    .mutate(
        variety=ibis.case().when(col.variety == "Setosa", 0).when(col.variety == "Versicolor", 1).else_(None).end()
    )
)
y_table["variety"].value_counts()

In [18]:
X = jnp.array(X_table.execute().to_numpy())
y = jnp.array(y_table.execute().to_numpy())
X[:5], y[:5]

(Array([[5.1, 1.4],
        [4.9, 1.4],
        [4.7, 1.3],
        [4.6, 1.5],
        [5. , 1.4]], dtype=float32),
 Array([[0],
        [0],
        [0],
        [0],
        [0]], dtype=int8))

In [19]:
X_std = (X - X.mean(axis=0)) / X.std(axis=0)
X_std[:5]

Array([[-0.58106565, -1.0129777 ],
       [-0.89430845, -1.0129777 ],
       [-1.2075521 , -1.0823122 ],
       [-1.3641734 , -0.94364315],
       [-0.73768705, -1.0129777 ]], dtype=float32)

## パーセプトロン


### 論理モデル


In [20]:
type Bit = Literal[0, 1]

In [21]:
def perceptron[N: int, P: int](
    weights: npt.NDArray[npt.Shape["1, P"], npt.Float64], bias: float
) -> Callable[[npt.NDArray[npt.Shape["N, P"], npt.Float64]], Bit]:
    def _wrapper(inputs: npt.NDArray[npt.Shape["N, P"], npt.Float64]) -> Bit:
        net_input = weights @ inputs + bias
        return 0 if net_input < 0 else 1

    return _wrapper

In [22]:
nand = perceptron(np.array((-1.0, -1.0)), 1.5)
or_ = perceptron(np.array((1.0, 1.0)), -0.5)
and_ = perceptron(np.array((1.0, 1.0)), -1.5)


def xor[N: int, P: int](inputs: npt.NDArray[npt.Shape["N, P"], npt.Float64]) -> Bit:
    return and_(np.array((nand(inputs), or_(inputs))))

In [23]:
zero_zero = np.array((0, 0))
zero_one = np.array((0, 1))
one_zero = np.array((1, 0))
one_one = np.array((1, 1))

print(nand(zero_zero), nand(zero_one), nand(one_zero), nand(one_one))
print(or_(zero_zero), or_(zero_one), or_(one_zero), or_(one_one))
print(and_(zero_zero), and_(zero_one), and_(one_zero), and_(one_one))
print(xor(zero_zero), xor(zero_one), xor(one_zero), xor(one_one))

1 1 1 0
0 1 1 1
0 0 0 1
0 1 1 0


### 学習モデル


In [24]:
class Perceptron[N: int, P: int]:
    __slots__ = (
        "learning_rate",
        "n_epochs",
        "__prng_key",
        "__weights",
        "__bias",
        "loss_by_epochs",
    )

    def __init__(self, learning_rate: float = 0.01, n_epochs: int = 50, random_seed: int = 0) -> None:
        self.learning_rate = learning_rate
        self.n_epochs = n_epochs
        self.__prng_key = random.PRNGKey(random_seed)

    def fit(self, X: jnpt.Float32[jnpt.Array, "N P"], y: jnpt.Int32[jnpt.Array, "N 1"]) -> Self:
        n_features = X.shape[1]
        self.__initialize_parameters(n_features)
        self.loss_by_epochs: list[float] = []
        self.__train_by_epochs(X, y)
        return self

    def __initialize_parameters(self, n_features: int) -> None:
        # 重みを0で初期化してしまうと、各特徴量の重みが0または学習率倍になり学習率により各特徴量の重みの比が変化しない
        self.__weights = random.normal(self.__prng_key, shape=(n_features,))  # 小さい乱数値であれば何でも良い
        self.__bias = jnp.float32(0.0)

    def __train_by_epochs(self, X: jnpt.Float32[jnpt.Array, "N P"], y: jnpt.Int32[jnpt.Array, "N 1"]) -> None:
        for _ in range(self.n_epochs):
            self.__train_by_samples(X, y)

    def __train_by_samples(self, X: jnpt.Float32[jnpt.Array, "N P"], y: jnpt.Int32[jnpt.Array, "N 1"]) -> None:
        n_misclassifications = 0
        for features, target_label in zip(X, y, strict=True):
            label_difference = self.__calculate_label_difference(features, target_label)
            self.__update_parameters(features, label_difference)
            is_misclassified = bool(label_difference)
            n_misclassifications += int(is_misclassified)
        self.loss_by_epochs.append(n_misclassifications)

    def __calculate_label_difference(
        self, features: jnpt.Float32[jnpt.Array, "P"], target_label: jnpt.Int32[jnpt.Array, "1"]
    ) -> Literal[-1, 0, 1]:
        target_label_val = target_label.item()
        pred_label = self.predict(features)
        pred_label_val = pred_label.item()
        return target_label_val - pred_label_val

    def predict(self, X: jnpt.Float32[jnpt.Array, "N P"]) -> jnpt.Int32[jnpt.Array, "N"]:
        return jnp.where(self.__net_input(X) < 0, 0, 1)  # printすると整数値が出力されるが実際にはjnp.ndarrayを返す

    def __net_input(self, X: jnpt.Float32[jnpt.Array, "N P"]) -> jnpt.Float32[jnpt.Array, "N 1"]:
        return X @ self.__weights + self.__bias

    def __update_parameters(
        self, features: jnpt.Float32[jnpt.Array, "P"], label_difference: Literal[-1, 0, 1]
    ) -> None:
        parameter_update = self.learning_rate * label_difference
        self.__weights += parameter_update * features
        self.__bias += parameter_update

### テスト


In [25]:
ppn: Perceptron[Literal[100], Literal[2]] = Perceptron(learning_rate=0.1, n_epochs=10)
trained_ppn = ppn.fit(X, y)
plot_classifier_loss(trained_ppn)
plot_decision_regions(X, y, trained_ppn)

## ADALINE(ADapter LInear NEuron)


### 学習モデル


In [26]:
class AdalineGD[N: int, P: int]:
    __slots__ = (
        "learning_rate",
        "n_epochs",
        "__prng_key",
        "__weights",
        "__bias",
        "loss_by_epochs",
    )

    def __init__(self, learning_rate: float = 0.01, n_epochs: int = 50, random_seed: int = 0) -> None:
        self.learning_rate = learning_rate
        self.n_epochs = n_epochs
        self.__prng_key = random.PRNGKey(random_seed)

    def fit(self, X: jnpt.Float32[jnpt.Array, "N P"], y: jnpt.Int32[jnpt.Array, "N 1"]) -> Self:
        n_features = X.shape[1]
        self.__initialize_parameters(n_features)
        self.loss_by_epochs: list[float] = []
        self.__train_by_epochs(X, y)
        return self

    def __initialize_parameters(self, n_features: int) -> None:
        # 重みを0で初期化してしまうと、各特徴量の重みが0または学習率倍になり学習率により各特徴量の重みの比が変化しない
        self.__weights: jnpt.Float32[jnpt.Array, "P"] = random.normal(
            self.__prng_key, shape=(n_features,)
        )  # 小さい乱数値であれば何でも良い
        self.__bias = jnp.float32(0.0)

    def __train_by_epochs(self, X: jnpt.Float32[jnpt.Array, "N P"], y: jnpt.Int32[jnpt.Array, "N 1"]) -> None:
        for _ in range(self.n_epochs):
            errors = self.__calculate_errors(X, y)
            self.__update_parameters(X, errors)
            n_samples = X.shape[0]
            self.loss_by_epochs.append(self.__calculate_mse(errors, n_samples))

    def __calculate_errors(
        self, X: jnpt.Float32[jnpt.Array, "N P"], y: jnpt.Int32[jnpt.Array, "N 1"]
    ) -> jnpt.Float32[jnpt.Array, "N"]:
        return y.flatten() - self.__activation(self.__net_input(X))

    def __activation(self, net_input: jnpt.Float32[jnpt.Array, "N 1"]) -> jnpt.Float32[jnpt.Array, "N"]:
        return net_input

    def __net_input(self, X: jnpt.Float32[jnpt.Array, "N P"]) -> jnpt.Float32[jnpt.Array, "N"]:
        return X @ self.__weights + self.__bias

    def __update_parameters(self, X: jnpt.Float32[jnpt.Array, "N P"], errors: jnpt.Float32[jnpt.Array, "N 1"]) -> None:
        n_samples = X.shape[0]
        self.__weights += 2.0 * self.learning_rate * X.T @ errors / n_samples
        self.__bias += 2.0 * self.learning_rate * errors.mean()

    def __calculate_mse(self, errors: jnpt.Float32[jnpt.Array, "N 1"], n_samples: int) -> float:
        mse_array = (errors**2).sum() / n_samples
        return mse_array.item()

    def predict(self, X: jnpt.Float32[jnpt.Array, "N P"]) -> jnpt.Int32[jnpt.Array, "N"]:
        return jnp.where(
            (self.__activation(self.__net_input(X)) >= 0.5), 1, 0
        )  # printすると整数値が出力されるが実際にはjnp.ndarrayを返す

In [27]:
class AdalineSGD[N: int, P: int]:
    __slots__ = (
        "learning_rate",
        "n_epochs",
        "__prng_key",
        "__is_weights_initialized",
        "__enabled_shuffling",
        "__weights",
        "__bias",
        "loss_by_epochs",
    )

    def __init__(
        self, learning_rate: float = 0.01, n_epochs: int = 50, random_seed: int = 0, shuffle: bool = True
    ) -> None:
        self.learning_rate = learning_rate
        self.n_epochs = n_epochs
        self.__prng_key = random.PRNGKey(random_seed)
        self.__is_weights_initialized = False
        self.__enabled_shuffling = shuffle

    def fit(self, X: jnpt.Float32[jnpt.Array, "N P"], y: jnpt.Int32[jnpt.Array, "N 1"]) -> Self:
        self.__initialize_parameters(n_features=X.shape[1])
        self.loss_by_epochs: list[float] = []
        self.__train_by_epochs(X, y)
        return self

    def __initialize_parameters(self, n_features: int) -> None:
        # 重みを0で初期化してしまうと、各特徴量の重みが0または学習率倍になり学習率により各特徴量の重みの比が変化しない
        self.__weights: jnpt.Float32[jnpt.Array, "P"] = random.normal(
            self.__prng_key, shape=(n_features,)
        )  # 小さい乱数値であれば何でも良い
        self.__bias = jnp.float32(0.0)
        self.__is_weights_initialized = True

    def __train_by_epochs(self, X: jnpt.Float32[jnpt.Array, "N P"], y: jnpt.Int32[jnpt.Array, "N 1"]) -> None:
        for _ in range(self.n_epochs):
            self.__train_by_samples(X, y)

    def __train_by_samples(self, X: jnpt.Float32[jnpt.Array, "N P"], y: jnpt.Int32[jnpt.Array, "N 1"]) -> None:
        if self.__enabled_shuffling:
            X, y = self.__shuffle(X, y)
        losses: list[float] = []
        for features, target_label in zip(X, y, strict=True):
            error = self.__calculate_error(features, target_label)
            self.__update_parameters(features, error)
            losses.append(self.__calculate_loss(error))
        self.loss_by_epochs.append(self.__calculate_avg_loss(losses))

    def __shuffle(
        self, X: jnpt.Float32[jnpt.Array, "N P"], y: jnpt.Int32[jnpt.Array, "N 1"]
    ) -> tuple[jnpt.Float32[jnpt.Array, "N P"], jnpt.Int32[jnpt.Array, "N 1"]]:
        n_samples = X.shape[0]
        shuffled_indices = random.permutation(self.__prng_key, n_samples)
        return X[shuffled_indices], y[shuffled_indices]

    def __calculate_error(
        self, features: jnpt.Float32[jnpt.Array, "P"], target_label: jnpt.Int32[jnpt.Array, "1"]
    ) -> float:
        return (target_label - self.__activation(self.__net_input(features))).item()

    def __activation(self, net_input: jnpt.Float32[jnpt.Array, "1"]) -> jnpt.Float32[jnpt.Array, "1"]:
        return net_input

    def __net_input(self, features: jnpt.Float32[jnpt.Array, "P"]) -> jnpt.Float32[jnpt.Array, "1"]:
        return features @ self.__weights + self.__bias

    def __update_parameters(self, features: jnpt.Float32[jnpt.Array, "P"], error: float) -> None:
        self.__weights += 2.0 * self.learning_rate * error * features
        self.__bias += 2.0 * self.learning_rate * error

    def __calculate_loss(self, error: float) -> float:
        return error**2

    def __calculate_avg_loss(self, losses: list[float]) -> float:
        return jnp.array(losses).mean().item()

    def fit_more(self, X: jnpt.Float32[jnpt.Array, "N P"], y: jnpt.Int32[jnpt.Array, "N 1"]) -> Self:
        if not self.__is_weights_initialized:
            self.__initialize_parameters(n_features=X.shape[1])
        for features, target_label in zip(X, y, strict=True):
            self.__update_parameters(features, self.__calculate_error(features, target_label))
        return self

    def predict(self, X: jnpt.Float32[jnpt.Array, "N P"]) -> jnpt.Int32[jnpt.Array, "N"]:
        return jnp.where(
            (self.__activation(self.__net_input(X)) >= 0.5), 1, 0
        )  # printすると整数値が出力されるが実際にはjnp.ndarrayを返す

### テスト


In [28]:
adaline1: AdalineGD[Literal[100], Literal[2]] = AdalineGD(learning_rate=0.1, n_epochs=15).fit(X, y)
plot_classifier_loss(adaline1)
adaline2: AdalineGD[Literal[100], Literal[2]] = AdalineGD(learning_rate=0.0001, n_epochs=15).fit(X, y)
plot_classifier_loss(adaline2)

In [29]:
ada_gd: AdalineGD[Literal[100], Literal[2]] = AdalineGD(learning_rate=0.1, n_epochs=20).fit(X_std, y)
plot_classifier_loss(ada_gd)
plot_decision_regions(X_std, y, ada_gd)

In [30]:
ada_sgd: AdalineSGD[Literal[100], Literal[2]] = AdalineSGD(learning_rate=0.1, n_epochs=20).fit(X_std, y)
plot_classifier_loss(ada_sgd)
plot_decision_regions(X_std, y, ada_sgd)