In [1]:
import torch
import numpy
import torch.nn as nn
import random
import numpy as np
import matplotlib.pyplot as plt
import os
import imageio 

In [2]:
all_values = torch.linspace(-5 * np.pi, 5 * np.pi, 1000).unsqueeze(1)
all_next_values = torch.sin(all_values)

# Create pairs (x, y)
data = torch.stack((all_values, all_next_values), dim=1)

# Shuffle the data
shuffled_indices = torch.randperm(len(data))
shuffled_data = data[shuffled_indices]

# Determine split indices
split_index = int(0.8 * len(shuffled_data))  # 80% for training

# Split into train and test sets
train_data = shuffled_data[:split_index]
test_data = shuffled_data[split_index:]

In [3]:
class LQR(nn.Module):
    def __init__(self, enc_dim):
        super(LQR, self).__init__()
        self.A = torch.nn.Parameter(torch.randn(enc_dim, enc_dim))
       
        self.state_encoder = torch.nn.Sequential(
            torch.nn.Linear(1, enc_dim//2),
            torch.nn.ReLU(),
            torch.nn.Linear(enc_dim//2, enc_dim),
            torch.nn.ReLU(),
            torch.nn.Linear(enc_dim, enc_dim),
        )

        self.state_decoder = torch.nn.Sequential(
            torch.nn.Linear(enc_dim, enc_dim//2),
            torch.nn.ReLU(),
            torch.nn.Linear(enc_dim//2, 1)
        )
    def forward (self,x):
        xx = self.state_encoder(x)
        x_prime_prediction = self.A @ xx 
        return self.state_decoder(x_prime_prediction), x_prime_prediction, xx, #reward.unsqueeze(0)

In [4]:
# Generate the data
super_values = torch.linspace(-10 * np.pi, 10 * np.pi, 1000).unsqueeze(1)
all_next_super_values = torch.sin(super_values)
super_data = torch.cat((super_values, all_next_super_values), dim=1)

# Split data into ranges
range_1_mask = (super_values.squeeze() >= -5 * np.pi) & (super_values.squeeze() <= 5 * np.pi)
range_2_mask = (super_values.squeeze() > 5 * np.pi) | (super_values.squeeze() < -5 * np.pi)

range_1_values = super_values[range_1_mask]
range_1_actual = all_next_super_values[range_1_mask]
range_2_values = super_values[range_2_mask]
range_2_actual = all_next_super_values[range_2_mask]

In [5]:
def visualize_model_performance(model,epoch,folder_name):
    test_predictions = []
    with torch.no_grad():
        for x, y in super_data:
            lqr_x_prime, x_prime_expanded, xx = model(x.unsqueeze(0))
            test_predictions.append(lqr_x_prime)
    test_predictions = torch.tensor(test_predictions)

    # Scatter plot
    plt.scatter(
        range_1_values, 
        test_predictions[range_1_mask], 
        c='r', alpha=0.5, label="Predicted (Training Range)"
    )
    plt.scatter(
        range_1_values, 
        range_1_actual, 
        c='b', alpha=0.5, label="Actual (Training Range)"
    )

    plt.scatter(
        range_2_values, 
        test_predictions[range_2_mask], 
        c='orange', alpha=0.5, label="Predicted (Unseen Range)"
    )
    plt.scatter(
        range_2_values, 
        range_2_actual, 
        c='green', alpha=0.5, label="Actual (Unseen Range)"
    )
    #make x axis in terms of pi


    # Add legend and labels, legend should be outside the plot
    #plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))
    plt.title(f'{folder_name}, Encoder Dimension: {64} Epoch: {epoch}')
    plt.xlabel('Neg 10 pi to 10 pi')
    plt.ylabel('Sin(Pi)')
    plt.ylim(-1.2, 2)
    plt.xlim(-10 * np.pi, 10 * np.pi)
    #plt.show()
    os.makedirs(folder_name, exist_ok=True)
    plt.savefig(f'{folder_name}/epoch_{epoch}.png')
    plt.close()

In [6]:
#train, test are lists of tuples of (x, u, y, r)
def train_model(model,optimizer,epochs=1):
    criterion = torch.nn.MSELoss()
    for i in range(epochs):
        total_state_loss = 0
        total_reward_loss = 0
        for x, y in train_data:
            optimizer.zero_grad()
            lqr_x_prime, x_prime_expanded, xx = model(x)
            #reward_loss = criterion(reward, r)
            lqr_pred_loss = criterion(lqr_x_prime, y)
            decoder_loss = criterion(model.state_decoder(xx), x)
            encoder_loss = criterion(model.state_encoder(y), x_prime_expanded) 
            state_loss = lqr_pred_loss  + decoder_loss + encoder_loss
            loss = state_loss #+ reward_loss
            loss.backward()
            optimizer.step()
            total_state_loss += state_loss.item()
            total_reward_loss += 0#reward_loss.item()
        # if i % 10 == 0:
        #     with torch.no_grad():
        #         total_test_state_loss = 0
        #         total_test_reward_loss = 0
        #         for x, y, in test_data:
        #             lqr_x_prime, x_prime_expanded, xx, = model(x)
        #             #reward_loss = criterion(reward, r)
        #             lqr_pred_loss = criterion(lqr_x_prime, y)
        #             decoder_loss = criterion(model.state_decoder(xx), x)
        #             encoder_loss = criterion(model.state_encoder(y), x_prime_expanded) 
        #             state_loss = lqr_pred_loss  + decoder_loss + encoder_loss
        #             total_test_state_loss += state_loss.item()
        #             total_test_reward_loss += 0#reward_loss.item()
        #         print(f"Epoch {i}, Train State Loss: {total_state_loss}, Train Reward Loss: {total_reward_loss}, Test State Loss: {total_test_state_loss}, Test Reward Loss: {total_test_reward_loss}")

In [14]:
def make_gif(folder_name):
    images = []
    for filename in sorted(os.listdir(folder_name)):
        images.append(imageio.imread(f'{folder_name}/{filename}'))
    #make dir, gifs
    os.makedirs('gifs', exist_ok=True)
    imageio.mimsave(f'gifs/{folder_name}.gif', images)

In [8]:
model = LQR(64)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
file_name = "Normal_Model"
for epoch in range(100):
    train_model(model,optimizer,epochs=1)
    visualize_model_performance(model,epoch,file_name)
make_gif(file_name)

  images.append(imageio.imread(f'{folder_name}/{filename}'))


In [16]:
model = LQR(64)
#freeze encoder and decoder
for param in model.state_encoder.parameters():
    param.requires_grad = False
for param in model.state_decoder.parameters():
    param.requires_grad = False
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
file_name = "Frozen Encoder and Decoder"
for epoch in range(100):
    train_model(model,optimizer,epochs=1)
    visualize_model_performance(model,epoch,file_name)
make_gif(file_name)

  images.append(imageio.imread(f'{folder_name}/{filename}'))


In [10]:
model = LQR(64)
#freeze A
model.A.requires_grad = False
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
file_name = "Frozen A"
for epoch in range(100):
    train_model(model,optimizer,epochs=1)
    visualize_model_performance(model,epoch,file_name)
make_gif(file_name)

  images.append(imageio.imread(f'{folder_name}/{filename}'))


In [None]:
model = LQR(64)
#freeze All
model.A.requires_grad = False
model.state_encoder.requires_grad = False
model.state_decoder.requires_grad = False
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
file_name = "Frozen All"
for epoch in range(100):
    train_model(model,optimizer,epochs=1)
    visualize_model_performance(model,epoch,file_name)
make_gif(file_name)

RuntimeError: element 0 of tensors does not require grad and does not have a grad_fn