In [1]:
# Github link: https://github.com/sreksoz/Intro-to-Machine-Learning/tree/HW-5--Lorenz-Equation-and-Forecasting-Data
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams
from scipy import integrate
from mpl_toolkits.mplot3d import Axes3D
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split

In [2]:
# Create a neural network architecture
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(3, 15)
        self.fc2 = nn.Linear(15, 6)
        self.fc3 = nn.Linear(6, 3)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [3]:
# Create model instance
model = Net()

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

# Define hyperparameters
dt = 0.01
T = 8
t = np.arange(0,T+dt,dt)
beta = 8/3
sigma = 10
# Given rho values to train and test
rho_values_train = [10, 28, 40]
rho_values_test = [17, 35]


# Define the NN input and output
nn_input = np.zeros((100*(len(t)-1),3))
nn_output = np.zeros_like(nn_input)


In [4]:
# Create a list for input and output for the neural network (after data preparation)
nn_input_final = []
nn_output_final = []

# Data preparation
for rho in rho_values_train:
    
    # Define the Lorenz equation
    def lorenz_deriv(x_y_z, t0, sigma=sigma, beta=beta, rho=rho):
        x, y, z = x_y_z
        return [sigma * (y - x), x * (rho - z) - y, x * y - beta * z]

    np.random.seed(123)
    x0 = -15 + 30 * np.random.random((100, 3))

    x_t = np.asarray([integrate.odeint(lorenz_deriv, x0_j, t)
                      for x0_j in x0])
    
    for j in range(100):
        nn_input[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,:-1,:]
        nn_output[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,1:,:]
        
    # Convert numpy arrays to PyTorch tensors
    nn_input_tensor = torch.from_numpy(nn_input).float()
    nn_output_tensor = torch.from_numpy(nn_output).float()
    
    # Appending the tensors to a list
    nn_input_final.append(nn_input_tensor)
    nn_output_final.append(nn_output_tensor)
    

# Concatenate the neural network input and outputs from each rho values
nn_in = torch.cat(nn_input_final)
nn_out = torch.cat(nn_output_final)

# Train the model
for epoch in range(30):
    optimizer.zero_grad()
    outputs = model(nn_in)
    loss = criterion(outputs, nn_out)
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch+1}, loss={loss.item():.4f}")


Epoch 1, loss=274.7623
Epoch 2, loss=199.3614
Epoch 3, loss=265.0704
Epoch 4, loss=617.9752
Epoch 5, loss=280.3966
Epoch 6, loss=276.1551
Epoch 7, loss=272.1067
Epoch 8, loss=267.6044
Epoch 9, loss=260.7444
Epoch 10, loss=251.4229
Epoch 11, loss=239.3160
Epoch 12, loss=228.1230
Epoch 13, loss=217.1938
Epoch 14, loss=202.7013
Epoch 15, loss=168.8647
Epoch 16, loss=293.1321
Epoch 17, loss=182.2799
Epoch 18, loss=175.5872
Epoch 19, loss=167.6380
Epoch 20, loss=160.9444
Epoch 21, loss=154.7194
Epoch 22, loss=148.9911
Epoch 23, loss=143.7739
Epoch 24, loss=139.0699
Epoch 25, loss=134.8714
Epoch 26, loss=131.1620
Epoch 27, loss=127.9189
Epoch 28, loss=125.1142
Epoch 29, loss=122.7162
Epoch 30, loss=120.6909


In [5]:
# Create a list for input and output for the neural network (for testing)
nn_input_test = []
nn_output_test = []

# Test the network with given test rho values
for rho in rho_values_test:
    
    # Define the Lorenz equation
    def lorenz_deriv(x_y_z, t0, sigma=sigma, beta=beta, rho=rho):
        x, y, z = x_y_z
        return [sigma * (y - x), x * (rho - z) - y, x * y - beta * z]

    np.random.seed(123)
    x0 = -15 + 30 * np.random.random((100, 3))

    x_t = np.asarray([integrate.odeint(lorenz_deriv, x0_j, t)
                      for x0_j in x0])
    
    for j in range(100):
        nn_input[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,:-1,:]
        nn_output[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,1:,:]
        
    # Convert numpy arrays to PyTorch tensors
    nn_input_tensor = torch.from_numpy(nn_input).float()
    nn_output_tensor = torch.from_numpy(nn_output).float()
    
     # Appending the tensors to a list
    nn_input_test.append(nn_input_tensor)
    nn_output_test.append(nn_output_tensor)

# Concatenate the neural network input and outputs from each rho values
nn_in = torch.cat(nn_input_test)
nn_out = torch.cat(nn_output_test)
    
# Test the network
with torch.no_grad():
    outputs = model(nn_in)
    compute_mse = criterion(outputs, nn_out)

    print('Least squares error of test data: {}'.format(compute_mse.item()))

Least squares error of test data: 95.89716339111328


In [11]:
# Create an LSTM neural network architecture
class LSTMNet(nn.Module):
    def __init__(self, input_size=3, hidden_size=15, num_layers=1, output_size=3):
        super(LSTMNet, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Initialize hidden and cell states
        h0 = torch.zeros(self.num_layers, self.hidden_size)
        c0 = torch.zeros(self.num_layers, self.hidden_size)
        
        # LSTM layer
        out, _ = self.lstm(x, (h0, c0))
        
        # Fully connected layer
        out = self.fc(out[:, :])
        
        return out

    

In [14]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define the model used, which is LSTM
model = LSTMNet().to(device)

# Define the loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

# Create a list for input and output for the neural network (after data preparation)
nn_input_final = []
nn_output_final = []

# Define the NN input and output
nn_input = np.zeros((100*(len(t)-1),3))
nn_output = np.zeros_like(nn_input)

# Data preparation
for rho in rho_values_train:
    
    # Define the Lorenz equation
    def lorenz_deriv(x_y_z, t0, sigma=sigma, beta=beta, rho=rho):
        x, y, z = x_y_z
        return [sigma * (y - x), x * (rho - z) - y, x * y - beta * z]

    np.random.seed(123)
    x0 = -15 + 30 * np.random.random((100, 3))

    x_t = np.asarray([integrate.odeint(lorenz_deriv, x0_j, t)
                      for x0_j in x0])
    
    for j in range(100):
        nn_input[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,:-1,:]
        nn_output[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,1:,:]
        
    # Convert numpy arrays to PyTorch tensors
    nn_input_tensor = torch.from_numpy(nn_input).float()
    nn_output_tensor = torch.from_numpy(nn_output).float()
    
    # Appending the tensors to a list
    nn_input_final.append(nn_input_tensor)
    nn_output_final.append(nn_output_tensor)
    

# Concatenate the neural network input and outputs from each rho values
nn_in = torch.cat(nn_input_final)
nn_out = torch.cat(nn_output_final)

# Train the model
for epoch in range(30):
    optimizer.zero_grad()
    outputs = model(nn_in)
    loss = criterion(outputs, nn_out)
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch+1}, loss={loss.item():.4f}")


Epoch 1, loss=295.5326
Epoch 2, loss=288.6763
Epoch 3, loss=282.2330
Epoch 4, loss=272.1898
Epoch 5, loss=254.3024
Epoch 6, loss=235.2149
Epoch 7, loss=213.6164
Epoch 8, loss=190.4366
Epoch 9, loss=167.3828
Epoch 10, loss=145.9868
Epoch 11, loss=128.4299
Epoch 12, loss=115.1093
Epoch 13, loss=106.6399
Epoch 14, loss=102.1929
Epoch 15, loss=101.4685
Epoch 16, loss=94.1753
Epoch 17, loss=108.5016
Epoch 18, loss=108.4572
Epoch 19, loss=104.8777
Epoch 20, loss=110.0657
Epoch 21, loss=86.2611
Epoch 22, loss=84.1831
Epoch 23, loss=82.5908
Epoch 24, loss=82.7229
Epoch 25, loss=75.6549
Epoch 26, loss=75.8235
Epoch 27, loss=70.1476
Epoch 28, loss=68.6971
Epoch 29, loss=66.0172
Epoch 30, loss=62.9020


In [15]:
# Create a list for input and output for the neural network (for testing)
nn_input_test = []
nn_output_test = []

# Test the network with given test rho values
for rho in rho_values_test:
    
    # Define the Lorenz equation
    def lorenz_deriv(x_y_z, t0, sigma=sigma, beta=beta, rho=rho):
        x, y, z = x_y_z
        return [sigma * (y - x), x * (rho - z) - y, x * y - beta * z]

    np.random.seed(123)
    x0 = -15 + 30 * np.random.random((100, 3))

    x_t = np.asarray([integrate.odeint(lorenz_deriv, x0_j, t)
                      for x0_j in x0])
    
    for j in range(100):
        nn_input[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,:-1,:]
        nn_output[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,1:,:]
        
    # Convert numpy arrays to PyTorch tensors
    nn_input_tensor = torch.from_numpy(nn_input).float()
    nn_output_tensor = torch.from_numpy(nn_output).float()
    
     # Appending the tensors to a list
    nn_input_test.append(nn_input_tensor)
    nn_output_test.append(nn_output_tensor)

# Concatenate the neural network input and outputs from each rho values
nn_in = torch.cat(nn_input_test)
nn_out = torch.cat(nn_output_test)
    
# Test the network
with torch.no_grad():
    outputs = model(nn_in)
    compute_mse = criterion(outputs, nn_out)

    print('Least squares error of test data: {}'.format(compute_mse.item()))

Least squares error of test data: 50.31068801879883


In [16]:
# Create an RNN neural network architecture
class RNNNet(nn.Module):
    def __init__(self, input_size=3, hidden_size=15, num_layers=1, output_size=3):
        super(RNNNet, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        # Initialize hidden state
        h0 = torch.zeros(self.num_layers, self.hidden_size)
        
        # RNN layer
        out, _ = self.rnn(x, h0)
        
        # Fully connected layer
        out = self.fc(out[:, :])  # Use the last output timestep
        
        return out

In [17]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define the model used, which is LSTM
model = RNNNet().to(device)

# Define the loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

# Create a list for input and output for the neural network (after data preparation)
nn_input_final = []
nn_output_final = []

# Define the NN input and output
nn_input = np.zeros((100*(len(t)-1),3))
nn_output = np.zeros_like(nn_input)

# Data preparation
for rho in rho_values_train:
    
    # Define the Lorenz equation
    def lorenz_deriv(x_y_z, t0, sigma=sigma, beta=beta, rho=rho):
        x, y, z = x_y_z
        return [sigma * (y - x), x * (rho - z) - y, x * y - beta * z]

    np.random.seed(123)
    x0 = -15 + 30 * np.random.random((100, 3))

    x_t = np.asarray([integrate.odeint(lorenz_deriv, x0_j, t)
                      for x0_j in x0])
    
    for j in range(100):
        nn_input[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,:-1,:]
        nn_output[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,1:,:]
        
    # Convert numpy arrays to PyTorch tensors
    nn_input_tensor = torch.from_numpy(nn_input).float()
    nn_output_tensor = torch.from_numpy(nn_output).float()
    
    # Appending the tensors to a list
    nn_input_final.append(nn_input_tensor)
    nn_output_final.append(nn_output_tensor)
    

# Concatenate the neural network input and outputs from each rho values
nn_in = torch.cat(nn_input_final)
nn_out = torch.cat(nn_output_final)

# Train the model
for epoch in range(30):
    optimizer.zero_grad()
    outputs = model(nn_in)
    loss = criterion(outputs, nn_out)
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch+1}, loss={loss.item():.4f}")

Epoch 1, loss=292.3287
Epoch 2, loss=255.3023
Epoch 3, loss=210.5301
Epoch 4, loss=160.9973
Epoch 5, loss=122.4227
Epoch 6, loss=105.1868
Epoch 7, loss=107.8777
Epoch 8, loss=107.1276
Epoch 9, loss=118.7423
Epoch 10, loss=103.8654
Epoch 11, loss=106.2003
Epoch 12, loss=98.0878
Epoch 13, loss=96.2881
Epoch 14, loss=94.4343
Epoch 15, loss=89.5607
Epoch 16, loss=93.0507
Epoch 17, loss=93.5552
Epoch 18, loss=91.8467
Epoch 19, loss=89.3242
Epoch 20, loss=86.3346
Epoch 21, loss=82.3205
Epoch 22, loss=77.5896
Epoch 23, loss=74.4349
Epoch 24, loss=73.6426
Epoch 25, loss=73.7057
Epoch 26, loss=73.3200
Epoch 27, loss=70.8493
Epoch 28, loss=68.0691
Epoch 29, loss=64.1548
Epoch 30, loss=65.5668


In [18]:
# Create a list for input and output for the neural network (for testing)
nn_input_test = []
nn_output_test = []

# Test the network with given test rho values
for rho in rho_values_test:
    
    # Define the Lorenz equation
    def lorenz_deriv(x_y_z, t0, sigma=sigma, beta=beta, rho=rho):
        x, y, z = x_y_z
        return [sigma * (y - x), x * (rho - z) - y, x * y - beta * z]

    np.random.seed(123)
    x0 = -15 + 30 * np.random.random((100, 3))

    x_t = np.asarray([integrate.odeint(lorenz_deriv, x0_j, t)
                      for x0_j in x0])
    
    for j in range(100):
        nn_input[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,:-1,:]
        nn_output[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,1:,:]
        
    # Convert numpy arrays to PyTorch tensors
    nn_input_tensor = torch.from_numpy(nn_input).float()
    nn_output_tensor = torch.from_numpy(nn_output).float()
    
     # Appending the tensors to a list
    nn_input_test.append(nn_input_tensor)
    nn_output_test.append(nn_output_tensor)

# Concatenate the neural network input and outputs from each rho values
nn_in = torch.cat(nn_input_test)
nn_out = torch.cat(nn_output_test)
    
# Test the network
with torch.no_grad():
    outputs = model(nn_in)
    compute_mse = criterion(outputs, nn_out)

    print('Least squares error of test data: {}'.format(compute_mse.item()))

Least squares error of test data: 50.16377258300781


In [19]:
# Define the Echo States Network architecture
class ESN(nn.Module):
    def __init__(self, input_size=3, reservoir_size=100, output_size=3):
        super(ESN, self).__init__()
        self.input_size = input_size
        self.reservoir_size = reservoir_size
        self.output_size = output_size
        
        # Reservoir layer
        self.reservoir = nn.Linear(input_size + reservoir_size, reservoir_size)
        
        # Output layer
        self.output = nn.Linear(reservoir_size, output_size)

    def forward(self, x, reservoir_state):
        # Concatenate input with reservoir state
        combined_input = torch.cat([x, reservoir_state], dim=1)
        
        # Reservoir layer
        reservoir_output = torch.tanh(self.reservoir(combined_input))
        
        # Output layer
        output = self.output(reservoir_output)
        
        return output, reservoir_output

In [22]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define the model used, which is LSTM
model = ESN(input_size=3, reservoir_size=100, output_size=3).to(device)

# Define the loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

# Create a list for input and output for the neural network (after data preparation)
nn_input_final = []
nn_output_final = []

# Define the NN input and output
nn_input = np.zeros((100*(len(t)-1),3))
nn_output = np.zeros_like(nn_input)

# Data preparation
for rho in rho_values_train:
    
    # Define the Lorenz equation
    def lorenz_deriv(x_y_z, t0, sigma=sigma, beta=beta, rho=rho):
        x, y, z = x_y_z
        return [sigma * (y - x), x * (rho - z) - y, x * y - beta * z]

    np.random.seed(123)
    x0 = -15 + 30 * np.random.random((100, 3))

    x_t = np.asarray([integrate.odeint(lorenz_deriv, x0_j, t)
                      for x0_j in x0])
    
    for j in range(100):
        nn_input[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,:-1,:]
        nn_output[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,1:,:]
        
    # Convert numpy arrays to PyTorch tensors
    nn_input_tensor = torch.from_numpy(nn_input).float()
    nn_output_tensor = torch.from_numpy(nn_output).float()
    
    # Appending the tensors to a list
    nn_input_final.append(nn_input_tensor)
    nn_output_final.append(nn_output_tensor)
    

# Concatenate the neural network input and outputs from each rho values
nn_in = torch.cat(nn_input_final)
nn_out = torch.cat(nn_output_final)

# Train the model
for epoch in range(30):
    optimizer.zero_grad()
    reservoir_state = torch.zeros(nn_in.size(0), model.reservoir_size)
    outputs, _ = model(nn_in, reservoir_state)
    loss = criterion(outputs, nn_out)
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch+1}, loss={loss.item():.4f}")

Epoch 1, loss=300.5471
Epoch 2, loss=180.8709
Epoch 3, loss=91.5154
Epoch 4, loss=123.4013
Epoch 5, loss=155.3483
Epoch 6, loss=77.4563
Epoch 7, loss=70.3367
Epoch 8, loss=62.7786
Epoch 9, loss=62.4004
Epoch 10, loss=64.0828
Epoch 11, loss=62.2120
Epoch 12, loss=62.3209
Epoch 13, loss=59.2354
Epoch 14, loss=58.2525
Epoch 15, loss=55.6917
Epoch 16, loss=53.9776
Epoch 17, loss=52.9975
Epoch 18, loss=51.8251
Epoch 19, loss=52.0794
Epoch 20, loss=50.7887
Epoch 21, loss=49.1469
Epoch 22, loss=48.3943
Epoch 23, loss=48.1675
Epoch 24, loss=47.9274
Epoch 25, loss=47.4216
Epoch 26, loss=46.8212
Epoch 27, loss=46.4049
Epoch 28, loss=45.8008
Epoch 29, loss=45.0182
Epoch 30, loss=44.3235


In [23]:
nn_input_test_final = []
nn_output_test_final = []

# Test the network with given test rho values
for rho in rho_values_test:
    
    # Define the Lorenz equation
    def lorenz_deriv(x_y_z, t0, sigma=sigma, beta=beta, rho=rho):
        x, y, z = x_y_z
        return [sigma * (y - x), x * (rho - z) - y, x * y - beta * z]

    np.random.seed(123)
    x0 = -15 + 30 * np.random.random((100, 3))

    x_t = np.asarray([integrate.odeint(lorenz_deriv, x0_j, t)
                      for x0_j in x0])
    
    for j in range(100):
        nn_input[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,:-1,:]
        nn_output[j*(len(t)-1):(j+1)*(len(t)-1),:] = x_t[j,1:,:]
        
    # Convert numpy arrays to PyTorch tensors
    nn_in_test_tensor = torch.from_numpy(nn_input).float()
    nn_out_test_tensor = torch.from_numpy(nn_output).float()
    
    # Appending the tensors to a list
    nn_input_test_final.append(nn_in_test_tensor)
    nn_output_test_final.append(nn_out_test_tensor)
    
# Concatenate the neural network input and outputs from each rho values
nn_in_test = torch.cat(nn_input_test_final)
nn_out_test = torch.cat(nn_output_test_final)
    
# Test the network
reservoir_state = torch.zeros(nn_in_test.size(0), model.reservoir_size)
with torch.no_grad():
    outputs, _ = model(nn_in_test, reservoir_state)
    compute_mse = criterion(outputs, nn_out_test)

    print('Least squares error of test data: {}'.format(compute_mse.item()))

Least squares error of test data: 43.1268310546875
