In [1]:
from torch.utils.data import DataLoader
from tqdm import tqdm
import torch
import copy
import math
import numpy as np
import math
from data_prep import SystemIdentDataset, ControllerDataset, SystemIdentDatasetNormed
import pickle

def evaluate(model, loss_function, val_loader):
    c_error = 0.0
    cos_sim = torch.nn.CosineSimilarity(dim=1)
    running_loss = 0.0
    batch_count = 0
    for _, example in enumerate(tqdm(val_loader), 0):
        inputs,label = example

        with torch.no_grad():
            outputs = model(inputs)
        
        loss = loss_function(outputs, label)
        
        c_error += torch.sum(cos_sim(outputs,label)).item()/val_dloader.batch_size
        running_loss += loss.item()
        batch_count += 1
    
    c_error /= batch_count
    
    return running_loss, c_error

def train(model, num_epochs, loss_function, optimizer, train_loader, val_loader):
    best_loss = []
    best_cerror = []
    val_loss, c_error = evaluate(model, loss_function, val_loader)
    print(f"Initial validation loss: {val_loss}, cosine error: {c_error}")
    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}:")
        for _, example in enumerate(tqdm(train_loader), 0):
            inputs,label = example
            
            outputs = model(inputs)

            loss = loss_function(outputs, label)

            optimizer.zero_grad()

            loss.backward()

            optimizer.step()
        val_loss, c_error = evaluate(model, loss_function, val_loader)
        print(f"validation loss: {val_loss}, cosine error: {c_error}")

        if epoch < 5:
            best_loss.append((copy.deepcopy(model), val_loss, c_error))
            best_cerror.append((copy.deepcopy(model), val_loss,c_error))
        else:
            for i, entry in enumerate(best_loss):
                m,l,c = entry
                if val_loss < l:
                    best_loss[i] = (copy.deepcopy(model), val_loss, c_error)
                    break
            for i, entry in enumerate(best_cerror):
                m,l,c = entry
                if c_error > c:
                    best_cerror[i] = (copy.deepcopy(model), val_loss, c_error)
                    break
    return best_loss + best_cerror


In [None]:
train_dataset = SystemIdentDatasetNormed(num_examples=1000000)
val_dataset = SystemIdentDatasetNormed(num_examples=100000)
train_dloader = DataLoader(train_dataset, batch_size=256,shuffle=True)
val_dloader = DataLoader(val_dataset, batch_size=256, shuffle=True)



In [None]:


# Define the model
model = torch.nn.Sequential(
    torch.nn.Linear(4, 32, dtype=torch.float64),
    torch.nn.Tanh(),
    torch.nn.Linear(32, 64, dtype=torch.float64),
    torch.nn.Tanh(),
    torch.nn.Linear(64, 64, dtype=torch.float64),
    torch.nn.Tanh(),
    torch.nn.Linear(64, 64, dtype=torch.float64),
    torch.nn.Tanh(),
    torch.nn.Linear(64, 32, dtype=torch.float64),
    torch.nn.Tanh(),
    torch.nn.Linear(32, 4, dtype=torch.float64),
)

# Define the loss function
#loss_fn = torch.nn.MSELoss()
csim = torch.nn.CosineSimilarity(dim=1)
loss_fn = lambda x,y: torch.sum(-1*csim(x,y))
# Define the optimizer
optimizer = torch.optim.Adam(model.parameters())
best_models = train(model=model, num_epochs=20, loss_function=loss_fn, optimizer=optimizer, train_loader=train_dloader, val_loader=val_dloader)
for i, model in enumerate(best_models):
    weights, loss, c_error = model
    print(f"Model {i}: loss: {loss}, cosine error: {c_error}")


In [None]:
with open('./emulator_random_inputs3.pkl', 'wb') as f:
    pickle.dump(best_models[0][0], f)

In [4]:
# Now learn to drive emulated plant from state Zo to Zd in K steps where K is a hyperparameter

train_dataset = ControllerDataset(num_examples=1000000)
val_dataset = ControllerDataset(num_examples=100000)
train_dloader = DataLoader(train_dataset, batch_size=256,shuffle=True)
val_dloader = DataLoader(val_dataset, batch_size=256, shuffle=True)

class ControllerTrainedEnclosure(torch.nn.Module):
    def __init__(self, emulator_network, K):
        super(self.__class__, self).__init__()
        emulator_network.requires_grad=False 
        self.system_emulator = emulator_network
        self.K = K
        self.network = torch.nn.Sequential(
                torch.nn.Linear(4, 32, dtype=torch.float64),
                torch.nn.Tanh(),
                torch.nn.Linear(32, 64, dtype=torch.float64),
                torch.nn.Tanh(),
                torch.nn.Linear(64, 64, dtype=torch.float64),
                torch.nn.Tanh(),
                torch.nn.Linear(64, 32, dtype=torch.float64),
                torch.nn.Tanh(),
                torch.nn.Linear(32, 1, dtype=torch.float64),
                )

    def forward(self, x):
        for _ in range(self.K):
            xnew = torch.zeros_like(x)
            u = self.network(x)
            xnew[:,0] = u.squeeze()
            xnew[:,1:] = x[:,1:]

            dx = self.system_emulator(xnew)
            x = x + dx
            #torch.nn.utils.clip_grad_norm_(self.parameters(), max_norm=1)
        
        return x



In [5]:

with open('./emulator_random_inputs3.pkl', 'rb') as f:
    emulator = pickle.load(f)
model = ControllerTrainedEnclosure(emulator, K=10)

# Define the loss function
#loss_fn = torch.nn.MSELoss()
csim = torch.nn.CosineSimilarity(dim=1)
loss_fn = lambda x,y: torch.sum(-1*csim(x,y))
# Define the optimizer
optimizer = torch.optim.Adam(model.parameters())
best_models = train(model=model, num_epochs=5, loss_function=loss_fn, optimizer=optimizer, train_loader=train_dloader, val_loader=val_dloader)
for i, model in enumerate(best_models):
    weights, loss, c_error = model
    print(f"Model {i}: loss: {loss}, cosine error: {c_error}")

100%|██████████| 391/391 [00:05<00:00, 70.15it/s]


Initial validation loss: 2543.8819104938343, cosine error: -0.025414421260528237
Epoch 1:


100%|██████████| 3907/3907 [01:59<00:00, 32.67it/s]
100%|██████████| 391/391 [00:05<00:00, 70.51it/s]


validation loss: -97012.41474860419, cosine error: 0.969193721513389
Epoch 2:


100%|██████████| 3907/3907 [01:58<00:00, 33.00it/s]
100%|██████████| 391/391 [00:05<00:00, 70.94it/s]


validation loss: -99821.27241184482, cosine error: 0.9972553589738333
Epoch 3:


100%|██████████| 3907/3907 [01:59<00:00, 32.62it/s]
100%|██████████| 391/391 [00:05<00:00, 70.16it/s]


validation loss: -99988.0483277563, cosine error: 0.9989215186196881
Epoch 4:


100%|██████████| 3907/3907 [01:59<00:00, 32.75it/s]
100%|██████████| 391/391 [00:05<00:00, 70.85it/s]


validation loss: -99999.11622243717, cosine error: 0.9990320914166118
Epoch 5:


100%|██████████| 3907/3907 [01:58<00:00, 32.99it/s]
100%|██████████| 391/391 [00:05<00:00, 71.62it/s]

validation loss: -99999.98034742105, cosine error: 0.9990407243788069
Model 0: loss: -97012.41474860419, cosine error: 0.969193721513389
Model 1: loss: -99821.27241184482, cosine error: 0.9972553589738333
Model 2: loss: -99988.0483277563, cosine error: 0.9989215186196881
Model 3: loss: -99999.11622243717, cosine error: 0.9990320914166118
Model 4: loss: -99999.98034742105, cosine error: 0.9990407243788069
Model 5: loss: -97012.41474860419, cosine error: 0.969193721513389
Model 6: loss: -99821.27241184482, cosine error: 0.9972553589738333
Model 7: loss: -99988.0483277563, cosine error: 0.9989215186196881
Model 8: loss: -99999.11622243717, cosine error: 0.9990320914166118
Model 9: loss: -99999.98034742105, cosine error: 0.9990407243788069





In [7]:
with open('./controller_model2.pkl', 'wb') as f:
    pickle.dump(best_models[4][0].network, f)


Sequential(
  (0): Linear(in_features=4, out_features=32, bias=True)
  (1): Tanh()
  (2): Linear(in_features=32, out_features=64, bias=True)
  (3): Tanh()
  (4): Linear(in_features=64, out_features=64, bias=True)
  (5): Tanh()
  (6): Linear(in_features=64, out_features=32, bias=True)
  (7): Tanh()
  (8): Linear(in_features=32, out_features=1, bias=True)
)

In [None]:


a = torch.zeros((1,4),dtype=torch.float64)
#a[0][2] = np.pi + np.pi/20
em = best_models[8][0].system_emulator
nn = best_models[8][0].network

em(a)


In [None]:
model

In [None]:

train_dataset = SystemIdentDatasetEuler(num_examples=2000000)
val_dataset = SystemIdentDatasetEuler(num_examples=100000)
train_dloader = DataLoader(train_dataset, batch_size=256, shuffle=True)
val_dloader = DataLoader(val_dataset, batch_size=256, shuffle=True)

model = torch.nn.Sequential(
    torch.nn.Linear(4, 32, dtype=torch.float64),
    torch.nn.Tanh(),
    torch.nn.Linear(32, 64, dtype=torch.float64),
    torch.nn.Tanh(),
    torch.nn.Linear(64, 64, dtype=torch.float64),
    torch.nn.Tanh(),
    torch.nn.Linear(64, 64, dtype=torch.float64),
    torch.nn.Tanh(),
    torch.nn.Linear(64, 32, dtype=torch.float64),
    torch.nn.Tanh(),
    torch.nn.Linear(32, 4, dtype=torch.float64),
)

# Define the loss function
loss_fn = torch.nn.MSELoss()

# Define the optimizer
optimizer = torch.optim.Adam(model.parameters())
best_models = train(model=model, num_epochs=25, loss_function=loss_fn, optimizer=optimizer, train_loader=train_dloader, val_loader=val_dloader)
for i, model in enumerate(best_models):
    weights, loss, c_error = model
    print(f"Model {i}: loss: {loss}, cosine error: {c_error}")