In [None]:

import torch
import numpy as np
import torch.nn as nn
from torch.optim import Adam
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import ast
import os
import argparse
import sys


In [None]:

base_path = os.path.abspath(os.path.join(os.getcwd(), ".."))
sys.path.append(base_path)

print("Base path added to sys.path:", base_path)

In [None]:

# hyperparametr
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dropout_prob = 0.2
test_batch_size = 1 # must be fixed to 1


In [None]:

def get_args(custom_args=None):
    """
    make parser to get parameters
    """

    parser = argparse.ArgumentParser(
        prog='VAE',
        description='using VAE for 768 dimensional data')
    
    parser.add_argument('--filename', type = str, default = 'prospectus_investment_objective.txt', help='source txt file')
    parser.add_argument('--model_savefolder', type = str, default = 'model/state', help='best model save folder')
    
    

    # Parse arguments
    if custom_args:
        return parser.parse_args(custom_args)
    else:
        return parser.parse_args()




In [None]:

class CustomDataset(Dataset):
    
    def __init__(self, dataframe):
        
        self.vector = dataframe[4]
        self.id = dataframe[0]
        self.name = dataframe[1]
        self.type_no = dataframe[2]
        self.sentence = dataframe[3]
        
    def __getitem__(self, index):
        
        vector_data = np.array(ast.literal_eval(self.vector[index])).astype(np.float32)
        criteria_no = int(self.type_no[index])
        return vector_data, self.id[index], self.name[index], criteria_no, self.sentence[index]

    def __len__(self):
        
        return len(self.vector)



In [None]:
     
class VAE(nn.Module):

    def __init__(self, input_dim=768, hidden_dim=[600, 500, 400, 300, 200, 100, 50], latent_dim = 2, device=device):
        
        super(VAE, self).__init__()

        # encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim[0]),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(hidden_dim[0], hidden_dim[1]),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(hidden_dim[1], hidden_dim[2]),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(hidden_dim[2], hidden_dim[3]),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(hidden_dim[3], hidden_dim[4]),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(hidden_dim[4], hidden_dim[5]),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(hidden_dim[5], hidden_dim[6]),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(hidden_dim[6], latent_dim),
            nn.LeakyReLU(0.2)
            )
        
        # latent mean and variance 
        self.mean_layer = nn.Linear(latent_dim, 2)
        self.logvar_layer = nn.Linear(latent_dim, 2)
        
        # decoder
        self.decoder = nn.Sequential(
            nn.Linear(2, latent_dim),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(latent_dim, hidden_dim[6]),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(hidden_dim[6], hidden_dim[5]),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(hidden_dim[5], hidden_dim[4]),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(hidden_dim[4], hidden_dim[3]),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(hidden_dim[3], hidden_dim[2]),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(hidden_dim[2], hidden_dim[1]),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(hidden_dim[1], hidden_dim[0]),
            nn.LeakyReLU(0.2),
            nn.Dropout(dropout_prob), 
            nn.Linear(hidden_dim[0], input_dim),
            nn.Sigmoid()
            )
     
    def encode(self, x):
        x = self.encoder(x)
        mean, logvar = self.mean_layer(x), self.logvar_layer(x)
        return mean, logvar, x

    def reparameterization(self, mean, var):
        epsilon = torch.randn_like(var).to(device)      
        z = mean + var*epsilon
        return z

    def decode(self, x):
        return self.decoder(x)

    def forward(self, x):
        mean, logvar, _ = self.encode(x)
        z = self.reparameterization(mean, logvar)
        x_hat = self.decode(z)
        return x_hat, mean, logvar
        


In [None]:



def inference(args, device, test_loader, output_file, x_dim=768):
    
    # loading model
    model_path = os.path.join(base_path, args.model_savefolder, 'vae_model_best.pth') 
    print('loading model for inference: ', model_path)
    model = torch.load(model_path)
    model.eval()
    inference_size = 1
    
    for batch_idx, xf in enumerate(test_loader):
     
        x = xf[0]
        x = x.view(inference_size, x_dim).to(device)
        
        mean, logvar, output_vector = model.encode(x)
       
        
        str_output = str(output_vector.detach().cpu().numpy().flatten())
        # print(str_output)
        
        test_id = ' '.join(str(value) for value in xf[1])
        # print(test_id)
        
        test_name = ' '.join(str(value) for value in xf[2])
        # print(test_name)
        
        test_criteria_no = str(xf[3].item())
        # print(test_criteria_no)
        
        test_sentence = ' '.join(str(value) for value in xf[4])
        # print(test_sentence)
        
        write_str = test_id + '\t' + test_name + '\t' + test_criteria_no + '\t' + str_output + '\t' + test_sentence 
        print(write_str)
        
        # output_filename = os.path.join(args.inference_output_folder, args.compressed_filename)
        
        
        with open(output_file, 'a', encoding='utf-8') as file:
            file.write(write_str + '\n')
        
    return output_vector


In [None]:

def main():
    
    # args
    custom_args = ['--filename', 'data/prospectus_investment_objective.txt', '--model_savefolder', 'model/state']  
    args = get_args(custom_args)

    basedir = os.path.dirname(args.filename)
    print('basedir: ', basedir)
    basename = os.path.basename(args.filename)
    
    output_file = os.path.join(base_path, basedir, "compressed_" + basename)
    

    # Remove the output file if it exists.
    if os.path.exists(output_file):
        os.remove(output_file)

    
    df = pd.read_csv(os.path.join(base_path, args.filename), sep="\t", header=None) 
    dataset = CustomDataset(dataframe=df)
    data_loader = DataLoader(dataset=dataset, batch_size=1, shuffle=False)
    
    inference(args, device, data_loader, output_file)
    print('inference finished')

    return
if __name__ == '__main__':
    main()
