In [10]:
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import make_pipeline
from skrub import SelectCols

In [12]:
PROV_COL = "_provenance"

# CORE PROVENANCE TRANSFORMERS
class ProvenanceInit(BaseEstimator, TransformerMixin):
    """Initialize provenance tracking."""
    
    def __init__(self, table_name):
        self.table_name = table_name
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X = X.copy()
        X[PROV_COL] = [[(self.table_name, i, 1.0)] for i in range(len(X))]
        return X


class ProvenanceFilter(BaseEstimator, TransformerMixin):
    """Filter rows while preserving provenance."""
    
    def __init__(self, condition):
        self.condition = condition
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        if isinstance(self.condition, str):
            return X.query(self.condition).copy()
        elif callable(self.condition):
            return X[self.condition(X)].copy()
        else:
            return X[self.condition].copy()


class ProvenanceSelectCols(BaseEstimator, TransformerMixin):
    """Select columns while preserving provenance."""
    
    def __init__(self, cols):
        self.cols = cols
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        cols_to_keep = list(self.cols)
        if PROV_COL in X.columns and PROV_COL not in cols_to_keep:
            cols_to_keep.append(PROV_COL)
        
        selector = SelectCols(cols_to_keep)
        return selector.fit_transform(X)


class ProvenanceJoiner(BaseEstimator, TransformerMixin):
    """Join with auxiliary table and merge provenance."""
    
    def __init__(self, aux_table, key=None, main_key=None, aux_key=None, how='inner'):
        self.aux_table = aux_table
        self.key = key
        self.main_key = main_key
        self.aux_key = aux_key
        self.how = how
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        # Determine keys
        if self.key is not None:
            left_on = right_on = self.key
        else:
            left_on = self.main_key
            right_on = self.aux_key
        
        # Ensure aux_table has provenance
        aux = self.aux_table.copy()
        if PROV_COL not in aux.columns:
            aux = ProvenanceInit("AUX").transform(aux)
        
        # Perform merge
        merged = X.merge(
            aux,
            left_on=left_on,
            right_on=right_on,
            how=self.how,
            suffixes=('', '_RIGHT')
        )
        
        # Merge provenance
        col_l = PROV_COL
        col_r = f"{PROV_COL}_RIGHT"
        
        def merge_provenance(row):
            hist_l = row.get(col_l, [])
            hist_r = row.get(col_r, [])
            
            if not isinstance(hist_l, list):
                hist_l = []
            if not isinstance(hist_r, list):
                hist_r = []
            
            if hist_l and hist_r:
                left_weight = 0.5
                right_weight = 0.5
            elif hist_l:
                left_weight = 1.0
                right_weight = 0.0
            elif hist_r:
                left_weight = 0.0
                right_weight = 1.0
            else:
                return []
            
            result = []
            for table, row_id, orig_weight in hist_l:
                result.append((table, row_id, orig_weight * left_weight))
            for table, row_id, orig_weight in hist_r:
                result.append((table, row_id, orig_weight * right_weight))
            
            return result
        
        merged[PROV_COL] = merged.apply(merge_provenance, axis=1)
        
        if col_r in merged.columns:
            merged = merged.drop(columns=[col_r])
        
        return merged


# UTILITY FUNCTIONS
def get_provenance_summary(df):
    """Get average contribution by table."""
    if PROV_COL not in df.columns:
        return {}
    
    summary = {}
    for prov_list in df[PROV_COL]:
        if isinstance(prov_list, list):
            for table, row_id, weight in prov_list:
                summary[table] = summary.get(table, 0) + weight
    
    n_rows = len(df)
    return {k: v / n_rows for k, v in summary.items()}


def show_row_provenance(df, row_idx=0):
    """Show provenance for specific row."""
    if PROV_COL not in df.columns or row_idx >= len(df):
        print("No provenance available")
        return
    
    print(f"Row {row_idx}:")
    prov = df[PROV_COL].iloc[row_idx]
    for table, row_id, weight in prov:
        print(f"  {table}[{row_id}]: {weight:.1%}")

In [13]:
# EXAMPLE
# Sample data
customers = pd.DataFrame({
    'c_id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Charlie'],
    'country': ['USA', 'UK', 'USA']
})

orders = pd.DataFrame({
    'o_id': [101, 102, 103, 104],
    'c_id': [1, 1, 2, 3],
    'p_id': ['P1', 'P2', 'P1', 'P3'],
    'amount': [1200, 50, 300, 20]
})

products = pd.DataFrame({
    'p_id': ['P1', 'P2', 'P3'],
    'product_name': ['Laptop', 'Mouse', 'Keyboard'],
    'category': ['Electronics', 'Accessories', 'Accessories']
})

# Initialize auxiliary tables
customers_prov = ProvenanceInit("CUSTOMERS").transform(customers)
products_prov = ProvenanceInit("PRODUCTS").transform(products)

# Build pipeline
pipeline = make_pipeline(
    ProvenanceInit("ORDERS"),
    ProvenanceFilter("amount > 100"),
    ProvenanceJoiner(customers_prov, key='c_id', how='inner'),
    ProvenanceJoiner(products_prov, key='p_id', how='inner'),
    ProvenanceSelectCols(['name', 'product_name', 'amount', 'country'])
)

# Execute
result = pipeline.fit_transform(orders)

# Display
print("\nFinal Result:")
print(result.drop(columns=[PROV_COL]))

print("\n" + "=" * 70)
print("Contribution by Table:")
contributions = get_provenance_summary(result)
for table, weight in sorted(contributions.items()):
    print(f"  {table}: {weight:.1%}")

print("\n" + "=" * 70)
print("Row-Level Provenance:")
for i in range(len(result)):
    show_row_provenance(result, i)


Final Result:
    name product_name  amount country
0  Alice       Laptop    1200     USA
1    Bob       Laptop     300      UK

Contribution by Table:
  CUSTOMERS: 25.0%
  ORDERS: 25.0%
  PRODUCTS: 50.0%

Row-Level Provenance:
Row 0:
  ORDERS[0]: 25.0%
  CUSTOMERS[0]: 25.0%
  PRODUCTS[0]: 50.0%
Row 1:
  ORDERS[2]: 25.0%
  CUSTOMERS[1]: 25.0%
  PRODUCTS[0]: 50.0%
