In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import math
import numpy as np
from sklearn.metrics import recall_score

In [2]:
# ### ignore this block
# # test data block, generate data for testing usage
# # generate random values in [-1, 1] range. 30 values in one array and ttl i_size arrays
# i_size = 2000
# x = torch.rand(i_size, 150) * 2 - 1  
# y = torch.zeros(i_size)
# y[x.sum(dim=1) > 0] = 1  # set y to 1 where x sum is positive
# print("Input tensor x shape:", x.shape)
# print("Output tensor y shape:", y.shape)
# print("First 5 elements of x:")
# print(x[:5].sum(dim=1))
# print("First 5 elements of y:")
# print(y[:5])

In [3]:
# read in hit and non_hit csv files
hit_csv_str = './voyager-1-ml/hit.csv'
non_hit_csv_str = '.voyager-1-ml/non_hit.csv'

hit_arr = np.genfromtxt(hit_csv_str, delimiter=',')
non_hit_arr = np.genfromtxt(non_hit_csv_str, delimiter=',')

# Print the arrays
print('hit_arr:', hit_arr.shape)
print('non_hit_arr:', non_hit_arr.shape)

hit_arr: (122, 1600)
non_hit_arr: (244, 1600)


In [4]:
# # form test set and training set
# test_fraction = 0.1

# # Get the number of rows in the arrays
# hit_num_rows, non_num_rows = hit_arr.shape[0], non_hit_arr.shape[0]

# # Shuffle the indices of the rows in each array
# hit_idx = np.random.permutation(hit_num_rows)
# non_idx = np.random.permutation(non_num_rows)

# # Get the indices for the test set and the training set
# hit_num_test = int(hit_num_rows * test_fraction)
# non_num_test = int(non_num_rows * test_fraction)
# hit_test_idx = hit_idx[:hit_num_test]
# hit_train_idx = hit_idx[hit_num_test:]
# non_test_idx = non_idx[:non_num_test]
# non_train_idx = non_idx[non_num_test:]

# # Split the arrays into test and training sets
# hit_arr_test = hit_arr[hit_test_idx]
# hit_arr_test_y = np.ones(hit_arr_test.shape[0])
# hit_arr_train = hit_arr[hit_train_idx]
# hit_arr_train_y = np.ones(hit_arr_train.shape[0])

# non_hit_arr_test = non_hit_arr[non_test_idx]
# non_hit_arr_test_y = np.zeros(non_hit_arr_test.shape[0])
# non_hit_arr_train = non_hit_arr[non_train_idx]
# non_hit_arr_train_y = np.zeros(non_hit_arr_train.shape[0])

# print(f'hit_test shape: {hit_arr_test.shape}')
# print(f'hit_train shape: {hit_arr_train.shape}')
# print(f'non_hit_train shape: {non_hit_arr_train.shape}')
# print(f'non_hit_test shape: {non_hit_arr_test.shape}')

In [5]:
# form test set and training set
test_fraction = 0.05
train_fraction = 1-test_fraction

# Get the number the test and train size of each
hit_train_size = int(train_fraction * len(hit_arr))
hit_test_size = len(hit_arr) - hit_train_size
non_hit_train_size = int(train_fraction * len(non_hit_arr))
non_hit_test_size = len(non_hit_arr) - non_hit_train_size

# Split both sets
hit_train_dataset, hit_test_dataset = torch.utils.data.random_split(hit_arr, [hit_train_size,hit_test_size])
non_hit_train_dataset, non_hit_test_dataset = torch.utils.data.random_split(non_hit_arr, [non_hit_train_size, non_hit_test_size])


In [6]:
# combine the training set and test set
x_test_arr = np.concatenate((hit_test_dataset,non_hit_test_dataset), axis=0)
x_arr = np.concatenate((hit_train_dataset,non_hit_train_dataset), axis=0)
x_test = torch.Tensor(x_test_arr)
x = torch.Tensor(x_arr)

y_test_arr = np.concatenate((np.ones(hit_test_size),np.zeros(non_hit_test_size)))
y_arr = np.concatenate((np.ones(hit_train_size),np.zeros(non_hit_train_size)))
y_test = torch.Tensor(y_test_arr)
y = torch.Tensor(y_arr)

print('x_test shape = {}, x shape = {}, y_test shape = {}, y shape = {}'.format(x_test.shape, x.shape, y_test.shape, y.shape))
print(f'len(y_test) = {len(y_test)}, y_test = {y_test}')

x_test shape = torch.Size([20, 1600]), x shape = torch.Size([346, 1600]), y_test shape = torch.Size([20]), y shape = torch.Size([346])
len(y_test) = 20, y_test = tensor([1., 1., 1., 1., 1., 1., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0.])


In [7]:
# # combine the training set and test set
# x_test_arr = np.concatenate((hit_arr_test,non_hit_arr_test), axis=0)
# x_arr = np.concatenate((hit_arr_train,non_hit_arr_train), axis=0)
# x_test = torch.Tensor(x_test_arr)
# x = torch.Tensor(x_arr)

# y_test_arr = np.concatenate((hit_arr_test_y,non_hit_arr_test_y), axis=0)
# y_arr = np.concatenate((hit_arr_train_y,non_hit_arr_train_y), axis=0)
# y_test = torch.Tensor(y_test_arr)
# y = torch.Tensor(y_arr)

# print('x_test shape = {}, x shape = {}, y_test shape = {}, y shape = {}'.format(x_test.shape, x.shape, y_test.shape, y.shape))
# print(f'len(y_test) = {len(y_test)}, y_test = {y_test}')

In [8]:
# Define the neural network model
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(1600, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, 1)
        self.activation = nn.Sigmoid()

    def forward(self, x):
        x = self.activation(self.fc1(x))
        x = self.activation(self.fc2(x))
        x = self.activation(self.fc3(x))
        x = self.activation(self.fc4(x))
        return x

# Create an instance of the model
model = Net()

# Print the model's architecture
print(model)

Net(
  (fc1): Linear(in_features=1600, out_features=256, bias=True)
  (fc2): Linear(in_features=256, out_features=128, bias=True)
  (fc3): Linear(in_features=128, out_features=64, bias=True)
  (fc4): Linear(in_features=64, out_features=1, bias=True)
  (activation): Sigmoid()
)


In [9]:
# Instantiate the neural network model
model = Net()

# Define the loss function and optimizer
criterion = nn.BCELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

# Train the neural network
for epoch in range(25000):
    # Forward pass
    y_pred = model(x)

    # Compute the loss and recall
    loss = criterion(y_pred.squeeze(), y)
    y_pred_binary = torch.round(y_pred.detach()).squeeze().cpu().numpy()
    recall = recall_score(y.cpu().numpy(), y_pred_binary, average='binary')

    # Zero the gradients
    optimizer.zero_grad()

    # Backward pass
    loss.backward()

    # Update the model parameters
    optimizer.step()

    # Print the loss and recall every 1000 epochs
    if epoch % 1000 == 999:
        print("Epoch {}: Loss = {:.4f}, Recall = {:.4f}".format(epoch, loss.item(), recall))

Epoch 999: Loss = 0.6352, Recall = 0.0000
Epoch 1999: Loss = 0.6345, Recall = 0.0000
Epoch 2999: Loss = 0.6335, Recall = 0.0000
Epoch 3999: Loss = 0.6315, Recall = 0.0000
Epoch 4999: Loss = 0.6272, Recall = 0.0000
Epoch 5999: Loss = 0.6146, Recall = 0.0000
Epoch 6999: Loss = 0.5650, Recall = 0.0000
Epoch 7999: Loss = 0.3975, Recall = 0.5913
Epoch 8999: Loss = 0.1759, Recall = 0.8870
Epoch 9999: Loss = 0.0522, Recall = 0.9913
Epoch 10999: Loss = 0.0211, Recall = 1.0000
Epoch 11999: Loss = 0.0120, Recall = 1.0000
Epoch 12999: Loss = 0.0080, Recall = 1.0000
Epoch 13999: Loss = 0.0059, Recall = 1.0000
Epoch 14999: Loss = 0.0046, Recall = 1.0000
Epoch 15999: Loss = 0.0037, Recall = 1.0000
Epoch 16999: Loss = 0.0031, Recall = 1.0000
Epoch 17999: Loss = 0.0027, Recall = 1.0000
Epoch 18999: Loss = 0.0023, Recall = 1.0000
Epoch 19999: Loss = 0.0021, Recall = 1.0000
Epoch 20999: Loss = 0.0019, Recall = 1.0000
Epoch 21999: Loss = 0.0017, Recall = 1.0000
Epoch 22999: Loss = 0.0015, Recall = 1.0000

In [10]:
# ### ignore this block 
# # Test unit for test data block

# # Create input tensor x_test
# x_test = torch.rand(2000, 150) * 2 - 1  # generate random values in [-1, 1] range

# # Load the trained model
# model = model

# # Test the model on the example inputs
# with torch.no_grad():
#     y_test = model(x_test)
#     y_pred = torch.round(y_test).squeeze().tolist()

# # Print the predictions
# # print("Input tensor x_test:")
# # print(x_test.sum(dim=1))
# # print("Predicted output tensor y_pred:")
# # print(y_pred)

# # Calculate the accuracy of the predictions
# y_true = [1 if x.sum() > 0 else 0 for x in x_test]
# correct = sum([1 if y_pred[i] == y_true[i] else 0 for i in range(len(y_pred))])
# accuracy = correct / len(y_pred)
# print("Accuracy:", accuracy)

In [11]:
# Test unit

# load test dataset and the model
x_test = x_test
y_test = y_test
model = model

# Test the model on the example inputs
with torch.no_grad():
    y_pred = model(x_test)
    y_pred_binary = torch.round(y_pred).squeeze().tolist()

# Print the predictions
# print("Input tensor x_test:")
# print(x_test.sum(dim=1))
# print("Predicted output tensor y_pred:")
# print(y_pred)

# Calculate the accuracy of the predictions
y_pred_binary=torch.tensor(y_pred_binary)
# convert to NumPy array and set print options
np.set_printoptions(precision=4, suppress=True)
y_pred_np = y_pred.T.numpy()
print(y_pred_np)
print(y_pred_binary)
print(y_test)
print(f'length of test: {len(y_test)}')

accuracy = torch.sum(y_pred_binary == y_test).float() / len(y_test)
print('Test accuracy:', accuracy.item())

[[0.029  0.4071 0.9967 0.9966 0.0004 0.0008 0.0006 0.0425 0.0041 0.0008
  0.0136 0.3331 0.0006 0.0025 0.0002 0.0009 0.0008 0.0003 0.0031 0.0031]]
tensor([0., 0., 1., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0.])
tensor([1., 1., 1., 1., 1., 1., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0.])
length of test: 20
Test accuracy: 0.75


In [12]:
# save the model :D
model = model
path = 'voyager_dust_impact_4layer_model_v1.pt'
torch.save(model.state_dict(), path)
print('model saved!')

model saved!
