# banksテーブルからCSV形式でエクスポートしたデータを元に銀行名と支店名を全角で取得し、銀行名と支店名の更新SQLファイルを作成

## 使用するライブラリをインストール

In [None]:
!pip install pandas jaconv

In [None]:
"""
概要: このスクリプトは全銀行コード（Zengin Code）を用いて、
日本の各銀行とその支店の詳細情報を取得し、CSVファイルに書き出します。
また、定義されていない銀行コードについては、テキストファイルに記録します。
注意：
実行には同階層に「banks.csv」ファイルが必要です。
"""

from zengin_code import Bank
import pandas as pd
import jaconv
from typing import List, Dict, Tuple


def generate_bank_counts_csv(input_file: str, output_file: str) -> pd.DataFrame:
    """
    この関数は、入力のCSVファイルからデータを読み込み、'bank_code'でグループ化し、
    各銀行のエントリ数を数え、データを'bank_code'でソートし、結果を出力のCSVファイルに書き込む。

    引数:
        input_file (str): 入力CSVファイルへのパス
        output_file (str): 出力CSVファイルへのパス

    戻り値:
        df_grouped (pd.DataFrame): グループ化とカウントが適用されたDataFrame
    """
    try:
        # CSVファイルを読み込む（'bank_code'を文字列として扱う）
        df = pd.read_csv(input_file, dtype={'bank_code': str})

        # 'deleted_at'列がnullの行のみを保持する
        df = df[df['deleted_at'].isnull()]

        # 'bank_code'でグループ化し、それぞれのグループの数を数える
        df_grouped = df.groupby(['bank_code', 'bank_name']).size().reset_index(name='count')

        # データを'bank_code'でソート
        df_grouped = df_grouped.sort_values('bank_code')

        # 結果を新しいCSVファイルに書き込む
        df_grouped.to_csv(output_file, index=False)

        print(f"'{output_file}'file generated")
        return df_grouped

    except FileNotFoundError:
        print("指定されたファイルが存在しません。")
    except pd.errors.EmptyDataError:
        print("指定されたファイルにデータがありません。")
    except Exception as e:
        print("予期しないエラーが発生しました:", e)


def to_half_width_kana(full_width_str) -> str:
    """全角カタカナを半角カタカナに変換に－を-にする"""
    half_width_str = jaconv.z2h(full_width_str, kana=True, ascii=True, digit=False)
    return half_width_str.replace("－", "-")


def fetch_bank_details(bank_codes: List[str], undefined_file: str) -> Tuple[List[Dict], int]:
    """指定された銀行コードに基づいて銀行詳細を取得する。

    Args:
        bank_codes (List[str]): 銀行コードのリスト。
        undefined_file (str): 未定義の銀行コードを記録するファイルの名前。

    Returns:
        Tuple[List[Dict], int]: 銀行および支店の情報を格納したリストと未定義の銀行数。
    """
    data = []  # 銀行および支店の情報を格納するためのリスト
    undefined_bank_count = 0  # 存在しない銀行コードをカウント

    with open(undefined_file, "w") as f:  # 新規書き込みモードで開く
        for bank_code in bank_codes:
            try:
                # 銀行コードを用いて銀行情報を取得
                bank = Bank[bank_code]
                # 銀行の各支店情報を取得
                branches = bank.branches
                for code in branches:
                    branch = branches[code]

                    # 全角カタカナを半角カタカナに変換
                    bank_name_half_width_kana = to_half_width_kana(bank.kana)
                    bank_br_name_half_width_kana = to_half_width_kana(branch.kana)

                    # 銀行および支店の情報をリストに追加
                    data.append(
                        {
                            "bank_code": bank_code,  # Add the current bank_code
                            "bank_br_code": code,
                            "bank_name_full_width": bank.kana,
                            "bank_br_name_full_width": branch.name,
                            "bank_name_half_width_kana": bank_name_half_width_kana,
                            "bank_br_name_half_width_kana": bank_br_name_half_width_kana,
                        }
                    )
            except KeyError:
                # 定義されていない銀行コードはテキストファイルに記録
                f.write(f"Bank code {bank_code} not found\n")
                undefined_bank_count += 1

    return data, undefined_bank_count


def write_to_file(filename: str, undefined_bank_count: int, total_banks: int) -> None:
    """結果をファイルとコンソールに出力する。

    Args:
        filename (str): 書き込むファイル名。
        undefined_bank_count (int): 未定義の銀行数。
        total_banks (int): 取得した銀行の総数。
    """
    message1 = f"存在しない銀行数: {undefined_bank_count}件"
    message2 = f"最大銀行更新数: {total_banks}件"

    print(message1)
    print(message2)

    with open(filename, "a") as f:  # 追記モードで開く
        f.write(message1 + "\n")
        f.write(message2 + "\n")

    print(f"'{filename}' file generated")


def main() -> None:
    raw_date_csv = "banks.csv"
    out_put_csv = "bank_code_count.csv"
    # 各銀行のエントリ数を数えたCSVファイル出力
    df = generate_bank_counts_csv(raw_date_csv, out_put_csv)

    # 'bank_code'カラムのデータをリスト形式で取得し、変数に代入
    bank_codes = df["bank_code"].tolist()
    
    undefined_file = "undefined_bank_code.txt"
    data, undefined_bank_count = fetch_bank_details(bank_codes, undefined_file)

    df = pd.DataFrame(data)

    output_file_name = "update_target_bank_combination_list.csv"
    df.to_csv(output_file_name, index=False)
    print(f"'{output_file_name}' file generated")

    write_to_file(undefined_file, undefined_bank_count, len(df))


if __name__ == "__main__":
    main()

## banksテーブルの全角の銀行名と全角の支店名を更新するSQLファイルを作成

In [None]:
"""
このスクリプトは、以下の操作を行う：
1. banks.csvとupdate_target_bank_combination_list.csvという2つのCSVファイルからデータを読み込む。
2. 読み込んだデータを元に、2つのDataFrameを作成し、それらを特定の列を基準に結合。
3. 結合されたDataFrameを元に、banksテーブルのbank_name_full_widthとbank_br_name_full_width列を更新するためのSQLクエリを生成し、それをupdate_banks.sqlというファイルに書き出す。
4. 最終的に、DataFrameのbank_name_full_widthとbank_br_name_full_width列の値の存在・非存在の数を出力する。

注意：
このスクリプトは例外処理を含んでおり、ファイルの読み込み、DataFrameの結合、SQLファイルの生成、値の数の出力において例外が発生した場合、
適切なエラーメッセージが出力され、その後の処理がスキップされる。
"""

import pandas as pd
from pandas import DataFrame


def load_dataframes() -> tuple[DataFrame, DataFrame]:
    """
    CSVファイルからデータを読み込み、pandasのDataFrameを生成
    """
    try:
        df_banks = pd.read_csv(
            "banks.csv", dtype={"bank_id": str, "bank_code": str, "bank_br_code": str}
        )
        df_updates = pd.read_csv(
            "update_target_bank_combination_list.csv",
            dtype={"bank_code": str, "bank_br_code": str},
        )
    except Exception as e:
        print(f"Error occurred while reading CSV files: {e}")
        return None, None

    return df_banks, df_updates


def merge_dataframes(df_banks: DataFrame, df_updates: DataFrame) -> DataFrame:
    """
    二つのDataFrameを特定の列に基づいて結合
    """
    try:
        df_merged = pd.merge(
            df_banks, df_updates, on=["bank_code", "bank_br_code"], how="left"
        )
        df_merged.drop_duplicates(inplace=True)  # 重複行を削除
    except Exception as e:
        print(f"Error occurred while merging DataFrames: {e}")
        return None

    return df_merged[
        [
            "bank_id",
            "bank_code",
            "bank_br_code",
            "bank_name",
            "bank_br_name",
            "bank_name_full_width",
            "bank_br_name_full_width",
        ]
    ]


def generate_sql_file(df: DataFrame) -> None:
    """
    DataFrameからSQLファイルを生成
    """
    try:
        sql_file = "update_banks.sql"
        df_sql_gen = df[
            df["bank_name_full_width"].notna() & df["bank_br_name_full_width"].notna()
        ]

        with open(sql_file, "w", encoding="utf8") as f:
            # もし列が存在しない場合、列を作成する指示を追加
            f.write(
                "-- bank_name_full_widthとbranch_br_name_full_widthという列がbanksテーブルに存在しない場合、以下のクエリを実行して追加します:\n"
            )
            f.write("-- ALTER TABLE banks\n")
            f.write(
                "-- ADD COLUMN bank_name_full_width VARCHAR(255) COMMENT '銀行名全角' AFTER bank_br_name,\n"
            )
            f.write(
                "-- ADD COLUMN bank_br_name_full_width VARCHAR(255) COMMENT '支店名全角' AFTER bank_name_full_width;\n\n"
            )

            for index, row in df_sql_gen.iterrows():
                sql = f"UPDATE banks SET bank_name_full_width = '{row['bank_name_full_width']}', bank_br_name_full_width = '{row['bank_br_name_full_width']}' WHERE bank_id = {row['bank_id']} AND bank_code = {row['bank_code']} AND bank_br_code = {row['bank_br_code']};\n"
                f.write(sql)

        print(f"'{sql_file}' File generated successfully.")
    except Exception as e:
        print(f"Error occurred while generating SQL file: {e}")


def print_value_counts(df: DataFrame) -> None:
    """
    指定された列の値の数を出力
    """
    try:
        value_counts_branch = df["bank_br_name_full_width"].notna().value_counts()

        print("Bank and bank branch name mapping information")
        print(
            "Number of data registrations in bank master: ",
            df["bank_br_name_full_width"].shape[0],
        )
        print("Number of existing values: ", value_counts_branch[True])
        print("Number of non-existing values: ", value_counts_branch[False])
    except Exception as e:
        print(f"Error occurred while printing value counts: {e}")


def main() -> None:
    df_banks, df_updates = load_dataframes()

    if df_banks is None or df_updates is None:
        print("Unable to continue, data loading failed.")
        return

    df_merged = merge_dataframes(df_banks, df_updates)

    if df_merged is None:
        print("Unable to continue, DataFrame merging failed.")
        return
    # CSV出力
    df_merged.to_csv("merge_bank_data.csv", index=False)

    print_value_counts(df_merged)
    # 銀行更新用SQLファイル出力
    generate_sql_file(df_merged)


if __name__ == "__main__":
    main()

## banksテーブルのbank_nameカラムとbank_br_nameカラムを最新の状態に更新するSQLファイルを作成

In [None]:
"""
スクリプト概要：
このスクリプトは、銀行名と支店名を更新するためのもの
2つのCSVファイル（'update_target_bank_combination_list.csv'と'banks.csv'）を読み込み、そのデータを比較・処理して、SQL更新文を出力する

注意点：
1. スクリプトを実行する前に、'update_target_bank_combination_list.csv'と'banks.csv'という2つのCSVファイルが必要
2. このスクリプトを動作させるためには、'pandas'と'jaconv'というPythonライブラリが必要となる。これらはpipを使ってインストールすること
3. pythonのバージョンは3.9以上であること
"""


import os
import pandas as pd
import jaconv
from typing import Tuple


def to_half_width_kana(full_width_str: str) -> str:
    """
    全角カタカナとハイフンを半角に変換
    """
    half_width_str = jaconv.z2h(full_width_str, kana=True, ascii=True, digit=False)
    return half_width_str.replace("－", "-")


def load_and_prepare_dataframes() -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    CSVファイルを読み込み、データフレームを準備する
    """
    try:
        df_update_target = pd.read_csv(
            "update_target_bank_combination_list.csv",
            dtype={"bank_code": str, "bank_br_code": str},
        )[
            [
                "bank_code",
                "bank_br_code",
                "bank_name_half_width_kana",
                "bank_br_name_half_width_kana",
            ]
        ]
        df_banks = pd.read_csv(
            "banks.csv", dtype={"bank_id": str, "bank_code": str, "bank_br_code": str}
        )[["bank_id", "bank_code", "bank_br_code", "bank_name", "bank_br_name"]]
    except FileNotFoundError:
        print("File not found. Please check the file path and try again.")
        exit(1)
    except pd.errors.EmptyDataError:
        print("No data found in the file. Please check the file content and try again.")
        exit(1)
    except pd.errors.ParserError:
        print(
            "Error while parsing the file. Please check the file content and try again."
        )
        exit(1)
    except Exception as e:
        print(f"An error occurred: {e}")
        exit(1)

    # 銀行名を全角カタカナから半角カタカナに変換
    df_banks["bank_name_half_width_kana"] = df_banks["bank_name"].apply(
        to_half_width_kana
    )
    df_banks["bank_br_name_half_width_kana"] = df_banks["bank_br_name"].apply(
        to_half_width_kana
    )

    return df_banks, df_update_target


def merge_and_compare_dataframes(
    df_banks: pd.DataFrame, df_update_target: pd.DataFrame
) -> pd.DataFrame:
    """
    データフレームをマージし、名前を比較する
    """
    # データフレームをマージ
    df = pd.merge(
        df_banks,
        df_update_target,
        on=["bank_code", "bank_br_code"],
        how="left",
        suffixes=("_banks", "_update"),
    )

    # 銀行名と支店名を比較
    df["bank_name_match"] = (
        df["bank_name_half_width_kana_banks"] == df["bank_name_half_width_kana_update"]
    )
    df["bank_br_name_match"] = (
        df["bank_br_name_half_width_kana_banks"]
        == df["bank_br_name_half_width_kana_update"]
    )

    # 結果を "〇" と "×" にマッピング
    df["bank_name_match"] = df["bank_name_match"].map({True: "〇", False: "×"})
    df["bank_br_name_match"] = df["bank_br_name_match"].map({True: "〇", False: "×"})

    # 不要なカラムを削除
    df = df.drop(
        columns=[
            "bank_name_half_width_kana_banks",
            "bank_br_name_half_width_kana_banks",
        ]
    )

    return df


def output_and_count(
    df: pd.DataFrame, file_name: str, condition: str = None
) -> pd.DataFrame:
    """
    DataFrameをCSVファイルとして出力し、行数をカウントする関数
    """
    output_dir = "output"
    os.makedirs(output_dir, exist_ok=True)  # ディレクトリが存在しない場合に新たに作成

    if condition:
        df = df.query(condition)

    if df.empty:
        print(f"No rows matching condition: {condition}")
        return pd.DataFrame()

    file_path = os.path.join(output_dir, file_name)
    df.to_csv(file_path, index=False)
    print(f"Count of rows for condition '{condition}' in '{file_name}': {len(df)}")
    return df


def generate_sql(
    df: pd.DataFrame, output_file_name: str = "update_banks_and_branches.sql"
) -> None:
    """
    SQL文を生成して出力する
    """
    with open(output_file_name, "w") as f:
        for i, row in df.iterrows():
            f.write(
                f"UPDATE banks SET bank_name = '{row['bank_name_half_width_kana_update']}', bank_br_name = '{row['bank_br_name_half_width_kana_update']}' WHERE bank_id = '{row['bank_id']}' AND bank_code = '{row['bank_code']}' AND bank_br_code = '{row['bank_br_code']}';\n"
            )

    print(
        f"Successfully generated update queries for {len(df)} banks, saved to {output_file_name}."
    )
    print(f"{len(df)} bank updates are necessary.")


def main() -> None:
    df_banks, df_update_target = load_and_prepare_dataframes()
    df = merge_and_compare_dataframes(df_banks, df_update_target)

    output_and_count(df, "output_all.csv")
    output_and_count(
        df,
        "output_both_match.csv",
        '(bank_name_match == "〇") & (bank_br_name_match == "〇")',
    )
    df_update_needed = output_and_count(
        df,
        "output_one_match.csv",
        '((bank_name_match == "〇") & (bank_br_name_match == "×")) | ((bank_name_match == "×") & (bank_br_name_match == "〇"))',
    )
    output_and_count(
        df,
        "output_both_empty.csv",
        "bank_name_half_width_kana_update.isna() & bank_br_name_half_width_kana_update.isna()",
    )

    generate_sql(df_update_needed)


if __name__ == "__main__":
    main()