In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [8]:
import pandas as pd
import numpy as np
from typing import Dict, Tuple, Optional, List
from dataclasses import dataclass

@dataclass
class ColumnConfig:
    col1: str
    col2: str
    col_type: str  # 'J' (join), 'N' (numerical), 'C' (categorical)
    variance: Optional[float] = None  # For numerical: 0.05 = 5%


class TableReconciliation:

    def __init__(self, df1: pd.DataFrame, df2: pd.DataFrame, config: List[ColumnConfig]):
        self.df1 = df1.copy()
        self.df2 = df2.copy()
        self.config = config
        self.merged = None
        self.results = {}

        # Extract join key
        join_cols = [c.col1 for c in config if c.col_type == 'J']
        if not join_cols:
            raise ValueError("At least one column marked as 'J' (join key) required")
        self.join_key = join_cols[0]

        # Auto-run full reconciliation
        self._run_all()

    def _run_all(self):
        """Auto-execute: join + compare all columns."""
        self.join()

        # Compare all non-join columns
        for cfg in self.config:
            if cfg.col_type == 'J':
                continue

            if cfg.col_type == 'N':
                self.compare_numerical(cfg.col1, cfg.col2, cfg.variance)
            elif cfg.col_type == 'C':
                self.compare_categorical(cfg.col1, cfg.col2)

    # ========================================================================
    # 1. JOIN & VALIDATE
    # ========================================================================

    def join(self) -> Tuple[Optional[pd.DataFrame], Dict]:
        """Join tables on join_key. Validates 1:1 mapping and no duplicates."""

        report = {
            't1_count': len(self.df1),
            't2_count': len(self.df2),
            'merged_count': None,
            'status': 'FAILED',
            'errors': []
        }

        # Get join column names for each table
        join_cfg = next(c for c in self.config if c.col_type == 'J')
        join_col_t1 = join_cfg.col1
        join_col_t2 = join_cfg.col2

        # Check duplicates
        dups_t1 = self.df1[self.df1.duplicated(subset=[join_col_t1], keep=False)]
        if len(dups_t1) > 0:
            report['errors'].append(f"T1: {len(dups_t1)//2} duplicate {join_col_t1} keys")

        dups_t2 = self.df2[self.df2.duplicated(subset=[join_col_t2], keep=False)]
        if len(dups_t2) > 0:
            report['errors'].append(f"T2: {len(dups_t2)//2} duplicate {join_col_t2} keys")

        if report['errors']:
            return None, report

        # Rename table2 join key to match table1
        df2_renamed = self.df2.copy()
        if join_col_t2 != join_col_t1:
            df2_renamed = df2_renamed.rename(columns={join_col_t2: join_col_t1})

        # Merge
        try:
            self.merged = pd.merge(self.df1, df2_renamed, on=self.join_key, how='outer',
                                   suffixes=('_t1', '_t2'), indicator=True)

            unmatched_t1 = (self.merged['_merge'] == 'left_only').sum()
            unmatched_t2 = (self.merged['_merge'] == 'right_only').sum()

            if unmatched_t1 > 0 or unmatched_t2 > 0:
                report['errors'].append(f"Unmatched: T1={unmatched_t1}, T2={unmatched_t2}")

            self.merged = self.merged.drop(columns=['_merge'])
            report['merged_count'] = len(self.merged)
            report['status'] = 'SUCCESS' if len(self.df1) == len(self.df2) == len(self.merged) else 'PARTIAL'

        except Exception as e:
            report['errors'].append(str(e))
            return None, report

        return self.merged, report

    # ========================================================================
    # 2. COMPARE NUMERICAL (using np.isclose)
    # ========================================================================

    def compare_numerical(self, col1: str, col2: str, variance: Optional[float] = None) -> Dict:
        """Compare numerical columns using np.isclose. Treats bad/NaN data as mismatch."""

        if self.merged is None:
            return {'error': 'Run join() first'}

        c1 = col1 if col1 in self.merged.columns else f"{col1}_t1"
        c2 = col2 if col2 in self.merged.columns else f"{col2}_t2"

        v1 = pd.to_numeric(self.merged[c1], errors='coerce')
        v2 = pd.to_numeric(self.merged[c2], errors='coerce')

        diff = v1 - v2

        # Initialize matches as False (mismatch) - safer default
        matches = pd.Series([False] * len(self.merged), index=self.merged.index)

        # Try np.isclose, catch any issues
        try:
            if variance is not None:
                # Use np.isclose with relative tolerance
                is_close = np.isclose(v1, v2, rtol=variance, atol=0, equal_nan=True)
                matches = is_close | (v1.isna() & v2.isna())
            else:
                # Exact match using np.isclose
                is_close = np.isclose(v1, v2, rtol=0, atol=0, equal_nan=True)
                matches = is_close | (v1.isna() & v2.isna())
        except Exception:
            # If np.isclose fails, all remain as mismatch (False)
            pass

        matching = matches.sum()
        total = len(self.merged)

        result = {
            'col1': col1, 'col2': col2, 'type': 'NUMERICAL',
            'total': total, 'matching': matching, 'mismatching': total - matching,
            'match_pct': (matching / total * 100) if total > 0 else 0,
            'variance': variance,
            'stats': {
                'mean_diff': diff.mean(),
                'max_diff': diff.abs().max(),
                'std_dev': diff.std()
            },
            'matches': matches,
            'diff': diff
        }

        self.results[col1] = result
        return result

    # ========================================================================
    # 3. COMPARE CATEGORICAL (strip whitespace only, case-sensitive)
    # ========================================================================

    def compare_categorical(self, col1: str, col2: str) -> Dict:
        """Compare categorical columns (strip whitespace only, case-sensitive)."""

        if self.merged is None:
            return {'error': 'Run join() first'}

        c1 = col1 if col1 in self.merged.columns else f"{col1}_t1"
        c2 = col2 if col2 in self.merged.columns else f"{col2}_t2"

        v1 = self.merged[c1].fillna('').astype(str).str.strip()
        v2 = self.merged[c2].fillna('').astype(str).str.strip()

        matches = v1 == v2
        matching = matches.sum()
        total = len(self.merged)

        result = {
            'col1': col1, 'col2': col2, 'type': 'CATEGORICAL',
            'total': total, 'matching': matching, 'mismatching': total - matching,
            'match_pct': (matching / total * 100) if total > 0 else 0,
            'matches': matches
        }

        self.results[col1] = result
        return result

    # ========================================================================
    # 4. DETAILED ANALYSIS
    # ========================================================================

    def analyze(self, col1: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Detailed analysis: returns (matched_df, mismatched_df)

        Columns: [join_key, col1, col2, diff] sorted by highest diff
        """

        if self.merged is None:
            return pd.DataFrame(), pd.DataFrame()

        # Find config
        cfg = next((c for c in self.config if c.col1 == col1), None)
        if not cfg:
            return pd.DataFrame(), pd.DataFrame()

        col2 = cfg.col2
        col_type = cfg.col_type

        # Get comparison results
        if col1 not in self.results:
            if col_type == 'N':
                self.compare_numerical(col1, col2, cfg.variance)
            else:
                self.compare_categorical(col1, col2)

        matches = self.results[col1]['matches']

        # Get actual column names
        c1 = col1 if col1 in self.merged.columns else f"{col1}_t1"
        c2 = col2 if col2 in self.merged.columns else f"{col2}_t2"

        # Matched records
        matched = self.merged[matches][[self.join_key, c1, c2]].copy()
        matched.columns = [self.join_key, col1, col2]

        # Mismatched records
        mismatched = self.merged[~matches][[self.join_key, c1, c2]].copy()
        mismatched.columns = [self.join_key, col1, col2]

        if col_type == 'N':
            v1 = pd.to_numeric(self.merged.loc[~matches, c1], errors='coerce')
            v2 = pd.to_numeric(self.merged.loc[~matches, c2], errors='coerce')
            diff = v1.values - v2.values
            mismatched['diff'] = diff
            mismatched = mismatched.sort_values('diff', key=abs, ascending=False)
        else:
            mismatched['diff'] = mismatched[col1].astype(str) + ' → ' + mismatched[col2].astype(str)

        return matched, mismatched

    # ========================================================================
    # SUMMARY
    # ========================================================================

    def summary(self) -> pd.DataFrame:
        """Summary of all comparisons."""
        data = []
        for col1, result in self.results.items():
            data.append({
                'Column (T1)': result['col1'],
                'Column (T2)': result['col2'],
                'Type': result['type'],
                'Total': result['total'],
                'Match': result['matching'],
                'Mismatch': result['mismatching'],
                'Match %': f"{result['match_pct']:.2f}%"
            })
        return pd.DataFrame(data)

In [10]:
df1 = pd.DataFrame({
    'id': [1, 2, 3, 4, 5],
    'amount': [100.50, 250.75, 150.00, 300.25, 75.99],
    'status': ['Active', 'Pending', 'Active', 'Closed', 'Active']
})

# DataFrame 2
df2 = pd.DataFrame({
    'id': [1, 2, 3, 4, 6],
    'amount': [100.50, 250.75, 155.00, 300.25, 500.00],
    'category': ['A', 'B', 'A', 'C', 'B']
})


config = [
    ColumnConfig('id', 'id', 'J'),
    ColumnConfig('amount', 'amount', 'N', variance=0.01),
    ColumnConfig('status', 'category', 'C'),
]


recon = TableReconciliation(df1, df2, config)

# View summary
print("SUMMARY:")
print(recon.summary())

# Analyze mismatches
print("\n" + "="*60)
print("AMOUNT MISMATCHES (bad data treated as mismatch)")
print("="*60)
matched, mismatched = recon.analyze('status')

SUMMARY:
  Column (T1) Column (T2)         Type  Total  Match  Mismatch Match %
0      amount      amount    NUMERICAL      6      3         3  50.00%
1      status    category  CATEGORICAL      6      0         6   0.00%

AMOUNT MISMATCHES (bad data treated as mismatch)


In [12]:
matched, mismatched = recon.analyze('amount')

In [13]:
mismatched

Unnamed: 0,id,amount,amount.1,diff
2,3,150.0,155.0,-5.0
4,5,75.99,,
5,6,,500.0,


In [14]:
import pandas as pd
import numpy as np
from typing import Dict, Tuple, Optional, List
from dataclasses import dataclass

@dataclass
class ColumnConfig:
    col1: str
    col2: str
    col_type: str  # 'J' (join), 'N' (numerical), 'C' (categorical)
    variance: Optional[float] = None  # For numerical: 0.05 = 5%


class TableReconciliation:

    def __init__(self, df1: pd.DataFrame, df2: pd.DataFrame, config: List[ColumnConfig], join_type: str = 'outer'):
        self.df1 = df1.copy()
        self.df2 = df2.copy()
        self.config = config
        self.merged = None
        self.results = {}
        self.join_type = join_type.lower()

        if self.join_type not in ['inner', 'outer']:
            raise ValueError("join_type must be 'inner' or 'outer'")

        # Extract join key
        join_cols = [c.col1 for c in config if c.col_type == 'J']
        if not join_cols:
            raise ValueError("At least one column marked as 'J' (join key) required")
        self.join_key = join_cols[0]

        # Auto-run full reconciliation
        self._run_all()

    def _run_all(self):
        """Auto-execute: join + compare all columns."""
        self.join()

        # Compare all non-join columns
        for cfg in self.config:
            if cfg.col_type == 'J':
                continue

            if cfg.col_type == 'N':
                self.compare_numerical(cfg.col1, cfg.col2, cfg.variance)
            elif cfg.col_type == 'C':
                self.compare_categorical(cfg.col1, cfg.col2)

    # ========================================================================
    # 1. JOIN & VALIDATE
    # ========================================================================

    def join(self) -> Tuple[Optional[pd.DataFrame], Dict]:
        """Join tables on join_key. Validates 1:1 mapping and no duplicates."""

        report = {
            't1_count': len(self.df1),
            't2_count': len(self.df2),
            'merged_count': None,
            'join_type': self.join_type,
            'status': 'FAILED',
            'errors': []
        }

        # Get join column names for each table
        join_cfg = next(c for c in self.config if c.col_type == 'J')
        join_col_t1 = join_cfg.col1
        join_col_t2 = join_cfg.col2

        # Check duplicates
        dups_t1 = self.df1[self.df1.duplicated(subset=[join_col_t1], keep=False)]
        if len(dups_t1) > 0:
            report['errors'].append(f"T1: {len(dups_t1)//2} duplicate {join_col_t1} keys")

        dups_t2 = self.df2[self.df2.duplicated(subset=[join_col_t2], keep=False)]
        if len(dups_t2) > 0:
            report['errors'].append(f"T2: {len(dups_t2)//2} duplicate {join_col_t2} keys")

        if report['errors']:
            return None, report

        # Rename table2 join key to match table1
        df2_renamed = self.df2.copy()
        if join_col_t2 != join_col_t1:
            df2_renamed = df2_renamed.rename(columns={join_col_t2: join_col_t1})

        # Merge
        try:
            self.merged = pd.merge(self.df1, df2_renamed, on=self.join_key, how=self.join_type,
                                   suffixes=('_t1', '_t2'), indicator=True)

            unmatched_t1 = (self.merged['_merge'] == 'left_only').sum()
            unmatched_t2 = (self.merged['_merge'] == 'right_only').sum()

            if unmatched_t1 > 0 or unmatched_t2 > 0:
                report['errors'].append(f"Unmatched: T1={unmatched_t1}, T2={unmatched_t2}")

            self.merged = self.merged.drop(columns=['_merge'])
            report['merged_count'] = len(self.merged)
            report['status'] = 'SUCCESS' if len(self.df1) == len(self.df2) == len(self.merged) else 'PARTIAL'

        except Exception as e:
            report['errors'].append(str(e))
            return None, report

        return self.merged, report

    # ========================================================================
    # 2. COMPARE NUMERICAL (using np.isclose)
    # ========================================================================

    def compare_numerical(self, col1: str, col2: str, variance: Optional[float] = None) -> Dict:
        """Compare numerical columns using np.isclose. Treats bad/NaN data as mismatch."""

        if self.merged is None:
            return {'error': 'Run join() first'}

        c1 = col1 if col1 in self.merged.columns else f"{col1}_t1"
        c2 = col2 if col2 in self.merged.columns else f"{col2}_t2"

        v1 = pd.to_numeric(self.merged[c1], errors='coerce')
        v2 = pd.to_numeric(self.merged[c2], errors='coerce')

        diff = v1 - v2

        # Initialize matches as False (mismatch) - safer default
        matches = pd.Series([False] * len(self.merged), index=self.merged.index)

        # Try np.isclose, catch any issues
        try:
            if variance is not None:
                # Use np.isclose with relative tolerance
                is_close = np.isclose(v1, v2, rtol=variance, atol=0, equal_nan=True)
                matches = is_close | (v1.isna() & v2.isna())
            else:
                # Exact match using np.isclose
                is_close = np.isclose(v1, v2, rtol=0, atol=0, equal_nan=True)
                matches = is_close | (v1.isna() & v2.isna())
        except Exception:
            # If np.isclose fails, all remain as mismatch (False)
            pass

        matching = matches.sum()
        total = len(self.merged)

        result = {
            'col1': col1, 'col2': col2, 'type': 'NUMERICAL',
            'total': total, 'matching': matching, 'mismatching': total - matching,
            'match_pct': (matching / total * 100) if total > 0 else 0,
            'variance': variance,
            'stats': {
                'mean_diff': diff.mean(),
                'max_diff': diff.abs().max(),
                'std_dev': diff.std()
            },
            'matches': matches,
            'diff': diff
        }

        self.results[col1] = result
        return result

    # ========================================================================
    # 3. COMPARE CATEGORICAL (strip whitespace only, case-sensitive)
    # ========================================================================

    def compare_categorical(self, col1: str, col2: str) -> Dict:
        """Compare categorical columns (strip whitespace only, case-sensitive)."""

        if self.merged is None:
            return {'error': 'Run join() first'}

        c1 = col1 if col1 in self.merged.columns else f"{col1}_t1"
        c2 = col2 if col2 in self.merged.columns else f"{col2}_t2"

        v1 = self.merged[c1].fillna('').astype(str).str.strip()
        v2 = self.merged[c2].fillna('').astype(str).str.strip()

        matches = v1 == v2
        matching = matches.sum()
        total = len(self.merged)

        result = {
            'col1': col1, 'col2': col2, 'type': 'CATEGORICAL',
            'total': total, 'matching': matching, 'mismatching': total - matching,
            'match_pct': (matching / total * 100) if total > 0 else 0,
            'matches': matches
        }

        self.results[col1] = result
        return result

    # ========================================================================
    # 4. DETAILED ANALYSIS
    # ========================================================================

    def analyze(self, col1: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Detailed analysis: returns (matched_df, mismatched_df)

        Columns: [join_key, col1, col2, diff] sorted by highest diff
        """

        if self.merged is None:
            return pd.DataFrame(), pd.DataFrame()

        # Find config
        cfg = next((c for c in self.config if c.col1 == col1), None)
        if not cfg:
            return pd.DataFrame(), pd.DataFrame()

        col2 = cfg.col2
        col_type = cfg.col_type

        # Get comparison results
        if col1 not in self.results:
            if col_type == 'N':
                self.compare_numerical(col1, col2, cfg.variance)
            else:
                self.compare_categorical(col1, col2)

        matches = self.results[col1]['matches']

        # Get actual column names
        c1 = col1 if col1 in self.merged.columns else f"{col1}_t1"
        c2 = col2 if col2 in self.merged.columns else f"{col2}_t2"

        # Matched records
        matched = self.merged[matches][[self.join_key, c1, c2]].copy()
        matched.columns = [self.join_key, col1, col2]

        # Mismatched records
        mismatched = self.merged[~matches][[self.join_key, c1, c2]].copy()
        mismatched.columns = [self.join_key, col1, col2]

        if col_type == 'N':
            v1 = pd.to_numeric(self.merged.loc[~matches, c1], errors='coerce')
            v2 = pd.to_numeric(self.merged.loc[~matches, c2], errors='coerce')
            diff = v1.values - v2.values
            mismatched['diff'] = diff
            mismatched = mismatched.sort_values('diff', key=abs, ascending=False)
        else:
            mismatched['diff'] = mismatched[col1].astype(str) + ' → ' + mismatched[col2].astype(str)

        return matched, mismatched

    # ========================================================================
    # SUMMARY
    # ========================================================================

    def summary(self) -> pd.DataFrame:
        """Summary of all comparisons."""
        data = []
        for col1, result in self.results.items():
            data.append({
                'Column (T1)': result['col1'],
                'Column (T2)': result['col2'],
                'Type': result['type'],
                'Total': result['total'],
                'Match': result['matching'],
                'Mismatch': result['mismatching'],
                'Match %': f"{result['match_pct']:.2f}%"
            })
        return pd.DataFrame(data)

In [19]:
df1 = pd.DataFrame({
    'id': [1, 2, 3, 4, 5],
    'amount': [100.50, 250.75, 150.00, 300.25, 75.99],
    'status': ['Active', 'Pending', 'Active', 'Closed', 'Active']
})

# DataFrame 2
df2 = pd.DataFrame({
    'id': [1, 2, 3, 4, 6],
    'amount': [100.50, 250.75, 155.00, 300.25, 500.00],
    'category': ['A', 'B', 'A', 'C', 'B']
})


config = [
    ColumnConfig('id', 'id', 'J'),
    ColumnConfig('amount', 'amount', 'N', variance=0.01),
    ColumnConfig('status', 'category', 'C'),
]


recon = TableReconciliation(df1, df2, config,join_type='inner')

# View summary
print("SUMMARY:")
print(recon.summary())

# Analyze mismatches
print("\n" + "="*60)
print("AMOUNT MISMATCHES (bad data treated as mismatch)")
print("="*60)
matched, mismatched = recon.analyze('status')

SUMMARY:
  Column (T1) Column (T2)         Type  Total  Match  Mismatch Match %
0      amount      amount    NUMERICAL      4      3         1  75.00%
1      status    category  CATEGORICAL      4      0         4   0.00%

AMOUNT MISMATCHES (bad data treated as mismatch)


In [20]:
mismatched

Unnamed: 0,id,status,category,diff
0,1,Active,A,Active → A
1,2,Pending,B,Pending → B
2,3,Active,A,Active → A
3,4,Closed,C,Closed → C
