# ライブラリのインポート

In [None]:
# TODO(minplu): GitHubフォルダ全体を移動する
# 1

# TODO(minplu): 提出ファイルのフォルダのパスを変更する
# 2


In [None]:
from __future__ import annotations

from copy import deepcopy
from datetime import datetime, timedelta, timezone
from enum import Enum, auto
from logging import (
    DEBUG,
    FileHandler,
    Formatter,
    Logger,
    StreamHandler,
    getLogger,
)
from pathlib import Path

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import MinMaxScaler


# 定数

In [None]:
# データパス
PATH_TRAIN = "data\\titanic\\train.csv"
PATH_TEST = "data\\titanic\\test.csv"

# ログファイルが保存されるフォルダのパス
PATH_LOG = "log"

# csvファイル保存用手法名
LOGISTIC_REGRESSION = "logreg"
RANDOM_FOREST = "rf"


# クラス

## Enum

In [None]:
class NanFillMethod(Enum):
    """欠損値の置換方法の識別用Enumクラス

    Args:
        Enum: 継承するクラス

    """

    MEAN = auto()
    MEDIAN = auto()
    MODE = auto()


## ユーティリティー

### 表示

In [None]:
class DisplayUtility:
    """データ表示用のユーティリティクラス"""

    @staticmethod
    def output_divider(title: str) -> None:
        """区切り線を出力する

        Args:
            title (str): 区切り線の中央に表示する文字列

        """
        print(f"-------------------- {title} --------------------")


### ファイル操作

In [None]:
class FolderUtility:
    """フォルダ・ファイル用ユーティリティークラス"""

    @staticmethod
    def make_folder_if_not_exist(save_folder_name: Path) -> None:
        """同名のフォルダがなければ作成する

        Args:
            save_folder_name (str): フォルダパス

        """
        if not save_folder_name.exists():
            save_folder_name.mkdir()


### ロガー

In [None]:
# TODO(minplu): ロガーの出力メッセージをカスタマイズする
# 4


class LogSetting:
    """ログ設定用のクラス"""

    @classmethod
    def set_logger(cls, log_folder_path: Path) -> Logger:
        """ロガーの出力設定を行う

        Args:
            log_folder_path (Path): ログファイルの保存先フォルダパス

        Returns:
            Logger: ログを出力するためのロガーオブジェクト

        """
        logger = getLogger(__name__)

        stream_handler = cls._generate_stream_handler()

        FolderUtility.make_folder_if_not_exist(log_folder_path)
        file_handler = cls._generate_file_handler()

        logger.addHandler(file_handler)
        logger.addHandler(stream_handler)

        return logger

    @classmethod
    def _generate_stream_handler(cls) -> StreamHandler:
        """ロガーのストリームハンドラーを生成する

        Returns:
            StreamHandler: ロガーのストリームハンドラー

        """
        stream_handler_log_format = (
            "%(asctime)s %(name)s [%(levelname)s] %(funcName)s: %(message)s"
        )

        stream_handler = StreamHandler()
        stream_handler.setLevel(DEBUG)
        stream_formatter = Formatter(stream_handler_log_format)

        stream_handler.setFormatter(stream_formatter)

        return stream_handler

    @classmethod
    def _generate_file_handler(cls) -> FileHandler:
        """ロガーのファイルハンドラーを生成する

        Returns:
            FileHandler: ロガーのファイルハンドラー

        """
        jst = timezone(timedelta(hours=+9), "JST")
        log_datetime = f"{datetime.now(jst):%Y%m%d%H%M%S}"

        file_handler = FileHandler(rf"{PATH_LOG}\log_{log_datetime}.log")
        file_handler.setLevel(DEBUG)
        file_handler_log_format = (
            "%(asctime)s %(name)s [%(levelname)s] %(funcName)s: %(message)s"
        )
        file_formatter = Formatter(file_handler_log_format)

        file_handler.setFormatter(file_formatter)

        return file_handler


### 分析

In [None]:
class AnalysisUtility:
    """データセット確認用ユーティリティークラス"""

    @classmethod
    def display_summary(cls, dataset: pd.DataFrame) -> None:
        """データフレームの概要を出力する

        出力内容
        - データセット本体
        - 統計量（describe()）
        - 欠損値かどうか（isnull()）
        - 各列の欠損値の合計（isnull().sum()）

        それぞれディバイダ―付きで表示する

        Args:
            dataset (pd.DataFrame): 出力対象のデータフレーム

        """
        cls._display_data(dataset)
        cls._display_statistics(dataset)
        cls._display_is_nan(dataset)
        cls._display_nan_sum(dataset)

    @classmethod
    def _display_data(cls, dataset: pd.DataFrame) -> None:
        """データフレームのデータを表示する

        Args:
            dataset (pd.DataFrame): 表示対象のデータフレーム

        """
        DisplayUtility.output_divider("Data")
        display(dataset)

    @classmethod
    def _display_statistics(cls, dataset: pd.DataFrame) -> None:
        """データフレームの各列の統計量を表示する

        Args:
            dataset (pd.DataFrame): 表示対象のデータフレーム

        """
        DisplayUtility.output_divider("Statistics")
        display(dataset.describe())

    @classmethod
    def _display_is_nan(cls, dataset: pd.DataFrame) -> None:
        """データフレームのデータのうち欠損値のみTrue、それ以外をFalseで表示する

        Args:
            dataset (pd.DataFrame): 表示対象のデータフレーム

        """
        DisplayUtility.output_divider("Is NaN")
        display(dataset.isna())

    @classmethod
    def _display_nan_sum(cls, dataset: pd.DataFrame) -> None:
        """データフレームの各列の欠損値の合計を表示する

        Args:
            dataset (pd.DataFrame): 表示対象のデータフレーム

        """
        DisplayUtility.output_divider("Sum of NaN")
        display(cls._calculate_nan_sum(dataset))

    @classmethod
    def _calculate_nan_sum(cls, dataset: pd.DataFrame) -> pd.Series:
        """データフレームの各列の欠損値の合計を計算する

        Args:
            dataset (pd.DataFrame): 計算対象のデータフレーム

        Returns:
            pd.Series[int]: 各列の欠損値の合計

        """
        return dataset.isna().sum()

    @classmethod
    def display_categorized_columns(cls, dataset: pd.DataFrame) -> None:
        """データフレームの各列のユニークなデータとその数をdisplayメソッドで表示する

        Args:
            dataset (pd.DataFrame): 表示対象のデータフレーム

        """
        columns = dataset.columns
        for column in columns:
            df_categorized = cls._categorize(dataset=dataset, column=column)
            display(df_categorized)

    @classmethod
    def _categorize(cls, dataset: pd.DataFrame, column: str) -> pd.Series:
        """データフレームの列をユニークなデータごとにグルーピングする

        Args:
            dataset (pd.DataFrame): グルーピング対象のデータフレーム
            column (str): グルーピング対象の列

        Returns:
            pd.Series: グルーピング後の一次元データ

        """
        df_groupby = dataset.groupby(column)
        df_categorized = df_groupby.size()

        return df_categorized


### データセットの前処理

In [None]:
class PreprocessUtility:
    """前処理用ユーティリティークラス"""

    @classmethod
    def preprocess_dataset(
        cls,
        dataset: pd.DataFrame,
        selected_columns: list[str],
        encode_columns: list[str],
        logger: Logger,
    ) -> pd.DataFrame:
        """データセットの前処理を実行するクラス

        1. 特徴量の抽出
        2. カテゴリ変数のエンコード
        3. 欠損値の置換

        Args:
            dataset (pd.DataFrame): 処理対象のデータフレーム
            selected_columns (List[str]): 抽出対象の列
            encode_columns (List[str]): エンコード対象の列
            logger (Logger): ロガー

        Returns:
            pd.DataFrame: 処理後のデータフレーム

        """
        # 1. 特徴量の抽出
        dataset_selected = cls.select_features(dataset, selected_columns)

        # 2. カテゴリ変数のエンコード
        dataset_encoded = cls.encode_by_one_hot(dataset_selected, encode_columns)

        # 3. 欠損値の置換（平均値）
        dataset_filled = cls._fill_nan(
            df=dataset_encoded,
            nan_fill_method=NanFillMethod.MEAN,
            round_figure=1,
            logger=logger,
        )

        return dataset_filled

    @classmethod
    def select_features(
        cls,
        dataset: pd.DataFrame,
        selected_columns: list[str],
    ) -> pd.DataFrame:
        """データセットから必要な列を抽出する

        Args:
            dataset (pd.DataFrame): 抽出対象のデータフレーム
            selected_columns (List[str]): 抽出する列

        Returns:
            pd.DataFrame: 抽出後のデータフレーム

        """
        dataset_selected = dataset.loc[:, selected_columns]
        return dataset_selected

    # ワンホットエンコーディング
    @classmethod
    def encode_by_one_hot(
        cls,
        df: pd.DataFrame,
        encoding_column_list: list[str],
    ) -> pd.DataFrame:
        """データフレームにワンホットエンコーディングを実行する

        Args:
            df (pd.DataFrame): エンコード対象のデータフレーム
            encoding_column_list (List[str]): エンコード対象の列

        Returns:
            pd.DataFrame: エンコード後のデータフレーム

        """
        df_encoded = deepcopy(df)
        for column in encoding_column_list:
            # エンコードしたデータフレームの取得
            df_dummy = pd.get_dummies(df[column], dtype=int, prefix=column)

            # 挿入位置の取得（エンコードする列の番号）
            insert_location = df_encoded.columns.get_loc(column)

            # 列の削除
            df_encoded = df_encoded.drop(column, axis=1)

            # データフレームの挿入
            df_encoded = cls.insert_dataframe(df_encoded, df_dummy, insert_location)

        return df_encoded

    @classmethod
    def insert_dataframe(
        cls,
        df_base: pd.DataFrame,
        df_insert: pd.DataFrame,
        insert_location: int | slice | np.ndarray,
    ) -> pd.DataFrame:
        """特定のデータフレームを別のデータフレームに挿入する

        Args:
            df_base (pd.DataFrame): 挿入先のデータフレーム
            df_insert (pd.DataFrame): 挿入対象のデータフレーム
            insert_location (Union[int, slice, np.ndarray]): 挿入位置

        Returns:
            pd.DataFrame: 挿入後のデータフレーム

        """
        # 同名の列があった場合はdf_baseを返す
        if not set(df_base.columns).isdisjoint(df_insert.columns):
            print("There is a same column between df_base and df_insert.")
            return df_base

        # 挿入位置から左のデータフレーム
        df_divided_left = df_base.iloc[:, :insert_location]

        # 挿入位置から右のデータフレーム
        df_divided_right = df_base.iloc[:, insert_location:]

        # 結合したデータフレーム
        df_merged = pd.concat([df_divided_left, df_insert, df_divided_right], axis=1)

        return df_merged

    @classmethod
    def _fill_nan(
        cls,
        df: pd.DataFrame,
        nan_fill_method: NanFillMethod,
        round_figure: int,
        logger: Logger,
    ) -> pd.DataFrame:
        """欠損値（NaN）を置換する

        Args:
            df (pd.DataFrame): 欠損値のあるデータフレーム
            nan_fill_method (NanFillMethod): 欠損値の置換方法（平均値のみ有効）
            round_figure (int): 置換に使用する値の有効桁数（小数点以下）
            logger (Logger): エラー出力用のロガー

        Returns:
            pd.DataFrame:
            置換後のデータフレーム
            nan_fill_methodが予期しない値の場合はdfをそのまま返す

        """
        if nan_fill_method == NanFillMethod.MEAN:
            fill_values = df.mean(numeric_only=True)
            fill_values_round = round(fill_values, round_figure)
            df_nan_filled = df.fillna(fill_values_round)
            return df_nan_filled

        logger.warning(
            msg="Incorrect method inputted: please choose from class NanFillMethod.",
        )
        return df


### CSV出力

In [None]:
class CsvUtility:
    """提出用CSVファイルのユーティリティークラス"""

    # csv出力
    @classmethod
    def output_csv(cls, df: pd.DataFrame, postfix_method_name: str) -> None:
        """データフレームをcsvファイルに出力する

        Args:
            df (pd.DataFrame): csvファイルにするデータフレーム
            postfix_method_name (str): csvファイルの接尾辞に使用する手法名

        """
        jst = timezone(timedelta(hours=+9), "JST")
        train_datetime = datetime.now(jst)

        # 保存先フォルダ名の接尾辞（日付）
        save_folder_name = cls._generate_save_folder_name(train_datetime)
        save_folder_path = Path(save_folder_name)

        # 保存先フォルダの作成
        FolderUtility.make_folder_if_not_exist(save_folder_path)

        # 保存ファイル名の接尾辞（日付と日時）
        save_file_name = cls._generate_save_file_name(
            postfix_method_name,
            train_datetime,
        )

        # 保存ファイルのパス（カレントディレクトリの直下に作成する）
        save_path = cls._generate_save_path(save_folder_name, save_file_name)

        df.to_csv(save_path, index=False)

    @classmethod
    def _generate_save_folder_name(cls, train_datetime: datetime) -> str:
        """ファイル保存先フォルダパスを生成する

        Args:
            train_datetime (datetime): フォルダの作成日時

        Returns:
            str: ファイル保存先フォルダパス

        """
        postfix_save_folder_name = train_datetime.strftime("%Y%m%d")
        # 保存先フォルダ名
        save_folder_name = f"submission_{postfix_save_folder_name}"
        return save_folder_name

    @classmethod
    def _generate_save_file_name(
        cls,
        postfix_method_name: str,
        train_datetime: datetime,
    ) -> str:
        """保存ファイル名を生成する

        Args:
            postfix_method_name (str): ファイル名の末尾につける学習手法の名前
            train_datetime (datetime): ファイルの作成日時

        Returns:
            str: 保存ファイル名

        """
        postfix_datetime = train_datetime.strftime("%Y%m%d%H%M%S")
        # 保存ファイル名
        save_file_name = f"submission_{postfix_method_name}_{postfix_datetime}.csv"

        return save_file_name

    @classmethod
    def _generate_save_path(cls, save_folder_name: str, save_file_name: str) -> str:
        """ファイル保存先パスを生成する

        Args:
            save_folder_name (str): ファイル保存先フォルダ名
            save_file_name (str): 保存ファイル名

        Returns:
            str: ファイル保存先パス

        """
        save_path = f"{save_folder_name}/{save_file_name}"
        return save_path


## 例外

In [None]:
class FalseComponentError(Exception):
    """二つの配列の行列数が異なる場合に呼び出す例外クラス

    Args:
        Exception: 継承する例外クラス

    """

    def __init__(self, msg: str) -> None:
        """コンストラクタ

        Args:
            msg (str): 例外発生時に出力する文字列

        """
        self.msg = msg

    def __str__(self) -> str:
        """文字列を出力する場合に呼び出される関数

        Args:
            msg (str): 出力する例外の内容

        Returns:
            str: 出力する例外の内容

        """
        return self.msg


# Logger生成

In [None]:
path_log = Path(PATH_LOG)
logger = LogSetting.set_logger(path_log)

logger.debug("Logger has been set.")


# データセット分析

## 事前準備
---
- pd.set_option

  pd.DataFrame型の表示行列数の変更

  - 第一引数

    変更する表示方向

    - display.max_rows

      表示行数

    - display.max_columns  

      表示列数

  - 第二引数

    表示する行数・列数
---

In [None]:
pd.set_option("display.max_columns", 1000)


## 読み込み
pandasのメソッドを使用する

In [None]:
path_train = Path(PATH_TRAIN)
path_test = Path(PATH_TEST)

# 訓練データ
train_data = pd.read_csv(PATH_TRAIN)

# テストデータ
test_data = pd.read_csv(PATH_TEST)


## データ内容確認

### 訓練データ

In [None]:
AnalysisUtility.display_summary(train_data)


### テストデータ

In [None]:
AnalysisUtility.display_summary(test_data)


## グループ化
- チケットクラス

  1 = 1st  
  2 = 2nd  
  3 = 3rd  

- 乗船地

  C = Cherbourg  
  Q = Queenstown  
  S = Southampton

### 訓練データ

In [None]:
AnalysisUtility.display_categorized_columns(train_data)


### テストデータ

In [None]:
AnalysisUtility.display_categorized_columns(test_data)


# データの前処理

1. 不要な列の削除
2. カテゴリ変数のエンコード
3. 欠損値の置換

`2と3については順番が不明（個人的には3を先にやった方がよさそう）`

## 特徴量リスト
データセット

In [None]:
# 訓練データの列名
train_data_columns = [
    "PassengerId",
    "Survived",
    "Pclass",
    "Name",
    "Sex",
    "Age",
    "SibSp",
    "Parch",
    "Ticket",
    "Fare",
    "Cabin",
    "Embarked",
]

# テストデータの列名
test_data_columns = [
    "PassengerId",
    "Pclass",
    "Name",
    "Sex",
    "Age",
    "SibSp",
    "Parch",
    "Ticket",
    "Fare",
    "Cabin",
    "Embarked",
]

# 抽出後の列名（共通）
selected_columns = [
    "Pclass",
    "Sex",
    "Age",
    "SibSp",
    "Parch",
    "Fare",
    "Embarked",
]

# 共通
encode_columns = ["Pclass", "Sex", "Embarked"]


## 前処理

In [None]:
# 訓練データ
train_data_preprocessed = PreprocessUtility.preprocess_dataset(
    train_data,
    selected_columns,
    encode_columns,
    logger,
)

# データ確認用
DisplayUtility.output_divider("前処理後の訓練データ")
display(train_data_preprocessed)


# テストデータ
test_data_preprocessed = PreprocessUtility.preprocess_dataset(
    test_data,
    selected_columns,
    encode_columns,
    logger,
)

# データ確認用
DisplayUtility.output_divider("前処理後のテストデータ")
display(test_data_preprocessed)


# 学習

## データ用意

In [None]:
# 訓練データ
x_train = train_data_preprocessed
y_train = train_data["Survived"]

# テストデータ
x_test = test_data_preprocessed


## 列名

In [None]:
if x_train.columns.to_numpy().all() and x_test.columns.to_numpy().all():
    columns_names = x_train.columns
else:
    msg = "NotMatchSizeError: either array has one or more false components."
    raise FalseComponentError(msg)


## スケーラー生成

In [None]:
# 正規化
scaler = MinMaxScaler()


## パラメータ

In [None]:
params_logreg = {"C": 10.0, "max_iter": 1000}

params_rf = {""}


## モデル

In [None]:
logreg = LogisticRegression(max_iter=1000, random_state=0)
rf = RandomForestClassifier(random_state=0)


## パイプライン

In [None]:
pipe = make_pipeline(scaler, logreg)


## ロジスティック回帰

In [None]:
search = GridSearchCV(pipe, params_logreg_grid, n_jobs=2)

search.fit(x_train, y_train)


In [None]:
result_search = search.cv_results_
result_search_df = pd.DataFrame(result_search).iloc[:, 4:]
result_search_df_rounded = result_search_df.round(3)

display(result_search_df_rounded)


In [None]:
best_score = search.best_score_
print("Grid search best score: ", best_score)


In [None]:
model: LogisticRegression = search.best_estimator_.named_steps["logisticregression"]

y_pred = model.predict(np.array(x_test))

y_pred_df = pd.DataFrame(y_pred, columns=["Survived"])
y_pred_df_submission = pd.concat([test_data["PassengerId"], y_pred_df], axis=1)

display(y_pred_df_submission)


In [None]:
# ハイパーパラメータ
params_logreg = {"C": 10.0, "max_iter": 1000}

# モデル生成
model_logreg = LogisticRegression(**params_logreg)

# 学習
model_logreg.fit(x_train, y_train)

# 予測
y_pred = model_logreg.predict(x_test)
y_proba = model_logreg.predict_proba(x_test)


In [None]:
y_pred_df = pd.DataFrame(y_pred, columns=["Survived"])

y_pred_df_submission = pd.concat([test_data["PassengerId"], y_pred_df], axis=1)

display(y_pred_df_submission)


In [None]:
# 提出用の形式（PassengerId, Survived）に変更する
y_pred_df_with_test_data = PreprocessUtility.insert_dataframe(
    df_base=test_data,
    df_insert=y_pred_df,
    insert_location=1,
)

display(y_pred_df_with_test_data)


In [None]:
# 各ラベルの予測確率
y_proba_df = pd.DataFrame(np.round(y_proba, 3), columns=["Survived_0", "Survived_1"])

display(y_proba_df)


In [None]:
# 係数
model_logreg_coef = pd.DataFrame(
    np.round(model_logreg.coef_, 3),
    columns=x_train.columns,
)

display(model_logreg_coef)


In [None]:
CsvUtility.output_csv(y_pred_df_submission, LOGISTIC_REGRESSION)


## ランダムフォレスト

In [None]:
# ハイパーパラメータ
params_logreg = {}

model_rf = RandomForestClassifier()

model_rf.fit(x_train, y_train)

# 予測
y_pred = model_rf.predict(x_test)
y_proba = model_rf.predict_proba(x_test)


In [None]:
display(y_pred)


In [None]:
display(model_rf.feature_importances_)


In [None]:
display(x_train.columns.values)


In [None]:
display(pd.DataFrame(np.round(model_rf.feature_importances_, 3)).T)


In [None]:
# 係数
model_rf_importances = pd.DataFrame(
    np.round(model_rf.feature_importances_, 3),
    index=x_train.columns,
).T

display(model_rf_importances)


In [None]:
y_pred_df = pd.DataFrame(y_pred, columns=["Survived"])

y_pred_df_submission = pd.concat([test_data["PassengerId"], y_pred_df], axis=1)

display(y_pred_df_submission)


In [None]:
CsvUtility.output_csv(y_pred_df_submission, RANDOM_FOREST)
