# 現新比較ツール
## S3上の2つのPrefix間でCSVファイルを比較

### 1. ライブラリのインポート

In [None]:
import pandas as pd
import boto3
from datetime import datetime
import io
import gc
from botocore.exceptions import ClientError

### 2. 設定

In [2]:
# 処理日を自動取得
process_date = datetime.now().strftime('%Y%m%d')

# S3設定
S3_BUCKET = 'k-ishibashi-test'
S3_PREFIX_BEFORE = f'{process_date}/before/'
S3_PREFIX_AFTER = f'{process_date}/after/'
S3_OUTPUT_PREFIX = f'{process_date}/output/'

# デフォルト設定
DEFAULT_KEY_COLUMNS = ['vin']
DEFAULT_IGNORE_COLUMNS = []
DEFAULT_HAS_HEADER = True

# ファイル別設定
FILE_SETTINGS = [
    {
        'pattern': r'tccontract.*\.csv$',
        'has_header': True,
        'key_columns': ['vin'],
        'ignore_columns': []
    },
    {
        'pattern': r'run-.*-part-r-\d+$',
        'has_header': True,
        'key_columns': ['vin'],
        'ignore_columns': []
    }
]

print(f'処理日: {process_date}')
print(f'S3バケット: {S3_BUCKET}')
print(f'PrefixBefore: {S3_PREFIX_BEFORE}')
print(f'PrefixAfter: {S3_PREFIX_AFTER}')
print(f'OutputPrefix: {S3_OUTPUT_PREFIX}')

処理日: 20260106
S3バケット: k-ishibashi-test
PrefixBefore: 20260106/before/
PrefixAfter: 20260106/after/
OutputPrefix: 20260106/output/


### 3. S3操作関数

In [3]:
# S3クライアント作成
def get_s3_client():
    return boto3.client("s3")

# 指定Prefix内のCSVファイル一覧を取得
def list_s3_csv_files(s3_client, bucket, prefix):
    try:
        response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
        if 'Contents' not in response:
            return []
        files = []
        for obj in response['Contents']:
            key = obj['Key']
            filename = key.split('/')[-1]
            # .csvファイルまたはrun-*-part-r-*パターンのファイルを対象
            if (filename.endswith('.csv') or filename.startswith('run-')) and key != prefix:
                files.append(filename)
        return files
    except ClientError as e:
        print(f"S3エラー: {e}")
        return []

# S3ファイルのサイズ（MB）を取得
def get_s3_file_size(s3_client, bucket, key):
    try:
        response = s3_client.head_object(Bucket=bucket, Key=key)
        return response['ContentLength'] / (1024 * 1024)
    except ClientError:
        return 0

# S3からCSVファイルを読み込んでDataFrameに変換
def read_csv_from_s3(s3_client, bucket, key, has_header=True):
    try:
        response = s3_client.get_object(Bucket=bucket, Key=key)
        if has_header:
            return pd.read_csv(io.BytesIO(response['Body'].read()), dtype=str, keep_default_na=False)
        else:
            return pd.read_csv(io.BytesIO(response['Body'].read()), header=None, dtype=str, keep_default_na=False)
    except ClientError as e:
        print(f"読み込みエラー ({key}): {e}")
        return None

# データをS3にアップロード
def upload_to_s3(s3_client, bucket, key, data):
    try:
        s3_client.put_object(Bucket=bucket, Key=key, Body=data)
        return True
    except ClientError as e:
        print(f"アップロードエラー ({key}): {e}")
        return False

s3_client = get_s3_client()
print('S3クライアント作成完了')

S3クライアント作成完了


### 4. バイナリチェック・比較処理関数

In [4]:
# MD5ハッシュでファイルの同一性をチェック
# 同一: True, 異なる: False, エラー: None
def binary_check_s3_files(s3_client, bucket, key_a, key_b):
    import hashlib
    try:
        response_a = s3_client.get_object(Bucket=bucket, Key=key_a)
        response_b = s3_client.get_object(Bucket=bucket, Key=key_b)
        
        content_a = response_a['Body'].read()
        content_b = response_b['Body'].read()
        
        # ハッシュ比較
        if hashlib.md5(content_a).hexdigest() == hashlib.md5(content_b).hexdigest():
            return True, None, None
        
        return False, None, None
    
    except Exception as e:
        print(f"バイナリチェックエラー: {e}")
        return None, None, None

# ファイル名からファイル別設定（キー列、ヘッダ有無など）を取得
def get_file_config(filename):
    import re
    for setting in FILE_SETTINGS:
        if re.match(setting['pattern'], filename):
            return {
                'has_header': setting.get('has_header', True),
                'key_columns': setting['key_columns'],
                'ignore_columns': setting['ignore_columns']
            }
    return {
        'has_header': DEFAULT_HAS_HEADER,
        'key_columns': DEFAULT_KEY_COLUMNS,
        'ignore_columns': DEFAULT_IGNORE_COLUMNS
    }

# 2つのDataFrameをキー列で突合し、差分を検出
# 削除・追加・変更の3種類の差分をDataFrameで返す
def compare_csv_normal(baseline_df, candidate_df, key_columns, ignore_columns):
    if len(key_columns) > 1:
        baseline_df = baseline_df.copy()
        candidate_df = candidate_df.copy()
        combined_key = '_'.join(map(str, key_columns))
        baseline_df[combined_key] = baseline_df[key_columns].astype(str).agg('_'.join, axis=1)
        candidate_df[combined_key] = candidate_df[key_columns].astype(str).agg('_'.join, axis=1)
        key_column = combined_key
    else:
        key_column = key_columns[0]
    
    compare_columns = [col for col in baseline_df.columns if col not in key_columns and col not in ignore_columns and col != key_column]
    diff_records = []
    
    # 削除・追加を検出
    baseline_keys = set(baseline_df[key_column])
    candidate_keys = set(candidate_df[key_column])
    
    for key in baseline_keys - candidate_keys:
        diff_records.append({'key': key, 'diff_type': 'DELETED', 'column': None, 'baseline_value': None, 'candidate_value': None})
    for key in candidate_keys - baseline_keys:
        diff_records.append({'key': key, 'diff_type': 'ADDED', 'column': None, 'baseline_value': None, 'candidate_value': None})
    
    # 変更を検出
    common_keys = baseline_keys & candidate_keys
    if common_keys:
        baseline_common = baseline_df[baseline_df[key_column].isin(common_keys)].set_index(key_column).sort_index()
        candidate_common = candidate_df[candidate_df[key_column].isin(common_keys)].set_index(key_column).sort_index()
        
        # 共通カラムのみを比較
        common_cols = [col for col in baseline_common.columns if col in candidate_common.columns]
        for col in common_cols:
            if col not in ignore_columns and col != key_column:
                diff_mask = baseline_common[col].values != candidate_common[col].values
                for idx, key in enumerate(baseline_common.index):
                    if diff_mask[idx]:
                        diff_records.append({
                            'key': key, 'diff_type': 'MODIFIED', 'column': col,
                            'baseline_value': baseline_common.iloc[idx][col],
                            'candidate_value': candidate_common.iloc[idx][col]
                        })
    
    return pd.DataFrame(diff_records)


# キーごとに差分を詳細表示（現在値と新規値を並べて表示）
def display_diff_by_id(diff_df, baseline_df, candidate_df, key_columns, ignore_columns):
    if len(diff_df) == 0:
        print('差分はありません')
        return
    if len(key_columns) > 1:
        baseline_df, candidate_df = baseline_df.copy(), candidate_df.copy()
        key_column = '_'.join(key_columns)
        baseline_df[key_column] = baseline_df[key_columns].astype(str).agg('_'.join, axis=1)
        candidate_df[key_column] = candidate_df[key_columns].astype(str).agg('_'.join, axis=1)
        key_display = '+'.join(key_columns)
    else:
        key_column, key_display = key_columns[0], key_columns[0]
    
    baseline_indexed = baseline_df.set_index(key_column)
    candidate_indexed = candidate_df.set_index(key_column)
    
    # 列名を取得（ignore_columnsとkey_columnsを除く）
    display_columns = [col for col in baseline_df.columns if col not in ignore_columns and col not in key_columns and col != key_column]
    
    for key in sorted(diff_df['key'].unique()):
        key_rows = diff_df[diff_df['key'] == key]
        if len(key_rows) == 0:
            continue
        diff_type = key_rows['diff_type'].iloc[0]
        print(f'\n{key_display}: {key}')
        print('=' * 40)
        print(f'列名: {", ".join(str(col) for col in display_columns)}')
        
        if diff_type == 'DELETED':
            vals = []
            for c in display_columns:
                v = baseline_indexed.loc[key, c]
                if isinstance(v, pd.Series):
                    v = v.iloc[0] if len(v) > 0 else ''
                vals.append(str(v))
            print(f'(現) {", ".join(vals)}')
            print('(新) ---')
        elif diff_type == 'ADDED':
            vals = []
            for c in display_columns:
                v = candidate_indexed.loc[key, c]
                if isinstance(v, pd.Series):
                    v = v.iloc[0] if len(v) > 0 else ''
                vals.append(str(v))
            print('(現) ---')
            print(f'(新) {", ".join(vals)}')
        elif diff_type == 'MODIFIED':
            bvals = []
            cvals = []
            for c in display_columns:
                bv = baseline_indexed.loc[key, c]
                cv = candidate_indexed.loc[key, c]
                if isinstance(bv, pd.Series):
                    bv = bv.iloc[0] if len(bv) > 0 else ''
                if isinstance(cv, pd.Series):
                    cv = cv.iloc[0] if len(cv) > 0 else ''
                bvals.append(str(bv))
                cvals.append(str(cv))
            print(f'(現) {", ".join(bvals)}')
            print(f'(新) {", ".join(cvals)}')
            print(f'変更された項目: {", ".join([str(int(float(c))) if str(c).replace(".","").isdigit() else str(c) for c in key_rows["column"].tolist() if c is not None])}')

# 比較結果のサマリーレポートを表示（ファイル情報、差分件数など）
def display_summary_report(filename, result, lines_a_count, lines_b_count):
    elapsed_time = result.get('elapsed_time', 0)
    minutes = int(elapsed_time // 60)
    seconds = int(elapsed_time % 60)
    print(f"\n{filename} 比較結果サマリー ({minutes:02d}:{seconds:02d})")
    print("=" * 60)
    
    # ファイル基本情報
    print(f"ファイル情報:")
    print(f"   Before: {lines_a_count:,}行")
    print(f"   After:  {lines_b_count:,}行")
    print(f"   行数差: {lines_b_count - lines_a_count:+,}行")
    
    if result.get('binary_identical', False):
        print(f"\n結果: ファイルは完全に同一です")
        return
    
    # データ内容の差分
    print(f"\nデータ内容の変化:")
    diff_count = result['diff_count']
    if diff_count == 0:
        print(f"   データ内容に変更はありません")
    else:
        print(f"   変更されたレコード: {diff_count:,}件")
        if 'summary' in result:
            for diff_type, count in result['summary'].items():
                type_name = {'ADDED': '追加', 'DELETED': '削除', 'MODIFIED': '変更'}.get(diff_type, diff_type)
                print(f"     {type_name}: {count:,}件")
    
    # 出力ファイル情報
    if 'output_key' in result:
        print(f"\n詳細レポート: s3://{S3_BUCKET}/{result['output_key']}")

# S3からファイルを読込、ソート後に比較を実行
# 差分DataFrame、baseline_df、candidate_df、行数を返す
def compare_csv_large(s3_client, bucket, key_a, key_b, key_columns, ignore_columns):
    size_a, size_b = get_s3_file_size(s3_client, bucket, key_a), get_s3_file_size(s3_client, bucket, key_b)
    print(f"ファイルサイズ: Before={size_a:.1f}MB, After={size_b:.1f}MB")
    
    file_config = get_file_config(key_a.split('/')[-1])
    has_header = file_config.get('has_header', True)
    baseline_df, candidate_df = read_csv_from_s3(s3_client, bucket, key_a, has_header), read_csv_from_s3(s3_client, bucket, key_b, has_header)
    if baseline_df is None or candidate_df is None:
        return None, None, None, 0, 0
    return compare_csv_normal(baseline_df, candidate_df, key_columns, ignore_columns), baseline_df, candidate_df, len(baseline_df), len(candidate_df)

print('バイナリチェック・比較関数定義完了')


バイナリチェック・比較関数定義完了


### 5. ファイル一覧取得

In [5]:
# ファイル一覧取得
prefix_before_files = set(list_s3_csv_files(s3_client, S3_BUCKET, S3_PREFIX_BEFORE))
prefix_after_files = set(list_s3_csv_files(s3_client, S3_BUCKET, S3_PREFIX_AFTER))
files_to_compare = list(prefix_before_files & prefix_after_files)

print(f'Before: {len(prefix_before_files)}ファイル')
print(f'After:  {len(prefix_after_files)}ファイル')
print(f'比較対象: {len(files_to_compare)}ファイル')
print(f'\n比較対象ファイル一覧:')
for i, filename in enumerate(sorted(files_to_compare), 1):
    print(f'  {i}. {filename}')

Before: 2ファイル
After:  3ファイル
比較対象: 2ファイル

比較対象ファイル一覧:
  1. run-1765205425280-part-r-00000
  2. tccontract_20251208.csv


### 6. 比較実行

In [6]:
# 各ファイルを比較
results = {}

for i, filename in enumerate(files_to_compare, 1):
    start_time = datetime.now()
    print(f'\n[{i}/{len(files_to_compare)}] {filename} を比較中...')
    print("=" * 80)
    
    key_a = f"{S3_PREFIX_BEFORE.rstrip('/')}/{filename}"
    key_b = f"{S3_PREFIX_AFTER.rstrip('/')}/{filename}"
    file_config = get_file_config(filename)
    
    # バイナリチェック実行
    is_identical, _, _ = binary_check_s3_files(s3_client, S3_BUCKET, key_a, key_b)
    
    if is_identical is None:
        print('バイナリチェック失敗')
        continue
    elif is_identical:
        print('差分なし')
        results[filename] = {'diff_count': 0, 'binary_identical': True}
        continue
    else:
        print('差分あり')
    
    diff_df, baseline_df, candidate_df, lines_a_count, lines_b_count = compare_csv_large(
        s3_client, S3_BUCKET, key_a, key_b, file_config['key_columns'], file_config['ignore_columns']
    )
    
    if diff_df is None:
        print('キー突合チェック失敗')
        continue
    
    # 結果を保存
    result = {
        'diff_count': len(diff_df),
        'binary_identical': False
    }
    
    if len(diff_df) > 0:
        summary = diff_df['diff_type'].value_counts()
        result['summary'] = summary.to_dict()
        
        # 差分ファイル出力
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        output_key = f"{S3_OUTPUT_PREFIX.rstrip('/')}/{filename.replace('.csv', '')}_diff_{timestamp}.csv"
        csv_buffer = io.StringIO()
        diff_df.to_csv(csv_buffer, index=False, encoding='utf-8-sig')
        
        if upload_to_s3(s3_client, S3_BUCKET, output_key, csv_buffer.getvalue().encode('utf-8-sig')):
            result['output_key'] = output_key
        
        # 詳細差分表示
        print(f'\n=== {filename} ID別詳細差分レポート ===')
        display_diff_by_id(diff_df, baseline_df, candidate_df, file_config['key_columns'], file_config['ignore_columns'])
    
    results[filename] = result
    
    # 処理時間を計算
    elapsed_time = (datetime.now() - start_time).total_seconds()
    result['elapsed_time'] = elapsed_time
    
    # わかりやすいサマリー表示
    display_summary_report(filename, result, lines_a_count, lines_b_count)
    
    gc.collect()

print(f'\n全{len(files_to_compare)}ファイルの比較が完了しました')


[1/2] run-1765205425280-part-r-00000 を比較中...
差分なし

[2/2] tccontract_20251208.csv を比較中...
差分あり
ファイルサイズ: Before=0.0MB, After=0.0MB

=== tccontract_20251208.csv ID別詳細差分レポート ===

vin: PHY099190000018
列名: createdatetime, startdatetime, enddatetime, contractreleasedatetime, contractstartdatetime, serviceplanid, navigationserial, acceptingdealercode, acceptingshopcode, autocontinuationtype, diffflag
(現) 2025-12-08, , , , , , , , , , 111111111
(新) 2025-12-08, , , , , , , , , , 111111000
変更された項目: diffflag

vin: PHY099190000099
列名: createdatetime, startdatetime, enddatetime, contractreleasedatetime, contractstartdatetime, serviceplanid, navigationserial, acceptingdealercode, acceptingshopcode, autocontinuationtype, diffflag
(現) ---
(新) 2025-12-08, , , , , , , , , , 111111111

tccontract_20251208.csv 比較結果サマリー (00:00)
ファイル情報:
   Before: 28行
   After:  29行
   行数差: +1行

データ内容の変化:
   変更されたレコード: 2件
     追加: 1件
     変更: 1件

詳細レポート: s3://k-ishibashi-test/20260106/output/tccontract_20251208_diff_2026010

### 7. 全体サマリー

In [7]:
# 全体比較結果サマリー
print('\n' + '='*60)
print('全体比較結果サマリー')
print('='*60)

for filename in sorted(results.keys()):
    result = results[filename]
    
    if result.get('binary_identical', False) and result['diff_count'] == 0:
        print(f'✅ {filename}')
    elif result['diff_count'] > 0:
        summary_parts = []
        if 'summary' in result:
            for diff_type, count in result['summary'].items():
                if diff_type == 'MODIFIED':
                    summary_parts.append(f'変更{count}件')
                elif diff_type == 'DELETED':
                    summary_parts.append(f'削除{count}件')
                elif diff_type == 'ADDED':
                    summary_parts.append(f'追加{count}件')
        summary_str = '、'.join(summary_parts)
        print(f'❌ {filename}: {result["diff_count"]}件の変更')
        if summary_str:
            print(f'   └ {summary_str}')
    else:
        print(f'❌ {filename} (バイナリ差分あり)')



全体比較結果サマリー
✅ run-1765205425280-part-r-00000
❌ tccontract_20251208.csv: 2件の変更
   └ 追加1件、変更1件
