In [1]:
import sys
import glob

sys.path.insert(0, '/home/eharper/github_projects/ericharper/dataloaders/')

In [2]:
# from batch_dataloader import BatchDataLoader
# from batch_dataset import BatchDataset
# from batch_dataset import TensorBatchDataset

from pytorch.batch_dataloader.parquet_batch_dataset import ParquetBatchDataset
from pytorch.batch_dataloader.parquet_batch_dataloader import ParquetBatchDataLoader

import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import time

In [3]:
TRAIN_PARQUET = '/raid/data/handset_data/train_parquet_swapped'
TEST_PARQUET = '/raid/data/handset_data/test_parquet_swapped'

NUMERIC_COLS = ['main_Device_ScreenResolution_Width', 'main_Device_ScreenResolution_Height', 'main_Device_Memory',
                'main_Device_Storage', 'main_QOS_Location_Latitude', 'main_QOS_Location_Longitude', 
                'main_QOS_Location_Altitude', 'main_QOS_Location_HorizontalAccuracy', 'main_QOS_Location_VerticalAccuracy',
                'main_QOS_Jitter_Average', 'main_QOS_PacketLoss_LostPercentage', 'main_QOS_Velocity_Speed', 
                'main_QOS_Velocity_Bearing', 'main_QOS_DeviceFreeMemory', 'main_QOS_DeviceCPU', 
                'main_QOS_DeviceBatteryLevel', 'main_QOS_DeviceFreeStorage', 'main_QOS_DeltaTransmittedBytes',
                'main_QOS_DeltaReceivedBytes', 'main_QOS_SystemUptime', 'main_QOS_DeviceUsedStorage', 'main_QOS_DeviceUsedMemory']

MAX_ORDS = {"main_ConnectionType": 3, "main_ConnectionTechnology": 13, "main_ServiceProvider": 979, "main_Device_Manufacturer": 465, "main_Device_OS": 28, "main_Device_DeviceLanguage": 54, "main_Region": 53, "conn_Generation_Category": 3, "sp_ServiceProviderBrandName": 76, "main_QOS_DeviceBatteryState": 4}

In [4]:
cat_card_dict = {k:v+1 for (k,v) in MAX_ORDS.items()}

In [5]:
def compute_embedding_dims(cat_card_dict):
    cat_embedding_dict = {k:max(int(np.ceil(v / 100.0)), 2) for (k,v) in cat_card_dict.items()} 
    return cat_embedding_dict 

In [6]:
cat_embedding_dict = compute_embedding_dims(cat_card_dict)

In [7]:
LATENT_DIM = 10
#NUM_FILES_PER_DATASET = 200
#NUM_FILES_PER_DATASET = 20
NUM_FILES_PER_DATASET = 500

In [8]:
train_parquet_filepaths = glob.glob(TRAIN_PARQUET + '/*.parquet')
test_parquet_filepaths = glob.glob(TEST_PARQUET + '/*.parquet')

In [9]:
class Autoencoder(nn.Module):
    def __init__(self, cont_name_list, cat_card_dict, cat_embedding_dict, latent_dim):
        super(Autoencoder, self).__init__()
        
        self.cont_name_list = cont_name_list
        self.cat_card_dict = cat_card_dict
        self.cat_embedding_dict = cat_embedding_dict
        self.latent_dim = latent_dim
        
        
        self.embedding_layers = nn.ModuleList(
            [nn.Embedding(self.cat_card_dict[cat], self.cat_embedding_dict[cat])
             for cat in self.cat_card_dict.keys()]
        )
        
        for idx, layer in enumerate(self.embedding_layers):
            print((idx, layer))
        
        self.encoder_input_dim = len(self.cont_name_list) + sum(self.cat_embedding_dict.values())
            
        self.decoder_out_dim = len(self.cont_name_list) + sum(self.cat_card_dict.values()) + len(cat_card_dict)
        
        self.encoder = nn.Sequential(
            nn.Linear(self.encoder_input_dim, 96),
            nn.Tanh(),
            nn.Dropout(.1),
            nn.Linear(96, 64),
            nn.Tanh(),
            nn.Dropout(.1),
            nn.Linear(64, 48),
            nn.Tanh(),
            nn.Dropout(.1),
            nn.Linear(48, 16),
            nn.Tanh(),
            nn.Dropout(.1),
            nn.Linear(16, self.latent_dim),
            nn.ReLU()
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(self.latent_dim, 16),
            nn.Tanh(),
            nn.Dropout(.1),
            nn.Linear(16, 48),
            nn.Tanh(),
            nn.Dropout(.1),
            nn.Linear(48, 64),
            nn.Tanh(),
            nn.Dropout(.1),
            nn.Linear(64, 96),
            nn.Tanh(),
            nn.Dropout(.1),
            nn.Linear(96, self.decoder_out_dim),
            #nn.ReLU()
            #nn.Sigmoid()
        )
        
    def forward(self, cat_array, cont_array):
        #print(cat_array.shape)
        embedded = [embedding_layer(cat_array[:, idx])
                    for idx, embedding_layer in enumerate(self.embedding_layers)] 
            
        embedded = torch.cat(embedded, 1)
        
        encoder_input = torch.cat([embedded, cont_array], 1)
        encoded = self.encoder(encoder_input)
        decoded = self.decoder(encoded)
        
        return decoded, encoded

In [10]:
class MCE_MSE_Loss(torch.nn.Module):
    def __init__(self, cont_name_list, cat_card_dict, cat_embedding_dict):
        super().__init__()
        self.cont_name_list = cont_name_list
        self.cat_card_dict = cat_card_dict
        
        self.decoder_cat_indices = [0]
        for idx, cat_card in enumerate(self.cat_card_dict.values()):
            self.decoder_cat_indices += [self.decoder_cat_indices[idx] + cat_card]
        
        self.decoder_cont_idx = -len(cont_name_list)
        
    def forward(self, decoder_out, cat_array, cont_array):
        #cat_loss = sum([F.cross_entropy(decoder_out[:, decoder_cat_idx:self.decoder_cat_indices[cat_idx + 1]], cat_array[:, cat_idx])
        #                for cat_idx, decoder_cat_idx in enumerate(self.decoder_cat_indices[0:-1])])
        cat_loss = 0
        for cat_idx, decoder_cat_idx in enumerate(self.decoder_cat_indices[0:-1]):
            cat_loss += F.cross_entropy(decoder_out[:, decoder_cat_idx:self.decoder_cat_indices[cat_idx + 1]], cat_array[:, cat_idx])
        
        cont_loss = F.mse_loss(decoder_out[:, self.decoder_cont_idx:], cont_array)
        
        w = sum(self.cat_card_dict.values())
        
        loss = (cat_loss / w) + cont_loss
        
        return loss

In [11]:
device = torch.device("cuda:3")
autoencoder = Autoencoder(NUMERIC_COLS, cat_card_dict, cat_embedding_dict, LATENT_DIM)
autoencoder = autoencoder.to(device)

(0, Embedding(4, 2))
(1, Embedding(14, 2))
(2, Embedding(980, 10))
(3, Embedding(466, 5))
(4, Embedding(29, 2))
(5, Embedding(55, 2))
(6, Embedding(54, 2))
(7, Embedding(4, 2))
(8, Embedding(77, 2))
(9, Embedding(5, 2))


In [12]:
criterion = MCE_MSE_Loss(NUMERIC_COLS, cat_card_dict, cat_embedding_dict)
optimizer = optim.Adam(autoencoder.parameters(), lr=0.0001)

In [13]:
%%time
train_dataset = ParquetBatchDataset(train_parquet_filepaths,
                                    num_files_per_chunk=NUM_FILES_PER_DATASET,
                                    batch_size=300000)
total_train_samples = sum(train_dataset.compute_samples_per_chunk())
print('Total Samples:', total_train_samples)
train_data_loader = ParquetBatchDataLoader(train_dataset, total_train_samples,
                                           shuffle=True, drop_last=False)

print('Total batches:', len(train_data_loader))

Total Samples: 272978846
Total batches: 910
CPU times: user 1min 34s, sys: 24.9 s, total: 1min 59s
Wall time: 9.33 s


In [14]:
#for batch_idx, batch in enumerate(train_data_loader):
#    print(batch[0][0:10])
    #print('Batch Idx: %d | Batch Shape: (%d, %d)' % (batch_idx, batch[0].shape[0], batch[0].shape[1]))

In [15]:
autoencoder.train()

num_epochs = 1

training_start_time = time.time()

total_batch_time = 0

for epoch in range(num_epochs):
    
    for batch_idx, batch in enumerate(train_data_loader):
        batch_train_start = time.time()
        
        batch = batch[0]
        
        num_batch_rows = batch.shape[0]

        cat_array = batch[:, 0:len(cat_card_dict)].long()
        cont_array = batch[:, -len(NUMERIC_COLS):].float()
        cat_array = cat_array.to(device)
        cont_array = cont_array.to(device)

        decoder_out, encoder_out = autoencoder(cat_array, cont_array)

        loss = criterion(decoder_out, cat_array, cont_array)
        loss.backward()
        optimizer.step()

        # print statistics
        batch_train_duration = time.time() - batch_train_start
        total_batch_time += batch_train_duration
        rows_per_second = num_batch_rows / batch_train_duration
        if batch_idx % 100 == 0:
            print('Rows per second:', rows_per_second)
            print('Epoch %d | Batch %d | Loss %.4f' %
                  (epoch, batch_idx, loss.item())
                 )

total_training_time = time.time() - training_start_time
print('Total Training Duration:', total_training_time) 
print('Total Batch Duration:', total_batch_time)
print('Data loading proportion:', (total_training_time - total_batch_time) / total_training_time)
    
torch.save(autoencoder.state_dict(), "./autoencoder.pt")

Loading chunk 0 from disk.
Rows per second: 220918.80251093893
Epoch 0 | Batch 0 | Loss 0.1359
Rows per second: 333599.4795143619
Epoch 0 | Batch 100 | Loss 0.0495
Rows per second: 332132.0397083088
Epoch 0 | Batch 200 | Loss 0.0533
Rows per second: 334225.15559255326
Epoch 0 | Batch 300 | Loss 0.0454
Rows per second: 331593.7562011207
Epoch 0 | Batch 400 | Loss 0.0466
Loading chunk 1 from disk.
Rows per second: 332038.4368250433
Epoch 0 | Batch 500 | Loss 0.0492
Rows per second: 333757.0731238931
Epoch 0 | Batch 600 | Loss 0.0455
Rows per second: 332505.9251897395
Epoch 0 | Batch 700 | Loss 0.0423
Rows per second: 333523.87682405015
Epoch 0 | Batch 800 | Loss 0.0404
Rows per second: 331089.96800642874
Epoch 0 | Batch 900 | Loss 0.0414
Total Training Duration: 868.1286525726318
Total Batch Duration: 818.3920609951019
Data loading proportion: 0.05729172908893098


In [16]:
%%time
test_dataset = ParquetBatchDataset(test_parquet_filepaths,
                                    num_files_per_chunk=NUM_FILES_PER_DATASET,
                                    batch_size=300000)
total_test_samples = sum(test_dataset.compute_samples_per_chunk())
print('Total Samples:', total_test_samples)
test_data_loader = ParquetBatchDataLoader(test_dataset, total_test_samples,
                                           shuffle=False, drop_last=False)

print('Total batches:', len(test_data_loader))

Total Samples: 68239502
Total batches: 228
CPU times: user 24 s, sys: 5.92 s, total: 29.9 s
Wall time: 2.28 s


In [17]:
# testing
device = torch.device("cuda:3")
autoencoder = Autoencoder(NUMERIC_COLS, cat_card_dict, cat_embedding_dict, LATENT_DIM)
autoencoder.load_state_dict(torch.load("./autoencoder.pt"))
autoencoder = autoencoder.to(device)
autoencoder.eval()
criterion = MCE_MSE_Loss(NUMERIC_COLS, cat_card_dict, cat_embedding_dict)

test_start = time.time()

for batch_idx, batch in enumerate(test_data_loader):
    batch_start = time.time()
    batch = batch[0]
    num_rows = batch.shape[0]

    cat_array = batch[:, 0:len(cat_card_dict)].long()
    cont_array = batch[:, -len(NUMERIC_COLS):].float()
    
    # TODO: fix preprocessing - ordinal encoder may encode -1's on test data
    cat_array = cat_array.abs()
    
    cat_array = cat_array.to(device)
    cont_array = cont_array.to(device)

    decoder_out, encoder_out = autoencoder(cat_array, cont_array)

    loss = criterion(decoder_out, cat_array, cont_array)

    # print statistics
    batch_duration = time.time() - batch_start
    inferences_per_second = num_rows / batch_duration
    if batch_idx % 100 == 0:
        print('Inferences per second:', inferences_per_second)
        print('Batch %d | Loss %.4f' % (batch_idx, loss.item())
             )

print("Testing duration:", time.time() - test_start)

(0, Embedding(4, 2))
(1, Embedding(14, 2))
(2, Embedding(980, 10))
(3, Embedding(466, 5))
(4, Embedding(29, 2))
(5, Embedding(55, 2))
(6, Embedding(54, 2))
(7, Embedding(4, 2))
(8, Embedding(77, 2))
(9, Embedding(5, 2))
Loading chunk 0 from disk.
Inferences per second: 2538115.774228555
Batch 0 | Loss 0.0364
Inferences per second: 2805911.119138887
Batch 100 | Loss 0.0362
Inferences per second: 2828758.5343317874
Batch 200 | Loss 0.0362
Testing duration: 28.2793128490448
