In [None]:
import dask.dataframe as dd
from dask.distributed import Client
import numpy as np
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import KBinsDiscretizer
import torch
import torch.nn as nn
from google.colab import drive
import gc

client = Client()

drive.mount('/content/drive', force_remount=True)

input_path = '/content/drive/My Drive/BB_Normalized_Monthly_Final/combined_data_minus_183.parquet'

data = dd.read_parquet(input_path)

#identify numeric columns
#target for discretization
numeric_cols = data.select_dtypes(include=[np.number]).columns.tolist()

#apply imputation within each partition
def apply_imputation(df):
    imputer = SimpleImputer(strategy='mean')
    return pd.DataFrame(imputer.fit_transform(df), columns=df.columns)

#Dask imputation to handle missing values
data[numeric_cols] = data[numeric_cols].map_partitions(apply_imputation, meta=data[numeric_cols])

data = data.compute()

#discretization
#10 bins
discretizer = KBinsDiscretizer(n_bins=10, encode='ordinal', strategy='uniform')
data[numeric_cols] = discretizer.fit_transform(data[numeric_cols])

#PyTorch tensor conversion for embedding
data_tensor = torch.tensor(data[numeric_cols].values, dtype=torch.long)

#embedding layer
class DataEmbedder(nn.Module):
    def __init__(self, num_features, embedding_dim):
        super(DataEmbedder, self).__init__()
        self.embeddings = nn.ModuleList([nn.Embedding(10, embedding_dim) for _ in range(num_features)])

    def forward(self, x):
        embedded_features = [self.embeddings[i](x[:, i]) for i in range(len(self.embeddings))]
        return torch.cat(embedded_features, dim=1)

#embedder model initialization
num_features = len(numeric_cols)
embedding_dim = 10
embedder = DataEmbedder(num_features, embedding_dim)
embedded_data = embedder(data_tensor)

# Save the final vectorized data back to Google Drive in Parquet format
output_path = '/content/drive/My Drive/BB_Normalized_Monthly_Final/vectorized_combined_data_minus_183.parquet'
pd.DataFrame(data).to_parquet(output_path)

print("Final data has been successfully saved to:", output_path)

#clean up resources to free memory
client.close()
gc.collect()


Perhaps you already have a cluster running?
Hosting the HTTP server on port 44599 instead
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:42025
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:44599/status
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:41471'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:41839'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:34039'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:46201'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:38207', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:38207
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:50308
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:41215', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.schedul

Mounted at /content/drive


INFO:distributed.core:Event loop was unresponsive in Nanny for 3.35s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
INFO:distributed.core:Event loop was unresponsive in Nanny for 3.19s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
INFO:distributed.core:Event loop was unresponsive in Scheduler for 3.19s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
INFO:distributed.core:Event loop was unresponsive in Nanny for 3.37s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
INFO:distributed.core:Event loop was unresponsive in Nanny for 3.26s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeout