In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from backend.data_processing_service import DataProcessingService

# Initialize DataProcessingService
data_processor = DataProcessingService(seq_length=10, pred_window=5, batch_size=10)
data = np.loadtxt('data/normalized_apple_prices.csv')
# data = data_processor.get_sample_data(length=50)

# Normalize the data
data_normalized, scaler = data_processor.normalize_data(data)

# Create sequences
x_data, y_data = data_processor.create_sequences(data_normalized)

# Split data into training and test sets
x_train, x_test, y_train, y_test = data_processor.split_data(x_data, y_data, train_ratio=0.8)

# Convert data to PyTorch tensors
x_train = torch.tensor(x_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
x_test = torch.tensor(x_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)


In [11]:
print(x_train.shape,y_train.shape,x_test.shape,y_test.shape)

torch.Size([99, 10, 1]) torch.Size([99, 5, 1]) torch.Size([25, 10, 1]) torch.Size([25, 5, 1])


In [12]:
from backend.model_service import ModelService

In [13]:
import torch
import torch.nn as nn

class TimeSeriesGenerator(ModelService):
    def __init__(self, input_dim, output_dim, hidden_dim):
        super(TimeSeriesGenerator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim),
            # nn.Tanh()
        )

    def forward(self, x):
        return self.model(x)
    

class TimeSeriesDiscriminator(ModelService):
    def __init__(self, input_dim, hidden_dim):
        super(TimeSeriesDiscriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1),
            nn.Sigmoid() # 0 fake, 1 real
        )

    def forward(self, x):
        return self.model(x)



In [14]:
import torch
import torch.nn as nn
import torch.optim as optim

class GAN(ModelService):
    def __init__(self, generator, discriminator, gen_input_dim, data_dim, lr=0.0002, betas=(0.5, 0.999)):
        super(GAN, self).__init__()
        self.generator = generator
        self.discriminator = discriminator
        self.gen_input_dim = gen_input_dim
        self.data_dim = data_dim
        
        self.criterion = nn.MSELoss()
        self.optimizer_g = optim.Adam(self.generator.parameters(), lr=lr, betas=betas)
        self.optimizer_d = optim.Adam(self.discriminator.parameters(), lr=lr, betas=betas)

    def train(self, x_train, y_train, epochs):
        for epoch in range(epochs):
            for i in range(len(x_train)):
                real_data = x_train[i]
                real_labels = y_train[i].unsqueeze(1)

                ### Train Generator
                self.optimizer_g.zero_grad()
                
                noise = real_data.t()
                g_prediction = self.generator(noise)

                # Loss for Generator (compared to real labels)
                g_real_loss = self.criterion(g_prediction, real_labels)

                # Combine real data and generated data for the Discriminator
                g_prediction_combined = torch.cat((real_data, g_prediction.t()))[:self.gen_input_dim].t()

                ### Train Discriminator
                self.optimizer_d.zero_grad()

                # Real data loss for Discriminator
                real_output = self.discriminator(real_data.t())
                real_loss = self.criterion(real_output, torch.ones_like(real_output))
                
                # Fake data loss for Discriminator
                fake_output = self.discriminator(g_prediction_combined.detach())  # detach to avoid backprop through G
                fake_loss = self.criterion(fake_output, torch.zeros_like(fake_output))
                
                # Total Discriminator loss
                d_loss = real_loss + fake_loss
                d_loss.backward()
                self.optimizer_d.step()

                # Optimize Generator again against Discriminator's output
                fake_output_for_g = self.discriminator(g_prediction_combined)
                g_loss_against_discriminator = self.criterion(fake_output_for_g, torch.ones_like(fake_output_for_g))
                
                # Total Generator loss
                g_loss = g_real_loss * 0.5 + g_loss_against_discriminator * 0.5
                g_loss.backward(retain_graph=True)
                self.optimizer_g.step()

            print(f'Epoch [{epoch+1}/{epochs}]  Loss D: {d_loss.item()}, Loss G: {g_loss.item()}')

# Define model parameters
# data_processor = DataProcessingService(seq_length=10, pred_window=1, batch_size=10) (1st cell)
gen_input_dim = data_processor.seq_length
gen_output_dim = data_processor.pred_window
gen_hidden_dim = 32
dis_input_dim = 10
dis_hidden_dim = 16

# Instantiate models
generator = TimeSeriesGenerator(input_dim=gen_input_dim, output_dim=gen_output_dim, hidden_dim=gen_hidden_dim)
discriminator = TimeSeriesDiscriminator(input_dim=dis_input_dim, hidden_dim=dis_hidden_dim)
gan = GAN(generator, discriminator, gen_input_dim, gen_output_dim)

# Train the GAN model
epochs = 10
gan.train(x_train, y_train, epochs)


  return F.mse_loss(input, target, reduction=self.reduction)


Epoch [1/10]  Loss D: 0.5007985830307007, Loss G: 0.14317576587200165
Epoch [2/10]  Loss D: 0.5003405213356018, Loss G: 0.1382562518119812
Epoch [3/10]  Loss D: 0.5002017021179199, Loss G: 0.1364590972661972
Epoch [4/10]  Loss D: 0.5001502633094788, Loss G: 0.13654480874538422
Epoch [5/10]  Loss D: 0.5001226663589478, Loss G: 0.13730831444263458
Epoch [6/10]  Loss D: 0.5001023411750793, Loss G: 0.13827991485595703
Epoch [7/10]  Loss D: 0.5000864267349243, Loss G: 0.13883176445960999
Epoch [8/10]  Loss D: 0.5000739693641663, Loss G: 0.1391364336013794
Epoch [9/10]  Loss D: 0.5000638365745544, Loss G: 0.13898268342018127
Epoch [10/10]  Loss D: 0.5000554323196411, Loss G: 0.13865412771701813


In [15]:
model = generator
criterion = nn.MSELoss()
x_test_rs = x_test.permute(0, 2, 1)
model.evaluation(model, x_test_rs, y_test, criterion)

Test Loss: 0.0297


  return F.mse_loss(input, target, reduction=self.reduction)


0.02972451189532876

In [16]:
a = 3
x_test[-a], y_test[-a]

(tensor([[-0.0302],
         [ 0.0372],
         [ 0.0924],
         [-0.1962],
         [-0.0986],
         [ 0.0676],
         [-0.0713],
         [-0.0696],
         [-0.0396],
         [-0.0436]]),
 tensor([[ 0.0216],
         [ 0.0265],
         [-0.0417],
         [-0.0789],
         [-0.0580]]))

In [21]:
# 3. Evaluate generated data using discriminator (optional)
gan.generator(x_test[-a].t())

tensor([[-0.1145, -0.1221, -0.0591, -0.1408, -0.0359]],
       grad_fn=<AddmmBackward0>)