<a href="https://colab.research.google.com/github/4SELECTION-gif/OpenAPI-Specification/blob/main/Financial_Transaction_ETL_Script.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
# -*- coding: utf-8 -*-
"""
Production-quality ETL script for ingesting, cleaning, and normalizing
multilingual (Japanese/English) financial transaction CSV files.

This script is designed to handle a wide variety of format inconsistencies,
including multiple character encodings, extraneous header rows, messy dates, and
amount formats, and Unicode normalization issues.

It produces a clean, analysis-ready DataFrame and a corresponding CSV file,
adhering to a defined target schema and providing detailed quality checks.
"""

# 1) Safe Imports & Runtime Setup
# ==============================================================================
import pandas as pd
import numpy as np
import re
import io
import warnings
import hashlib
import unicodedata
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Tuple, Dict, Any

# Attempt to import zoneinfo, fallback to pytz for older Python versions
try:
    from zoneinfo import ZoneInfo
except ImportError:
    from pytz import timezone as ZoneInfo

# Pandas display options for readable console summaries
pd.set_option('display.max_columns', 50)
pd.set_option('display.width', 120)
pd.set_option('display.float_format', '{:,.2f}'.format)


def zen_to_han(text: str) -> str:
    """
    Converts full-width (zenkaku) characters to half-width (hankaku).
    This function uses unicodedata.normalize('NFKC') which is a robust way
    to handle Japanese full-width digits, spaces, and ASCII symbols.

    Args:
        text: The string containing full-width characters.

    Returns:
        The normalized string with half-width characters.
    """
    if not isinstance(text, str):
        return text
    return unicodedata.normalize('NFKC', text)


# 2) File Reading with Encoding Fallback & Header Detection
# ==============================================================================
def find_best_header(df_peek: pd.DataFrame, max_rows_to_scan: int = 10) -> int:
    """
    Scans the first few rows of a DataFrame to find the most likely header row.
    The best header is determined by the row with the highest number of unique,
    non-empty values.

    Args:
        df_peek: A DataFrame read with no header to inspect the first few rows.
        max_rows_to_scan: The number of rows to scan from the top.

    Returns:
        The index of the row most likely to be the header.
    """
    best_header_index = 0
    max_score = -1

    for i in range(min(max_rows_to_scan, len(df_peek))):
        row_values = df_peek.iloc[i].dropna().astype(str)
        # Score favors rows with many unique, non-empty values
        score = row_values.nunique() + row_values.str.len().sum()
        if score > max_score:
            max_score = score
            best_header_index = i

    print(f"INFO: Best header row detected at index {best_header_index}.")
    return best_header_index


def read_transaction_csv(file_path: Path) -> Tuple[pd.DataFrame, str]:
    """
    Reads a CSV file with robust encoding fallback and header detection.

    Args:
        file_path: The path to the CSV file.

    Returns:
        A tuple containing the loaded DataFrame and the detected encoding.
    """
    encodings_to_try = ["utf-8-sig", "utf-8", "cp932", "shift_jis"]
    detected_encoding = None
    df = None

    for encoding in encodings_to_try:
        try:
            # Initial read to detect header
            df_peek = pd.read_csv(
                file_path,
                header=None,
                nrows=10,
                encoding=encoding,
                dtype=str,
                keep_default_na=False,
                skip_blank_lines=False
            )
            header_row = find_best_header(df_peek)

            # Full read with detected header
            df = pd.read_csv(
                file_path,
                header=header_row,
                encoding=encoding,
                dtype=str,
                keep_default_na=False,
                skip_blank_lines=True
            )
            detected_encoding = encoding
            print(f"INFO: Successfully read file with encoding '{encoding}'.")
            break
        except (UnicodeDecodeError, pd.errors.ParserError) as e:
            warnings.warn(f"WARNING: Failed to read with encoding '{encoding}': {e}")
        except FileNotFoundError:
            raise

    if df is None:
        raise ValueError(f"CRITICAL: Could not read the file at {file_path} with any of the attempted encodings.")

    # Drop rows that are entirely empty
    df = df.replace('', np.nan).dropna(how='all').replace(np.nan, '')
    return df, detected_encoding


# 3) Column Name Normalization & Mapping
# ==============================================================================
def normalize_column_names(df: pd.DataFrame) -> pd.DataFrame:
    """
    Normalizes DataFrame column names to lower_snake_case and de-duplicates them.
    """
    new_columns = []
    counts = {}
    for col in df.columns:
        # Handle non-string columns gracefully
        if not isinstance(col, str):
            col = str(col)
        # Strip BOM/whitespace, normalize, and convert to snake_case
        clean_col = re.sub(r'[\s\uFEFF]+', '_', zen_to_han(col.strip())).lower()
        clean_col = re.sub(r'[^a-z0-9_]', '', clean_col)

        # Deduplicate column names
        if clean_col in counts:
            counts[clean_col] += 1
            new_columns.append(f"{clean_col}_{counts[clean_col]}")
        else:
            counts[clean_col] = 1
            new_columns.append(clean_col)

    df.columns = new_columns
    return df


def map_columns_to_schema(df: pd.DataFrame) -> pd.DataFrame:
    """
    Maps common Japanese and English column names to the target schema.
    Unmapped columns are preserved with an 'auxiliary_' prefix.
    """
    mapping = {
        'date': ['日付', '取引日', 'ご利用日', '伝票日付', '予約日付'],
        'description': ['内容', '摘要', '詳細', 'ご利用店名', 'メモ', '備考'],
        'amount': ['金額', 'お支払金額', 'ご利用金額', '引落額', '入金額', '出金額'],
        'balance': ['残高', '残高金額'],
        'currency': ['通貨', '通貨コード'],
        'category': ['分類', 'カテゴリ', 'カテゴリー']
    }

    # Invert mapping for faster lookup
    inverted_mapping = {val: key for key, values in mapping.items() for val in values}

    final_cols = {}
    aux_cols = {}
    for col in df.columns:
        # Normalize original Japanese names for matching
        normalized_col_name = col.replace('_', ' ').strip()

        # Check against the direct Japanese mapping first
        target_col = None
        for k, v in mapping.items():
            if any(jp_name in normalized_col_name for jp_name in v):
                target_col = k
                break

        if target_col:
            if target_col not in final_cols:
                final_cols[target_col] = col
            else:
                # If target already mapped, move this one to auxiliary
                aux_cols[f"auxiliary_{col}"] = col
        else:
            # If no match, it's an auxiliary column
            aux_cols[f"auxiliary_{col}"] = col

    # Rename columns based on the mapping
    rename_dict = {v: k for k, v in final_cols.items()}
    rename_dict.update({v: k for k, v in aux_cols.items()})
    df = df.rename(columns=rename_dict)

    print("INFO: Column mapping results:")
    for target, original in final_cols.items():
        print(f"  - Mapped '{original}' to '{target}'")
    for new_name, original in aux_cols.items():
        print(f"  - Kept '{original}' as '{new_name}'")

    return df


# 5) Date Parsing
# ==============================================================================
def parse_date_robustly(series: pd.Series) -> pd.Series:
    """
    Parses a Series of date strings into timezone-aware UTC datetimes.
    """
    # First, handle Japanese era dates (if any) - simplified for this example
    # A full implementation would use a library or more complex regex

    # Use pandas' powerful to_datetime with multiple format attempts
    parsed_dates = pd.to_datetime(series, errors='coerce', infer_datetime_format=True)

    # Try a specific Japanese format for any remaining NaTs
    jp_format = '%Y年%m月%d日'
    still_nat = parsed_dates.isna()
    if still_nat.any():
        parsed_dates.loc[still_nat] = pd.to_datetime(
            series[still_nat], format=jp_format, errors='coerce'
        )

    # Localize naive datetimes to Asia/Tokyo, then convert all to UTC
    tokyo_tz = ZoneInfo("Asia/Tokyo")
    utc_tz = ZoneInfo("UTC")

    def to_utc(dt):
        if pd.isna(dt):
            return pd.NaT
        if dt.tzinfo is None:
            return dt.tz_localize(tokyo_tz).tz_convert(utc_tz)
        return dt.tz_convert(utc_tz)

    return parsed_dates.apply(to_utc)

# 6) Currency & Amount Normalization
# ==============================================================================
def normalize_amount(series: pd.Series) -> pd.Series:
    """
    Cleans and converts a Series of amount strings to numeric floats.
    Handles commas, currency symbols, full-width characters, and parentheses.
    """

    def clean_single_amount(val):
        if not isinstance(val, str) or val.strip() == '':
            return np.nan

        # Normalize characters
        val = zen_to_han(val)

        # Check for negative sign from parentheses
        is_negative = '(' in val and ')' in val

        # Remove all non-numeric characters except for decimal point and minus sign
        val = re.sub(r'[^\d.-]', '', val)

        if not val:
            return np.nan

        try:
            amount = float(val)
            if is_negative and amount > 0:
                amount *= -1
            return amount
        except ValueError:
            return np.nan

    return series.apply(clean_single_amount)


def infer_amount_sign(df: pd.DataFrame) -> pd.DataFrame:
    """
    Infers amount sign (negative for expenses) based on keywords in description.
    """
    df['amount'] = pd.to_numeric(df['amount'], errors='coerce')

    negative_keywords = ['引落', '支払', '出金', 'fee', 'charge', 'payment']

    def check_sign(row):
        amount = row['amount']
        desc = row.get('description', '').lower()
        if pd.isna(amount):
            return amount

        if amount > 0:
            if any(keyword in desc for keyword in negative_keywords):
                return -amount
        return amount

    df['amount'] = df.apply(check_sign, axis=1)
    return df

# Main ETL Function
# ==============================================================================
def process_financial_csv(file_path_str: str) -> Optional[pd.DataFrame]:
    """
    Main ETL pipeline to process a financial transaction CSV.
    """
    print("="*60)
    print(f"Starting ETL process for: {file_path_str}")
    print("="*60)

    file_path = Path(file_path_str)
    if not file_path.exists():
        raise FileNotFoundError(f"CRITICAL: The file {file_path_str} was not found.")

    # Step 2: Read file
    df_raw, encoding = read_transaction_csv(file_path)
    df = df_raw.copy()

    # Step 3: Normalize columns and map to schema
    df = normalize_column_names(df)
    df = map_columns_to_schema(df)

    # Step 4: Text Cleanup
    if 'description' in df.columns:
        df['original_description'] = df['description']
        for col in df.select_dtypes(include=['object']).columns:
            df[col] = df[col].astype(str).apply(zen_to_han).str.strip()
            df[col] = df[col].str.replace(r'\s+', ' ', regex=True)
            df[col] = df[col].str.replace(r'[\x00-\x1F\x7F]', '', regex=True)
    else:
        df['description'] = ''
        df['original_description'] = ''
        warnings.warn("WARNING: No 'description' column found or mapped.")

    # Step 5: Date Parsing
    if 'date' in df.columns:
        df['auxiliary_date_raw'] = df['date']
        df['date'] = parse_date_robustly(df['date'])
    else:
        df['date'] = pd.NaT
        warnings.warn("WARNING: No 'date' column found or mapped.")

    # Step 6: Amount Normalization
    if 'amount' in df.columns:
        df['amount'] = normalize_amount(df['amount'])
        df = infer_amount_sign(df)
    else:
        df['amount'] = np.nan
        warnings.warn("WARNING: No 'amount' column found or mapped.")

    # Step 6b: Currency
    df['currency'] = 'JPY' # Defaulting as per spec

    # Step 7: Balance Handling
    if 'balance' in df.columns:
        df['balance'] = normalize_amount(df['balance'])
    else:
        df['balance'] = np.nan

    # Step 8: Category Handling
    if 'category' not in df.columns:
        df['category'] = None

    # Step 9: Hash ID and Deduplication
    print("\n--- Deduplication ---")
    rows_before_dedupe = len(df)

    # For hashing, ensure consistent data types and formats
    df['hash_input'] = (
        df['date'].dt.strftime('%Y-%m-%dT%H:%M:%SZ').fillna('') + '|' +
        df['amount'].astype(str).fillna('') + '|' +
        df['description'].astype(str).fillna('') + '|' +
        df['currency'].astype(str).fillna('')
    )
    df['hash_id'] = df['hash_input'].apply(
        lambda x: hashlib.sha256(x.encode('utf-8')).hexdigest()
    )
    df = df.drop(columns=['hash_input'])

    df = df.drop_duplicates(subset=['hash_id'], keep='first')
    rows_after_dedupe = len(df)
    print(f"Shape before deduplication: ({rows_before_dedupe}, {df.shape[1]})")
    print(f"Shape after deduplication:  ({rows_after_dedupe}, {df.shape[1]})")
    print(f"Removed {rows_before_dedupe - rows_after_dedupe} duplicate rows.")

    # Step 10: Final Assembly
    df['source_file'] = str(file_path.resolve())
    account_name = file_path.stem if file_path.stem else "unknown"
    df['account'] = account_name

    target_schema = [
        "date", "description", "original_description", "amount", "currency",
        "balance", "category", "account", "source_file", "hash_id"
    ]

    # Add any missing columns from the target schema
    for col in target_schema:
        if col not in df.columns:
            df[col] = None

    aux_cols = [col for col in df.columns if col.startswith('auxiliary_')]
    final_ordered_cols = target_schema + sorted(aux_cols)
    df = df[final_ordered_cols]

    if df.empty:
        warnings.warn("CRITICAL: DataFrame is empty after cleaning process. No output will be generated.")
        return None

    return df

# Main execution block
if __name__ == "__main__":
    # Simulate the presence of the input file
    csv_content = """
    ,,"これは不要な行です",,
    "銀行取引明細",,,,,
    "口座: 普通 1234567",,,,,
    ,,,,,
    日付,内容,摘要,お支払金額,入金額,残高
    "2023年４月１日","振込　カ）ＡＢＣ","給与",,"350,000", "￥1,234,567"
    2023/05/10,"ＡＴＭ引出","現金","-50000",,"1,184,567円"
    2023/05/15,"カード利用　ＡＭＡＺＯＮ．ＣＯ．ＪＰ","オンラインショッピング","(12,800)",,
    2023-05-20,"振込入金　ＸＹＺ社",,,"250,000",1421767
    ,"デポジット",,"１０，０００",,
    2023/06/01,ＲＥＦＵＮＤ　ＦＲＯＭ　ＥＸＡＭＰＬＥ,返金,,1980,"１，４３３，７４７"
    22/06/2023,ﾃﾞﾝｷﾘｮｳ ﾋｷｵﾄｼ,公共料金,"(9,870)",,１，４２３，８７７
    2023年6月25日 15:30:00,振込　山田太郎,,(20000),,1403877
    2023/06/25,"振込　山田太郎",,"-20,000",,1403877
    null,無効なデータ,,,,,
    2023/07/01,給与振込　カ）ＡＢＣ,給与,,"300,000","1,703,877"
    """
    input_file_path = Path("/mnt/data/取引.csv")
    input_file_path.parent.mkdir(exist_ok=True)
    # Write with cp932 to test encoding detection
    with open(input_file_path, "w", encoding="cp932") as f:
        f.write(csv_content.strip())

    try:
        df_clean = process_financial_csv(str(input_file_path))

        if df_clean is not None:
            # Step 11: Quality Checks & Clear Summaries
            print("\n" + "="*60)
            print("ETL Process Completed. Final DataFrame Summary:")
            print("="*60)

            print("\n--- Data Previews ---")
            print("df.head(10):")
            print(df_clean.head(10))
            if len(df_clean) > 10:
                print("\ndf.sample(5, random_state=42):")
                print(df_clean.sample(min(5, len(df_clean)), random_state=42))

            print("\n--- Data Quality Report ---")
            null_report = pd.DataFrame({
                'null_count': df_clean.isnull().sum(),
                'null_percentage': (df_clean.isnull().sum() / len(df_clean) * 100)
            })
            print(null_report)

            print("\n--- Financial Summary ---")
            total_income = df_clean[df_clean['amount'] > 0]['amount'].sum()
            total_expense = df_clean[df_clean['amount'] < 0]['amount'].sum()
            min_date = df_clean['date'].min()
            max_date = df_clean['date'].max()

            print(f"Date Range (UTC): {min_date.strftime('%Y-%m-%d')} to {max_date.strftime('%Y-%m-%d')}")
            print(f"Total Income:  {total_income:,.2f} JPY")
            print(f"Total Expense: {abs(total_expense):,.2f} JPY")
            print(f"Net Flow:      {(total_income + total_expense):,.2f} JPY")

            print("\n--- Monthly Rollup ---")
            monthly_rollup = df_clean.set_index('date').groupby(pd.Grouper(freq='M')).agg(
                count=('amount', 'size'),
                total_inflow=('amount', lambda x: x[x > 0].sum()),
                total_outflow=('amount', lambda x: abs(x[x < 0].sum())),
            )
            monthly_rollup['net_flow'] = monthly_rollup['total_inflow'] - monthly_rollup['total_outflow']
            print(monthly_rollup)

            # Warnings
            if (df_clean['amount'] > 0).all() or (df_clean['amount'] < 0).all():
                warnings.warn("WARNING: All amounts are positive or all are negative. Sign inference may be incomplete.")
            if null_report.loc['date', 'null_percentage'] > 5:
                 warnings.warn(f"WARNING: More than 5% of dates failed to parse ({null_report.loc['date', 'null_percentage']:.2f}%).")
            if null_report.loc['amount', 'null_percentage'] > 5:
                 warnings.warn(f"WARNING: More than 5% of amounts failed to parse ({null_report.loc['amount', 'null_percentage']:.2f}%).")


            # Step 12: Save Artifacts
            output_path = Path("/mnt/data/transactions_normalized.csv")
            df_clean.to_csv(output_path, index=False, encoding="utf-8")
            print("\n" + "="*60)
            print(f"Saved normalized CSV to {output_path} ({len(df_clean)} rows).")
            print("="*60)

    except Exception as e:
        print(f"\nCRITICAL ERROR: The ETL process failed. Reason: {e}")

Starting ETL process for: /mnt/data/取引.csv

CRITICAL ERROR: The ETL process failed. Reason: CRITICAL: Could not read the file at /mnt/data/取引.csv with any of the attempted encodings.







In [7]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
