# Demo: Transform and Inverse Transform Pipeline
This notebook demonstrates the process of transforming and inverse-transforming a dataset using the `DataLoaderTransformer` and `ScalableVGMMNormalizer` pipeline from the `src` folder.

We will:
1. Load the configuration file.
2. Load and preprocess the dataset.
3. Apply the transformation and save the transformed data.
4. Apply the inverse transformation and save the inverse-transformed data.
5. Measure and display the time taken for each step.

In [1]:
import sys
import os
import time
import yaml
import pandas as pd

# Add the src directory to the Python module search path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname("__file__"), "..")))
# Import necessary libraries and set up the environment

In [2]:
from src.data_loader import DataLoaderTransformer  # Importing from src folder
from src.vgmm_normalizer import ScalableVGMMNormalizer  # Importing from src folder

In [3]:
# Define the function to load configuration
def load_config(config_path):
    """
    Load the configuration file from a YAML path.

    Parameters:
        config_path (str): Path to the YAML configuration file.

    Returns:
        dict: Parsed configuration dictionary.
    """
    with open(config_path, "r") as file:
        config = yaml.safe_load(file)
    return config

# Specify the path to your configuration file
config_path = "../config/demo.yaml"  # Adjust this path to your actual config file location

# Load the configuration
config = load_config(config_path)
data_loader_config = config["DataLoaderTransformer"]

# Display the loaded configuration
print("Loaded Configuration:")
print(config)

Loaded Configuration:
{'DataLoaderTransformer': {'input_path': '../data/input_50K.csv', 'output_parquet_path': '../data/input_converted_50K.parquet', 'n_workers': 1, 'threads_per_worker': 2, 'memory_limit': '6GB', 'spill_dir': './tmp/dask-spill', 'target_partitions': 10}, 'ScalableVGMMNormalizer': {'n_clusters': 10, 'eps': 0.005, 'weight_concentration_prior_type': 'dirichlet_process', 'weight_concentration_prior': 0.001, 'max_iter': 100, 'n_init': 1, 'random_state': 42}, 'continuous_columns': ['Amount']}


## Initialize DataLoaderTransformer
The `DataLoaderTransformer` is responsible for loading, preprocessing, and managing data transformations. In this step, we initialize it with the configuration settings.

In [4]:
# Initialize the DataLoaderTransformer
data_loader = DataLoaderTransformer(config=data_loader_config)

if data_loader.client:
    print(f"Dask Dashboard is running at: {data_loader.client.dashboard_link}")

Dask client initialized with 1 workers, 2 threads per worker, memory limit 6GB, and spill-to-disk at ./tmp/dask-spill.
Dask Dashboard is running at: http://127.0.0.1:8787/status


## Measure Time and Perform the Pipeline Steps
We will now perform the following steps:
1. Load and preprocess the dataset.
2. Apply the transformation and save the transformed data.
3. Apply the inverse transformation and save the inverse-transformed data.

We will measure the time taken for each step and display it at the end.

In [5]:
# Dictionary to store timing for each step
timing = {}

# Step 1: Load and preprocess data
start_time = time.time()
df = data_loader.load_and_convert_data()
df = data_loader.apply_transformations(df)
timing["Load and Preprocess"] = time.time() - start_time

CSV converted to Parquet and saved at ../data/input_converted_50K.parquet
Initial transformations applied to the DataFrame.


In [6]:
# Step 2: Apply VGMM Transformation
start_time = time.time()
continuous_columns = config.get("continuous_columns")
transformed_df, normalizers = data_loader.apply_vgmm_transformation(df, continuous_columns)
timing["Transformation"] = time.time() - start_time

In [7]:
# Utility function to save output files
def save_output(df, output_dir, filename_base):
    """
    Save a Dask DataFrame in both CSV and Parquet formats in a specified directory.

    Parameters:
        df (dd.DataFrame): Dask DataFrame to save.
        output_dir (str): Directory where to save the output files.
        filename_base (str): Base name of the output files (without extension).
    """
    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Save to CSV
    csv_path = os.path.join(output_dir, f"{filename_base}.csv")
    df.compute().to_csv(csv_path, index=False)
    print(f"Saved CSV to {csv_path}")

    # Save to Parquet
    parquet_path = os.path.join(output_dir, f"{filename_base}.parquet")
    df.to_parquet(parquet_path, engine="pyarrow", write_index=False)
    print(f"Saved Parquet to {parquet_path}")

# Step 3: Save transformed data
start_time = time.time()
input_dir = os.path.dirname(data_loader_config["input_path"])
output_dir = os.path.join(input_dir, "output")
save_output(transformed_df, output_dir, "transformed_data")
timing["Save Transformed Data"] = time.time() - start_time

Saved CSV to ../data/output/transformed_data.csv
Saved Parquet to ../data/output/transformed_data.parquet


In [8]:
# Step 4: Apply Inverse Transformation
start_time = time.time()
inverse_transformed_df = data_loader.apply_inverse_transformation(
    transformed_df, continuous_columns, normalizers
)
inverse_transformed_only = inverse_transformed_df[[f"{col}_inverse_transformed" for col in continuous_columns]]
timing["Inverse Transformation"] = time.time() - start_time

In [9]:
# Step 5: Save inverse-transformed data
start_time = time.time()
save_output(inverse_transformed_only, output_dir, "inverse_transformed_data")
timing["Save Inverse Transformed Data"] = time.time() - start_time

Saved CSV to ../data/output/inverse_transformed_data.csv
Saved Parquet to ../data/output/inverse_transformed_data.parquet


## Display Timing Results
Finally, we display the time taken for each step in the pipeline. This helps in understanding the performance of the pipeline and identifying any bottlenecks.

In [10]:
# Display timing results
timing_df = pd.DataFrame.from_dict(timing, orient="index", columns=["Time (s)"])
print("\nTiming Results:")
print(timing_df)


Timing Results:
                               Time (s)
Load and Preprocess            0.348949
Transformation                 0.098540
Save Transformed Data          1.706664
Inverse Transformation         0.003953
Save Inverse Transformed Data  1.407375


In [11]:
pd.read_csv("../data/output/transformed_data.csv").head()


Unnamed: 0,Amount,Amount_transformed,Amount_prob_0,Amount_prob_1,Amount_prob_2
0,14.61,-0.062408,1.0,0.0,0.0
1,1.0,-0.169768,1.0,0.0,0.0
2,197.04,0.037085,0.0,1.0,0.0
3,1.0,-0.169768,1.0,0.0,0.0
4,23.25,0.005746,1.0,0.0,0.0


In [12]:
pd.read_csv("../data/output/inverse_transformed_data.csv").head()

Unnamed: 0,Amount_inverse_transformed
0,14.61
1,1.0
2,197.04
3,1.0
4,23.25


In [13]:
pd.read_csv(config["DataLoaderTransformer"]["input_path"]).head()

Unnamed: 0,Amount
0,14.61
1,1.0
2,197.04
3,1.0
4,23.25
