In [1]:
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler
from sklearn.utils.validation import check_array, check_X_y

import japanize_matplotlib  # noqa
import matplotlib.pyplot as plt
import seaborn as sns  # データ可視化ライブラリ
from lightning.pytorch import seed_everything

from sklearn.decomposition import PCA
from scipy import stats
import pandas as pd
from sklearn.cluster import KMeans
from abc import ABCMeta, abstractmethod

from numpy.typing import NDArray

from typing import Optional

plt.style.use("ggplot")
seed_everything(8)


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.0.1 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "c:\Users\HaruMomozu\Desktop\momozu\ABtesting\.venv\Lib\site-packages\ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "c:\Users\HaruMomozu\Desktop\momozu\ABtesting\.venv\Lib\site-packages\traitlets\config\application.py", line 1075, in launch_instance
    app.start()
  File "c:\Users\HaruMomozu\Desktop\momozu\ABtesting\.venv\Lib\site-packages\ipykernel\kernelapp.py", line 739, in

8

# データの前処理

In [2]:
# 外れ値の除去
def remove_outliers_zscore(data:pd.DataFrame, metric: str, threshold: float =2) -> pd.DataFrame:
    z_scores = np.abs(stats.zscore(data[metric]))
    data = data[(z_scores < threshold)]
    return data

In [3]:
df1 = pd.read_csv(
    R"C:\Users\HaruMomozu\Documents\オンラインデータ\NHANES_age_prediction.csv"
)
df1 = df1.drop(columns=["SEQN", "age_group"])

obj1 = "BMXBMI"
features_list1 = [
    "RIDAGEYR",  # 年齢（連続変数）
    "RIAGENDR",  # 性別（1:Male, 2:Female)
    "PAQ605",  # 運動有無(1:日常的に運動する, 2:運動しない)
    "LBXGLU",  # 断食後の血糖値（連続変数）
    "DIQ010",  # 糖尿病の有無(0:なし、1:あり)
    "LBXGLT",  # 口内の健康状態（連続変数）
    "LBXIN",  # 血中インスリン濃度（連続変数）
]
df1 = df1[df1["PAQ605"] != 7.0]

In [4]:
# df2 = pd.read_csv(
#     R"C:\Users\HaruMomozu\Documents\オンラインデータ\OnlineNewsPopularity\OnlinenewsPopularity.csv"
# )
# df2 = df2.drop(columns=["url"])
# df2 = df2.drop(columns=[" timedelta"])

# obj2 = " shares"
# features_list2 = [col for col in list(df2.columns) if col != " shares"]

In [5]:
# df3 = pd.read_csv(
#     R"C:\Users\HaruMomozu\Documents\オンラインデータ\USCensus1990.data.txt",
#     delimiter=",",
# )

# obj3 = "iFertil"
# features_list3 = [col for col in list(df3.columns) if col != obj3]
# features_list3_20 = features_list3[:20]
# print(features_list3_20)

In [6]:
# df4 = pd.read_csv(
#     R"C:\Users\HaruMomozu\Documents\aug_first_cpn_data_for_ab_test_sensibility_tsukuba.csv"
# )

# obj4 = "GMV"
# features_list4 = [
#     "hist_4_day_buy_num",
#     "hist_4_day_gmv",
#     "his_4_day_is_buy",
#     "hist_30_day_buy_days",
#     "hist_30_day_buy_num",
#     "hist_30_day_gmv",
#     "hist_30_day_buy_recency",
#     "hist_30_day_pay_days",
#     "hist_30_day_atpu",
#     "hist_30_day_gpv",
#     "hist_30_day_pay_recency",
#     "hist_30_day_list_days",
#     "hist_30_day_list_num",
#     "hist_30_day_list_recency",
#     "hist_30_day_like_count",
#     "hist_30_day_like_count_not_deleted",
#     "hist_30_day_like_recency",
# ]

In [7]:
df = df1  # 選ぶ
obj = obj1  # 選ぶ
features_list = features_list1  # 選ぶ

df = remove_outliers_zscore(df, obj)

X = df[features_list]
# 数値列の標準化
scaler = StandardScaler()
scaled_features = scaler.fit_transform(X)
X_scaled = pd.DataFrame(scaled_features, columns=features_list)

y = df[obj]  # 目的変数

# 行を詰める
df = df.reset_index(drop=True)

# FSSEM でクラスタリング

Wrapperクラス

In [8]:
clusters = 5
n_features_to_select = 7  # 選択したい特徴量の数

In [24]:
class Wrapper(BaseEstimator, TransformerMixin):
    def __init__(
        self,
        n_features_to_select: int,
        n_clusters: int,
        criterion: str = "ml",
        clustering_method: str = "em",
        random_state: int = 0,
    ):
        self.n_features_to_select = n_features_to_select  # 特徴量数
        self.n_clusters = n_clusters  # クラスタ数
        self.criterion = criterion  # 特徴量選択基準
        self.clustering_method = clustering_method  # クラスタリング手法
        self.random_state = random_state

    def FSS(self, X: pd.DataFrame, y: pd.DataFrame) -> "Wrapper":
        X, y = check_X_y(X, y)

        n_features = X.shape[1]  # 総特徴量数
        self.selected_features_ = []  # ここに選択した特徴量を入れる

        # 選ばれた特徴量と残っている特徴量の初期化
        current_features = []
        remaining_features = list(range(n_features))
        best_score = -np.inf

        while (
            len(current_features) < self.n_features_to_select
        ):  # これだと特徴量が５個選ばれるまで続く
            # print(current_features)
            # best_score = -np.inf  # best score初期化（-∞）　#上に書く
            best_feature = None  # 選ぶ特徴量の初期化

            for feature in remaining_features:
                temp_features = tuple(
                    current_features + [feature]
                )  # 特徴量をひとつ加え、score計算

                score = self.CRIT(X[:, temp_features])

                if score > best_score:
                    best_score = score
                    best_feature = feature

            if best_feature is not None:
                current_features.append(
                    best_feature
                )  # best feature をcurrent features に追加
                remaining_features.remove(
                    best_feature
                )  # best feature をremaining features から取り除く
                self.selected_features_ = current_features
            else:
                break

        # 選ばれた特徴量サブセットでクラスタリング
        final_features = X[:, self.selected_features_]
        if self.clustering_method == "em":
            self.final_model_ = GaussianMixture(
                n_components=self.n_clusters, random_state=self.random_state
            )
        elif self.clustering_method == "kmeans":
            self.final_model_ = KMeans(
                n_clusters=self.n_clusters, random_state=self.random_state
            )
        else:
            raise ValueError(f"Unknown clustering method: {self.clustering_method}")

        self.final_model_.fit(final_features)
        self.final_cluster_assignments_ = self.final_model_.predict(final_features)

        return self

    def CRIT(self, X: pd.DataFrame) -> float:
        if self.clustering_method == "em":
            em = GaussianMixture(
                n_components=self.n_clusters,
                random_state=self.random_state,
                # init_params="kmeans",
            )
            em.fit(X)
            labels = em.predict(X)

            if self.criterion == "tr":
                means = em.means_  # 平均ベクトル
                covariances = em.covariances_  # 共分散行列
                weights = em.weights_  # 混合比率
                overall_mean = np.sum(
                    weights[:, np.newaxis] * means, axis=0
                )  # 標本平均 #np.newaxisを使って1次元配列から2次元配列にする

                S_W = np.sum(weights[:, np.newaxis, np.newaxis] * covariances, axis=0)
                S_B = np.sum(
                    weights[:, np.newaxis, np.newaxis]
                    * np.einsum(
                        "...i,...j->...ij", means - overall_mean, means - overall_mean
                    ),
                    axis=0,
                )
                score = np.trace(np.linalg.solve(S_W, S_B))

            elif self.criterion == "ml":
                score = em.score(X)

        if self.clustering_method == "kmeans":
            kmeans = KMeans(
                n_clusters=self.n_clusters,
                random_state=self.random_state,
            )
            kmeans.fit(X)
            labels = kmeans.predict(X)
            if self.criterion == "tr":
                labels = kmeans.labels_
                cluster_centers = kmeans.cluster_centers_

                sw_i_list = []
                for i in range(self.n_clusters):
                    cluster_points = X[labels == i]

                    if cluster_points.shape[0] <= 2:
                        # データポイントが1つの場合はゼロ行列を使用
                        sw_i = np.zeros((X.shape[1], X.shape[1])) + 1e-7
                    else:
                        # 共分散行列を計算し、スカラー値ではなく2次元行列になることを保証
                        sw_i = np.cov(cluster_points, rowvar=False) * np.sum(
                            labels
                            == i  # データ数を重みに使う代わりにデータの割合を使う
                        )
                        if np.isscalar(sw_i):  # スカラー値のとき
                            sw_i = np.array([[sw_i]])
                    sw_i_list.append(sw_i)

                # 全クラスターの S_W を合計
                S_W = np.sum(sw_i_list, axis=0)

                # クラスター間散布行列 S_B を計算
                overall_mean = np.mean(X, axis=0)
                S_B = sum(
                    np.sum(labels == i)  # 割合にする
                    * np.outer(
                        cluster_centers[i] - overall_mean,
                        cluster_centers[i] - overall_mean,
                    )
                    # *(cluster_centers[i] - overall_mean) @ (cluster_centers[i] - overall_mean).T
                    for i in range(self.n_clusters)
                )

                # 散乱分離性を計算
                score = np.trace(np.linalg.solve(S_W, S_B))

            elif self.criterion == "ml":
                score = -kmeans.score(X)

        return score

    def get_feature_index_out(self) -> NDArray:
        return np.array(self.selected_features_)  # 選択された特徴量のインデックス

    def get_final_cluster_assignments(self) -> NDArray:
        return self.final_cluster_assignments_  # 最終的なクラスタリング結果

Wrapperクラス確認

In [27]:
import warnings
from sklearn.exceptions import ConvergenceWarning


clusters = 5
n_features_to_select = 5  # 選択したい特徴量の数

fssem_tr = Wrapper(
    n_features_to_select=n_features_to_select,
    n_clusters=clusters,
    criterion="tr",
    clustering_method="em",
    random_state=0,
)
fssem_ml = Wrapper(
    n_features_to_select=n_features_to_select,
    n_clusters=clusters,
    criterion="ml",
    clustering_method="em",
    random_state=0,
)
fsskmeans_tr = Wrapper(
    n_features_to_select=n_features_to_select,
    n_clusters=clusters,
    criterion="tr",
    clustering_method="kmeans",
    random_state=0,
)
fsskmeans_ml = Wrapper(
    n_features_to_select=n_features_to_select,
    n_clusters=clusters,
    criterion="ml",
    clustering_method="kmeans",
    random_state=0,
)

instance_dict = {
    "fssem_tr": fssem_tr,
    "fssem_ml": fssem_ml,
    "fsskmeans_tr": fsskmeans_tr,
    "fsskmeans_ml": fsskmeans_ml,
}

In [28]:
selected_features_index_dict = {}
cluster_label_dict = {}
cluster_size_dict = {}
for name, instance in instance_dict.items():
    instance.FSS(X_scaled, y)
    selected_features_index = instance.get_feature_index_out()
    selected_features_index_dict[name] = selected_features_index
    cluster_label = instance.get_final_cluster_assignments()
    cluster_label_dict[name] = cluster_label
    cluster_size = np.unique(cluster_label, return_counts=True)[1]
    cluster_size_dict[name] = cluster_size

    print(selected_features_index)

[4 1]
[4 2 1]
[4 2]
[3 6 0 1 4]


# 層化抽出

In [12]:
class BaseAllocation(metaclass=ABCMeta):  # 抽象基底クラス（ABC）
    # 初期化クラス（n_samples(標本サイズ), H(クラスタ数)）
    def __init__(
        self,
        n_samples: int,
        H: int,
        random_state: int,
        criterion: str,
        clustering_method: str,
    ):
        self.n_samples = n_samples
        self.H = H
        self.random_state = random_state
        self.criterion = criterion
        self.clustering_method = clustering_method

    @abstractmethod
    def solve(self, X: NDArray, y: NDArray) -> NDArray:
        """標本配分を解く

        Args:
            X (NDArray): データ (N x M)
            y (NDArray): 目的変数 (N)

        Raises:
            NotImplementedError: _description_

        Returns:
            NDArray: 各クラスタの標本数 (H, )

        Note:
            M: 特徴量数
            H: クラスタ数
        """
        # 具象クラスがsolveメゾッドを実装しない場合はNotImpleamentedErrorが発生
        raise NotImplementedError

    def clustering(self, X: NDArray) -> tuple[NDArray, NDArray]:
        if self.criterion == "tr" and self.clustering_method == "em":
            cluster_label = cluster_label_dict["fssem_tr"]
            cluster_size = cluster_size_dict["fssem_tr"]
        if self.criterion == "ml" and self.clustering_method == "em":
            cluster_label = cluster_label_dict["fssem_ml"]
            cluster_size = cluster_size_dict["fssem_ml"]
        if self.criterion == "tr" and self.clustering_method == "kmeans":
            cluster_label = cluster_label_dict["fsskmeans_tr"]
            cluster_size = cluster_size_dict["fsskmeans_tr"]
        if self.criterion == "ml" and self.clustering_method == "kmeans":
            cluster_label = cluster_label_dict["fsskmeans_ml"]
            cluster_size = cluster_size_dict["fsskmeans_ml"]
        # インスタンス変数として設定
        print(cluster_size)
        self.cluster_label = cluster_label
        self.N = cluster_size
        return cluster_label, cluster_size

In [13]:
class RandomAllocation(BaseAllocation):
    # 抽象メゾッドを具象化
    def solve(self, X: NDArray, y: NDArray) -> NDArray:
        """ランダムにn_samplesの標本を選択する"""
        n = np.array([self.n_samples])

        return n  # （例）n=[標本サイズ]

    def clustering(self, X: NDArray) -> tuple[NDArray, NDArray]:
        # cluster_labelのすべての要素は0（すべてのデータを同じクラスタに属させている）
        cluster_label = np.zeros(
            X.shape[0]
        )  # cluster_label = [0,0,0,,...(要素数：データ数）]
        # クラスタサイズ＝データ数
        cluster_size = np.array([len(cluster_label)])  # cluster_size=[データ数]
        return cluster_label, cluster_size

In [14]:
class ProportionalAllocation(BaseAllocation):
    def solve(self, X: NDArray, y: NDArray) -> NDArray:
        """各クラスタ数に比例した標本数で分割する"""
        n: NDArray = np.round(self.N / self.N.sum() * self.n_samples).astype(int)

        if n.sum() > self.n_samples:
            # nの合計がn_samplesより大きい場合は一番標本数が多いクラスタから削る
            n[np.argmax(n)] -= n.sum() - self.n_samples

        for i in range(len(n)):  # nの要素で2より小さいものがあれば2にする
            if n[i] == 0:
                n[i] = 2
                n[np.argmax(n)] -= 2
            if n[i] == 1:
                n[i] = 2
                n[np.argmax(n)] -= 1

        return n

In [15]:
class PostStratification(BaseAllocation):
    def solve(self, X: NDArray, y: NDArray) -> NDArray:
        """ランダムにn_samplesの標本を選択する"""
        n = np.array([self.n_samples])

        return n  # （例）n=[標本サイズ]

In [16]:
class OptimalAllocation(BaseAllocation):
    def __init__(
        self,
        n_samples: int,
        H: int,
        m: NDArray,  # 標本サイズ下限
        M: Optional[NDArray] = None,  # 標本サイズ上限 #Optional(Noneである可能性がある)
        random_state: int = 0,
        criterion: str = "ml",
        clustering_method: str="kmeans",
    ):
        super().__init__(
            n_samples, H, random_state, criterion, clustering_method
        )  # 基底クラスBaseAllocation（スーパークラス）の初期化メゾッドを呼び出す
        self.m = m  # 各クラスタの最小標本サイズ (H, )
        self.M = M  # 各クラスタの最大標本サイズ (H, ), (指定しない場合はクラスタサイズ)

    def solve(self, X: NDArray, y: NDArray) -> NDArray:
        # S:クラスタ毎の目的変数のvarianceを要素とする配列 (H, )
        S = np.array([np.var(y[self.cluster_label == h]) for h in range(self.H)])
        d = (self.N**2) * S  # (H, )
        n = self._simple_greedy(n=self.m.copy(), d=d)

        # 制約チェック
        self._check_constraints(n)

        return n

    def _simple_greedy(self, n: NDArray, d: NDArray) -> NDArray:
        M = self.M.copy() if self.M is not None else self.N.copy()
        I = np.arange(self.H)  # noqa #クラスタのインデックス
        while (n.sum() != self.n_samples) and len(I) != 0:
            delta = np.zeros(self.H)
            delta[I] = (d / (n + 1) - d / n)[I]
            h_star = np.argmin(delta[I])
            h_star = I[h_star]

            if n[h_star] + 1 <= M[h_star]:
                n[h_star] = n[h_star] + 1
            else:
                # Iの要素h_starを削除
                I_ = I.tolist()
                I_ = [i for i in I_ if i != h_star]
                I = np.array(I_)  # noqa

        return n

    def _check_constraints(self, n: NDArray):
        assert (
            n.sum() <= self.n_samples
        ), f"Total sample size is over than {self.n_samples}"
        assert np.all(n >= self.m), "Minimum sample size constraint is not satisfied"
        if self.M is not None:
            assert np.all(
                n <= self.M
            ), "Maximum sample size constraint is not satisfied"

In [17]:
def estimate_y_mean(n: NDArray, cluster_label: NDArray, y: NDArray) -> NDArray:
    """実際にサンプリングを行って目的変数の平均を推定

    Args:
        n (NDArray): 各クラスタの標本数 (H, )
        cluster_label (NDArray): クラスタラベル (N, )
        y (NDArray): 目的変数 (N, )

    Returns:
        NDArray: 推定された目的変数の平均

    Note:
        N: データ数
        H: クラスタ数
    """
    # cluster_labelからユニークなクラスタラベルを取得し、母集団の各クラスタのサイズNを取得
    N = np.unique(cluster_label, return_counts=True)[1]  # クラスタサイズ (H, )
    weights = N / N.sum()
    y_hat = 0
    for h in range(n.shape[0]):  # n.shape[0]:層の数
        y_cluster = y[cluster_label == h]
        # クラスタ内でランダム n_h サンプリング
        sample: NDArray = np.random.choice(y_cluster, n[h], replace=False)
        y_sample_mean = sample.mean()  # サンプリングした標本の平均
        y_hat += y_sample_mean * weights[h]

    return y_hat


def estimate_y_mean_post(n: NDArray, cluster_label: NDArray, y: NDArray) -> NDArray:
    N = np.unique(cluster_label, return_counts=True)[1]
    weights = N / N.sum()
    y_hat = 0
    indices = np.arange(N.sum())
    y_array = np.array(y.tolist())
    n_indices = np.random.choice(indices, n[0], replace=False)
    n_label = np.array([cluster_label[i] for i in n_indices])
    n_new = np.unique(n_label)
    for h in n_new:
        index = np.where(n_label == h)[0]
        sample = y_array[n_indices[index]]
        y_sample_mean = sample.mean()  # サンプリングした標本の平均
        y_hat += y_sample_mean * weights[h]
    return y_hat

In [18]:
N_SAMPLES = 1000  # 標本サイズ
H = clusters  # クラスタ数が多すぎるとpropotionalがうまくいかない
N_TRIALS = 1000  # 試行回数
m_VALUE = 2  # 各クラスタの最小標本数
RANDOM_STATE = 0  # 乱数シード
CRITERION = "ml"
CLUSTERING_METHOD = "kmeans"

# 戦略を定義
policies: list[BaseAllocation] = [
    RandomAllocation(
        n_samples=N_SAMPLES,
        H=H,
        random_state=RANDOM_STATE,
        criterion=CRITERION,
        clustering_method=CLUSTERING_METHOD,
    ),
    ProportionalAllocation(
        n_samples=N_SAMPLES,
        H=H,
        random_state=RANDOM_STATE,
        criterion=CRITERION,
        clustering_method=CLUSTERING_METHOD,
    ),
    PostStratification(
        n_samples=N_SAMPLES,
        H=H,
        random_state=RANDOM_STATE,
        criterion=CRITERION,
        clustering_method=CLUSTERING_METHOD,
    ),
    OptimalAllocation(
        n_samples=N_SAMPLES,
        H=H,
        random_state=RANDOM_STATE,
        m=np.full(H, m_VALUE),
        M=None,
        criterion=CRITERION,
        clustering_method=CLUSTERING_METHOD,
    ),
]

# それぞれの戦略で各クラスタの標本数を求解
allocations: list[dict] = []  # 各戦略の実行結果が辞書形式で追加される
for policy in policies:
    # policyを用いてXをクラスタリング
    cluster_label, _ = policy.clustering(X_scaled)
    n = policy.solve(X_scaled, y)
    allocations.append(
        {
            "policy": policy.__class__.__name__,
            "n": n,
            "cluster_label": cluster_label,
        }
    )

# 各戦略の標本数に基づいて目的変数の平均を推定
y_hats = []
for random_state in range(N_TRIALS):
    for allocation in allocations:
        if allocation["policy"] == "PostStratification":
            y_hat = estimate_y_mean_post(
                allocation["n"], allocation["cluster_label"], y
            )
        else:
            y_hat = estimate_y_mean(allocation["n"], allocation["cluster_label"], y)
        y_hats.append(
            {
                "policy": allocation["policy"],
                "y_hat": y_hat,
                "random_state": random_state,
            }
        )

[976 560 160 463  10]
[976 560 160 463  10]
[976 560 160 463  10]


In [19]:
y_hat_df = pd.DataFrame(y_hats)
y_hat_df["error"] = (
    y_hat_df["y_hat"] - y.mean()
)  # 真の平均からの誤差をerrorカラムに追加

# random_allocationの誤差分散
random_allocation_std = y_hat_df[y_hat_df["policy"] == "RandomAllocation"][
    "error"
].var()
# random_allocation以外の誤差分散
non_random_allocation_std = (
    y_hat_df[y_hat_df["policy"] != "RandomAllocation"].groupby("policy")["error"].var()
)

# 削減率
reduction_rate = (1 - non_random_allocation_std / random_allocation_std) * 100

## policyの順番をpoliciesの順番に調整
reduction_rate = reduction_rate.reindex(
    [policy.__class__.__name__ for policy in policies]
)

print(
    "[criteion :",
    CRITERION,
    ", clustering_method:",
    CLUSTERING_METHOD,
    "のときの各手法の誤差分散削減率]",
)
print(reduction_rate)

[criteion : ml , clustering_method: kmeans のときの各手法の誤差分散削減率]
policy
RandomAllocation                NaN
ProportionalAllocation    -3.551617
PostStratification         7.815010
OptimalAllocation         15.271639
Name: error, dtype: float64


In [20]:
print(
    "[criterion:",
    CRITERION,
    ", clustering_method:",
    CLUSTERING_METHOD,
    " のときの各手法のvar]",
)

policy_name_list = [policy.__class__.__name__ for policy in policies]
for i in range(len(policy_name_list)):
    var = y_hat_df[y_hat_df["policy"] == policy_name_list[i]]["error"].var()
    print(policy_name_list[i], var)


[criterion: ml , clustering_method: kmeans  のときの各手法のvar]
RandomAllocation 0.015422291865375368
ProportionalAllocation 0.01597003253677609
PostStratification 0.014217038153814181
OptimalAllocation 0.013067055122034868
