In [1]:
!pip install dask_cuda



In [1]:
import os
import sys
cwd = os.getcwd()
sys.path.append(cwd)

In [2]:
import processor
import numpy as np
import tensorflow as tf
import random
import time

import synthetic_data_generator
import evaluation

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

In [3]:
cluster = LocalCUDACluster()
client = Client(cluster)

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:37101
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:37365'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:34523', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:34523
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:42708
INFO:distributed.scheduler:Receive client connection: Client-a6c075b5-aef9-11ef-a27c-0242ac1c000c
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:42714


In [4]:
os.makedirs("actual_data", exist_ok=True)
os.makedirs("transformed_data", exist_ok=True)
os.makedirs("result_data", exist_ok=True)

In [5]:
def set_global_seed(seed):
    np.random.seed(seed)
    tf.random.set_seed(seed)
    random.seed(seed)

set_global_seed(42)

In [6]:
start_time = time.time()
existing_data_path = 'Credit.csv'
column_name = 'Amount'
output_dir = 'synthetic_data_1B'
target_count = 1_000_000_000

if not os.path.exists(output_dir):
    os.makedirs(output_dir)

if not any(os.scandir(output_dir)):
    print("Output directory is empty. Generating synthetic data...")
    synthetic_data_generator.generate_synthetic_data(existing_data_path,
                                                 column_name, output_dir,target_count)
    print(f"Synthetic data generated and stored in: {output_dir}")
else:
    print(f"Output directory '{output_dir}' is not empty. Skipping synthetic data generation.")
elapsed_time = (time.time() - start_time) / 60
print(f"Time taken: {elapsed_time:.2f} minutes")

Output directory is empty. Generating synthetic data...
Synthetic data generated and stored in: synthetic_data_1B
Time taken: 1.41 minutes


In [7]:
#Read the Files
ddf = processor.read_input_data(output_dir,column_name)

In [8]:
start_time = time.time()
for i, partition in enumerate(ddf.to_delayed()):
    processor.process_partition(partition.compute(), i)
elapsed_time = (time.time() - start_time) / 60
print(f"Time taken: {elapsed_time:.2f} minutes")

Time taken: 6.51 minutes


In [9]:
start_time = time.time()
actual_folder = "actual_data"
result_folder = "result_data"
errors = evaluation.calculate_errors(actual_folder, result_folder, metric="rmse")
mean_error = np.mean(errors)
print("Mean Error : ", mean_error)
elapsed_time = (time.time() - start_time) / 60
print(f"Time taken: {elapsed_time:.2f} minutes")

Mean Error :  123.27544623745658
Time taken: 0.80 minutes


**Time Taken 1B records** :


*   Generating a data  : 1 minute 41 seconds(One-time activity)
*   Data Transformation and inv transformation of GMM : 6 minute 51 seconds
*   Getting error metrics : 80 Seconds


**Total execution time 1B records**:

*   First Time : ~9 minutes 52 seconds
*   Thereafter :  ~8 minutes 11 seconds







