In [2]:
import numpy as np
import pandas as pd
from ForestDiffusion import ForestDiffusionModel
from sklearn.preprocessing import StandardScaler
from scipy.stats import norm
from sklearn.neighbors import KernelDensity
import warnings

# Suppress warnings about feature names mismatch in KDE
warnings.filterwarnings("ignore", category=UserWarning, message="X does not have valid feature names")

class CorrelationAdjustedForestDiffusion:
    def __init__(self, X, n_t=50, n_estimators=100, max_depth=10, n_jobs=-1):
        self.X = X
        self.n_t = n_t  # Number of time steps in diffusion
        self.n_features = X.shape[1]

        # Ensure X is a numpy array
        if isinstance(X, pd.DataFrame):
            X = X.to_numpy()

        # Standardize data using StandardScaler
        self.scaler = StandardScaler().fit(X)
        X_scaled = self.scaler.transform(X)

        # Calculate the correlation matrix of the original data
        self.correlation_matrix = np.corrcoef(X_scaled, rowvar=False)
        
        # Perform Cholesky decomposition to get the lower triangular matrix
        self.L = np.linalg.cholesky(self.correlation_matrix)

        # Initialize the Forest Diffusion model
        self.forest_diffusion = ForestDiffusionModel(
            X=X_scaled,
            label_y=None,  # No labels for generation
            n_t=n_t,
            n_estimators=n_estimators,
            max_depth=max_depth,
            n_jobs=n_jobs,
            diffusion_type='flow',
            duplicate_K=100,
            bin_indexes=[],  # Assuming no binning needed
            cat_indexes=[],  # Assuming no categorical features
            int_indexes=[]   # Assuming no integer-specific processing
        )

    def apply_whitening(self, X):
        """Applies the whitening transformation using the Cholesky matrix."""
        return X @ np.linalg.inv(self.L.T)

    def revert_whitening(self, X):
        """Reverts the whitening transformation."""
        return X @ self.L.T

    def kde_filter(self, generated_X):
        """Filter generated samples using Kernel Density Estimation to retain high-density samples."""
        kde = KernelDensity(kernel='gaussian', bandwidth=0.1).fit(self.X)
        log_probs = kde.score_samples(generated_X)

        # Keep only the samples with density scores above the 50th percentile
        threshold = np.percentile(log_probs, 50)
        filtered_samples = generated_X[log_probs >= threshold]

        return filtered_samples

    def post_sampling_noise_reduction(self, generated_X):
        """Reduces noise in generated data by smoothing and ensuring consistency."""
        # Clip extreme values to reduce outliers
        generated_X = np.clip(generated_X, -1e5, 1e5)

        # Smooth data if needed (optional)
        smoothed_X = np.zeros_like(generated_X)
        for i in range(generated_X.shape[1]):
            smoothed_X[:, i] = np.interp(np.arange(generated_X.shape[0]), np.arange(generated_X.shape[0]), generated_X[:, i])

        return smoothed_X

    def generate_samples(self, num_samples):
        """Generate samples using Forest Diffusion in the whitened space and apply correlation adjustments."""
        # Generate samples with Forest Diffusion model in the whitened space
        try:
            samples = self.forest_diffusion.generate(batch_size=num_samples, n_t=self.n_t)
        except AttributeError:
            print("generate method not found. Please check the available methods.")
            return np.empty((num_samples, self.n_features))

        # Apply whitening to transform the data into uncorrelated space
        whitened_samples = self.apply_whitening(samples)

        # After generating in the whitened space, we need to revert to the original space (with correlations)
        reverted_samples = self.revert_whitening(whitened_samples)

        # Inverse scaling to original scale
        reverted_samples = self.scaler.inverse_transform(reverted_samples)

        # Apply KDE filtering to retain high-density samples
        reverted_samples = self.kde_filter(reverted_samples)

        # Reduce post-sampling noise
        reverted_samples = self.post_sampling_noise_reduction(reverted_samples)

        return reverted_samples


# Load the Pima Indians Diabetes dataset
url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"
column_names = ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin',
                'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome']
data = pd.read_csv(url, header=None, names=column_names)

# Separate features and minority class (Outcome == 1)
X = data.drop('Outcome', axis=1)
y = data['Outcome']
X_minority = X[y == 1]

# Initialize the correlation-adjusted diffusion model
correlation_diffusion_model = CorrelationAdjustedForestDiffusion(X_minority, n_t=50)

# Generate synthetic samples
num_samples_to_generate = len(X_minority)  # Generate the same number of samples as the minority class
synthetic_samples = correlation_diffusion_model.generate_samples(num_samples_to_generate)

# Integrate synthetic samples back into the dataset
X_final = np.vstack((X, synthetic_samples))
y_final = np.concatenate((y, np.ones(len(synthetic_samples))))  # Label generated samples as '1'

# Convert to DataFrame and save
X_final_df = pd.DataFrame(X_final, columns=X.columns)
y_final_df = pd.Series(y_final, name="Outcome")
final_data = pd.concat([X_final_df, y_final_df], axis=1)
final_data.to_csv('generated_pima_diabetes_data_with_whitened_diffusion.csv', index=False)

print("Generated data saved to 'generated_pima_diabetes_data_with_whitened_diffusion.csv'")

KeyboardInterrupt: 