# Chapter-12 ONNX model from scratch and Custom Operators

#### In this notebook, we will try to develop ONNX model from scratch using building blocks provided by ONNX. Then we will also learn how to add custom operators in ONNX.

### Step-1 Developing ONNX model from scratch using ONNX's building blocks

##### For this example we will develop a simple 3 layer MLP model with relu activation.

In [1]:
# Install prerequisites
!pip install onnx==1.18.0 onnxruntime==1.22.0 onnxscript==0.2.4 netron==8.4.3
!pip install torch==2.6.0 torchvision==0.21.0 torchaudio==2.6.0 \
    --index-url https://download.pytorch.org/whl/cpu

Collecting onnxscript==0.2.4
  Downloading onnxscript-0.2.4-py3-none-any.whl (705 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m705.4/705.4 KB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: onnxscript
Successfully installed onnxscript-0.2.4
Looking in indexes: https://download.pytorch.org/whl/cpu


In [2]:
# import required modules

import onnx
from onnx import helper
from onnx import TensorProto
import numpy as np
import IPython
import netron
import onnxruntime as ort

In [3]:
# model specification
input_size = 4
hidden_size = 10
output_size = 2

onnx_opset = 18
ir_version = 10
onnx_model_path = "mlp_model.onnx"

##### Define input and output tensors of the graph

In [4]:
# Define input tensor (with dynamic batch size)
X = helper.make_tensor_value_info(
    name="input",
    elem_type=TensorProto.FLOAT,
    shape=["batch", input_size]  # [batch_size, input_size]
)

# Define output tensor
Y = helper.make_tensor_value_info(
    name="output",
    elem_type=TensorProto.FLOAT,
    shape=["batch", output_size]  # [batch_size, output_size]
)

##### Create weights and biases as initializer of the model

In [5]:
# Weights and biases for the first layer (input -> hidden)
W1 = helper.make_tensor(
    name="W1",
    data_type=TensorProto.FLOAT,
    dims=[input_size, hidden_size],
    vals=np.random.randn(input_size, hidden_size).astype(np.float32).flatten().tolist()
)

B1 = helper.make_tensor(
    name="B1",
    data_type=TensorProto.FLOAT,
    dims=[hidden_size],
    vals=np.random.randn(hidden_size).astype(np.float32).flatten().tolist()
)

# Weights and biases for the second layer (hidden -> output)
W2 = helper.make_tensor(
    name="W2",
    data_type=TensorProto.FLOAT,
    dims=[hidden_size, output_size],
    vals=np.random.randn(hidden_size, output_size).astype(np.float32).flatten().tolist()
)

B2 = helper.make_tensor(
    name="B2",
    data_type=TensorProto.FLOAT,
    dims=[output_size],
    vals=np.random.randn(output_size).astype(np.float32).flatten().tolist()
)

##### Create nodes of the model
##### Input -> Matmul -> Add -> Relu -> Matmul -> Add -> Output

In [6]:
# Node 1: X * W1 + B1
node_matmul1 = helper.make_node(
    op_type="MatMul", inputs=["input", "W1"], outputs=["matmul1_out"], name="matmul1"
)

node_add1 = helper.make_node(
    op_type="Add", inputs=["matmul1_out", "B1"], outputs=["add1_out"], name="add1"
)

# Node 2: ReLU activation
node_relu = helper.make_node(
    op_type="Relu", inputs=["add1_out"], outputs=["relu_out"], name="relu"
)

# Node 3: relu_out * W2 + B2
node_matmul2 = helper.make_node(
    op_type="MatMul", inputs=["relu_out", "W2"], outputs=["matmul2_out"], name="matmul2"
)

node_add2 = helper.make_node(
    op_type="Add", inputs=["matmul2_out", "B2"], outputs=["output"], name="add2"
)

##### Assemble the graph

In [7]:
graph = helper.make_graph(
    nodes=[node_matmul1, node_add1, node_relu, node_matmul2, node_add2],
    name="2_layer_mlp",
    inputs=[X],
    outputs=[Y],
    initializer=[W1, B1, W2, B2]
)

##### Create model out of the graph and save ONNX model on the disk

In [8]:
model = helper.make_model(
    graph,
    producer_name="onnx-mlp-example",
    opset_imports=[helper.make_opsetid("", onnx_opset)],
    ir_version=ir_version, # ONNX opset version
)

# Save the model
onnx.save(model, onnx_model_path)

##### Visualize the ONNX model

In [9]:
port = 6006
netron.start(onnx_model_path, port, browse=False)
IPython.display.IFrame(f"http://localhost:{port}", width=1000, height=500)

Serving 'mlp_model.onnx' at http://localhost:6006


##### Model execution using ONNX Runtime

In [10]:
# helper function to run model

def generate_random_input(model_path):
    # Load model and inspect input shape
    sess = ort.InferenceSession(model_path)
    input_name = sess.get_inputs()[0].name
    input_shape = sess.get_inputs()[0].shape
    
    # Replace dynamic dimensions (None) with 1
    concrete_shape = [1 if isinstance(dim, str) else dim for dim in input_shape]
    
    # Generate random data
    return {input_name: np.random.rand(*concrete_shape).astype(np.float32)}


def run_model(model_path):
    # Create inference session
    sess = ort.InferenceSession(model_path)
    
    # Generate inputs
    inputs = generate_random_input(model_path)
    input_name = list(inputs.keys())[0]
    
    # Run model
    outputs = sess.run(None, inputs)
    
    # Print results
    print(f"\nInput shape: {inputs[input_name].shape}")
    print(f"Output shape: {outputs[0].shape}")
    print("\nSample input values:")
    print(inputs[input_name][0, :5])  # First 5 elements of first sample
    print("\nSample output values:")
    print(outputs[0][0, :5])
    
    return outputs

In [11]:
# Run the model with generated inputs
outputs = run_model(onnx_model_path)


Input shape: (1, 4)
Output shape: (1, 2)

Sample input values:
[0.6841185  0.77549195 0.66654027 0.2499431 ]

Sample output values:
[1.6343086 1.7021872]


### Step-2 Developing ONNX model from scratch using ONNX Script

##### We will try to build the same model via ONNX Script

In [12]:
import onnxscript
from onnxscript import opset18 as op
from onnxscript import FLOAT
import numpy as np

onnx_model_path_onnxscript = "mlp_model_onnxscript.onnx"

# Define model
@onnxscript.script()
def two_layer_nn(input: FLOAT["batch", 4]) -> FLOAT["batch", 2]:
    # Layer 1: Linear (X * W1 + B1)

    # Define weights and biases as constants
    W1 = op.Constant(value=np.random.randn(4, 10).astype(np.float32))
    B1 = op.Constant(value=np.random.randn(10).astype(np.float32))
    # Define Matmul layer followed by Add
    hidden = op.Add(op.MatMul(input, W1), B1)
    
    # ReLU activation
    hidden = op.Relu(hidden)
    
    # Layer 2: Linear (hidden * W2 + B2)
    # Define weights and biases as constants
    W2 = op.Constant(value=np.random.randn(10, 2).astype(np.float32))
    B2 = op.Constant(value=np.random.randn(2).astype(np.float32))
    # Define Matmul layer followed by Add
    output = op.Add(op.MatMul(hidden, W2), B2)
    
    return output

# Convert to ONNX model
model = two_layer_nn.to_model_proto()

# Save to ONNX model
onnx.save(model, onnx_model_path_onnxscript)

  param_schemas = callee.param_schemas()


In [13]:
port = 6007
netron.start(onnx_model_path_onnxscript, port, browse=False)
IPython.display.IFrame(f"http://localhost:{port}", width=1000, height=500)

Serving 'mlp_model_onnxscript.onnx' at http://localhost:6007


##### Model execution by calling ONNX Script model

In [14]:
dummy_input = np.random.randn(1, 4).astype(np.float32)
dummy_output = two_layer_nn(dummy_input)

print("Sample input values:")
print(dummy_input)

print("Sample output values:")
print(dummy_output)

Sample input values:
[[-0.8897543 -1.3789165  1.4738988  1.2751241]]
Sample output values:
[[ 1.8809249 -1.6911085]]


  param_schemas = function.param_schemas()


### Step-3 Custom Operator in ONNX

#### 3.1 Using ONNX Building Blocks

In [15]:
import onnx
from onnx import helper, TensorProto, numpy_helper


# Build SwiGLU ONNX Function Proto
def get_swiglu_function_proto(op_name, input_names, output_names, domain, opset):
    # Define SwiGLU as a FunctionProto
    x_inp, W_inp, V_inp = input_names
    output = output_names[0]

    swiglu_fn = helper.make_function(
        domain=domain,  # Custom domain to avoid conflicts
        fname=op_name,
        inputs=input_names,
        outputs=output_names,
        nodes=[
            # xW = MatMul(x, W)
            helper.make_node("MatMul", [x_inp, W_inp], ["xW"], name="matmul_xW"),
            # sigmoid_xW = Sigmoid(xW)
            helper.make_node("Sigmoid", ["xW"], ["sigmoid_xW"], name="sigmoid"),
            # swish_xW = Mul(xW, sigmoid_xW)
            helper.make_node("Mul", ["xW", "sigmoid_xW"], ["swish_xW"], name="swish"),
            # xV = MatMul(x, V)
            helper.make_node("MatMul", [x_inp, V_inp], ["xV"], name="matmul_xV"),
            # output = Mul(swish_xW, xV)
            helper.make_node("Mul", ["swish_xW", "xV"], [output], name="mul_swish"),
        ],
        opset_imports=[helper.make_opsetid("", opset)],  # ONNX opset version
    )
    return swiglu_fn


# Build the Model Graph
def construct_graph():
    # Define Model Weights (Initializers)
    # Layer 1: 4x10 weight + 10 bias
    W1 = numpy_helper.from_array(np.random.randn(4, 10).astype(np.float32), name="W1")
    B1 = numpy_helper.from_array(np.random.randn(10).astype(np.float32), name="B1")

    # SwiGLU: Two 10x10 weights (W and V)
    W_swiglu = numpy_helper.from_array(
        np.random.randn(10, 10).astype(np.float32), name="W_swiglu"
    )
    V_swiglu = numpy_helper.from_array(
        np.random.randn(10, 10).astype(np.float32), name="V_swiglu"
    )

    # Layer 2: 10x2 weight + 2 bias
    W2 = numpy_helper.from_array(np.random.randn(10, 2).astype(np.float32), name="W2")
    B2 = numpy_helper.from_array(np.random.randn(2).astype(np.float32), name="B2")

    graph = helper.make_graph(
        name="SwiGLU_Linear_Model",
        inputs=[
            helper.make_tensor_value_info("input", TensorProto.FLOAT, ["batch", 4]),
        ],
        outputs=[
            helper.make_tensor_value_info("output", TensorProto.FLOAT, ["batch", 2]),
        ],
        nodes=[
            # Layer 1: MatMul -> Add
            helper.make_node(
                "MatMul", ["input", "W1"], ["matmul1_out"], name="matmul1"
            ),
            helper.make_node("Add", ["matmul1_out", "B1"], ["add1_out"], name="add1"),
            # SwiGLU
            helper.make_node(
                "SwiGLU",
                inputs=["add1_out", "W_swiglu", "V_swiglu"],
                outputs=["swiglu_out"],
                domain=custom_op_domain,
                name="swiglu",
            ),
            # Layer 2: MatMul -> Add
            helper.make_node(
                "MatMul", ["swiglu_out", "W2"], ["matmul2_out"], name="matmul2"
            ),
            helper.make_node("Add", ["matmul2_out", "B2"], ["output"], name="add2"),
        ],
        initializer=[W1, B1, W_swiglu, V_swiglu, W2, B2],
    )
    return graph


# Create and Save the Model
def construct_model_proto(custom_op_domain, custom_op_opset, model_opset, ir_version):
    swiglu_fn = get_swiglu_function_proto(
        "SwiGLU", ["x", "W", "V"], ["output"], custom_op_domain, model_opset
    )

    graph = construct_graph()

    model = helper.make_model(
        graph,
        functions=[swiglu_fn],
        opset_imports=[
            helper.make_opsetid("", model_opset),  # ONNX opset
            helper.make_opsetid(custom_op_domain, custom_op_opset),  # Custom domain
        ],
        ir_version=ir_version,
    )
    return model


custom_op_domain = "my.custom_op"
custom_op_opset = 1
model_opset = 18
ir_version = 10
model = construct_model_proto(custom_op_domain, custom_op_opset, model_opset, ir_version)

swiglu_model_onnx_apis = "swiglu_model_onnx_apis.onnx"
onnx.save(model, swiglu_model_onnx_apis)
print("Saved ONNX model with SwiGLU!")


Saved ONNX model with SwiGLU!


In [16]:
port = 6008
netron.start(swiglu_model_onnx_apis, port, browse=False)
IPython.display.IFrame(f"http://localhost:{port}", width=1000, height=500)

Serving 'swiglu_model_onnx_apis.onnx' at http://localhost:6008


In [17]:
# Run the model with generated inputs
outputs = run_model(swiglu_model_onnx_apis)


Input shape: (1, 4)
Output shape: (1, 2)

Sample input values:
[0.21669818 0.2543661  0.14980361 0.985594  ]

Sample output values:
[26.734856 37.842915]


#### 3.2 Using ONNX Script tool

In [18]:
import numpy as np
import onnxscript
from onnxscript import opset18 as op
from onnxscript import FLOAT, script

custom_op_domain = "my.custom_op"
custom_op_opset = 1
model_opset = 18
ir_version = 10
custom_opset = onnxscript.values.Opset(custom_op_domain, custom_op_opset)

# Define the custom SwiGLU function using ONNX Script
@onnxscript.script(opset=custom_opset)
def SwiGLU(x: FLOAT[...], W: FLOAT[...], V: FLOAT[...]) -> FLOAT[...]:
    xW = op.MatMul(x, W)
    sigmoid_xW = op.Sigmoid(xW)
    swish_xW = op.Mul(xW, sigmoid_xW)
    xV = op.MatMul(x, V)
    output = op.Mul(swish_xW, xV)
    return output

# Define the main model
@onnxscript.script()
def SwiGLU_Model(input: FLOAT["batch", 4]) -> FLOAT["batch", 2]:
    # Layer 1 weights
    W1 = op.Constant(value=np.random.randn(4, 10).astype(np.float32))
    B1 = op.Constant(value=np.random.randn(10).astype(np.float32))
    
    # SwiGLU weights
    W_swiglu = op.Constant(value=np.random.randn(10, 10).astype(np.float32))
    V_swiglu = op.Constant(value=np.random.randn(10, 10).astype(np.float32))
    
    # Layer 2 weights
    W2 = op.Constant(value=np.random.randn(10, 2).astype(np.float32))
    B2 = op.Constant(value=np.random.randn(2).astype(np.float32))
    
    # Layer 1: MatMul -> Add
    matmul1_out = op.MatMul(input, W1)
    add1_out = op.Add(matmul1_out, B1)
    
    # SwiGLU
    swiglu_out = SwiGLU(add1_out, W_swiglu, V_swiglu)
    
    # Layer 2: MatMul -> Add
    matmul2_out = op.MatMul(swiglu_out, W2)
    output = op.Add(matmul2_out, B2)
    
    return output

# Convert to ONNX model
model = SwiGLU_Model.to_model_proto()

# Save the model
swiglu_model_onnx_script = "swiglu_model_onnx_script.onnx"
onnx.save(model, swiglu_model_onnx_script)
print("Saved ONNX model with SwiGLU using ONNX Script!")

Saved ONNX model with SwiGLU using ONNX Script!


In [19]:
port = 6009
netron.start(swiglu_model_onnx_script, port, browse=False)
IPython.display.IFrame(f"http://localhost:{port}", width=1000, height=500)

Serving 'swiglu_model_onnx_script.onnx' at http://localhost:6009


In [20]:
# Run the model with generated inputs
outputs = run_model(swiglu_model_onnx_script)


Input shape: (1, 4)
Output shape: (1, 2)

Sample input values:
[0.6029944  0.81271684 0.7339709  0.5977562 ]

Sample output values:
[ -1.5772581 -17.073538 ]


#### 3.3 Using Pytorch export

In [21]:
import torch
import torch.nn as nn
import onnxscript

custom_op_domain = "my_custom_op"
custom_op_opset = 1
model_opset = 18
swiglu_model_pytorch_export = "swiglu_pytorch_exported.onnx"
custom_opset = onnxscript.values.Opset(custom_op_domain, custom_op_opset)


# Define the custom SwiGLU function using ONNX Script
@script(opset=custom_opset)
def SwiGLU(x: FLOAT[...], W: FLOAT[...], V: FLOAT[...]) -> FLOAT[...]:
    xW = op.MatMul(x, W)
    sigmoid_xW = op.Sigmoid(xW)
    swish_xW = op.Mul(xW, sigmoid_xW)
    xV = op.MatMul(x, V)
    output = op.Mul(swish_xW, xV)
    return output


# 1. Define the SwiGLU custom PyTorch operation
class SwiGLUFunction(torch.autograd.Function):
    @staticmethod
    def forward(ctx, x, W, V):
        ctx.save_for_backward(x, W, V)
        xW = torch.matmul(x, W)
        sigmoid = torch.sigmoid(xW)
        swish = xW * sigmoid
        xV = torch.matmul(x, V)
        return swish * xV

    @staticmethod
    def symbolic(g, x, W, V):
        return g.onnxscript_op(SwiGLU, x, W, V).setType(
            x.type().with_sizes([None, V.type().sizes()[1]])
        )


# 2. Create PyTorch model
class SwiGLUModel(nn.Module):
    def __init__(self, in_dim, out_dim, hidden_dim):
        super().__init__()
        self.linear1 = nn.Linear(in_dim, hidden_dim, bias=True)
        self.W_swiglu = nn.Parameter(torch.randn(hidden_dim, hidden_dim))
        self.V_swiglu = nn.Parameter(torch.randn(hidden_dim, hidden_dim))
        self.linear2 = nn.Linear(hidden_dim, out_dim, bias=True)

    def forward(self, x):
        x = self.linear1(x)
        x = SwiGLUFunction.apply(x, self.W_swiglu, self.V_swiglu)
        return self.linear2(x)


# 3. Register the custom symbolic function
torch.onnx.register_custom_op_symbolic(
    f"{custom_op_domain}::SwiGLU",
    SwiGLUFunction.symbolic,
    opset_version=model_opset,
)

# 4. Create and export the model
model = SwiGLUModel(4, 2, 10)
dummy_input = torch.randn(1, 4)  # Single example

# Export to ONNX
torch.onnx.export(
    model,
    dummy_input,  # Pass the tensor directly
    swiglu_model_pytorch_export,
    input_names=["input"],
    output_names=["output"],
    dynamic_axes={"input": {0: "batch"}, "output": {0: "batch"}},
    opset_version=model_opset,
    custom_opsets={custom_op_domain: custom_op_opset},  # Must match registration domain
)

print("Successfully exported PyTorch model with custom SwiGLU op!")


Successfully exported PyTorch model with custom SwiGLU op!


In [22]:
port = 6010
netron.start(swiglu_model_pytorch_export, port, browse=False)
IPython.display.IFrame(f"http://localhost:{port}", width=1000, height=500)

Serving 'swiglu_pytorch_exported.onnx' at http://localhost:6010


In [23]:
# Run the model with generated inputs
outputs = run_model(swiglu_model_pytorch_export)


Input shape: (1, 4)
Output shape: (1, 2)

Sample input values:
[0.66730917 0.4363972  0.39824814 0.6516445 ]

Sample output values:
[-0.68956697  0.33889443]
