Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,7 @@ ehthumbs.db
# Other
*.env
*.env.*
.envrc
.envrc
datasets/data/bar_pass_prediction.csv
example_notebooks/CART_LawSchoolAdmissionBar.ipynb
example_notebooks/helper_functions.py
7 changes: 4 additions & 3 deletions synthpop/method/GC.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,12 @@ def _rebuild_gaussian_copula(self, model_parameters: Dict[str, Any], default_par
univariates.append(univariate)
model_parameters["univariates"] = univariates
model_parameters["columns"] = columns
correlation = model_parameters.get("correlation")
correlation = model_parameters.get('correlation')
if correlation:
model_parameters["correlation"] = self._rebuild_correlation_matrix(correlation)
model_parameters['correlation'] = (
self._rebuild_correlation_matrix(correlation))
else:
model_parameters["correlation"] = [[1.0]]
model_parameters['correlation'] = [[1.0]]
return model_parameters

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions synthpop/metrics/diagnostic_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def generate_report(self) -> pd.DataFrame:
col_report["range_coverage"] = range_coverage(real, synthetic)
col_report["boundary_adherence"] = boundary_adherence(real, synthetic)
col_report["ks_complement"] = ks_complement(real, synthetic)
col_report["tv_complement"] = tv_complement(real, synthetic)
col_report["tv_complement"] = "N/A"
col_report["statistic_similarity"] = statistic_similarity(real, synthetic)
col_report["category_coverage"] = "N/A"
col_report["category_adherence"] = "N/A"
Expand All @@ -95,7 +95,7 @@ def generate_report(self) -> pd.DataFrame:
col_report["range_coverage"] = "N/A"
col_report["boundary_adherence"] = "N/A"
col_report["ks_complement"] = "N/A"
col_report["tv_complement"] = "N/A"
col_report["tv_complement"] = tv_complement(real, synthetic)
col_report["statistic_similarity"] = "N/A"
col_report["category_coverage"] = category_coverage(real, synthetic)
col_report["category_adherence"] = category_adherence(real, synthetic)
Expand Down
13 changes: 12 additions & 1 deletion synthpop/metrics/efficacy_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,18 @@ def evaluate(self, real_df: pd.DataFrame, synthetic_df: pd.DataFrame) -> dict:
X_real = real_df.drop(columns=[self.target_column])
y_real = real_df[self.target_column]

# For the purposes of efficacy metrics, we train on synthetic data and test on real data.
# Handle categorical encoding only if it's a classification task
if self.task == 'classification':
categorical_cols = X_syn.select_dtypes(include=['object', 'category']).columns.tolist()

if categorical_cols:
X_syn = pd.get_dummies(X_syn, columns=categorical_cols, drop_first=True)
X_real = pd.get_dummies(X_real, columns=categorical_cols, drop_first=True)

# Align columns in case of different categorical levels between real and synthetic data
X_syn, X_real = X_syn.align(X_real, join='left', axis=1, fill_value=0)

# Model Training and Evaluation
if self.task == 'regression':
model = LinearRegression()
model.fit(X_syn, y_syn)
Expand Down
70 changes: 54 additions & 16 deletions synthpop/metrics/privacy_metrics.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,91 @@
# privacy_metrics.py

import numpy as np
import pandas as pd
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder

class DisclosureProtection:
"""
A class to compute the disclosure protection metric for synthetic data.

The metric is defined as 1 minus the proportion of synthetic records that are too similar
(i.e. within a risk threshold) to a record in the real dataset.
This metric measures the proportion of synthetic records that are too similar
(within a defined threshold) to real records, posing a disclosure risk.

Parameters
----------
real_data : pd.DataFrame
A DataFrame containing the real data. The data should be numeric or preprocessed.
A DataFrame containing the real data. Supports both numerical and categorical features.
synthetic_data : pd.DataFrame
A DataFrame containing the synthetic data (with the same columns as real_data).
A DataFrame containing the synthetic data (with the same structure as real_data).
threshold : float, optional
A distance threshold under which a synthetic record is considered a potential disclosure risk.
If not provided, it is computed as the 10th percentile of the nearest-neighbor distances among real records.
"""

def __init__(self, real_data: pd.DataFrame, synthetic_data: pd.DataFrame, threshold: float = None):
self.real_data = real_data.copy()
self.synthetic_data = synthetic_data.copy()
self.threshold = threshold

# Preprocess data for distance computation
self.real_data, self.synthetic_data = self._preprocess_data(self.real_data, self.synthetic_data)

# Compute distance threshold if not provided
self._compute_threshold()

def _preprocess_data(self, real_data: pd.DataFrame, synthetic_data: pd.DataFrame):
"""
Preprocess both real and synthetic datasets:
- Standardize numerical columns
- One-hot encode categorical columns
- Align columns to ensure consistency
"""

# Identify numerical and categorical columns
categorical_cols = real_data.select_dtypes(include=["object", "category"]).columns.tolist()
numerical_cols = real_data.select_dtypes(include=[np.number]).columns.tolist()

# One-Hot Encode Categorical Columns
if categorical_cols:
encoder = OneHotEncoder(sparse_output=True, drop="first", handle_unknown="ignore")
real_cats = encoder.fit_transform(real_data[categorical_cols])
synthetic_cats = encoder.transform(synthetic_data[categorical_cols])

# Convert to DataFrame
real_cat_df = pd.DataFrame(real_cats.toarray(), columns=encoder.get_feature_names_out(categorical_cols))
synthetic_cat_df = pd.DataFrame(synthetic_cats.toarray(), columns=encoder.get_feature_names_out(categorical_cols))

# Drop original categorical columns and replace with encoded versions
real_data = real_data.drop(columns=categorical_cols)
synthetic_data = synthetic_data.drop(columns=categorical_cols)
real_data = pd.concat([real_data, real_cat_df], axis=1)
synthetic_data = pd.concat([synthetic_data, synthetic_cat_df], axis=1)

# Standardize numerical features
if numerical_cols:
scaler = MinMaxScaler()
real_data[numerical_cols] = scaler.fit_transform(real_data[numerical_cols])
synthetic_data[numerical_cols] = scaler.transform(synthetic_data[numerical_cols])

# Align columns (in case some categories exist in one dataset but not the other)
real_data, synthetic_data = real_data.align(synthetic_data, join="left", axis=1, fill_value=0)

return real_data, synthetic_data

def _compute_threshold(self):
"""
Compute the threshold if not provided. Uses the 10th percentile of the nearest-neighbor
distances among real records (excluding self-distance).
"""
if self.threshold is None:
# Fit a nearest neighbor model on the real data.
# n_neighbors=2 because the closest neighbor of a record is itself.
nn = NearestNeighbors(n_neighbors=2)
nn.fit(self.real_data)
distances, _ = nn.kneighbors(self.real_data)
# distances[:, 1] are the distances to the closest distinct record.
self.threshold = np.percentile(distances[:, 1], 10)

self.threshold = np.percentile(distances[:, 1], 10) # Exclude self-distance

def score(self) -> float:
"""
Compute the disclosure protection score.

For each synthetic record, compute its distance to the nearest real record.
The risk rate is the proportion of synthetic records with distance below the threshold.
The disclosure protection score is 1 - risk_rate (higher is better).

Returns
-------
Expand All @@ -61,7 +98,7 @@ def score(self) -> float:
distances = distances.flatten()
risk_count = np.sum(distances < self.threshold)
risk_rate = risk_count / len(distances)
return 1 - risk_rate
return 1 - risk_rate # Higher score means better protection

def report(self) -> dict:
"""
Expand All @@ -79,6 +116,7 @@ def report(self) -> dict:
risk_count = np.sum(distances < self.threshold)
risk_rate = risk_count / len(distances)
score = 1 - risk_rate

return {
"threshold": self.threshold,
"risk_rate": risk_rate,
Expand Down
72 changes: 33 additions & 39 deletions synthpop/metrics/single_columns_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,52 +124,46 @@ def ks_complement(real: pd.Series, synthetic: pd.Series) -> float:
return 1 - ks_stat


def tv_complement(real: pd.Series, synthetic: pd.Series, bins: int = 10) -> float:
def tv_complement(real_series: pd.Series, synthetic_series: pd.Series) -> float:
"""
Compute the complement of the Total Variation (TV) distance between the histograms
of the real and synthetic data. A value of 1 indicates identical distributions.
Computes the TVComplement score between a real and a synthetic categorical column.

If the data is datetime or timedelta, convert it to numeric values (in seconds).
TVD is defined as:
TVD = 1/2 * sum(|R_ω - S_ω|) for all categories ω in the union of both series.

The TVComplement score is:
score = 1 - TVD

Parameters
----------
real_series : pd.Series
Categorical data from the real dataset.
synthetic_series : pd.Series
Categorical data from the synthetic dataset.

Args:
real (pd.Series): Real numerical data.
synthetic (pd.Series): Synthetic numerical data.
bins (int, optional): Number of bins to use for the histograms. Defaults to 10.
Returns
-------
float
The TVComplement score (between 0 and 1).
"""
# Compute normalized frequency distributions (probabilities)
real_freq = real_series.value_counts(normalize=True)
synthetic_freq = synthetic_series.value_counts(normalize=True)

Returns:
float: 1 - TV distance, where TV is computed over the normalized histograms.
"""
real_clean = real.dropna()
synthetic_clean = synthetic.dropna()
# Get the union of categories present in both series
all_categories = real_freq.index.union(synthetic_freq.index)

if len(real_clean) == 0 or len(synthetic_clean) == 0:
return 0.0

# Convert datetime/timedelta to numeric values if necessary.
if np.issubdtype(real_clean.dtype, np.datetime64):
# Convert to seconds since epoch
real_clean = real_clean.astype('int64') / 1e9
synthetic_clean = synthetic_clean.astype('int64') / 1e9
elif np.issubdtype(real_clean.dtype, np.timedelta64):
# Convert to total seconds
if hasattr(real_clean, 'dt'):
real_clean = real_clean.dt.total_seconds()
synthetic_clean = synthetic_clean.dt.total_seconds()
else:
real_clean = real_clean.astype('int64') / 1e9
synthetic_clean = synthetic_clean.astype('int64') / 1e9

all_data = pd.concat([real_clean, synthetic_clean])
bin_edges = np.histogram_bin_edges(all_data, bins=bins)
real_hist, _ = np.histogram(real_clean, bins=bin_edges, density=True)
synth_hist, _ = np.histogram(synthetic_clean, bins=bin_edges, density=True)
# Reindex to ensure both distributions have the same categories, fill missing with 0
real_freq = real_freq.reindex(all_categories, fill_value=0)
synthetic_freq = synthetic_freq.reindex(all_categories, fill_value=0)

# Calculate Total Variation Distance (TVD)
tvd = 0.5 * np.abs(real_freq - synthetic_freq).sum()

# Normalize the histograms
real_hist = real_hist / np.sum(real_hist)
synth_hist = synth_hist / np.sum(synth_hist)
# Compute TVComplement: higher score means higher similarity
tv_complement_score = 1 - tvd

tv_distance = 0.5 * np.sum(np.abs(real_hist - synth_hist))
return 1 - tv_distance
return tv_complement_score


# ------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion synthpop/processor/data_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _preprocess(self, data: pd.DataFrame) -> pd.DataFrame:
elif dtype == "timedelta":
data[col] = pd.to_timedelta(data[col]).dt.total_seconds()

return data
return data[self.original_columns]

def postprocess(self, synthetic_data: pd.DataFrame) -> pd.DataFrame:
"""Transform numerical synthetic data back to its original format."""
Expand Down
39 changes: 29 additions & 10 deletions synthpop/processor/missing_data_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from sklearn.impute import SimpleImputer, IterativeImputer
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import LabelEncoder
from .data_processor import DataProcessor
import warnings


Expand Down Expand Up @@ -204,6 +205,11 @@ def detect_missingness(self, dfc: pd.DataFrame) -> dict:
def apply_imputation(self, df: pd.DataFrame, missingness: dict) -> pd.DataFrame:
"""Automatically applies imputation based on missingness type and column data type."""
df = df.copy()
metadata = self.get_column_dtypes(df)
processor = DataProcessor(metadata)
processed_data = processor.preprocess(df)
imputer = IterativeImputer(random_state=42)
df_iterative = pd.DataFrame(imputer.fit_transform(processed_data), columns= df.columns)
for col, mtype in missingness.items():
if df[col].isna().sum() == 0:
continue
Expand All @@ -218,16 +224,30 @@ def apply_imputation(self, df: pd.DataFrame, missingness: dict) -> pd.DataFrame:
df[col].fillna(df[col].mode()[0], inplace=True)
elif mtype == "MAR":
# Use get_dummies encoding for categorical data
dummies = pd.get_dummies(df[col], prefix=col, dummy_na=True)
le = LabelEncoder()
non_missing = df[col].dropna()
le.fit(non_missing)
predictor_cols = [c for c in df.columns if c != col]
predictors = df_iterative[predictor_cols].copy()
df_copy = df.copy()
df_copy[f"{col}_encoded"] = df_copy[col].apply(lambda x: le.transform([x])[0] if pd.notna(x) else np.nan)

# Combine predictors and the encoded target.
combined = pd.concat([predictors, df_copy[[f"{col}_encoded"]]], axis=1)
# Impute missing values using IterativeImputer.
imputer = IterativeImputer(random_state=42)
imputed = imputer.fit_transform(dummies)
imputed_rounded = np.rint(imputed).astype(int)
imputed_df = pd.DataFrame(
imputed_rounded, columns=dummies.columns, index=df.index
)
# Convert back to a single categorical column by taking the column with the maximum value.
predicted_category = imputed_df.idxmax(axis=1)
df[col] = predicted_category.str.split(f"{col}_").str[-1]
imputed_array = imputer.fit_transform(combined)
imputed_df = pd.DataFrame(imputed_array, columns=combined.columns, index=df.index)

# Extract the imputed encoded target column.
imputed_encoded = imputed_df[f"{col}_encoded"]
imputed_encoded = imputed_encoded.round().astype(int)
min_code = 0
max_code = len(le.classes_) - 1
imputed_encoded = imputed_encoded.clip(lower=min_code, upper=max_code)
# Decode back to the original categorical labels.
imputed_categories = le.inverse_transform(imputed_encoded)
df[col] = imputed_categories
elif mtype == "MNAR":
df[col].fillna("Missing", inplace=True)

Expand All @@ -252,7 +272,6 @@ def apply_imputation(self, df: pd.DataFrame, missingness: dict) -> pd.DataFrame:

# --- Datetime Data ---
elif pd.api.types.is_datetime64_any_dtype(df[col]):
print("entering here")
numeric_series = df[col].apply(lambda x: x.timestamp() if pd.notnull(x) else np.nan)
if mtype == "MCAR":
imputer = SimpleImputer(strategy="median")
Expand Down