In [25]:
import pandas as pd
import numpy as np
from sklearn.model_selection import StratifiedKFold

def stratified_downsample_cv(df, label_col='Label', n_splits=5, random_state=42, return_idx = False):
    """
    Performs stratified k-fold cross-validation with downsampling of the majority class.

    BALANCED TRAINING SET, BALANCED TEST SET

    Parameters:
        df (pd.DataFrame): The input dataset containing features and a binary label column.
        label_col (str): The name of the label column.
        n_splits (int): Number of folds.
        random_state (int): Random seed for reproducibility.

    Returns:
        List of (train_df, test_df) tuples, each representing a fold.
    """
    df_positive = df[df[label_col] == 1].copy()
    df_negative = df[df[label_col] == 0].copy()

    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state)
    folds = list(skf.split(df_positive, df_positive[label_col]))

    fold_data = []

    if return_idx:

        for fold_idx, (train_pos_idx, test_pos_idx) in enumerate(folds):
            train_pos = df_positive.iloc[train_pos_idx]
            test_pos = df_positive.iloc[test_pos_idx]
    
            # Downsample negatives
            train_neg = df_negative.sample(n=len(train_pos), random_state=random_state + fold_idx)
            test_neg = df_negative.drop(train_neg.index, errors='ignore').sample(
                n=len(test_pos), random_state=random_state + 10 + fold_idx
            )
    
            # Get indices
            train_idx = pd.concat([train_pos, train_neg]).index.to_list()
            test_idx = pd.concat([test_pos, test_neg]).index.to_list()
    
            fold_data.append((train_idx, test_idx))

    else:
        
        for fold_idx, (train_pos_idx, test_pos_idx) in enumerate(folds):
            train_pos = df_positive.iloc[train_pos_idx]
            test_pos = df_positive.iloc[test_pos_idx]
    
            # Downsample negatives
            train_neg = df_negative.sample(n=len(train_pos), random_state=random_state + fold_idx)
            test_neg = df_negative.drop(train_neg.index, errors='ignore').sample(
                n=len(test_pos), random_state=random_state + 10 + fold_idx
            )
    
            train_df = pd.concat([train_pos, train_neg]).sample(frac=1.0, random_state=random_state + 20 + fold_idx)
            test_df = pd.concat([test_pos, test_neg]).sample(frac=1.0, random_state=random_state + 30 + fold_idx)
    
            fold_data.append((train_df.reset_index(drop=True), test_df.reset_index(drop=True)))

    return fold_data

def stratified_cv_full_neg_train(df, label_col='Label', n_splits=5, random_state=42, return_idx=False):
    """
    Performs stratified k-fold cross-validation:

    UNBALANCED TRAINING SET, BALANCED TEST SET
    - Training set includes all negative samples and positive samples in the training split.
    - Test set is balanced: equal number of positives and downsampled negatives.

    Parameters:
        df (pd.DataFrame): The input dataset.
        label_col (str): The name of the label column.
        n_splits (int): Number of folds.
        random_state (int): Random seed for reproducibility.

    Returns:
        List of (train_df, test_df) tuples.
    """
    df_positive = df[df[label_col] == 1].copy()
    df_negative = df[df[label_col] == 0].copy()

    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state)
    folds = list(skf.split(df_positive, df_positive[label_col]))

    fold_data = []

    if return_idx:
    
        for fold_idx, (train_pos_idx, test_pos_idx) in enumerate(folds):
            train_pos = df_positive.iloc[train_pos_idx]
            test_pos = df_positive.iloc[test_pos_idx]
    
            # Training set: all negatives + train positives
            train_idx = pd.concat([train_pos, df_negative]).index.to_list()
    
            # Test set: balanced test positives + sampled negatives
            test_neg = df_negative.sample(n=len(test_pos), random_state=random_state + 10 + fold_idx)
            test_idx = pd.concat([test_pos, test_neg]).index.to_list()
    
            fold_data.append((train_idx, test_idx))

    else:
    
        for fold_idx, (train_pos_idx, test_pos_idx) in enumerate(folds):
            # Train: all negatives + positive train split
            train_pos = df_positive.iloc[train_pos_idx]
            train_df = pd.concat([train_pos, df_negative]).sample(frac=1.0, random_state=random_state + fold_idx)
    
            # Test: positive test split + downsampled negatives
            test_pos = df_positive.iloc[test_pos_idx]
            test_neg = df_negative.sample(n=len(test_pos), random_state=random_state + 10 + fold_idx)
            test_df = pd.concat([test_pos, test_neg]).sample(frac=1.0, random_state=random_state + 20 + fold_idx)
    
            fold_data.append((train_df.reset_index(drop=True), test_df.reset_index(drop=True)))

    return fold_data

def generate_test_data(n_pos=10000, n_neg=2000000, n_features=5):
    np.random.seed(0)
    # Create features for positives and negatives
    pos_data = pd.DataFrame(np.random.randn(n_pos, n_features), columns=[f"feature_{i}" for i in range(n_features)])
    pos_data['Label'] = 1

    neg_data = pd.DataFrame(np.random.randn(n_neg, n_features), columns=[f"feature_{i}" for i in range(n_features)])
    neg_data['Label'] = 0

    full_df = pd.concat([pos_data, neg_data]).sample(frac=1.0).reset_index(drop=True)
    return full_df


In [26]:
# Generate mock dataset
df = generate_test_data()

In [27]:
df

Unnamed: 0,feature_0,feature_1,feature_2,feature_3,feature_4,Label
0,-1.488774,-1.292348,-0.551646,-0.497234,0.359679,0
1,-1.939402,0.104617,-1.493234,-1.669684,-1.122117,0
2,0.206874,0.263105,-0.949888,-0.671508,-1.588191,0
3,-0.769363,-0.138402,-0.318259,-0.213263,-1.623041,0
4,0.159199,0.537841,-0.374660,1.469538,-1.089280,0
...,...,...,...,...,...,...
2009995,0.413385,-0.817726,-0.599240,-0.730284,0.323353,0
2009996,0.357714,1.455041,-0.123469,1.336496,0.480696,0
2009997,-0.233000,0.383257,1.238289,-0.666856,0.209631,0
2009998,-0.015651,-0.158612,-2.260126,1.048985,-0.433367,0


In [28]:
# Perform stratified downsampled CV
cv_folds = stratified_downsample_cv(df, return_idx=True)

In [29]:
len(cv_folds[0])

2

In [14]:
# Inspect the label balance in one of the folds
for i, (train_df, test_df) in enumerate(cv_folds):
    print(f"Fold {i}")
    print("Train label distribution:\n", train_df['Label'].value_counts(normalize=True))
    print("Test label distribution:\n", test_df['Label'].value_counts(normalize=True))
    print("-" * 40)

Fold 0
Train label distribution:
 1    0.5
0    0.5
Name: Label, dtype: float64
Test label distribution:
 0    0.5
1    0.5
Name: Label, dtype: float64
----------------------------------------
Fold 1
Train label distribution:
 0    0.5
1    0.5
Name: Label, dtype: float64
Test label distribution:
 0    0.5
1    0.5
Name: Label, dtype: float64
----------------------------------------
Fold 2
Train label distribution:
 1    0.5
0    0.5
Name: Label, dtype: float64
Test label distribution:
 0    0.5
1    0.5
Name: Label, dtype: float64
----------------------------------------
Fold 3
Train label distribution:
 0    0.5
1    0.5
Name: Label, dtype: float64
Test label distribution:
 1    0.5
0    0.5
Name: Label, dtype: float64
----------------------------------------
Fold 4
Train label distribution:
 1    0.5
0    0.5
Name: Label, dtype: float64
Test label distribution:
 0    0.5
1    0.5
Name: Label, dtype: float64
----------------------------------------


In [17]:
df = generate_test_data()  # 10k pos, 200k neg

cv_folds = stratified_cv_full_neg_train(df)

for i, (train_df, test_df) in enumerate(cv_folds):
    print(f"Fold {i}")
    print("Train label distribution:\n", train_df['Label'].value_counts(normalize=True))
    print("Test label distribution:\n", test_df['Label'].value_counts(normalize=True))
    print("-" * 40)


Fold 0
Train label distribution:
 0    0.996016
1    0.003984
Name: Label, dtype: float64
Test label distribution:
 0    0.5
1    0.5
Name: Label, dtype: float64
----------------------------------------
Fold 1
Train label distribution:
 0    0.996016
1    0.003984
Name: Label, dtype: float64
Test label distribution:
 0    0.5
1    0.5
Name: Label, dtype: float64
----------------------------------------
Fold 2
Train label distribution:
 0    0.996016
1    0.003984
Name: Label, dtype: float64
Test label distribution:
 1    0.5
0    0.5
Name: Label, dtype: float64
----------------------------------------
Fold 3
Train label distribution:
 0    0.996016
1    0.003984
Name: Label, dtype: float64
Test label distribution:
 0    0.5
1    0.5
Name: Label, dtype: float64
----------------------------------------
Fold 4
Train label distribution:
 0    0.996016
1    0.003984
Name: Label, dtype: float64
Test label distribution:
 1    0.5
0    0.5
Name: Label, dtype: float64
--------------------------

In [18]:
df.head()

Unnamed: 0,feature_0,feature_1,feature_2,feature_3,feature_4,Label
0,-1.488774,-1.292348,-0.551646,-0.497234,0.359679,0
1,-1.939402,0.104617,-1.493234,-1.669684,-1.122117,0
2,0.206874,0.263105,-0.949888,-0.671508,-1.588191,0
3,-0.769363,-0.138402,-0.318259,-0.213263,-1.623041,0
4,0.159199,0.537841,-0.37466,1.469538,-1.08928,0
