In [22]:
# =============================================================================
# CELL 1: Configuration and Paths
# =============================================================================
import pandas as pd
import numpy as np
import os
import glob
import json
import gzip
import time
import gc
from datetime import datetime
from collections import defaultdict
from pathlib import Path

# Paths
BASE_PATH = r"c:\Users\jonat\Downloads\_unique_csv_master"
ECB_PATH = os.path.join(BASE_PATH, "ECB_Data", "ECB Data")
ESMA_PATH = os.path.join(BASE_PATH, "ESMA_UE_Collat_Merged")
TEMPLATE_PATH = os.path.join(BASE_PATH, "ESMA Template (2).xlsx")
POOL_MAPPING_PATH = os.path.join(BASE_PATH, "pool_mapping.json")

# OUTPUT on D: drive
OUTPUT_DIR = r"D:\ECB_ESMA_MERGED"
CHECKPOINT_FILE = os.path.join(OUTPUT_DIR, "_checkpoint.json")
LOG_FILE = os.path.join(OUTPUT_DIR, "_processing_log.txt")

# Create output directory
os.makedirs(OUTPUT_DIR, exist_ok=True)

# =============================================================================
# POOLS WITH TEMPORAL OVERLAP (verified via analysis)
# Only these 3 pools have ECB and ESMA data for the same months
# Deduplication is ONLY needed for these pools
# =============================================================================
POOLS_WITH_OVERLAP = {
    'RMBMBE000095100120084',  # 8 overlapping months: 2022-03 to 2023-12
    'RMBMFR000083100220149',  # 4 overlapping months: 2021-05 to 2021-08
    'RMBMNL000185100120109',  # 1 overlapping month: 2024-06
}

print(f"Input ECB: {ECB_PATH}")
print(f"Input ESMA: {ESMA_PATH}")
print(f"Output: {OUTPUT_DIR}")
print(f"Checkpoint: {CHECKPOINT_FILE}")
print(f"Pools requiring deduplication: {len(POOLS_WITH_OVERLAP)}")

Input ECB: c:\Users\jonat\Downloads\_unique_csv_master\ECB_Data\ECB Data
Input ESMA: c:\Users\jonat\Downloads\_unique_csv_master\ESMA_UE_Collat_Merged
Output: D:\ECB_ESMA_MERGED
Checkpoint: D:\ECB_ESMA_MERGED\_checkpoint.json
Pools requiring deduplication: 3


In [23]:
# =============================================================================
# CELL 2: Load Template Mapping (ECB <-> ESMA columns)
# =============================================================================
template_df = pd.read_excel(TEMPLATE_PATH, sheet_name='Sheet1')
template_df = template_df[['FIELD CODE', 'FIELD NAME', 'For info: existing ECB or EBA NPL template field code']].copy()
template_df.columns = ['ESMA_Code', 'ESMA_Name', 'ECB_Code']
template_df = template_df[template_df['ECB_Code'].notna()]

# Create mappings
esma_to_ecb = dict(zip(template_df['ESMA_Code'], template_df['ECB_Code']))

ecb_to_esma = {}
for _, row in template_df.iterrows():
    esma_code = row['ESMA_Code']
    ecb_code = row['ECB_Code']
    esma_name = row['ESMA_Name']
    if ecb_code not in ecb_to_esma:
        ecb_to_esma[ecb_code] = esma_code
    elif 'New' in str(esma_name):
        ecb_to_esma[ecb_code] = esma_code

print(f"ECB->ESMA mappings: {len(ecb_to_esma)}")
print(f"ESMA->ECB mappings: {len(esma_to_ecb)}")

ECB->ESMA mappings: 72
ESMA->ECB mappings: 80


In [24]:
# =============================================================================
# CELL 3: Build File Index (All Pools)
# =============================================================================
# Index ECB files by pool
all_ecb_files = glob.glob(os.path.join(ECB_PATH, "*.gz"))
ecb_pool_files = defaultdict(list)
for f in all_ecb_files:
    fname = os.path.basename(f)
    pool_id = fname.split('_')[0]
    ecb_pool_files[pool_id].append(fname)

# Index ESMA files by pool
all_esma_files = glob.glob(os.path.join(ESMA_PATH, "*.csv"))
esma_pool_files = defaultdict(list)
for f in all_esma_files:
    fname = os.path.basename(f)
    parts = fname.split('_')
    pool_id = parts[-3]  # Pool ID is 3rd from last
    esma_pool_files[pool_id].append(fname)

# Load pool mapping (ECB -> ESMA matches)
with open(POOL_MAPPING_PATH) as f:
    pool_mapping = json.load(f)

matched_pools = pool_mapping['pools']
matched_ecb_pools = set(matched_pools.keys())
matched_esma_pools = set(p['esma_pool'] for p in matched_pools.values())

# Identify ECB-only and ESMA-only pools
ecb_only_pools = set(ecb_pool_files.keys()) - matched_ecb_pools
esma_only_pools = set(esma_pool_files.keys()) - matched_esma_pools

print(f"ECB pools: {len(ecb_pool_files)} ({len(all_ecb_files)} files)")
print(f"ESMA pools: {len(esma_pool_files)} ({len(all_esma_files)} files)")
print(f"\nMatched ECB->ESMA pools: {len(matched_pools)}")
print(f"ECB-only pools: {len(ecb_only_pools)}")
print(f"ESMA-only pools: {len(esma_only_pools)}")

ECB pools: 58 (1941 files)
ESMA pools: 262 (2906 files)

Matched ECB->ESMA pools: 22
ECB-only pools: 36
ESMA-only pools: 246


In [25]:
# =============================================================================
# CELL 4: Checkpoint Management
# =============================================================================
def scan_completed_pools():
    """
    Scan output folders to determine which pools are already processed.
    This is more reliable than checkpoint.json since it checks actual files.
    """
    completed = {
        'matched': set(),
        'ecb_only': set(),
        'esma_only': set()
    }
    
    for pool_type in ['matched', 'ecb_only', 'esma_only']:
        folder = os.path.join(OUTPUT_DIR, pool_type)
        if os.path.exists(folder):
            for fname in os.listdir(folder):
                if fname.endswith('.csv') and not fname.endswith('.tmp'):
                    # Pool ID is the filename without .csv
                    pool_id = fname[:-4]
                    completed[pool_type].add(pool_id)
    
    return completed

def log_message(msg):
    """Append message to log file and print"""
    timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    log_entry = f"[{timestamp}] {msg}"
    print(log_entry)
    with open(LOG_FILE, 'a', encoding='utf-8') as f:
        f.write(log_entry + '\n')

# Scan output folders to find already-completed pools
completed_pools = scan_completed_pools()
print(f"Scanned output folders for completed pools:")
print(f"  Matched pools: {len(completed_pools['matched'])}")
print(f"  ECB-only pools: {len(completed_pools['ecb_only'])}")
print(f"  ESMA-only pools: {len(completed_pools['esma_only'])}")

Scanned output folders for completed pools:
  Matched pools: 22
  ECB-only pools: 0
  ESMA-only pools: 0


In [26]:
# =============================================================================
# CELL 5: Data Loading Functions (with retry and chunked loading)
# =============================================================================
import gc
import io
import zlib

# Threshold for "large" pool (compressed size in bytes)
# 100 MB compressed - more conservative to avoid memory issues
LARGE_POOL_THRESHOLD = 100 * 1024 * 1024  # 100 MB compressed

def load_ecb_file(filepath, max_retries=3):
    """
    Load a single ECB .gz file with retry logic.
    
    Uses zlib fallback for files that fail with standard gzip reader
    (some ECB files have gzip format issues that zlib handles correctly).
    """
    fname = os.path.basename(filepath)
    
    for attempt in range(max_retries):
        try:
            # Method 1: Try standard pandas read (works for most files)
            df = pd.read_csv(filepath, compression='gzip', low_memory=False)
            return df
        except Exception as e:
            error_msg = str(e)
            
            # Check if this is a gzip-related error
            if 'gzip' in error_msg.lower() or 'Not a gzipped file' in error_msg:
                # Method 2: Use zlib directly (handles problematic gzip files)
                try:
                    with open(filepath, 'rb') as f:
                        raw = f.read()
                    
                    # Decompress using zlib (skip 10-byte gzip header)
                    decomp = zlib.decompressobj(-zlib.MAX_WBITS)
                    result = decomp.decompress(raw[10:])
                    result += decomp.flush()
                    
                    df = pd.read_csv(io.BytesIO(result), low_memory=False)
                    log_message(f"  Loaded {fname} using zlib fallback ({len(df):,} rows)")
                    return df
                except Exception as zlib_error:
                    if attempt < max_retries - 1:
                        log_message(f"  Retry {attempt+1}/{max_retries} for {fname}: zlib also failed: {zlib_error}")
                        time.sleep(0.5)
                        gc.collect()
                    else:
                        log_message(f"ERROR loading ECB file {filepath}: {e} (zlib fallback: {zlib_error})")
                        return pd.DataFrame()
            else:
                if attempt < max_retries - 1:
                    log_message(f"  Retry {attempt+1}/{max_retries} for {fname}: {e}")
                    time.sleep(0.5)
                    gc.collect()
                else:
                    log_message(f"ERROR loading ECB file {filepath}: {e}")
                    return pd.DataFrame()
    return pd.DataFrame()

def load_esma_file(filepath, max_retries=3):
    """Load a single ESMA .csv file with retry logic"""
    for attempt in range(max_retries):
        try:
            df = pd.read_csv(filepath, low_memory=False)
            return df
        except Exception as e:
            if attempt < max_retries - 1:
                log_message(f"  Retry {attempt+1}/{max_retries} for {os.path.basename(filepath)}: {e}")
                time.sleep(0.5)
                gc.collect()
            else:
                log_message(f"ERROR loading ESMA file {filepath}: {e}")
                return pd.DataFrame()
    return pd.DataFrame()

def get_pool_compressed_size(pool_id, source='ecb'):
    """Get total compressed size of files for a pool"""
    if source == 'ecb':
        files = ecb_pool_files.get(pool_id, [])
        base_path = ECB_PATH
    else:
        files = esma_pool_files.get(pool_id, [])
        base_path = ESMA_PATH
    
    total = 0
    for fname in files:
        try:
            total += os.path.getsize(os.path.join(base_path, fname))
        except:
            pass
    return total

def is_large_pool(pool_id, source='ecb'):
    """Check if pool exceeds size threshold"""
    return get_pool_compressed_size(pool_id, source) > LARGE_POOL_THRESHOLD

def load_all_ecb_for_pool(pool_id):
    """Load and concatenate all ECB files for a pool"""
    files = ecb_pool_files.get(pool_id, [])
    if not files:
        return pd.DataFrame()
    
    dfs = []
    for fname in files:
        filepath = os.path.join(ECB_PATH, fname)
        df = load_ecb_file(filepath)
        if len(df) > 0:
            dfs.append(df)
    
    if dfs:
        return pd.concat(dfs, ignore_index=True)
    return pd.DataFrame()

def load_all_esma_for_pool(pool_id):
    """Load and concatenate all ESMA files for a pool"""
    files = esma_pool_files.get(pool_id, [])
    if not files:
        return pd.DataFrame()
    
    dfs = []
    for fname in files:
        filepath = os.path.join(ESMA_PATH, fname)
        df = load_esma_file(filepath)
        if len(df) > 0:
            dfs.append(df)
    
    if dfs:
        return pd.concat(dfs, ignore_index=True)
    return pd.DataFrame()

print("✓ Data loading functions defined (with zlib fallback for problematic gzip files)")
print(f"  Large pool threshold: {LARGE_POOL_THRESHOLD/1024/1024:.0f} MB compressed")

✓ Data loading functions defined (with zlib fallback for problematic gzip files)
  Large pool threshold: 100 MB compressed


In [27]:
# =============================================================================
# CELL 6: Data Preparation Functions
# =============================================================================
def prepare_ecb_data(ecb_df):
    """
    Prepare ECB data:
    1. Rename columns to ESMA equivalents
    2. Add source marker
    3. Normalize date to YYYY-MM
    """
    if len(ecb_df) == 0:
        return pd.DataFrame()
    
    df = ecb_df.copy()
    
    # Rename ECB columns to ESMA equivalents
    rename_map = {ecb: esma for ecb, esma in ecb_to_esma.items() if ecb in df.columns}
    df = df.rename(columns=rename_map)
    
    # Add source marker
    df['source'] = 'ECB'
    
    # Normalize date to YYYY-MM for deduplication
    if 'RREL6' in df.columns:
        df['date_ym'] = df['RREL6'].astype(str).str[:7]
    elif 'AR1' in df.columns:
        df['date_ym'] = df['AR1'].astype(str).str[:7]
    else:
        df['date_ym'] = ''
    
    return df

def prepare_esma_data(esma_df):
    """
    Prepare ESMA data:
    1. Add source marker
    2. Normalize date to YYYY-MM
    """
    if len(esma_df) == 0:
        return pd.DataFrame()
    
    df = esma_df.copy()
    df['source'] = 'ESMA'
    
    # Normalize date
    if 'RREL6' in df.columns:
        df['date_ym'] = df['RREL6'].astype(str).str[:7]
    else:
        df['date_ym'] = ''
    
    return df

def remove_duplicates_prefer_esma(df):
    """
    Remove duplicate loan+date rows, preferring ESMA over ECB.
    """
    if len(df) == 0 or 'RREL3' not in df.columns:
        return df
    
    # Find loan+dates that have ESMA data
    esma_rows = df[df['source'] == 'ESMA']
    esma_loan_dates = set(zip(
        esma_rows['RREL3'].astype(str),
        esma_rows['date_ym'].astype(str)
    ))
    
    # Keep ESMA rows, or ECB rows without ESMA equivalent
    def should_keep(row):
        if row['source'] == 'ESMA':
            return True
        loan_date = (str(row['RREL3']), str(row['date_ym']))
        return loan_date not in esma_loan_dates
    
    mask = df.apply(should_keep, axis=1)
    result = df[mask].copy()
    
    # Drop helper column
    if 'date_ym' in result.columns:
        result = result.drop(columns=['date_ym'])
    
    return result

def get_non_empty_columns(df):
    """Return list of columns that have at least one non-null value"""
    non_empty = []
    for col in df.columns:
        if df[col].notna().any():
            non_empty.append(col)
    return non_empty

print("✓ Data preparation functions defined")

✓ Data preparation functions defined


In [28]:
# =============================================================================
# CELL 7: Pool Processing Functions (with large pool handling)
# =============================================================================
def process_matched_pool(ecb_pool_id, esma_pool_id):
    """
    Process a matched ECB-ESMA pool pair.
    Returns merged DataFrame with source column.
    
    OPTIMIZATION: Only runs deduplication for pools with temporal overlap.
    For pools without overlap, simply concatenates ECB and ESMA data.
    """
    # Load data
    ecb_df = load_all_ecb_for_pool(ecb_pool_id)
    esma_df = load_all_esma_for_pool(esma_pool_id)
    
    log_message(f"  ECB rows: {len(ecb_df)}, ESMA rows: {len(esma_df)}")
    
    # Check if this pool needs deduplication
    needs_dedup = ecb_pool_id in POOLS_WITH_OVERLAP
    
    # Prepare data
    ecb_prepared = prepare_ecb_data(ecb_df)
    esma_prepared = prepare_esma_data(esma_df)
    
    # Combine
    if len(ecb_prepared) > 0 and len(esma_prepared) > 0:
        combined = pd.concat([ecb_prepared, esma_prepared], ignore_index=True)
    elif len(esma_prepared) > 0:
        combined = esma_prepared
    elif len(ecb_prepared) > 0:
        combined = ecb_prepared
    else:
        return pd.DataFrame()
    
    # Remove duplicates ONLY for pools with temporal overlap
    if needs_dedup:
        log_message(f"  Pool has temporal overlap - running deduplication...")
        merged = remove_duplicates_prefer_esma(combined)
        log_message(f"  After dedup: {len(merged)} rows (removed {len(combined) - len(merged)} duplicates)")
    else:
        # No overlap - skip dedup (significant speedup)
        merged = combined
        if 'date_ym' in merged.columns:
            merged = merged.drop(columns=['date_ym'])
        log_message(f"  No temporal overlap - skipping dedup: {len(merged)} rows")
    
    # Add pool identifiers
    merged['ecb_pool_id'] = ecb_pool_id
    merged['esma_pool_id'] = esma_pool_id
    
    return merged

def get_all_columns_for_ecb_pool(ecb_pool_id):
    """Scan all ECB files to determine ALL columns for consistent output."""
    all_columns = set()
    files = ecb_pool_files.get(ecb_pool_id, [])
    
    for fname in files:
        try:
            filepath = os.path.join(ECB_PATH, fname)
            df = load_ecb_file(filepath)
            if len(df) > 0:
                prepared = prepare_ecb_data(df)
                if 'date_ym' in prepared.columns:
                    prepared = prepared.drop(columns=['date_ym'])
                prepared['ecb_pool_id'] = ecb_pool_id
                prepared['esma_pool_id'] = None
                non_empty = get_non_empty_columns(prepared)
                all_columns.update(non_empty)
                del df, prepared
        except Exception as e:
            log_message(f"    Warning: Could not scan ECB file {fname}: {e}")
    
    gc.collect()
    return sorted(list(all_columns))

def get_all_columns_for_esma_pool(esma_pool_id):
    """Scan all ESMA files to determine ALL columns for consistent output."""
    all_columns = set()
    files = esma_pool_files.get(esma_pool_id, [])
    
    for fname in files:
        try:
            filepath = os.path.join(ESMA_PATH, fname)
            df = load_esma_file(filepath)
            if len(df) > 0:
                prepared = prepare_esma_data(df)
                if 'date_ym' in prepared.columns:
                    prepared = prepared.drop(columns=['date_ym'])
                prepared['ecb_pool_id'] = None
                prepared['esma_pool_id'] = esma_pool_id
                non_empty = get_non_empty_columns(prepared)
                all_columns.update(non_empty)
                del df, prepared
        except Exception as e:
            log_message(f"    Warning: Could not scan ESMA file {fname}: {e}")
    
    gc.collect()
    return sorted(list(all_columns))

def process_ecb_only_pool_chunked(ecb_pool_id, output_path):
    """
    Process a LARGE ECB-only pool file-by-file, appending to output.
    This avoids loading entire pool into memory.
    Returns total rows saved.
    """
    files = ecb_pool_files.get(ecb_pool_id, [])
    if not files:
        return 0
    
    # PHASE 1: Scan all files to determine consistent column schema
    log_message(f"  Scanning {len(files)} files for column schema...")
    all_columns = get_all_columns_for_ecb_pool(ecb_pool_id)
    log_message(f"  Found {len(all_columns)} columns")
    
    total_rows = 0
    first_file = True
    
    log_message(f"  Processing {len(files)} files in chunked mode...")
    
    for i, fname in enumerate(files):
        filepath = os.path.join(ECB_PATH, fname)
        df = load_ecb_file(filepath)
        
        if len(df) == 0:
            continue
        
        # Prepare (rename to ESMA schema)
        prepared = prepare_ecb_data(df)
        
        # Drop helper column
        if 'date_ym' in prepared.columns:
            prepared = prepared.drop(columns=['date_ym'])
        
        # Add pool identifiers
        prepared['ecb_pool_id'] = ecb_pool_id
        prepared['esma_pool_id'] = None
        
        # Use consistent columns - add missing as NaN
        for col in all_columns:
            if col not in prepared.columns:
                prepared[col] = np.nan
        
        df_to_save = prepared[all_columns]
        
        # Append to file (write header only for first chunk)
        df_to_save.to_csv(output_path, mode='a', index=False, header=first_file)
        first_file = False
        total_rows += len(df_to_save)
        
        # Free memory
        del df, prepared, df_to_save
        gc.collect()
        
        if (i + 1) % 20 == 0:
            log_message(f"    Processed {i+1}/{len(files)} files, {total_rows:,} rows so far")
    
    return total_rows

def process_ecb_only_pool(ecb_pool_id):
    """
    Process an ECB-only pool (no ESMA match).
    Rename columns to ESMA schema.
    """
    ecb_df = load_all_ecb_for_pool(ecb_pool_id)
    
    if len(ecb_df) == 0:
        return pd.DataFrame()
    
    log_message(f"  ECB rows: {len(ecb_df)}")
    
    # Prepare (rename to ESMA schema)
    prepared = prepare_ecb_data(ecb_df)
    
    # Drop helper column
    if 'date_ym' in prepared.columns:
        prepared = prepared.drop(columns=['date_ym'])
    
    # Add pool identifiers
    prepared['ecb_pool_id'] = ecb_pool_id
    prepared['esma_pool_id'] = None
    
    return prepared

def process_esma_only_pool_chunked(esma_pool_id, output_path):
    """
    Process a LARGE ESMA-only pool file-by-file, appending to output.
    Returns total rows saved.
    """
    files = esma_pool_files.get(esma_pool_id, [])
    if not files:
        return 0
    
    # PHASE 1: Scan all files to determine consistent column schema
    log_message(f"  Scanning {len(files)} files for column schema...")
    all_columns = get_all_columns_for_esma_pool(esma_pool_id)
    log_message(f"  Found {len(all_columns)} columns")
    
    total_rows = 0
    first_file = True
    
    log_message(f"  Processing {len(files)} files in chunked mode...")
    
    for i, fname in enumerate(files):
        filepath = os.path.join(ESMA_PATH, fname)
        df = load_esma_file(filepath)
        
        if len(df) == 0:
            continue
        
        # Prepare
        prepared = prepare_esma_data(df)
        
        # Drop helper column
        if 'date_ym' in prepared.columns:
            prepared = prepared.drop(columns=['date_ym'])
        
        # Add pool identifiers
        prepared['ecb_pool_id'] = None
        prepared['esma_pool_id'] = esma_pool_id
        
        # Use consistent columns - add missing as NaN
        for col in all_columns:
            if col not in prepared.columns:
                prepared[col] = np.nan
        
        df_to_save = prepared[all_columns]
        
        # Append to file
        df_to_save.to_csv(output_path, mode='a', index=False, header=first_file)
        first_file = False
        total_rows += len(df_to_save)
        
        # Free memory
        del df, prepared, df_to_save
        gc.collect()
        
        if (i + 1) % 20 == 0:
            log_message(f"    Processed {i+1}/{len(files)} files, {total_rows:,} rows so far")
    
    return total_rows

def process_esma_only_pool(esma_pool_id):
    """
    Process an ESMA-only pool (no ECB match).
    """
    esma_df = load_all_esma_for_pool(esma_pool_id)
    
    if len(esma_df) == 0:
        return pd.DataFrame()
    
    log_message(f"  ESMA rows: {len(esma_df)}")
    
    # Prepare
    prepared = prepare_esma_data(esma_df)
    
    # Drop helper column
    if 'date_ym' in prepared.columns:
        prepared = prepared.drop(columns=['date_ym'])
    
    # Add pool identifiers
    prepared['ecb_pool_id'] = None
    prepared['esma_pool_id'] = esma_pool_id
    
    return prepared

def get_all_columns_for_matched_pool(ecb_pool_id, esma_pool_id):
    """
    PHASE 1: Scan sample files from both ECB and ESMA to determine ALL columns.
    This ensures consistent column alignment when writing chunks.
    Returns sorted list of all non-empty columns across both sources.
    """
    all_columns = set()
    
    ecb_files = ecb_pool_files.get(ecb_pool_id, [])
    esma_files = esma_pool_files.get(esma_pool_id, [])
    
    # Scan ALL files to ensure we capture every possible column
    # This prevents column misalignment if later files have different columns
    
    # Scan ECB files
    for fname in ecb_files:
        try:
            filepath = os.path.join(ECB_PATH, fname)
            df = load_ecb_file(filepath)
            if len(df) > 0:
                prepared = prepare_ecb_data(df)
                if 'date_ym' in prepared.columns:
                    prepared = prepared.drop(columns=['date_ym'])
                prepared['ecb_pool_id'] = ecb_pool_id
                prepared['esma_pool_id'] = esma_pool_id
                non_empty = get_non_empty_columns(prepared)
                all_columns.update(non_empty)
                del df, prepared
        except Exception as e:
            log_message(f"    Warning: Could not scan ECB file {fname}: {e}")
    
    # Scan ESMA files
    for fname in esma_files:
        try:
            filepath = os.path.join(ESMA_PATH, fname)
            df = load_esma_file(filepath)
            if len(df) > 0:
                prepared = prepare_esma_data(df)
                if 'date_ym' in prepared.columns:
                    prepared = prepared.drop(columns=['date_ym'])
                prepared['ecb_pool_id'] = ecb_pool_id
                prepared['esma_pool_id'] = esma_pool_id
                non_empty = get_non_empty_columns(prepared)
                all_columns.update(non_empty)
                del df, prepared
        except Exception as e:
            log_message(f"    Warning: Could not scan ESMA file {fname}: {e}")
    
    gc.collect()
    
    # Return sorted list for consistent ordering
    return sorted(list(all_columns))

def process_matched_pool_chunked(ecb_pool_id, esma_pool_id, output_path):
    """
    Process a LARGE matched ECB-ESMA pool file-by-file.
    
    FIXED: Now determines ALL columns upfront to ensure consistent alignment.
    
    INTEGRITY LOGIC:
    - We prefer ESMA data over ECB when both have the same loan+date
    - Strategy depends on sizes:
      - If smaller side < 500MB: load smaller, chunk larger
      - If BOTH sides > 500MB: chunk both, skip dedup (rare overlap anyway)
    
    Returns total rows saved.
    """
    ecb_files = ecb_pool_files.get(ecb_pool_id, [])
    esma_files = esma_pool_files.get(esma_pool_id, [])
    
    if not ecb_files and not esma_files:
        return 0
    
    # Determine sizes
    ecb_size = get_pool_compressed_size(ecb_pool_id, 'ecb')
    esma_size = get_pool_compressed_size(esma_pool_id, 'esma')
    smaller_size = min(ecb_size, esma_size)
    
    # Threshold for "can fit in memory" - 500 MB compressed max
    MEMORY_SAFE_THRESHOLD = 500 * 1024 * 1024
    
    log_message(f"  ECB size: {ecb_size/1024/1024:.0f} MB, ESMA size: {esma_size/1024/1024:.0f} MB")
    
    # PHASE 1: Determine ALL columns upfront for consistent alignment
    log_message(f"  Phase 1: Scanning files to determine column schema...")
    all_columns = get_all_columns_for_matched_pool(ecb_pool_id, esma_pool_id)
    log_message(f"  Found {len(all_columns)} total columns across both sources")
    
    total_rows = 0
    first_file = True
    
    if smaller_size > MEMORY_SAFE_THRESHOLD:
        # CASE 0: BOTH sides are huge - chunk both, write ECB first, then ESMA (preferred at end)
        log_message(f"  Strategy: MEGA POOL - chunk both sides with consistent columns")
        
        # Process ALL ECB files first
        log_message(f"  Processing {len(ecb_files)} ECB files...")
        for i, fname in enumerate(ecb_files):
            filepath = os.path.join(ECB_PATH, fname)
            df = load_ecb_file(filepath)
            
            if len(df) == 0:
                continue
            
            prepared = prepare_ecb_data(df)
            if 'date_ym' in prepared.columns:
                prepared = prepared.drop(columns=['date_ym'])
            
            prepared['ecb_pool_id'] = ecb_pool_id
            prepared['esma_pool_id'] = esma_pool_id
            
            # Use consistent columns - add missing columns as NaN
            for col in all_columns:
                if col not in prepared.columns:
                    prepared[col] = np.nan
            
            df_to_save = prepared[all_columns]
            df_to_save.to_csv(output_path, mode='a', index=False, header=first_file)
            first_file = False
            total_rows += len(df_to_save)
            
            del df, prepared, df_to_save
            gc.collect()
            
            if (i + 1) % 20 == 0:
                log_message(f"    ECB: {i+1}/{len(ecb_files)} files, {total_rows:,} rows")
        
        ecb_rows = total_rows
        log_message(f"  ECB complete: {ecb_rows:,} rows")
        
        # Process ALL ESMA files (preferred source - at end)
        log_message(f"  Processing {len(esma_files)} ESMA files...")
        for i, fname in enumerate(esma_files):
            filepath = os.path.join(ESMA_PATH, fname)
            df = load_esma_file(filepath)
            
            if len(df) == 0:
                continue
            
            prepared = prepare_esma_data(df)
            if 'date_ym' in prepared.columns:
                prepared = prepared.drop(columns=['date_ym'])
            
            prepared['ecb_pool_id'] = ecb_pool_id
            prepared['esma_pool_id'] = esma_pool_id
            
            # Use consistent columns - add missing columns as NaN
            for col in all_columns:
                if col not in prepared.columns:
                    prepared[col] = np.nan
            
            df_to_save = prepared[all_columns]
            df_to_save.to_csv(output_path, mode='a', index=False, header=first_file)
            first_file = False
            total_rows += len(df_to_save)
            
            del df, prepared, df_to_save
            gc.collect()
            
            if (i + 1) % 5 == 0:
                log_message(f"    ESMA: {i+1}/{len(esma_files)} files, {total_rows - ecb_rows:,} ESMA rows")
        
        log_message(f"  ESMA complete: {total_rows - ecb_rows:,} rows")
        
        # POST-PROCESSING: Deduplicate the output file
        # Logic: Keep ALL ESMA rows. Remove ECB rows where same (loan, date) exists in ESMA.
        needs_dedup = ecb_pool_id in POOLS_WITH_OVERLAP
        
        if needs_dedup:
            log_message(f"  Post-processing: removing ECB duplicates where ESMA exists...")
            try:
                # PASS 1: Collect all ESMA (loan, date) keys
                esma_keys = set()
                for chunk in pd.read_csv(output_path, chunksize=100000, dtype=str, usecols=['RREL3', 'RREL6', 'source']):
                    esma_rows = chunk[chunk['source'] == 'ESMA']
                    if len(esma_rows) > 0 and 'RREL3' in esma_rows.columns:
                        keys = zip(esma_rows['RREL3'].fillna(''), esma_rows['RREL6'].str[:7].fillna(''))
                        esma_keys.update(keys)
                log_message(f"    Found {len(esma_keys):,} unique ESMA (loan, date) keys")
                
                # PASS 2: Write all ESMA rows + ECB rows not in esma_keys
                temp_path = output_path + ".dedup_temp"
                rows_before = total_rows
                total_rows = 0
                first_chunk = True
                
                for chunk in pd.read_csv(output_path, chunksize=100000, dtype=str):
                    if 'source' in chunk.columns and 'RREL3' in chunk.columns:
                        # Keep row if: ESMA, or ECB with no ESMA equivalent
                        is_esma = chunk['source'] == 'ESMA'
                        keys = list(zip(chunk['RREL3'].fillna(''), chunk['RREL6'].str[:7].fillna('')))
                        is_dup_ecb = [(not esma) and (k in esma_keys) for esma, k in zip(is_esma, keys)]
                        chunk_filtered = chunk.loc[~pd.Series(is_dup_ecb, index=chunk.index)]
                    else:
                        chunk_filtered = chunk
                    
                    chunk_filtered.to_csv(temp_path, mode='a', index=False, header=first_chunk)
                    first_chunk = False
                    total_rows += len(chunk_filtered)
                    
                    del chunk, chunk_filtered
                    gc.collect()
                
                os.replace(temp_path, output_path)
                log_message(f"  Deduplication complete: {rows_before - total_rows:,} ECB duplicates removed")
                
            except Exception as e:
                log_message(f"  WARNING: Deduplication failed ({e}), keeping original file")
                if os.path.exists(temp_path):
                    os.remove(temp_path)
        else:
            log_message(f"  Skipping deduplication - no temporal overlap")
        
    elif esma_size <= ecb_size:
        # CASE 1: ECB is larger - load ESMA first, chunk ECB
        log_message(f"  Strategy: Load ESMA (smaller), chunk ECB files")
        
        needs_dedup = ecb_pool_id in POOLS_WITH_OVERLAP
        if needs_dedup:
            log_message(f"  Pool has temporal overlap - will filter ECB duplicates")
        else:
            log_message(f"  No temporal overlap - skipping deduplication")
        
        # Load all ESMA data
        log_message(f"  Loading ESMA data ({len(esma_files)} files)...")
        esma_dfs = []
        for fname in esma_files:
            filepath = os.path.join(ESMA_PATH, fname)
            df = load_esma_file(filepath)
            if len(df) > 0:
                esma_dfs.append(df)
        
        if esma_dfs:
            esma_all = pd.concat(esma_dfs, ignore_index=True)
            esma_prepared = prepare_esma_data(esma_all)
            if needs_dedup:
                esma_loan_dates = set(zip(
                    esma_prepared['RREL3'].astype(str),
                    esma_prepared['date_ym'].astype(str)
                ))
                log_message(f"  ESMA: {len(esma_prepared)} rows, {len(esma_loan_dates)} unique loan-dates")
            else:
                esma_loan_dates = set()
                log_message(f"  ESMA: {len(esma_prepared)} rows")
            del esma_all
        else:
            esma_prepared = pd.DataFrame()
            esma_loan_dates = set()
        
        del esma_dfs
        gc.collect()
        
        # Process ECB files one by one
        log_message(f"  Processing {len(ecb_files)} ECB files...")
        for i, fname in enumerate(ecb_files):
            filepath = os.path.join(ECB_PATH, fname)
            df = load_ecb_file(filepath)
            
            if len(df) == 0:
                continue
            
            ecb_prepared = prepare_ecb_data(df)
            
            # Filter: keep ECB rows only if NOT in ESMA
            if needs_dedup and len(esma_loan_dates) > 0 and 'RREL3' in ecb_prepared.columns:
                ecb_prepared['_key'] = ecb_prepared['RREL3'].astype(str) + '|' + ecb_prepared['date_ym'].astype(str)
                mask = ~ecb_prepared['_key'].apply(lambda x: (x.split('|')[0], x.split('|')[1]) in esma_loan_dates)
                ecb_filtered = ecb_prepared[mask].drop(columns=['_key']).copy()
            else:
                ecb_filtered = ecb_prepared
            
            if 'date_ym' in ecb_filtered.columns:
                ecb_filtered = ecb_filtered.drop(columns=['date_ym'])
            
            ecb_filtered['ecb_pool_id'] = ecb_pool_id
            ecb_filtered['esma_pool_id'] = esma_pool_id
            
            if len(ecb_filtered) > 0:
                # Use consistent columns
                for col in all_columns:
                    if col not in ecb_filtered.columns:
                        ecb_filtered[col] = np.nan
                
                df_to_save = ecb_filtered[all_columns]
                df_to_save.to_csv(output_path, mode='a', index=False, header=first_file)
                first_file = False
                total_rows += len(df_to_save)
            
            del df, ecb_prepared, ecb_filtered
            gc.collect()
            
            if (i + 1) % 20 == 0:
                log_message(f"    Processed {i+1}/{len(ecb_files)} ECB files, {total_rows:,} ECB rows kept")
        
        # Append ALL ESMA rows (preferred source)
        if len(esma_prepared) > 0:
            if 'date_ym' in esma_prepared.columns:
                esma_prepared = esma_prepared.drop(columns=['date_ym'])
            esma_prepared['ecb_pool_id'] = ecb_pool_id
            esma_prepared['esma_pool_id'] = esma_pool_id
            
            # Use consistent columns
            for col in all_columns:
                if col not in esma_prepared.columns:
                    esma_prepared[col] = np.nan
            
            df_to_save = esma_prepared[all_columns]
            df_to_save.to_csv(output_path, mode='a', index=False, header=first_file)
            total_rows += len(df_to_save)
            log_message(f"  Added {len(df_to_save):,} ESMA rows (preferred)")
        
    else:
        # CASE 2: ESMA is larger - load ECB first, chunk ESMA
        log_message(f"  Strategy: Load ECB (smaller), chunk ESMA files")
        
        needs_dedup = ecb_pool_id in POOLS_WITH_OVERLAP
        if needs_dedup:
            log_message(f"  Pool has temporal overlap - will filter ECB duplicates")
        else:
            log_message(f"  No temporal overlap - skipping deduplication")
        
        # Load all ECB data
        log_message(f"  Loading ECB data ({len(ecb_files)} files)...")
        ecb_dfs = []
        for fname in ecb_files:
            filepath = os.path.join(ECB_PATH, fname)
            df = load_ecb_file(filepath)
            if len(df) > 0:
                ecb_dfs.append(df)
        
        if ecb_dfs:
            ecb_all = pd.concat(ecb_dfs, ignore_index=True)
            ecb_prepared = prepare_ecb_data(ecb_all)
            if needs_dedup:
                ecb_loan_dates = set(zip(
                    ecb_prepared['RREL3'].astype(str),
                    ecb_prepared['date_ym'].astype(str)
                ))
                log_message(f"  ECB: {len(ecb_prepared)} rows, {len(ecb_loan_dates)} unique loan-dates")
            else:
                ecb_loan_dates = set()
                log_message(f"  ECB: {len(ecb_prepared)} rows")
            del ecb_all
        else:
            ecb_prepared = pd.DataFrame()
            ecb_loan_dates = set()
        
        del ecb_dfs
        gc.collect()
        
        ecb_loan_dates_covered_by_esma = set()
        
        # Write ECB rows first (will be filtered at end if needed)
        if len(ecb_prepared) > 0:
            if 'date_ym' in ecb_prepared.columns:
                ecb_for_write = ecb_prepared.drop(columns=['date_ym']).copy()
            else:
                ecb_for_write = ecb_prepared.copy()
            
            ecb_for_write['ecb_pool_id'] = ecb_pool_id
            ecb_for_write['esma_pool_id'] = esma_pool_id
            
            # Use consistent columns
            for col in all_columns:
                if col not in ecb_for_write.columns:
                    ecb_for_write[col] = np.nan
            
            df_to_save = ecb_for_write[all_columns]
            df_to_save.to_csv(output_path, mode='a', index=False, header=first_file)
            first_file = False
            total_rows += len(df_to_save)
            log_message(f"  Wrote {len(df_to_save):,} ECB rows")
            del ecb_for_write
        
        # Process ESMA files one by one
        log_message(f"  Processing {len(esma_files)} ESMA files...")
        for i, fname in enumerate(esma_files):
            filepath = os.path.join(ESMA_PATH, fname)
            df = load_esma_file(filepath)
            
            if len(df) == 0:
                continue
            
            esma_chunk = prepare_esma_data(df)
            
            # Track covered loan-dates for dedup
            if needs_dedup and 'RREL3' in esma_chunk.columns:
                chunk_loan_dates = set(zip(
                    esma_chunk['RREL3'].astype(str),
                    esma_chunk['date_ym'].astype(str)
                ))
                ecb_loan_dates_covered_by_esma.update(chunk_loan_dates & ecb_loan_dates)
            
            if 'date_ym' in esma_chunk.columns:
                esma_chunk = esma_chunk.drop(columns=['date_ym'])
            
            esma_chunk['ecb_pool_id'] = ecb_pool_id
            esma_chunk['esma_pool_id'] = esma_pool_id
            
            # Use consistent columns
            for col in all_columns:
                if col not in esma_chunk.columns:
                    esma_chunk[col] = np.nan
            
            df_to_save = esma_chunk[all_columns]
            df_to_save.to_csv(output_path, mode='a', index=False, header=first_file)
            first_file = False
            total_rows += len(df_to_save)
            
            del df, esma_chunk
            gc.collect()
            
            if (i + 1) % 5 == 0:
                log_message(f"    Processed {i+1}/{len(esma_files)} ESMA files, {total_rows:,} total rows")
        
        # Post-processing dedup if needed
        if needs_dedup and len(ecb_loan_dates_covered_by_esma) > 0:
            log_message(f"  Post-processing: removing {len(ecb_loan_dates_covered_by_esma)} ECB duplicates...")
            try:
                temp_path = output_path + ".dedup_temp"
                rows_before = total_rows
                total_rows = 0
                first_chunk = True
                
                for chunk in pd.read_csv(output_path, chunksize=100000, dtype=str):
                    if 'source' in chunk.columns and 'RREL3' in chunk.columns and 'RREL6' in chunk.columns:
                        # Keep all ESMA rows, filter ECB rows
                        is_esma = chunk['source'] == 'ESMA'
                        keys = list(zip(chunk['RREL3'].fillna(''), chunk['RREL6'].str[:7].fillna('')))
                        is_dup_ecb = [(not esma) and (k in ecb_loan_dates_covered_by_esma) for esma, k in zip(is_esma, keys)]
                        chunk_filtered = chunk.loc[~pd.Series(is_dup_ecb, index=chunk.index)]
                    else:
                        chunk_filtered = chunk
                    
                    chunk_filtered.to_csv(temp_path, mode='a', index=False, header=first_chunk)
                    first_chunk = False
                    total_rows += len(chunk_filtered)
                    
                    del chunk, chunk_filtered
                    gc.collect()
                
                os.replace(temp_path, output_path)
                log_message(f"  Deduplication complete: {rows_before - total_rows:,} duplicates removed")
                
            except Exception as e:
                log_message(f"  WARNING: Deduplication failed ({e})")
                if os.path.exists(temp_path):
                    os.remove(temp_path)
    
    return total_rows

def save_pool_result(df, pool_type, pool_id):
    """
    Save pool result to disk using ATOMIC write (temp file + rename).
    This ensures partial files are never left behind if interrupted.
    Uses dynamic column selection - only non-empty columns.
    """
    if len(df) == 0:
        return 0
    
    # Dynamic column selection: only keep non-empty columns
    non_empty_cols = get_non_empty_columns(df)
    df_to_save = df[non_empty_cols]
    
    # Create subdirectory by pool type
    subdir = os.path.join(OUTPUT_DIR, pool_type)
    os.makedirs(subdir, exist_ok=True)
    
    # ATOMIC WRITE: Save to temp file first, then rename
    safe_pool_id = pool_id.replace('/', '_').replace('\\', '_')
    output_path = os.path.join(subdir, f"{safe_pool_id}.csv")
    temp_path = output_path + ".tmp"
    
    df_to_save.to_csv(temp_path, index=False)
    os.replace(temp_path, output_path)  # Atomic rename
    
    return len(df_to_save)

print("✓ Pool processing functions defined (with ATOMIC writes for resume safety)")

✓ Pool processing functions defined (with ATOMIC writes for resume safety)


In [None]:
# =============================================================================
# CELL 8: Main Processing Loop - MATCHED POOLS (with large pool handling)
# =============================================================================
log_message("="*70)
log_message("STARTING MATCHED POOLS PROCESSING")
log_message("="*70)

# Scan output folders to find already-completed pools (file-based tracking)
completed_pools = scan_completed_pools()
completed_matched = completed_pools['matched']

# Track total rows for this session
session_rows_processed = 0

# Get pools to process (skip already completed)
pools_to_process = [(ecb_id, info['esma_pool']) 
                    for ecb_id, info in matched_pools.items() 
                    if ecb_id not in completed_matched]

log_message(f"Matched pools: {len(matched_pools)} total, {len(pools_to_process)} remaining")

# Identify large pools - check BOTH ECB and ESMA sides (either can cause memory issues)
def is_large_matched_pool(ecb_id, esma_id):
    return is_large_pool(ecb_id, 'ecb') or is_large_pool(esma_id, 'esma')

large_matched = [(e, s) for e, s in pools_to_process if is_large_matched_pool(e, s)]
normal_matched = [(e, s) for e, s in pools_to_process if not is_large_matched_pool(e, s)]
log_message(f"  Normal pools: {len(normal_matched)}, Large pools (chunked): {len(large_matched)}")

matched_start = time.time()

# Process normal pools first
for i, (ecb_pool_id, esma_pool_id) in enumerate(normal_matched):
    pool_start = time.time()
    log_message(f"\n[{i+1}/{len(normal_matched)}] Processing matched pool: {ecb_pool_id}")
    
    try:
        # Process pool
        result_df = process_matched_pool(ecb_pool_id, esma_pool_id)
        
        # Save result
        rows_saved = save_pool_result(result_df, "matched", ecb_pool_id)
        
        # Free memory
        del result_df
        gc.collect()
        
        # Track session progress (file-based tracking means completion is automatic)
        session_rows_processed += rows_saved
        
        pool_elapsed = time.time() - pool_start
        log_message(f"  Saved {rows_saved:,} rows in {pool_elapsed:.1f}s")
        
    except Exception as e:
        error_msg = f"ERROR processing {ecb_pool_id}: {str(e)}"
        log_message(error_msg)
        gc.collect()

# Process large pools with chunked mode
for i, (ecb_pool_id, esma_pool_id) in enumerate(large_matched):
    pool_start = time.time()
    ecb_size_mb = get_pool_compressed_size(ecb_pool_id, 'ecb') / 1024 / 1024
    esma_size_mb = get_pool_compressed_size(esma_pool_id, 'esma') / 1024 / 1024
    log_message(f"\n[LARGE {i+1}/{len(large_matched)}] Processing matched pool: {ecb_pool_id}")
    log_message(f"  ECB: {ecb_size_mb:.0f} MB, ESMA: {esma_size_mb:.0f} MB")
    
    # Aggressive memory cleanup before large pool
    gc.collect()
    gc.collect()  # Run twice to catch cyclic references
    time.sleep(1)  # Brief pause to let OS reclaim memory
    
    try:
        # Setup output path with ATOMIC write pattern
        subdir = os.path.join(OUTPUT_DIR, "matched")
        os.makedirs(subdir, exist_ok=True)
        safe_pool_id = ecb_pool_id.replace('/', '_').replace('\\', '_')
        output_path = os.path.join(subdir, f"{safe_pool_id}.csv")
        temp_path = output_path + ".tmp"  # Write to temp file first
        
        # Remove existing files if any (fresh start)
        if os.path.exists(output_path):
            os.remove(output_path)
        if os.path.exists(temp_path):
            os.remove(temp_path)
        
        # Process with chunked mode - writes to TEMP file
        rows_saved = process_matched_pool_chunked(ecb_pool_id, esma_pool_id, temp_path)
        
        # ATOMIC: Rename temp to final ONLY after complete success
        os.replace(temp_path, output_path)
        
        # Track session progress (file-based tracking means completion is automatic)
        session_rows_processed += rows_saved
        
        pool_elapsed = time.time() - pool_start
        log_message(f"  Saved {rows_saved:,} rows in {pool_elapsed:.1f}s (chunked)")
        
    except Exception as e:
        error_msg = f"ERROR processing {ecb_pool_id}: {str(e)}"
        log_message(error_msg)
        # Clean up temp file if it exists
        if os.path.exists(temp_path):
            os.remove(temp_path)
        gc.collect()

matched_elapsed = time.time() - matched_start
log_message(f"\\nMatched pools complete: {matched_elapsed:.1f}s")
log_message(f"Session rows processed: {session_rows_processed:,}")

[2025-12-10 13:04:48] STARTING MATCHED POOLS PROCESSING
[2025-12-10 13:04:48] Matched pools: 22 total, 22 remaining
[2025-12-10 13:04:48]   Normal pools: 5, Large pools (chunked): 17
[2025-12-10 13:04:48] 
[1/5] Processing matched pool: RMBMES000140100120090
[2025-12-10 13:04:50]   Loaded RMBMES000140100120090_2023-09-01_Pool.csv.gz using zlib fallback (1,543 rows)
[2025-12-10 13:04:50]   Loaded RMBMES000140100120090_2023-09-01_Pool.csv.gz using zlib fallback (1,543 rows)
[2025-12-10 13:04:50]   ECB rows: 70976, ESMA rows: 4715
[2025-12-10 13:04:50]   ECB rows: 70976, ESMA rows: 4715
[2025-12-10 13:04:50]   No temporal overlap - skipping dedup: 75691 rows
[2025-12-10 13:04:50]   No temporal overlap - skipping dedup: 75691 rows


  merged['ecb_pool_id'] = ecb_pool_id
  merged['esma_pool_id'] = esma_pool_id


[2025-12-10 13:04:53]   Saved 75,691 rows in 4.2s
[2025-12-10 13:04:53] 
[2/5] Processing matched pool: RMBMES000140100220122
[2025-12-10 13:04:55]   ECB rows: 122314, ESMA rows: 9587
[2025-12-10 13:04:55]   ECB rows: 122314, ESMA rows: 9587
[2025-12-10 13:04:56]   No temporal overlap - skipping dedup: 131901 rows
[2025-12-10 13:04:56]   No temporal overlap - skipping dedup: 131901 rows


  merged['ecb_pool_id'] = ecb_pool_id
  merged['esma_pool_id'] = esma_pool_id


[2025-12-10 13:05:00]   Saved 131,901 rows in 7.2s
[2025-12-10 13:05:00] 
[3/5] Processing matched pool: RMBMNL001345100320187
[2025-12-10 13:05:00]   ECB rows: 2551, ESMA rows: 25631
[2025-12-10 13:05:00]   No temporal overlap - skipping dedup: 28182 rows
[2025-12-10 13:05:00]   ECB rows: 2551, ESMA rows: 25631
[2025-12-10 13:05:00]   No temporal overlap - skipping dedup: 28182 rows


  merged['ecb_pool_id'] = ecb_pool_id
  merged['esma_pool_id'] = esma_pool_id


[2025-12-10 13:05:02]   Saved 28,182 rows in 1.9s
[2025-12-10 13:05:02] 
[4/5] Processing matched pool: RMBSBE000087100320118
[2025-12-10 13:05:09]   ECB rows: 507938, ESMA rows: 7540
[2025-12-10 13:05:09]   ECB rows: 507938, ESMA rows: 7540
[2025-12-10 13:05:12]   No temporal overlap - skipping dedup: 515478 rows
[2025-12-10 13:05:12]   No temporal overlap - skipping dedup: 515478 rows


  merged['ecb_pool_id'] = ecb_pool_id
  merged['esma_pool_id'] = esma_pool_id


[2025-12-10 13:05:24]   Saved 515,478 rows in 22.3s
[2025-12-10 13:05:24] 
[5/5] Processing matched pool: RMBMNL001345100220171
[2025-12-10 13:05:25]   ECB rows: 40021, ESMA rows: 25631
[2025-12-10 13:05:25]   ECB rows: 40021, ESMA rows: 25631
[2025-12-10 13:05:26]   No temporal overlap - skipping dedup: 65652 rows
[2025-12-10 13:05:26]   No temporal overlap - skipping dedup: 65652 rows


  merged['ecb_pool_id'] = ecb_pool_id
  merged['esma_pool_id'] = esma_pool_id


[2025-12-10 13:05:27]   Saved 65,652 rows in 3.4s
[2025-12-10 13:05:27] 
[LARGE 1/17] Processing matched pool: RMBMNL000185100120109
[2025-12-10 13:05:27]   ECB: 356 MB, ESMA: 127 MB
[2025-12-10 13:05:29]   ECB size: 356 MB, ESMA size: 127 MB
[2025-12-10 13:05:29]   Phase 1: Scanning files to determine column schema...
[2025-12-10 13:05:29]   ECB size: 356 MB, ESMA size: 127 MB
[2025-12-10 13:05:29]   Phase 1: Scanning files to determine column schema...
[2025-12-10 13:06:39]   Found 216 total columns across both sources
[2025-12-10 13:06:39]   Strategy: Load ESMA (smaller), chunk ECB files
[2025-12-10 13:06:39]   Pool has temporal overlap - will filter ECB duplicates
[2025-12-10 13:06:39]   Loading ESMA data (5 files)...
[2025-12-10 13:06:39]   Found 216 total columns across both sources
[2025-12-10 13:06:39]   Strategy: Load ESMA (smaller), chunk ECB files
[2025-12-10 13:06:39]   Pool has temporal overlap - will filter ECB duplicates
[2025-12-10 13:06:39]   Loading ESMA data (5 files

In [29]:
# =============================================================================
# CELL 9: Main Processing Loop - ECB-ONLY POOLS (with large pool handling)
# =============================================================================
log_message("="*70)
log_message("STARTING ECB-ONLY POOLS PROCESSING")
log_message("="*70)

# Scan output folders to find already-completed pools (file-based tracking)
completed_pools = scan_completed_pools()
completed_ecb_only = completed_pools['ecb_only']

# Track total rows for this session
session_rows_processed = 0

# Get pools to process (skip already completed)
ecb_only_to_process = [p for p in ecb_only_pools if p not in completed_ecb_only]

log_message(f"ECB-only pools: {len(ecb_only_pools)} total, {len(ecb_only_to_process)} remaining")

# Identify large pools
large_pools = [p for p in ecb_only_to_process if is_large_pool(p, 'ecb')]
normal_pools = [p for p in ecb_only_to_process if not is_large_pool(p, 'ecb')]
log_message(f"  Normal pools: {len(normal_pools)}, Large pools (chunked): {len(large_pools)}")

ecb_only_start = time.time()

# Process normal pools first
for i, ecb_pool_id in enumerate(normal_pools):
    pool_start = time.time()
    log_message(f"\n[{i+1}/{len(normal_pools)}] Processing ECB-only pool: {ecb_pool_id}")
    
    try:
        # Process pool
        result_df = process_ecb_only_pool(ecb_pool_id)
        
        # Save result
        rows_saved = save_pool_result(result_df, "ecb_only", ecb_pool_id)
        
        # Free memory
        del result_df
        gc.collect()
        
        # Track session progress (file-based tracking means completion is automatic)
        session_rows_processed += rows_saved
        
        pool_elapsed = time.time() - pool_start
        log_message(f"  Saved {rows_saved:,} rows in {pool_elapsed:.1f}s")
        
    except Exception as e:
        error_msg = f"ERROR processing {ecb_pool_id}: {str(e)}"
        log_message(error_msg)
        gc.collect()

# Process large pools with chunked mode
for i, ecb_pool_id in enumerate(large_pools):
    pool_start = time.time()
    pool_size_mb = get_pool_compressed_size(ecb_pool_id, 'ecb') / 1024 / 1024
    log_message(f"\n[LARGE {i+1}/{len(large_pools)}] Processing ECB-only pool: {ecb_pool_id} ({pool_size_mb:.0f} MB compressed)")
    
    try:
        # Setup output path with ATOMIC write pattern
        subdir = os.path.join(OUTPUT_DIR, "ecb_only")
        os.makedirs(subdir, exist_ok=True)
        safe_pool_id = ecb_pool_id.replace('/', '_').replace('\\', '_')
        output_path = os.path.join(subdir, f"{safe_pool_id}.csv")
        temp_path = output_path + ".tmp"
        
        # Remove existing files if any (fresh start)
        if os.path.exists(output_path):
            os.remove(output_path)
        if os.path.exists(temp_path):
            os.remove(temp_path)
        
        # Process with chunked mode - writes to TEMP file
        rows_saved = process_ecb_only_pool_chunked(ecb_pool_id, temp_path)
        
        # ATOMIC: Rename temp to final ONLY after complete success
        os.replace(temp_path, output_path)
        
        # Track session progress (file-based tracking means completion is automatic)
        session_rows_processed += rows_saved
        
        pool_elapsed = time.time() - pool_start
        log_message(f"  Saved {rows_saved:,} rows in {pool_elapsed:.1f}s (chunked)")
        
    except Exception as e:
        error_msg = f"ERROR processing {ecb_pool_id}: {str(e)}"
        log_message(error_msg)
        # Clean up temp file if it exists
        if os.path.exists(temp_path):
            os.remove(temp_path)
        gc.collect()

ecb_only_elapsed = time.time() - ecb_only_start
log_message(f"\\nECB-only pools complete: {ecb_only_elapsed:.1f}s")
log_message(f"Session rows processed: {session_rows_processed:,}")

[2025-12-10 20:19:28] STARTING ECB-ONLY POOLS PROCESSING
[2025-12-10 20:19:28] ECB-only pools: 36 total, 36 remaining
[2025-12-10 20:19:28]   Normal pools: 24, Large pools (chunked): 12
[2025-12-10 20:19:28] 
[1/24] Processing ECB-only pool: RMBMNL000125100120063
[2025-12-10 20:19:32]   ECB rows: 300969
[2025-12-10 20:19:32]   ECB rows: 300969
[2025-12-10 20:19:43]   Saved 300,969 rows in 15.0s
[2025-12-10 20:19:43] 
[2/24] Processing ECB-only pool: RMBMDE000950100120151
[2025-12-10 20:19:43]   Saved 300,969 rows in 15.0s
[2025-12-10 20:19:43] 
[2/24] Processing ECB-only pool: RMBMDE000950100120151
[2025-12-10 20:19:45]   ECB rows: 106140
[2025-12-10 20:19:45]   ECB rows: 106140
[2025-12-10 20:19:47]   Saved 106,140 rows in 4.6s
[2025-12-10 20:19:47] 
[3/24] Processing ECB-only pool: RMBSES000045100120098
[2025-12-10 20:19:48]   ECB rows: 13182
[2025-12-10 20:19:47]   Saved 106,140 rows in 4.6s
[2025-12-10 20:19:47] 
[3/24] Processing ECB-only pool: RMBSES000045100120098
[2025-12-10 20

In [30]:
# =============================================================================
# CELL 10: Main Processing Loop - ESMA-ONLY POOLS (with large pool handling)
# =============================================================================
log_message("="*70)
log_message("STARTING ESMA-ONLY POOLS PROCESSING")
log_message("="*70)

# Scan output folders to find already-completed pools (file-based tracking)
completed_pools = scan_completed_pools()
completed_esma_only = completed_pools['esma_only']

# Track total rows for this session
session_rows_processed = 0

# Get pools to process (skip already completed)
esma_only_to_process = [p for p in esma_only_pools if p not in completed_esma_only]

log_message(f"ESMA-only pools: {len(esma_only_pools)} total, {len(esma_only_to_process)} remaining")

# Identify large pools
large_pools = [p for p in esma_only_to_process if is_large_pool(p, 'esma')]
normal_pools = [p for p in esma_only_to_process if not is_large_pool(p, 'esma')]
log_message(f"  Normal pools: {len(normal_pools)}, Large pools (chunked): {len(large_pools)}")

esma_only_start = time.time()

# Process normal pools first
for i, esma_pool_id in enumerate(normal_pools):
    pool_start = time.time()
    log_message(f"\n[{i+1}/{len(normal_pools)}] Processing ESMA-only pool: {esma_pool_id}")
    
    try:
        # Process pool
        result_df = process_esma_only_pool(esma_pool_id)
        
        # Save result
        rows_saved = save_pool_result(result_df, "esma_only", esma_pool_id)
        
        # Free memory
        del result_df
        gc.collect()
        
        # Track session progress (file-based tracking means completion is automatic)
        session_rows_processed += rows_saved
        
        pool_elapsed = time.time() - pool_start
        log_message(f"  Saved {rows_saved:,} rows in {pool_elapsed:.1f}s")
        
    except Exception as e:
        error_msg = f"ERROR processing {esma_pool_id}: {str(e)}"
        log_message(error_msg)
        gc.collect()

# Process large pools with chunked mode
for i, esma_pool_id in enumerate(large_pools):
    pool_start = time.time()
    pool_size_mb = get_pool_compressed_size(esma_pool_id, 'esma') / 1024 / 1024
    log_message(f"\n[LARGE {i+1}/{len(large_pools)}] Processing ESMA-only pool: {esma_pool_id} ({pool_size_mb:.0f} MB)")
    
    try:
        # Setup output path with ATOMIC write pattern
        subdir = os.path.join(OUTPUT_DIR, "esma_only")
        os.makedirs(subdir, exist_ok=True)
        safe_pool_id = esma_pool_id.replace('/', '_').replace('\\', '_')
        output_path = os.path.join(subdir, f"{safe_pool_id}.csv")
        temp_path = output_path + ".tmp"
        
        # Remove existing files if any (fresh start)
        if os.path.exists(output_path):
            os.remove(output_path)
        if os.path.exists(temp_path):
            os.remove(temp_path)
        
        # Process with chunked mode - writes to TEMP file
        rows_saved = process_esma_only_pool_chunked(esma_pool_id, temp_path)
        
        # ATOMIC: Rename temp to final ONLY after complete success
        os.replace(temp_path, output_path)
        
        # Track session progress (file-based tracking means completion is automatic)
        session_rows_processed += rows_saved
        
        pool_elapsed = time.time() - pool_start
        log_message(f"  Saved {rows_saved:,} rows in {pool_elapsed:.1f}s (chunked)")
        
    except Exception as e:
        error_msg = f"ERROR processing {esma_pool_id}: {str(e)}"
        log_message(error_msg)
        # Clean up temp file if it exists
        if os.path.exists(temp_path):
            os.remove(temp_path)
        gc.collect()

esma_only_elapsed = time.time() - esma_only_start
log_message(f"\\nESMA-only pools complete: {esma_only_elapsed:.1f}s")
log_message(f"Session rows processed: {session_rows_processed:,}")

[2025-12-10 21:54:19] STARTING ESMA-ONLY POOLS PROCESSING
[2025-12-10 21:54:19] ESMA-only pools: 246 total, 246 remaining
[2025-12-10 21:54:19]   Normal pools: 180, Large pools (chunked): 66
[2025-12-10 21:54:19] 
[1/180] Processing ESMA-only pool: 549300ELBHWFMLA2R125N200601
[2025-12-10 21:54:19]   Normal pools: 180, Large pools (chunked): 66
[2025-12-10 21:54:19] 
[1/180] Processing ESMA-only pool: 549300ELBHWFMLA2R125N200601
[2025-12-10 21:54:21]   ESMA rows: 112245
[2025-12-10 21:54:21]   ESMA rows: 112245
[2025-12-10 21:54:23]   Saved 112,245 rows in 3.4s
[2025-12-10 21:54:23] 
[2/180] Processing ESMA-only pool: 95980020140005209659N201001
[2025-12-10 21:54:23]   Saved 112,245 rows in 3.4s
[2025-12-10 21:54:23] 
[2/180] Processing ESMA-only pool: 95980020140005209659N201001
[2025-12-10 21:54:23]   ESMA rows: 67517
[2025-12-10 21:54:23]   ESMA rows: 67517
[2025-12-10 21:54:25]   Saved 67,517 rows in 1.8s
[2025-12-10 21:54:25] 
[3/180] Processing ESMA-only pool: 635400AJEHTFGD5HYS44

  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan
  prepared[col] = np.nan


[2025-12-10 22:33:34]     Processed 20/26 files, 117,431 rows so far
[2025-12-10 22:33:37]   Saved 205,991 rows in 11.7s (chunked)
[2025-12-10 22:33:37] 
[LARGE 43/66] Processing ESMA-only pool: 549300WL1M55G8FIDZ68N202201 (654 MB)
[2025-12-10 22:33:37]   Scanning 40 files for column schema...
[2025-12-10 22:33:37]   Saved 205,991 rows in 11.7s (chunked)
[2025-12-10 22:33:37] 
[LARGE 43/66] Processing ESMA-only pool: 549300WL1M55G8FIDZ68N202201 (654 MB)
[2025-12-10 22:33:37]   Scanning 40 files for column schema...
[2025-12-10 22:33:48]   Found 121 columns
[2025-12-10 22:33:48]   Processing 40 files in chunked mode...
[2025-12-10 22:33:48]   Found 121 columns
[2025-12-10 22:33:48]   Processing 40 files in chunked mode...
[2025-12-10 22:33:59]     Processed 20/40 files, 357,975 rows so far
[2025-12-10 22:33:59]     Processed 20/40 files, 357,975 rows so far
[2025-12-10 22:34:11]     Processed 40/40 files, 716,226 rows so far
[2025-12-10 22:34:11]   Saved 716,226 rows in 33.7s (chunked)


In [None]:
# =============================================================================
# CELL 11: Final Summary
# =============================================================================
log_message("="*70)
log_message("PROCESSING COMPLETE")
log_message("="*70)

# Scan output folders for final stats (file-based tracking)
completed_pools = scan_completed_pools()

log_message(f"\\nCompleted pools:")
log_message(f"  Matched: {len(completed_pools['matched'])}/{len(matched_pools)}")
log_message(f"  ECB-only: {len(completed_pools['ecb_only'])}/{len(ecb_only_pools)}")
log_message(f"  ESMA-only: {len(completed_pools['esma_only'])}/{len(esma_only_pools)}")

# Check output size
total_size = 0
file_count = 0
for subdir in ['matched', 'ecb_only', 'esma_only']:
    subdir_path = os.path.join(OUTPUT_DIR, subdir)
    if os.path.exists(subdir_path):
        for f in os.listdir(subdir_path):
            if f.endswith('.csv'):
                total_size += os.path.getsize(os.path.join(subdir_path, f))
                file_count += 1

log_message(f"\\nOutput files: {file_count}")
log_message(f"Total output size: {total_size / (1024**3):.2f} GB")
log_message(f"Output directory: {OUTPUT_DIR}")

[2025-12-10 18:14:56] PROCESSING COMPLETE
[2025-12-10 18:14:56] 
Completed pools:
[2025-12-10 18:14:56]   Matched: 22/22
[2025-12-10 18:14:56]   ECB-only: 38/36
[2025-12-10 18:14:56]   ESMA-only: 246/246
[2025-12-10 18:14:56] 
Total rows processed: 527,355,838
[2025-12-10 18:14:56] 
Errors encountered: 3
[2025-12-10 18:14:56]   RMBMBE000095100120084: Unable to allocate 178. MiB for an array with shape (1, 23304387) and data type object
[2025-12-10 18:14:56]   RMBMFR000083100220149: Unable to allocate 2.13 GiB for an array with shape (6, 47749917) and data type object
[2025-12-10 18:14:56]   RMBMBE000095100120084: Unable to allocate 1.39 GiB for an array with shape (8, 23304387) and data type object
[2025-12-10 18:14:56] 
Output files: 22
[2025-12-10 18:14:56] Total output size: 167.38 GB
[2025-12-10 18:14:56] Output directory: D:\ECB_ESMA_MERGED
