In [15]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand
from pyspark.sql import DataFrame

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("OptimizedStratifiedSplitting") \
    .getOrCreate()

# Parameters for file paths
DATA_PATH = "creditcard.csv"
TRAIN_PATH = "creditcard_train.csv"
VAL_PATH = "creditcard_validation.csv"
TEST_PATH = "creditcard_test.csv"

# Load and shuffle the dataset
df = spark.read.csv(DATA_PATH, header=True, inferSchema=True).sample(fraction=1.0, seed=42)

# Check class distribution
class_distribution = df.groupBy("Class").count()
class_distribution.show()

def stratified_split(df: DataFrame, strat_col: str, test_frac: float, val_frac: float, seed: int):
    # Get class counts once
    class_counts = {row[strat_col]: row["count"] for row in df.groupBy(strat_col).count().collect()}
    
    # Define fractions for test split based on class distribution
    fractions = {key: test_frac for key in class_counts.keys()}
    
    # Sample test set using sampleBy
    test_set = df.sampleBy(strat_col, fractions, seed)
    
    # Remove test set from original data
    train_val_set = df.subtract(test_set)
    
    # Define fractions for train and validation split
    train_frac = 1 - val_frac
    train_val_fractions = {key: train_frac for key in class_counts.keys()}
    
    # Sample train set from remaining data using sampleBy
    train_set = train_val_set.sampleBy(strat_col, train_val_fractions, seed)
    
    # Remaining rows in train_val_set become validation set
    val_set = train_val_set.subtract(train_set)
    
    return train_set, val_set, test_set


# Apply stratified split
train_set, val_set, test_set = stratified_split(df, strat_col="Class", test_frac=0.2, val_frac=0.2, seed=42)

# Verify splits
##train_set.groupBy("Class").count().show()
##val_set.groupBy("Class").count().show()
##test_set.groupBy("Class").count().show()

# Write splits to CSV files
##train_set.coalesce(1).write.csv(TRAIN_PATH, header=True, mode="overwrite")
##val_set.coalesce(1).write.csv(VAL_PATH, header=True, mode="overwrite")
##test_set.coalesce(1).write.csv(TEST_PATH, header=True, mode="overwrite")

##print("Data splits saved to CSV files:")
##print(f"Train set: {TRAIN_PATH}")
##print(f"Validation set: {VAL_PATH}")
##print(f"Test set: {TEST_PATH}")

+-----+------+
|Class| count|
+-----+------+
|    1|   492|
|    0|284315|
+-----+------+

Stratified split execution time: 0.18 seconds
+-----+------+
|Class| count|
+-----+------+
|    1|   296|
|    0|181249|
+-----+------+

+-----+-----+
|Class|count|
+-----+-----+
|    1|   78|
|    0|44882|
+-----+-----+

+-----+-----+
|Class|count|
+-----+-----+
|    1|  100|
|    0|57183|
+-----+-----+

Data splits saved to CSV files:
Train set: creditcard_train.csv
Validation set: creditcard_validation.csv
Test set: creditcard_test.csv


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, log1p, mean, stddev
from pyspark.sql import DataFrame

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("PreprocessingPipeline") \
    .getOrCreate()

# Parameters for file paths
DATA_PATH = "creditcard.csv"
PREPROCESSED_PATH = "creditcard_preprocessed.csv"

# Load the dataset
df = spark.read.csv(DATA_PATH, header=True, inferSchema=True)

# Function to drop duplicates and null values
def clean_data(df: DataFrame) -> DataFrame:
    return df.dropDuplicates().dropna()

# Function to apply log normalization to the 'Amount' column
def log_normalize(df: DataFrame, column: str) -> DataFrame:
    return df.withColumn(column, log1p(col(column)))

# Function to standardize PCA-transformed features
def standardize_features(df: DataFrame, feature_columns: list) -> DataFrame:
    for col_name in feature_columns:
        stats = df.select(mean(col(col_name)).alias("mean"), stddev(col(col_name)).alias("stddev")).collect()[0]
        mean_val = stats["mean"]
        stddev_val = stats["stddev"]

        if stddev_val != 0:  # Avoid division by zero
            df = df.withColumn(col_name, (col(col_name) - mean_val) / stddev_val)

    return df

# Preprocessing pipeline
def preprocessing_pipeline(df: DataFrame) -> DataFrame:
    # Step 1: Clean data (drop duplicates and null values)
    df = clean_data(df)

    # Step 2: Apply log normalization to the 'Amount' column
    df = log_normalize(df, "Amount")

    # Step 3: Standardize PCA-transformed features
    pca_features = [f"V{i}" for i in range(1, 29)]  # V1 to V28
    df = standardize_features(df, pca_features)

    return df

# Apply the preprocessing pipeline
df_preprocessed = preprocessing_pipeline(df)

# Write the preprocessed data to a CSV file
df_preprocessed.coalesce(1).write.csv(PREPROCESSED_PATH, header=True, mode="overwrite")

print(f"Preprocessed data saved to: {PREPROCESSED_PATH}")

In [16]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, log1p, mean, stddev
from pyspark.sql import DataFrame

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("OptimizedStratifiedSplitting") \
    .getOrCreate()

# Parameters for file paths
DATA_PATH = "creditcard.csv"
TRAIN_PATH = "creditcard_train"
VAL_PATH = "creditcard_validation"
TEST_PATH = "creditcard_test"

# Load and shuffle the dataset
df = spark.read.csv(DATA_PATH, header=True, inferSchema=True).sample(fraction=1.0, seed=42)

# Check class distribution
class_distribution = df.groupBy("Class").count()
class_distribution.show()

def stratified_split(df: DataFrame, strat_col: str, test_frac: float, val_frac: float, seed: int):
    # Get class counts once
    class_counts = {row[strat_col]: row["count"] for row in df.groupBy(strat_col).count().collect()}
    
    # Define fractions for test split based on class distribution
    fractions = {key: test_frac for key in class_counts.keys()}
    
    # Sample test set using sampleBy
    test_set = df.sampleBy(strat_col, fractions, seed)
    
    # Remove test set from original data
    train_val_set = df.subtract(test_set)
    
    # Define fractions for train and validation split
    train_frac = 1 - val_frac
    train_val_fractions = {key: train_frac for key in class_counts.keys()}
    
    # Sample train set from remaining data using sampleBy
    train_set = train_val_set.sampleBy(strat_col, train_val_fractions, seed)
    
    # Remaining rows in train_val_set become validation set
    val_set = train_val_set.subtract(train_set)
    
    return train_set, val_set, test_set

# Preprocessing pipeline functions
def clean_data(df: DataFrame) -> DataFrame:
    return df.dropDuplicates().dropna()

def log_normalize(df: DataFrame, column: str) -> DataFrame:
    return df.withColumn(column, log1p(col(column)))

def standardize_features(df: DataFrame, feature_columns: list) -> DataFrame:
    for col_name in feature_columns:
        stats = df.select(mean(col(col_name)).alias("mean"), stddev(col(col_name)).alias("stddev")).collect()[0]
        mean_val = stats["mean"]
        stddev_val = stats["stddev"]

        if stddev_val != 0:  # Avoid division by zero
            df = df.withColumn(col_name, (col(col_name) - mean_val) / stddev_val)

    return df

def preprocessing_pipeline(df: DataFrame) -> DataFrame:
    df = clean_data(df)
    df = log_normalize(df, "Amount")
    pca_features = [f"V{i}" for i in range(1, 29)]  # V1 to V28
    df = standardize_features(df, pca_features)
    return df

# Apply stratified split
train_set, val_set, test_set = stratified_split(df, strat_col="Class", test_frac=0.2, val_frac=0.2, seed=42)

# Apply preprocessing pipeline and save outputs
preprocessed_train_set = preprocessing_pipeline(train_set)
preprocessed_val_set = preprocessing_pipeline(val_set)
preprocessed_test_set = preprocessing_pipeline(test_set)

# Save preprocessed data
preprocessed_train_set.coalesce(1).write.csv(TRAIN_PATH, header=True, mode="overwrite")
preprocessed_val_set.coalesce(1).write.csv(VAL_PATH, header=True, mode="overwrite")
preprocessed_test_set.coalesce(1).write.csv(TEST_PATH, header=True, mode="overwrite")

print("Preprocessed data splits saved to CSV files:")
print(f"Train set: {TRAIN_PATH}")
print(f"Validation set: {VAL_PATH}")
print(f"Test set: {TEST_PATH}")

+-----+------+
|Class| count|
+-----+------+
|    1|   492|
|    0|284315|
+-----+------+

Preprocessed data splits saved to CSV files:


NameError: name 'PREPROCESSED_TRAIN_PATH' is not defined

In [19]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, log1p
from pyspark.sql import DataFrame

# ML imports
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.functions import vector_to_array

# --------------------------------------------------------------------------------
# 1. Initialize Spark
# --------------------------------------------------------------------------------
spark = SparkSession.builder \
    .appName("OptimizedStratifiedSplittingWithPreprocessing") \
    .getOrCreate()

# --------------------------------------------------------------------------------
# 2. Load and Shuffle the Dataset
# --------------------------------------------------------------------------------
DATA_PATH = "creditcard.csv"

df = spark.read.csv(DATA_PATH, header=True, inferSchema=True).sample(fraction=1.0, seed=42)

# --------------------------------------------------------------------------------
# 3. Split Data into Train, Validation, and Test
# --------------------------------------------------------------------------------
train_fraction = 0.7
val_fraction   = 0.15
test_fraction  = 0.15

splits = df.randomSplit([train_fraction, val_fraction, test_fraction], seed=42)
train_set, val_set, test_set = splits[0], splits[1], splits[2]

# --------------------------------------------------------------------------------
# 4. Utility Functions
# --------------------------------------------------------------------------------

def clean_data(df: DataFrame) -> DataFrame:
    """Drop duplicates and rows with nulls."""
    return df.dropDuplicates().dropna()

def log_normalize(df: DataFrame, column: str) -> DataFrame:
    """Apply log1p(x) to a given column (e.g. Amount)."""
    return df.withColumn(column, log1p(col(column)))

def build_preprocessing_pipeline(feature_cols):
    """
    Build a Spark ML Pipeline consisting of:
     1) VectorAssembler  -> to combine features into a single 'features' column
     2) StandardScaler   -> to scale/standardize the 'features'
    """
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
    pipeline = Pipeline(stages=[assembler, scaler])
    return pipeline

def explode_scaled_features(df: DataFrame, feature_cols) -> DataFrame:
    """
    Convert 'scaled_features' (Vector) into a Spark array, then
    create columns named exactly the same as feature_cols (e.g. "V1", "V2", ...).
    
    IMPORTANT: We first drop the old columns "V1..V28" so that we don't
    get a 'COLUMN_ALREADY_EXISTS' error when we create new columns
    with those same names.
    """
    # 1) Drop the original unscaled feature columns.
    #    Otherwise, Spark complains that column X already exists.
    for c in feature_cols:
        df = df.drop(c)

    # 2) Convert ML Vector to Spark Array
    df = df.withColumn("scaled_array", vector_to_array(col("scaled_features")))

    # 3) Build the list of columns to SELECT
    #    (keep everything except the columns we no longer need)
    select_exprs = [
        col(c) for c in df.columns 
        if c not in {"scaled_features", "scaled_array", "features"}
    ]

    # 4) Add each scaled feature as a new column with the old name
    select_exprs += [
        col("scaled_array")[i].alias(feature_cols[i]) 
        for i in range(len(feature_cols))
    ]
    
    # 5) Final select
    df = df.select(*select_exprs)
    return df

# --------------------------------------------------------------------------------
# 5. Main Preprocessing / Fitting Flow
# --------------------------------------------------------------------------------

def fit_and_transform_data(train_df: DataFrame, val_df: DataFrame, test_df: DataFrame) -> (DataFrame, DataFrame, DataFrame):
    """
    1) Clean & log-normalize the training set, then fit the pipeline (assembler + scaler).
    2) Transform train, validation, and test sets with the fitted pipeline.
    3) Explode 'scaled_features' back into columns (replacing V1..V28).
    """
    pca_features = [f"V{i}" for i in range(1, 29)]

    # -----------------------------------------------
    # Step A: Pre-clean/normalize the TRAIN set
    # -----------------------------------------------
    train_cleaned = clean_data(train_df)
    train_cleaned = log_normalize(train_cleaned, "Amount")

    # Build and fit the pipeline on train
    pipeline = build_preprocessing_pipeline(pca_features)
    pipeline_model = pipeline.fit(train_cleaned)

    # Transform train set
    train_transformed = pipeline_model.transform(train_cleaned)
    train_transformed = explode_scaled_features(train_transformed, pca_features)

    # -----------------------------------------------
    # Step B: Transform VALIDATION set
    # -----------------------------------------------
    val_cleaned = clean_data(val_df)
    val_cleaned = log_normalize(val_cleaned, "Amount")
    val_transformed = pipeline_model.transform(val_cleaned)
    val_transformed = explode_scaled_features(val_transformed, pca_features)

    # -----------------------------------------------
    # Step C: Transform TEST set
    # -----------------------------------------------
    test_cleaned = clean_data(test_df)
    test_cleaned = log_normalize(test_cleaned, "Amount")
    test_transformed = pipeline_model.transform(test_cleaned)
    test_transformed = explode_scaled_features(test_transformed, pca_features)

    return train_transformed, val_transformed, test_transformed

# --------------------------------------------------------------------------------
# 6. Save to CSV
# --------------------------------------------------------------------------------
def save_to_csv(df: DataFrame, output_path: str):
    """
    Write Spark DataFrame to CSV (single file).
    Using coalesce(1) for a single CSV output. 
    """
    df.coalesce(1).write.csv(output_path, header=True, mode="overwrite")

# --------------------------------------------------------------------------------
# 7. Putting It All Together
# --------------------------------------------------------------------------------
TRAIN_PATH = "preprocessed_creditcard_train.csv"
VAL_PATH   = "preprocessed_creditcard_validation.csv"
TEST_PATH  = "preprocessed_creditcard_test.csv"

start_time = time.time()

# 7.1 Fit the pipeline on the train set, transform all splits
train_preprocessed, val_preprocessed, test_preprocessed = fit_and_transform_data(
    train_set, val_set, test_set
)

# 7.2 (Optional) Save the results
save_to_csv(train_preprocessed, TRAIN_PATH)
save_to_csv(val_preprocessed, VAL_PATH)
save_to_csv(test_preprocessed, TEST_PATH)

end_time = time.time()
print(f"Preprocessing and save completed in {end_time - start_time:.2f} seconds")

print("Preprocessed data splits saved to CSV files:")
print(f"  Train set: {TRAIN_PATH}")
print(f"  Validation set: {VAL_PATH}")
print(f"  Test set: {TEST_PATH}")

Preprocessing and save completed in 7.51 seconds
Preprocessed data splits saved to CSV files:
  Train set: preprocessed_creditcard_train.csv
  Validation set: preprocessed_creditcard_validation.csv
  Test set: preprocessed_creditcard_test.csv


In [24]:
############################################# cleaner version of Above ####################################

import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, log1p
from pyspark.sql import DataFrame

# ML imports
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.functions import vector_to_array

# --------------------------------------------------------------------------------
# 1. Initialize Spark
# --------------------------------------------------------------------------------

spark = SparkSession.builder \
    .appName("OptimizedStratifiedSplittingWithPreprocessing") \
    .getOrCreate()

# --------------------------------------------------------------------------------
# 2. Load and Shuffle the Dataset
# --------------------------------------------------------------------------------

DATA_PATH = "creditcard.csv"

df = spark.read.csv(DATA_PATH, header=True, inferSchema=True).sample(fraction=1.0, seed=42)

# --------------------------------------------------------------------------------
# 3. Split Data into Train, Validation, and Test
# --------------------------------------------------------------------------------

train_fraction = 0.7
val_fraction   = 0.15
test_fraction  = 0.15

splits = df.randomSplit([train_fraction, val_fraction, test_fraction], seed=42)
train_set, val_set, test_set = splits[0], splits[1], splits[2]

# --------------------------------------------------------------------------------
# 4. Utility Functions
# --------------------------------------------------------------------------------

def clean_data(df: DataFrame) -> DataFrame:
    """Drop duplicates and rows with nulls."""
    return df.dropDuplicates().dropna()

def log_normalize(df: DataFrame, column: str) -> DataFrame:
    """Apply log1p(x) to a given column (e.g. Amount)."""
    return df.withColumn(column, log1p(col(column)))

def build_preprocessing_pipeline(feature_cols):
    """
    Build a Spark ML Pipeline consisting of:
     1. VectorAssembler  -> to combine features into a single 'features' column
     2. StandardScaler   -> to scale/standardize the 'features'
    """
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
    pipeline = Pipeline(stages=[assembler, scaler])
    return pipeline

def explode_scaled_features(df: DataFrame, feature_cols) -> DataFrame:
    """
    Convert 'scaled_features' (Vector) into a Spark array, then
    create columns named exactly the same as feature_cols (e.g. "V1", "V2", ...).
    
    IMPORTANT: We first drop the old columns "V1..V28" so that we don't
    get a 'COLUMN_ALREADY_EXISTS' error when we create new columns
    with those same names.
    """
    # 1. Drop the original unscaled feature columns.
    #    Otherwise, Spark complains that column X already exists.
    for c in feature_cols:
        df = df.drop(c)

    # 2. Convert ML Vector to Spark Array
    df = df.withColumn("scaled_array", vector_to_array(col("scaled_features")))

    # 3. Build the list of columns to SELECT
    #    (keep everything except the columns we no longer need)
    select_exprs = [
        col(c) for c in df.columns 
        if c not in {"scaled_features", "scaled_array", "features"}
    ]

    # 4. Add each scaled feature as a new column with the old name
    select_exprs += [
        col("scaled_array")[i].alias(feature_cols[i]) 
        for i in range(len(feature_cols))
    ]
    
    # 5. Final select
    df = df.select(*select_exprs)
    return df

# --------------------------------------------------------------------------------
# 5. Main Preprocessing / Fitting Flow
# --------------------------------------------------------------------------------

def preprocess_dataset(df: DataFrame, pipeline_model, feature_cols, name: str) -> DataFrame:
    """
    Generalized function to preprocess a dataset.
    Performs cleaning, log-normalization, pipeline transformation, 
    and exploding scaled features into separate columns.

    Args:
        df (DataFrame): Input Spark DataFrame (e.g., train, val, or test).
        pipeline_model: Fitted pipeline model for transforming the dataset.
        feature_cols (list): List of feature column names (e.g., V1 to V28).
        name (str): Name of the dataset (train/val/test) for logging.

    Returns:
        DataFrame: Preprocessed Spark DataFrame.
    """
    print(f"Preprocessing {name} dataset...")
    cleaned = clean_data(df)
    normalized = log_normalize(cleaned, "Amount")
    transformed = pipeline_model.transform(normalized)
    final_df = explode_scaled_features(transformed, feature_cols)
    return final_df

def preprocess_multiple_datasets(datasets: dict, pipeline_model, feature_cols) -> dict:
    """
    Preprocess multiple datasets by calling preprocess_dataset for each.

    Args:
        datasets (dict): A dictionary where keys are dataset names (e.g., "train", "val", "test")
                         and values are the corresponding DataFrames.
        pipeline_model: Fitted pipeline model for transforming the datasets.
        feature_cols (list): List of feature column names (e.g., V1 to V28).

    Returns:
        dict: A dictionary where keys are dataset names and values are preprocessed DataFrames.
    """
    preprocessed_datasets = {}
    for name, df in datasets.items():
        preprocessed_datasets[name] = preprocess_dataset(df, pipeline_model, feature_cols, name)
    return preprocessed_datasets


def fit_and_transform_data(train_df: DataFrame, val_df: DataFrame, test_df: DataFrame) -> (DataFrame, DataFrame, DataFrame):
    """
    1. Clean & log-normalize the training set, then fit the pipeline (assembler + scaler).
    2. Use a general preprocessing function for train, validation, and test sets.
    """
    pca_features = [f"V{i}" for i in range(1, 29)]

    # Preprocess the training set
    train_cleaned = clean_data(train_df)
    train_cleaned = log_normalize(train_cleaned, "Amount")

    # Build and fit the pipeline on train
    pipeline = build_preprocessing_pipeline(pca_features)
    pipeline_model = pipeline.fit(train_cleaned)

    # Define datasets
    datasets = {
        "train": train_df,
        "val": val_df,
        "test": test_df
    }

    # Preprocess all datasets
    preprocessed = preprocess_multiple_datasets(datasets, pipeline_model, pca_features)

    return preprocessed["train"], preprocessed["val"], preprocessed["test"]

# --------------------------------------------------------------------------------
# 6. Save to CSV
# --------------------------------------------------------------------------------

def save_to_csv(df: DataFrame, output_path: str):
    """
    Write Spark DataFrame to CSV (single file).
    Using coalesce(1) for a single CSV output. 
    """
    df.coalesce(1).write.csv(output_path, header=True, mode="overwrite")

# --------------------------------------------------------------------------------
# 7. Putting It All Together
# --------------------------------------------------------------------------------

TRAIN_PATH = "preprocessed_creditcard_train.csv"
VAL_PATH   = "preprocessed_creditcard_validation.csv"
TEST_PATH  = "preprocessed_creditcard_test.csv"

start_time = time.time()

# 7.1 Fit the pipeline on the train set, transform all splits
train_preprocessed, val_preprocessed, test_preprocessed = fit_and_transform_data(
    train_set, val_set, test_set
)

# 7.2 (Optional) Save the results
save_to_csv(train_preprocessed, TRAIN_PATH)
save_to_csv(val_preprocessed, VAL_PATH)
save_to_csv(test_preprocessed, TEST_PATH)

end_time = time.time()
print(f"Preprocessing and save completed in {end_time - start_time:.2f} seconds")

print("Preprocessed data splits saved to CSV files:")
print(f"  Train set: {TRAIN_PATH}")
print(f"  Validation set: {VAL_PATH}")
print(f"  Test set: {TEST_PATH}")

Preprocessing train dataset...
Preprocessing val dataset...
Preprocessing test dataset...
Preprocessing and save completed in 5.51 seconds
Preprocessed data splits saved to CSV files:
  Train set: preprocessed_creditcard_train.csv
  Validation set: preprocessed_creditcard_validation.csv
  Test set: preprocessed_creditcard_test.csv


In [3]:
############################ this version separates out the labels for each set: train, val, and test ############

import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, log1p
from pyspark.sql import DataFrame

# ML imports
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.functions import vector_to_array

# --------------------------------------------------------------------------------
# 1. Initialize Spark
# --------------------------------------------------------------------------------

spark = SparkSession.builder \
    .appName("OptimizedStratifiedSplittingWithPreprocessing") \
    .getOrCreate()

# --------------------------------------------------------------------------------
# 2. Load and Shuffle the Dataset
# --------------------------------------------------------------------------------

DATA_PATH = "creditcard.csv"

df = spark.read.csv(DATA_PATH, header=True, inferSchema=True).sample(fraction=1.0, seed=42)

# --------------------------------------------------------------------------------
# 3. Split Data into Train, Validation, and Test
# --------------------------------------------------------------------------------

train_fraction = 0.8
val_fraction   = 0.1
test_fraction  = 0.1

splits = df.randomSplit([train_fraction, val_fraction, test_fraction], seed=42)
train_set, val_set, test_set = splits[0], splits[1], splits[2]

# --------------------------------------------------------------------------------
# 4. Utility Functions
# --------------------------------------------------------------------------------

def clean_data(df: DataFrame) -> DataFrame:
    """Drop duplicates and rows with nulls."""
    return df.dropDuplicates().dropna()

def log_normalize(df: DataFrame, column: str) -> DataFrame:
    """Apply log1p(x) to a given column (e.g., 'Amount')."""
    return df.withColumn(column, log1p(col(column)))

def build_preprocessing_pipeline(feature_cols):
    """
    Build a Spark ML Pipeline consisting of:
     1. VectorAssembler  -> to combine features into a single 'features' column
     2. StandardScaler   -> to scale/standardize the 'features'
    """
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
    pipeline = Pipeline(stages=[assembler, scaler])
    return pipeline

def explode_scaled_features(df: DataFrame, feature_cols) -> DataFrame:
    """
    Convert 'scaled_features' (Vector) into a Spark array, then
    create columns named exactly the same as feature_cols (e.g. "V1", "V2", ...).

    IMPORTANT: We first drop the old columns "V1..V28" so that we don't
    get a 'COLUMN_ALREADY_EXISTS' error when we create new columns
    with those same names.
    """
    # 1. Drop the original unscaled feature columns.
    for c in feature_cols:
        df = df.drop(c)

    # 2. Convert ML Vector to Spark Array
    df = df.withColumn("scaled_array", vector_to_array(col("scaled_features")))

    # 3. Build the list of columns to SELECT
    select_exprs = [
        col(c) for c in df.columns
        if c not in {"scaled_features", "scaled_array", "features"}
    ]
    # 4. Add each scaled feature as a new column with the old name
    select_exprs += [
        col("scaled_array")[i].alias(feature_cols[i])
        for i in range(len(feature_cols))
    ]

    # 5. Final select
    df = df.select(*select_exprs)
    return df

# --------------------------------------------------------------------------------
# 5. Main Preprocessing / Fitting Flow
# --------------------------------------------------------------------------------

def preprocess_dataset(df: DataFrame, pipeline_model, feature_cols, name: str) -> DataFrame:
    """
    Preprocess a dataset:
      - Clean (drop duplicates, na)
      - Log-normalize 'Amount'
      - Transform via fitted pipeline
      - Explode scaled features
    """
    print(f"Preprocessing {name} dataset...")
    cleaned = clean_data(df)
    normalized = log_normalize(cleaned, "Amount")
    transformed = pipeline_model.transform(normalized)
    final_df = explode_scaled_features(transformed, feature_cols)
    return final_df

def preprocess_multiple_datasets(datasets: dict, pipeline_model, feature_cols) -> dict:
    """
    Apply 'preprocess_dataset' to each dataset in the dictionary.
    """
    preprocessed_datasets = {}
    for name, df in datasets.items():
        preprocessed_datasets[name] = preprocess_dataset(df, pipeline_model, feature_cols, name)
    return preprocessed_datasets

def fit_and_transform_data(train_df: DataFrame, val_df: DataFrame, test_df: DataFrame):
    """
    1. Clean & log-normalize the training set, then fit the pipeline.
    2. Transform train, validation, and test sets.
    3. Return the preprocessed DataFrames.
    """
    pca_features = [f"V{i}" for i in range(1, 29)]

    # -- Fit the pipeline on the TRAIN set --
    train_cleaned = clean_data(train_df)
    train_cleaned = log_normalize(train_cleaned, "Amount")

    pipeline = build_preprocessing_pipeline(pca_features)
    pipeline_model = pipeline.fit(train_cleaned)

    # -- Transform all splits with the fitted model --
    datasets = {
        "train": train_df,
        "val": val_df,
        "test": test_df
    }
    preprocessed = preprocess_multiple_datasets(datasets, pipeline_model, pca_features)

    return preprocessed["train"], preprocessed["val"], preprocessed["test"]

# --------------------------------------------------------------------------------
# 6. Splitting Out Labels (Class) and Saving to CSV
# --------------------------------------------------------------------------------

def save_features_and_labels(df: DataFrame, label_col: str, features_path: str, labels_path: str):
    """
    Given a DataFrame that still contains the label column:
      1. Split into features_df (everything except label_col)
      2. Split into labels_df (only label_col)
      3. Save each to CSV
    """
    # 1) Labels only
    labels_df = df.select(label_col)
    # 2) Features only
    features_df = df.drop(label_col)

    # Save to CSV
    features_df.coalesce(1).write.csv(features_path, header=True, mode="overwrite")
    labels_df.coalesce(1).write.csv(labels_path, header=True, mode="overwrite")

def save_all_splits(
    train_df: DataFrame, val_df: DataFrame, test_df: DataFrame, label_col: str
):
    """
    Save train, val, and test splits as separate features+labels CSV files.
    """
    # You can of course rename the paths as you wish
    TRAIN_FEATURES_PATH = "preprocessed_creditcard_train_features"
    TRAIN_LABELS_PATH   = "preprocessed_creditcard_train_labels"

    VAL_FEATURES_PATH   = "preprocessed_creditcard_validation_features"
    VAL_LABELS_PATH     = "preprocessed_creditcard_validation_labels"

    TEST_FEATURES_PATH  = "preprocessed_creditcard_test_features"
    TEST_LABELS_PATH    = "preprocessed_creditcard_test_labels"

    # -- Train --
    save_features_and_labels(train_df, label_col, TRAIN_FEATURES_PATH, TRAIN_LABELS_PATH)
    # -- Validation --
    save_features_and_labels(val_df, label_col, VAL_FEATURES_PATH, VAL_LABELS_PATH)
    # -- Test --
    save_features_and_labels(test_df, label_col, TEST_FEATURES_PATH, TEST_LABELS_PATH)

    print("Preprocessed data splits (features + labels) saved to CSV:")
    print(f"  Train features: {TRAIN_FEATURES_PATH}, Train labels: {TRAIN_LABELS_PATH}")
    print(f"  Val features:   {VAL_FEATURES_PATH}, Val labels:   {VAL_LABELS_PATH}")
    print(f"  Test features:  {TEST_FEATURES_PATH}, Test labels:  {TEST_LABELS_PATH}")

# --------------------------------------------------------------------------------
# 7. Putting It All Together
# --------------------------------------------------------------------------------

start_time = time.time()

# 7.1 Fit the pipeline on the train set, transform all splits
train_preprocessed, val_preprocessed, test_preprocessed = fit_and_transform_data(
    train_set, val_set, test_set
)

# 7.2 Save Features and Labels Separately
#     (We assume the label is in a column named "Class")
save_all_splits(train_preprocessed, val_preprocessed, test_preprocessed, label_col="Class")

end_time = time.time()
print(f"Preprocessing + saving completed in {end_time - start_time:.2f} seconds")

Preprocessing train dataset...
Preprocessing val dataset...
Preprocessing test dataset...
Preprocessed data splits (features + labels) saved to CSV:
  Train features: preprocessed_creditcard_train_features, Train labels: preprocessed_creditcard_train_labels
  Val features:   preprocessed_creditcard_validation_features, Val labels:   preprocessed_creditcard_validation_labels
  Test features:  preprocessed_creditcard_test_features, Test labels:  preprocessed_creditcard_test_labels
Preprocessing + saving completed in 10.06 seconds


In [4]:
import os

def rename_csv_files(parent_directory):
    """
    Navigate through subdirectories in the parent directory, find the CSV file
    starting with 'part', and rename it to match the name of the subdirectory.

    Args:
        parent_directory (str): Path to the parent directory containing subdirectories.

    Returns:
        None
    """
    # Iterate through all items in the parent directory
    for subdirectory in os.listdir(parent_directory):
        # Construct full path for each subdirectory
        subdirectory_path = os.path.join(parent_directory, subdirectory)
        
        # Check if the path is a directory
        if os.path.isdir(subdirectory_path):
            # Look for files in the subdirectory
            for file_name in os.listdir(subdirectory_path):
                if file_name.startswith("part") and file_name.endswith(".csv"):
                    # Construct old and new file paths
                    old_file_path = os.path.join(subdirectory_path, file_name)
                    new_file_name = f"{subdirectory}.csv"
                    new_file_path = os.path.join(subdirectory_path, new_file_name)
                    
                    # Rename the file
                    os.rename(old_file_path, new_file_path)
                    print(f"Renamed '{old_file_path}' to '{new_file_path}'")
                    break  # Only one file should exist; no need to check further

# Example Usage
parent_dir = "."  # Replace with your actual parent directory path
rename_csv_files(parent_dir)

Renamed './preprocessed_creditcard_validation_features/part-00000-e5780052-74ea-47be-99c3-0d55bd573bda-c000.csv' to './preprocessed_creditcard_validation_features/preprocessed_creditcard_validation_features.csv'
Renamed './preprocessed_creditcard_test_labels/part-00000-440d9179-4251-4d05-8aff-0b6a65dd3e7b-c000.csv' to './preprocessed_creditcard_test_labels/preprocessed_creditcard_test_labels.csv'
Renamed './preprocessed_creditcard_test_features/part-00000-4d58d280-c55e-4171-a1fe-e096bd30bc6f-c000.csv' to './preprocessed_creditcard_test_features/preprocessed_creditcard_test_features.csv'
Renamed './preprocessed_creditcard_train_labels/part-00000-7e88f01c-73f0-4dd3-af50-2a273afef9fc-c000.csv' to './preprocessed_creditcard_train_labels/preprocessed_creditcard_train_labels.csv'
Renamed './preprocessed_creditcard_train_features/part-00000-7056bdb4-10c9-40c5-b912-dddd19f66708-c000.csv' to './preprocessed_creditcard_train_features/preprocessed_creditcard_train_features.csv'
Renamed './preproc

In [34]:
import pandas as pd

train_features = pd.read_csv("./preprocessed_creditcard_train_features/preprocessed_creditcard_train_features.csv")