In [107]:
import torch
import torch.nn as nn
import numpy as np
import onnx
from onnxruntime.training import artifacts
import onnxruntime
import torch.nn.functional as F
import coremltools as ct


In [108]:
torch.__version__, onnx.__version__, ct.__version__, onnxruntime.__version__

('2.1.0', '1.14.1', '7.1', '1.16.3')

In [109]:
class LSTMNumberPredictor(nn.Module):
    def __init__(self, num_classes, hidden_dim, num_layers):
        super(LSTMNumberPredictor, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        # LSTM layer
        self.lstm = nn.LSTM(input_size=1, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True)
        
        # Fully connected layer
        self.fc = nn.Linear(hidden_dim, num_classes)
    
    def forward(self, x):
        # Initializing hidden state for first input
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim)
        
        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_dim)
        # out, _ = self.lstm(x)
        
        # Decode the hidden state of the last time step
        out = self.fc(out[:, -1, :])
        return out

# Example usage:
model = LSTMNumberPredictor(num_classes=3, hidden_dim=50, num_layers=1)

In [110]:
import numpy as np

def generate_training_data(data_size):
    # Generate random integers for input data X
    X = np.random.uniform(0.0, 10.0, (data_size, 6))

    # Compute output data y
    y = (np.sum(X, axis=1) / 20)
    y = y.astype(int)
    # Compute Y as the sum of each row in X divided by 3

    return torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.long)

In [111]:
import torch.optim as optim

# Parameters
data_size = 500  # Length of the subsequences

# Generate data
X, y = generate_training_data(data_size)

X_train = X[:400]
y_train = y[:400]
X_test = X[400:]
y_test = y[400:]


In [112]:
print(X[:4], y[:4])

tensor([[8.2320, 4.9128, 6.4885, 3.6173, 1.6645, 6.0213],
        [4.7715, 4.0212, 8.0391, 4.3866, 7.1720, 2.0178],
        [6.7516, 2.3123, 0.4681, 9.3191, 8.5382, 0.5726],
        [5.9035, 8.2438, 3.2320, 0.3564, 4.8108, 8.9278]]) tensor([1, 1, 1, 1])


In [113]:
learning_rate = 0.001
num_epochs = 300     # Number of epochs for training


# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [114]:
# Reshape input suitable for LSTM (batch_size, seq_length, input_size)
X_train = X_train.unsqueeze(-1)
X_test = X_test.unsqueeze(-1)


# Training loop
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()

    # Forward pass
    outputs = model(X_train)
    loss_train = criterion(outputs, y_train)

    outputs_test = model(X_test)
    loss_test = criterion(outputs_test, y_test)

    # Backward and optimize
    loss_train.backward()
    optimizer.step()

    if (epoch+1) % 100 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {loss_train.item():.4f}, Testing Loss: {loss_test.item():.4f}')


Epoch [100/300], Training Loss: 0.4448, Testing Loss: 0.4735
Epoch [200/300], Training Loss: 0.1395, Testing Loss: 0.1596
Epoch [300/300], Training Loss: 0.0622, Testing Loss: 0.0946


In [100]:
model.eval()

# Trace the model with random data.
example_input_for_trace = X_train[:1]

traced_model = torch.jit.trace(model, (example_input_for_trace, ))

In [101]:
example_input_test_trace = X_train[2:3]
out = traced_model(example_input_test_trace)
print(example_input_test_trace, out) # the first element of out is expected to have the largest value

tensor([[[1.5627],
         [3.9146],
         [6.3159],
         [7.3567],
         [6.9889],
         [9.6894]]]) tensor([[-3.8410,  4.3895, -1.7654]], grad_fn=<AddmmBackward0>)


In [102]:
import coremltools as ct

# Using image_input in the inputs parameter:
# Convert to Core ML program using the Unified Conversion API.
model_ct = ct.convert(
    traced_model,
    # convert_to="mlprogram",
    # compute_precision=ct.precision.FLOAT32,
    inputs=[ct.TensorType(shape=example_input_for_trace.shape)],
 )

# check names of input and output
print(model_ct.input_description, model_ct.output_description)


2023-12-22 17:16:58,093 coremltools [INFO] - Converting graph.
2023-12-22 17:16:58,094 coremltools [INFO] - Adding op 'lstm.bias_hh_l0' of type const
2023-12-22 17:16:58,094 coremltools [INFO] - Adding op 'lstm.bias_ih_l0' of type const
2023-12-22 17:16:58,095 coremltools [INFO] - Adding op 'lstm.weight_hh_l0' of type const
2023-12-22 17:16:58,095 coremltools [INFO] - Adding op 'lstm.weight_ih_l0' of type const
2023-12-22 17:16:58,096 coremltools [INFO] - Adding op 'fc.bias' of type const
2023-12-22 17:16:58,097 coremltools [INFO] - Adding op 'fc.weight' of type const
Converting PyTorch Frontend ==> MIL Ops:   0%|          | 0/45 [00:00<?, ? ops/s]2023-12-22 17:16:58,098 coremltools [INFO] - Converting op 4 : constant
2023-12-22 17:16:58,098 coremltools [INFO] - Adding op '4' of type const
2023-12-22 17:16:58,099 coremltools [INFO] - Converting op 5 : size
2023-12-22 17:16:58,099 coremltools [INFO] - Adding op '5_shape' of type shape
2023-12-22 17:16:58,100 coremltools [INFO] - Adding 

Features(x) Features(linear_0)


In [103]:
example_input_test_trace.numpy().shape

(1, 6, 1)

In [104]:

# test inference using converted coreml model
coreml_pred = model_ct.predict({"x": example_input_test_trace.numpy()})['linear_0']
np.argmax(coreml_pred, axis=1)

# save the converted coreml model
model_ct.save("lstm_model.mlpackage")


array([0])

In [105]:

# Define input / output names
input_names = ["seq_input"]
output_names = ["my_output"]

# Convert the PyTorch model to ONNX
torch.onnx.export(model,
                  (example_input_for_trace,),
                  "lstm_model.onnx",
                  verbose=False,
                  input_names=input_names,
                  output_names=output_names,
                  dynamic_axes={'seq_input' : {0: 'batch'},    # variable length axes
                                'my_output' : {0: 'batch'}}
                                )



In [106]:
import onnx
import onnxruntime as ort

# Load the ONNX model
model = onnx.load("lstm_model.onnx")
onnx.checker.check_model(model)

ort_session = ort.InferenceSession("lstm_model.onnx")

seq = example_input_for_trace[:1].numpy()

onnx_pred = ort_session.run(
    ["my_output"],
    {"seq_input": seq},
)