In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import os
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


# 1. Define Neural Network

In [2]:
# Define the neural network model
class NeuralNetwork(nn.Module):
    def __init__(self, input_size=3, output_size=1):
        super(NeuralNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)  # Input size: 3 (3D float input), Output size: 64
        self.fc2 = nn.Linear(64, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, 32)
        self.fc5 = nn.Linear(32, 32)
        self.fc6 = nn.Linear(32, output_size)  # Output size: 1 (1D float output)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = torch.relu(self.fc4(x))
        x = torch.relu(self.fc5(x))
        x = self.fc6(x)
        return x


# 2. Prepare Training Dataset

In [3]:
# Import dataset
file_path = 'state_action_data1.csv'

if os.path.isfile(file_path):
    df = pd.read_csv(file_path)
else:
    print(f"File '{file_path}' not found.")

original_State = np.array([np.array(state.strip("[]").split(), dtype=float) for state in df.iloc[:,0]])
original_Action = np.array([float(action.strip("[]")) for action in df.iloc[:,1]])
original_NextState = np.array([np.array(state.strip("[]").split(), dtype=float) for state in df.iloc[:,2]])
X = torch.tensor(original_State, dtype=torch.float32)
y = torch.tensor(original_Action, dtype=torch.float32)
df.head()

Unnamed: 0,State,Action,NextState
0,[-0.91937816 0.39337495 0.6090173 ],[2.],[-0.9413804 0.3373469 1.2040485]
1,[-0.9413804 0.3373469 1.2040485],[2.],[-0.9673487 0.25344923 1.7570587 ]
2,[-0.9673487 0.25344923 1.7570587 ],[2.],[-0.9896661 0.14339098 2.2471457 ]
3,[-0.9896661 0.14339098 2.2471457 ],[2.],[-0.99993783 0.01115229 2.6546888 ]
4,[-0.99993783 0.01115229 2.6546888 ],[2.],[-0.99063015 -0.13657197 2.963053 ]


# 3. Train and save Policy model 
action = model(state)

In [None]:
# Create an instance of the model
model = NeuralNetwork(3,1)
model.load_state_dict(torch.load('model_1_NN.pth'))
# Define the loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

# Training loop 10000 times
for epoch in range(10):
    # Zero the gradients
    optimizer.zero_grad()

    # Forward pass
    y_ = model(X)

    # Calculate loss
    loss = criterion(y_.squeeze(), y)

    # Backward pass and update weights
    loss.backward()
    optimizer.step()

    # Print the loss at every 100 epochs
    if (epoch + 1) % 50 == 0:
        print(f'Epoch [{epoch+1}/10000], Loss: {loss.item():.4f}')

torch.save(model.state_dict(), 'model_1_NN.pth')
print("Model saved successfully!")

# 4. Reload and test model

In [4]:
# Load the saved model
loaded_model = NeuralNetwork()
loaded_model.load_state_dict(torch.load('model_1_NN.pth'))
criterion = nn.MSELoss()
print("Model loaded successfully!")

test_file_path = 'state_action_data2.csv'

if os.path.isfile(test_file_path):
    df = pd.read_csv(test_file_path)
else:
    print(f"File '{test_file_path}' not found.")
X_test = torch.tensor(np.array([np.array(state.strip("[]").split(), dtype=float) for state in df.iloc[:,0]]), dtype=torch.float32)
y_test = np.array([float(action.strip("[]")) for action in df.iloc[:,1]])

# Use the loaded model to make predictions
y_pred = loaded_model(X_test)
test_loss = criterion(y_pred.squeeze(), torch.tensor(y_test, dtype=torch.float32))
print(f"test loss = {test_loss.item():.4f}")
df_pred = pd.DataFrame(y_pred.detach().numpy(), columns=["Predicted"])
df_true = pd.DataFrame(y_test, columns=['True'])
pd.concat([df_pred, df_true], axis=1)

Model loaded successfully!
test loss = 0.1623


Unnamed: 0,Predicted,True
0,1.800475,1.932680
1,1.849026,1.959677
2,1.813180,1.939406
3,1.346297,1.673984
4,-0.508259,0.037763
...,...,...
3251,0.111452,0.564635
3252,-0.889309,-0.529998
3253,0.106900,0.559431
3254,-0.885603,-0.525331


# 5. Test with gym 

In [5]:
import gymnasium as gym
env = gym.make("Pendulum-v1", render_mode="human")

# Load the saved model
model = NeuralNetwork()
model.load_state_dict(torch.load('model_1_NN.pth'))
print("Model loaded successfully!")

for episode in range(5):
    obs = env.reset()[0]
    done = False
    count = 10
    steps = 0
    while not done:
        steps +=1
        action = model(torch.tensor(obs)).detach().numpy()
        env.render()
        obs, reward, done, info, _ = env.step(action)
        if 1 - abs(obs[0]) < 0.001 and abs(action[0]) < 0.6:
            count -=1
            if count <0:
                done = True
                print(episode+1, " steps: ", steps)
env.close()

Model loaded successfully!
1  steps:  12
2  steps:  58
3  steps:  73
4  steps:  75
5  steps:  64


# 6. Dynamic Model
next_state = dynamic_model(state, action)

train with which action, predicted or true label?

### Train with action from dataset
(state[0], state[1], state[2], action)

In [7]:
# Create an instance of the model
dynamic_model = NeuralNetwork(4,3)

# Define the loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.SGD(dynamic_model.parameters(), lr=0.01)

# Prepare dataset
S_A = torch.tensor(np.hstack((original_State, original_Action.reshape(-1, 1))), dtype=torch.float32)
NextS = torch.tensor(original_NextState, dtype=torch.float32)

dynamic_model.load_state_dict(torch.load('model_2_NN.pth'))
# Training loop 4000 times
for epoch in range(2000):
    # Zero the gradients
    optimizer.zero_grad()

    # Forward pass
    NextS_pred = dynamic_model(S_A)

    # Calculate loss
    loss = criterion(NextS_pred.squeeze(), NextS)

    # Backward pass and update weights
    loss.backward()
    optimizer.step()

    # Print the loss at every 100 epochs
    if (epoch + 1) % 100 == 0:
        print(f'Epoch [{epoch+1}/1000], Loss: {loss.item():.4f}')

torch.save(dynamic_model.state_dict(), 'model_2_NN.pth')
print("Model saved successfully!")

NameError: name 'original_State' is not defined

# 7. True Dynamic Test

In [8]:
test_file_path = 'state_action_data2.csv'

if os.path.isfile(test_file_path):
    df_test = pd.read_csv(test_file_path)
else:
    print(f"File '{test_file_path}' not found.")

test_State = np.array([np.array(state.strip("[]").split(), dtype=float) for state in df_test.iloc[:,0]])
test_Action = np.array([float(action.strip("[]")) for action in df_test.iloc[:,1]])
test_NextState = np.array([np.array(state.strip("[]").split(), dtype=float) for state in df_test.iloc[:,2]])

test_S_A = torch.tensor(np.hstack((test_State, test_Action.reshape(-1, 1))), dtype=torch.float32)
test_NextS = torch.tensor(test_NextState, dtype=torch.float32)

# Use the loaded model to make predictions
NextS_pred = dynamic_model(test_S_A)
test_loss = criterion(NextS_pred.squeeze(), test_NextS)
print(f"test loss = {test_loss.item():.4f}")
df_pred = pd.DataFrame(NextS_pred.detach().numpy(), columns=["Pred_theta", "Pred_angle", "Pred_velocity"])
df_true = pd.DataFrame(test_NextState, columns=['True_theta', 'True_angle', "True_velocity"])
pd.concat([df_pred[40:60], df_true[40:60]], axis=1)

test loss = 2.6328


Unnamed: 0,Pred_theta,Pred_angle,Pred_velocity,True_theta,True_angle,True_velocity
40,0.003614,0.052247,0.131329,0.999911,0.013311,-0.12625
41,0.002235,0.049481,0.130345,0.999938,0.011093,-0.044367
42,0.003129,0.052467,0.131693,0.999976,0.006876,-0.084346
43,0.002472,0.049791,0.13064,0.999983,0.005903,-0.019444
44,0.002941,0.052563,0.131894,0.999996,0.002874,-0.06059
45,0.002531,0.049926,0.130757,0.999996,0.002721,-0.003049
46,0.00287,0.052611,0.131978,1.0,0.000365,-0.047123
47,0.002534,0.049936,0.130778,1.0,0.000773,0.008146
48,0.00286,0.052629,0.131998,0.999999,-0.001217,-0.039786
49,0.0025,0.049882,0.130741,1.0,-0.000425,0.015844


# 8. True Dynamic Test with gym

In [4]:
import gymnasium as gym
env = gym.make("Pendulum-v1", render_mode="human")

# Load the saved model
model = NeuralNetwork(3,1)
model.load_state_dict(torch.load('model_1_NN.pth'))
dynamic_model = NeuralNetwork(4,3)
dynamic_model.load_state_dict(torch.load('model_2_NN.pth'))
print("Model loaded successfully!")


for episode in range(5):
    obs = env.reset()[0]
    done = False
    count = 10
    steps = 0
    while not done:
        steps +=1
        state = torch.tensor(obs)
        action = model(state)
        NextS_pred = dynamic_model(torch.cat((state, action)))
        env.render()
        obs, reward, done, info, _ = env.step(action.detach().numpy())
        print(NextS_pred.detach().numpy(), obs)
        if 1 - abs(obs[0]) < 0.001 and abs(action[0]) < 0.6:
            count -=1
            if count <0:
                done = True
                print(episode+1, " steps: ", steps)
env.close()


Model loaded successfully!
[ 0.6634182  -0.05021206  0.01851402] [ 0.85550857 -0.5177887   0.03002849]
[ 0.6556698  -0.05109791 -0.08001049] [ 0.85393333 -0.5203824  -0.0606916 ]
[ 0.6553317  -0.05332185 -0.18343106] [ 0.84984356 -0.527035   -0.15618332]
[ 0.65153205 -0.05724095 -0.29953697] [ 0.84291387 -0.53804857 -0.26024812]
[ 0.6457013  -0.05963484 -0.42109913] [ 0.83255714 -0.55393916 -0.37935892]
[ 0.6386738  -0.06070232 -0.56219566] [ 0.8178595 -0.575418  -0.5205374]
[ 0.6290842  -0.07386123 -0.7489653 ] [ 0.79751045 -0.6033051  -0.6904769 ]
[ 0.624372   -0.10254394 -0.9431283 ] [ 0.7696478 -0.6384687 -0.8973608]
[ 0.6403833  -0.13639754 -1.1413764 ] [ 0.731069   -0.68230355 -1.1680385 ]
[ 0.6244241  -0.15720615 -1.5060236 ] [ 0.6744123 -0.738355  -1.5943795]
[ 0.28145343 -0.10255445 -2.2176232 ] [ 0.58575433 -0.81048864 -2.2871592 ]
[ 0.07679141 -0.15368609 -3.0226555 ] [ 0.45650324 -0.88972175 -3.0349889 ]
[-0.01067263 -0.2375382  -3.7868721 ] [ 0.28342268 -0.9589951  -3.7339