In [None]:
#%pip install pandas==2.2.0

In [None]:
import sys
import os
import numpy as np
import pandas as pd
sys.path.append(os.path.abspath('../..')) 

os.chdir('../..')
print("New working directory:", os.getcwd())


In [None]:
import torch
import torch.nn as nn
from torch.optim.adam import Adam
from torch.utils.data import Dataset, DataLoader, random_split
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device


In [3]:
csv_file = "./data/tmp/btcusd_1-min_data.csv"
class BitcoinDataset(Dataset):
    
    def __init__(self, csv_file):
        """
        Args:
            csv_file (str): Path to the CSV file.
        """
        # Load data from CSV, select columns and drop null values
        self.dataframe = pd.read_csv(csv_file)[['High','Low','Open','Close']].dropna()

        # Extract the features for easier manipulation
        self.features = self.dataframe.values

        # Calculate mean and std for normalization
        self.mean = self.features.mean(axis=0)
        self.std = self.features.std(axis=0)

        # Apply normalization to features
        self.features = (self.features - self.mean) / self.std

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        
        # Return the item at index idx in the form of tensor
        features = torch.tensor(self.features[idx], dtype=torch.float32).to(device)
        
        return features


# Create an instance of BitcoinDataset and store in variable
dataset = BitcoinDataset(csv_file)


In [4]:
# Define batchsize and input dimensions
batch_size = 64
input_dim = dataset.features.shape[1]

# Split dataset into train and test in the ratio of 80:20
train_dataset, test_dataset = random_split(dataset, [0.8,0.2])

# Use DataLoader for batching and shuffling
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)



In [5]:
class VAE(nn.Module):

    def __init__(self, input_dim=4, hidden_dim=40, latent_dim=3, device=device):
        super(VAE, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim, latent_dim),
            nn.LeakyReLU(0.2)
            )
        
        # Latent mean and variance 
        self.mean_layer = nn.Linear(latent_dim, 1)
        self.logvar_layer = nn.Linear(latent_dim, 1)
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(1, latent_dim),
            nn.LeakyReLU(0.2),
            nn.Linear(latent_dim, hidden_dim),
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim, input_dim)
            )
     
    # Encode function
    def encode(self, x):
        x = self.encoder(x)
        mean, log_var = self.mean_layer(x), self.logvar_layer(x)
        return mean, log_var
    
    # Add Reparameterization
    def reparameterization(self, mean, var):
        epsilon = torch.randn_like(var).to(device)      
        z = mean + var*epsilon
        return z

    # Decode function
    def decode(self, x):
        return self.decoder(x)

    # Forward Function
    def forward(self, x):
        mean, log_var = self.encode(x)
        z = self.reparameterization(mean, log_var)
        x_hat = self.decode(z)
        return x_hat, mean, log_var
    
    # Reconstruct input from compressed form
    def reconstruction(self, mean, log_var):
        z = self.reparameterization(mean, log_var)
        x_hat = self.decode(z)
        return x_hat

In [6]:
class VAE(nn.Module):

    def __init__(self, input_dim=4, hidden_dim=40, latent_dim=3, device=device):
        super(VAE, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim, latent_dim),
            nn.LeakyReLU(0.2)
            )
        
        # Latent mean and variance 
        self.mean_layer = nn.Linear(latent_dim, 1)
        self.logvar_layer = nn.Linear(latent_dim, 1)
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(1, latent_dim),
            nn.LeakyReLU(0.2),
            nn.Linear(latent_dim, hidden_dim),
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim, input_dim)
            )
     
    # Encode function
    def encode(self, x):
        x = self.encoder(x)
        mean, log_var = self.mean_layer(x), self.logvar_layer(x)
        return mean, log_var
    
    # Add Reparameterization
    def reparameterization(self, mean, var):
        epsilon = torch.randn_like(var).to(device)      
        z = mean + var*epsilon
        return z

    # Decode function
    def decode(self, x):
        return self.decoder(x)

    # Forward Function
    def forward(self, x):
        mean, log_var = self.encode(x)
        z = self.reparameterization(mean, log_var)
        x_hat = self.decode(z)
        return x_hat, mean, log_var
    
    # Reconstruct input from compressed form
    def reconstruction(self, mean, log_var):
        z = self.reparameterization(mean, log_var)
        x_hat = self.decode(z)
        return x_hat

In [7]:
def loss_function(x, x_hat, mean, log_var):
    # Reproduction Loss
    reproduction_loss = nn.functional.mse_loss(x_hat, x)
    
    # KL Divergence Loss
    KLD = - 0.5 * torch.sum(1+ log_var - mean.pow(2) - log_var.exp())
    return reproduction_loss + KLD

# VAE Model created and stored in device
model = VAE().to(device)

# Optimizer defined
optimizer = Adam(model.parameters(), lr=1e-3)

In [None]:
def train(model, optimizer, epochs, device):
    
    # Set model to training mode
    model.train()
    
    # Loop for each epoch
    for epoch in range(epochs):
        overall_loss = 0
        
        # Iterate over the batches formed by DataLoader
        for batch_idx, x in enumerate(train_dataloader):
            x = x.to(device)
            
            # Reset Gradient
            optimizer.zero_grad()
            x_hat, mean, log_var = model(x)
            
            # Calculate batch loss and then overall loss
            loss = loss_function(x, x_hat, mean, log_var)
            overall_loss += loss.item()
            
            # Backpropagate the loss and train the optimizer
            loss.backward()
            optimizer.step()

        print("\tEpoch", epoch + 1, "\tAverage Loss: ", overall_loss/(batch_idx*batch_size))
    return overall_loss

# Train the model for 5 epochs
train(model, optimizer, epochs=5, device=device)

In [None]:

import matplotlib.pyplot as plt

def predict(model):
    
    # Set model to evaluation mode
    model.eval()
    dataset_size = 0
    
    # Set up torch so there is no gradient upgrade
    with torch.no_grad():
        overall_loss=0
        all_mean = None
        all_log_var = None
        
        # Iterate over batches of test dataset
        for batch_idx, x in enumerate(test_dataloader):
            
            # Get reconstructed value, mean and log_var
            x_hat, mean, log_var  = model(x)
            
            # Calculate loss
            loss = loss_function(x, x_hat, mean, log_var)

            # # plot x and x_hat
            # plt.plot(x, label='x')
            # plt.plot(x_hat, label='x_hat')
            # plt.legend()
            # plt.show()
            
            # Add mean and log_var to arrays
            if all_mean is not None:
                all_mean = torch.cat((all_mean, mean))
                all_log_var = torch.cat((all_log_var, log_var))
            else:
                all_mean = mean
                all_log_var = log_var
                
            # Add loss 
            overall_loss += loss.item()
            
            # Calculate the size of the dataset
            size_batch = x.element_size() * x.numel()
            dataset_size += size_batch

        # Calculate the size after compression of the dataset
        compressed_size = (all_mean.element_size() * all_mean.numel()) + (all_log_var.element_size() * all_log_var.numel())

        
        print("\tAverage Loss: ", overall_loss/(batch_idx*batch_size))
        print(f"\tDataset Size: {dataset_size} \n\tCompressed Size: {compressed_size}")
        print("\tCompression Ratio: ", compressed_size/dataset_size)
        
        return all_mean, all_log_var
    
    
# Predict the values for test dataset and calculate compression ratio
mean,var = predict(model)