### Load PyTorch Model

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
from torch.utils.data import TensorDataset, DataLoader
import torch.nn.functional as F

# Import the s4 model path
import sys
# PATH_TO_S4_REPO = "/Users/poomchan/Developer/s4"
PATH_TO_S4_REPO = "/Users/poomchan/Developer/light-har/code/s4"
sys.path.append(PATH_TO_S4_REPO)
# from models.s4.s4d import S4D
from s4 import S4Block
# from s4d import S4D
import s4d2

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import coremltools as ct

CUDA extension for structured kernels (Cauchy and Vandermonde multiplication) not found. Install by going to extensions/kernels/ and running `python setup.py install`, for improved speed and memory efficiency. Note that the kernel changed for state-spaces 4.0 and must be recompiled.
Falling back on slow Cauchy and Vandermonde kernel. Install at least one of pykeops or the CUDA extension for better speed and memory efficiency.


In [2]:
print(f"torch.__version__ = {torch.__version__}")
print(f"coremltools.__version__ = {ct.__version__}")

torch.__version__ = 2.1.0.post100
coremltools.__version__ = 7.1


In [3]:
import importlib
importlib.reload(s4d2)

class S4Model(nn.Module):
    def __init__(
        self,
        d_input,
        d_output,
        d_model=256,
        n_layers=4,
        dropout=0.2,
        lr=0.001,
        dropout_fn=nn.Dropout,
        prenorm=False,
    ):
        super().__init__()

        self.prenorm = prenorm

        # Linear encoder (d_input = 1 for grayscale and 3 for RGB)
        self.encoder = nn.Linear(d_input, d_model)

        # Stack S4 layers as residual blocks
        self.s4_layers = nn.ModuleList()
        self.norms = nn.ModuleList()
        self.dropouts = nn.ModuleList()
        for _ in range(n_layers):
            self.s4_layers.append(
                s4d2.S4D(d_model, dropout=dropout, transposed=True, lr=lr)
            )
            self.norms.append(nn.LayerNorm(d_model))
            self.dropouts.append(dropout_fn(dropout))

        # Linear decoder
        self.decoder = nn.Linear(d_model, d_output)

    def forward(self, x):
        """
        Input x is shape (B, L, d_input)
        """
        x = self.encoder(x)  # (B, L, d_input) -> (B, L, d_model)

        x = x.transpose(-1, -2)  # (B, L, d_model) -> (B, d_model, L)
        for layer, norm, dropout in zip(self.s4_layers, self.norms, self.dropouts):
            # Each iteration of this loop will map (B, d_model, L) -> (B, d_model, L)

            z = x
            if self.prenorm:
                # Prenorm
                z = norm(z.transpose(-1, -2)).transpose(-1, -2)

            # Apply S4 block: we ignore the state input and output
            z, _ = layer(z)

            # Dropout on the output of the S4 block
            z = dropout(z)

            # Residual connection
            x = z + x

            if not self.prenorm:
                # Postnorm
                x = norm(x.transpose(-1, -2)).transpose(-1, -2)

        x = x.transpose(-1, -2)

        # Pooling: average pooling over the sequence length
        x = x.mean(dim=1)

        # Decode the outputs
        x = self.decoder(x)  # (B, d_model) -> (B, d_output)

        return x

# import the PyTorch model.
model = S4Model(
    d_input=3, # num of feature
    d_output=6, # 6 classes
    d_model=16,
    n_layers=4,
    dropout=0.0,
    lr=0.001,
    dropout_fn=nn.Dropout,
    prenorm=False,
)

model.eval()
# model_path = "/Users/poomchan/Developer/light-har/code/s4/models/s4-d16.pt"
# model.load_state_dict(torch.load(model_path, map_location='cpu'))


S4Model(
  (encoder): Linear(in_features=3, out_features=16, bias=True)
  (s4_layers): ModuleList(
    (0-3): 4 x S4D(
      (activation): GELU(approximate='none')
      (dropout): Identity()
      (output_linear): Sequential(
        (0): Conv1d(16, 32, kernel_size=(1,), stride=(1,))
        (1): GLU(dim=-2)
      )
    )
  )
  (norms): ModuleList(
    (0-3): 4 x LayerNorm((16,), eps=1e-05, elementwise_affine=True)
  )
  (dropouts): ModuleList(
    (0-3): 4 x Dropout(p=0.0, inplace=False)
  )
  (decoder): Linear(in_features=16, out_features=6, bias=True)
)

### Convert to CoreML

In [4]:
import torch
import coremltools as ct

class MyModel(torch.nn.Module):
    def forward(self, x):
        real_part = x
        imag_part = torch.zeros_like(x)  # Assuming imaginary part is initially zero
        return real_part, imag_part

m = MyModel().eval()
x = torch.Tensor([1, 2, 3])
m = torch.jit.trace(m, x)

# Now convert the modified model
ct.convert(m, inputs=[ct.TensorType(shape=x.shape)])

When both 'convert_to' and 'minimum_deployment_target' not specified, 'convert_to' is set to "mlprogram" and 'minimum_deployment_targer' is set to ct.target.iOS15 (which is same as ct.target.macOS12). Note: the model will not run on systems older than iOS15/macOS12/watchOS8/tvOS15. In order to make your model run on older system, please set the 'minimum_deployment_target' to iOS14/iOS13. Details please see the link: https://coremltools.readme.io/docs/unified-conversion-api#target-conversion-formats
Tuple detected at graph output. This will be flattened in the converted model.
Converting PyTorch Frontend ==> MIL Ops:  71%|███████▏  | 5/7 [00:00<00:00, 1776.80 ops/s]
Running MIL frontend_pytorch pipeline: 100%|██████████| 5/5 [00:00<00:00, 7174.66 passes/s]
Running MIL default pipeline: 100%|██████████| 71/71 [00:00<00:00, 7030.28 passes/s]
Running MIL backend_mlprogram pipeline: 100%|██████████| 12/12 [00:00<00:00, 25191.02 passes/s]


input {
  name: "x"
  type {
    multiArrayType {
      shape: 3
      dataType: FLOAT32
    }
  }
}
output {
  name: "x"
  type {
    multiArrayType {
      shape: 3
      dataType: FLOAT32
    }
  }
}
output {
  name: "var_7"
  type {
    multiArrayType {
      shape: 3
      dataType: FLOAT32
    }
  }
}
metadata {
  userDefined {
    key: "com.github.apple.coremltools.source"
    value: "torch==2.1.0.post100"
  }
  userDefined {
    key: "com.github.apple.coremltools.source_dialect"
    value: "TorchScript"
  }
  userDefined {
    key: "com.github.apple.coremltools.version"
    value: "7.1"
  }
}

In [None]:
import torch
import coremltools as ct

class MyModel(torch.nn.Module):
    def forward(self, x):
        return torch.complex(x, x)

m = MyModel().eval()
x = torch.Tensor([1, 2, 3])
m = torch.jit.trace(m, x)
ct.convert(m, inputs=[ct.TensorType(shape=x.shape)])

In [4]:
import coremltools as ct

example_input = torch.randn(1, 16, 3)

model.to('cpu')
model.eval()
traced_model = torch.jit.trace(model, example_input)
traced_model(example_input)

# Convert to Core ML program using the Unified Conversion API.
coreml_model = ct.convert(
    traced_model,
    convert_to="mlprogram",
    source="pytorch",
    inputs=[ct.TensorType(shape=example_input.shape)]
)

Converting PyTorch Frontend ==> MIL Ops:   0%|          | 0/245 [00:00<?, ? ops/s]Saving value type of int64 into a builtin type of int32, might lose precision!
Converting PyTorch Frontend ==> MIL Ops:  16%|█▋        | 40/245 [00:00<00:00, 1993.89 ops/s]


ValueError: Op "65" (op_type: mul) Input x="complex_rfft_1" expects tensor or scalar of dtype from type domain ['fp16', 'fp32', 'int32'] but got tensor[1,16,17,complex64]

In [7]:
from coremltools.converters.mil.frontend.torch.edgeir_utils import extract_inputs_from_edge_program
from coremltools.converters.mil.frontend.torch.torchscript_utils import _expand_and_optimize_ir

example_input = torch.randn(1, 16, 3)

model.to('cpu')
model.eval()
traced_model = torch.jit.trace(model, example_input)
traced_model(example_input)

raw_graph, params_dict, buffer_dict = _expand_and_optimize_ir(traced_model)

for node in raw_graph.nodes():
    attr = {}
    for name in node.attributeNames():
        if node.kindOf(name) == "cs":
            # attr[name] = node.c(name)
            print(node)
            attr[name] = getattr(node, "c")(name)
        else:
            attr[name] = getattr(node, node.kindOf(name))(name)
    # print(attr)


### Perform Quantization

In [None]:
import coremltools.optimize.coreml as cto
import coremltools as ct

coreml_model = ct.models.MLModel("s4.mlpackage")
coreml_model

In [None]:
op_config = cto.OpLinearQuantizerConfig(mode="linear_symmetric", weight_threshold=512)
config = cto.OptimizationConfig(global_config=op_config)

quantized_model = cto.linear_quantize_weights(coreml_model, config=config)

# # Save the model
# model_path = "quantized-s4.mlpackage"
# quantized_model.save("quantized-s4.mlpackage")

In [None]:
from torch._C import Node

### Test the model

In [None]:
from numpy import genfromtxt
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

# Load Data
x = genfromtxt('../Data/WISDM_x.csv', delimiter=',')
y_df = pd.read_csv('../Data/WISDM_y.csv')
y = y_df.values.flatten()  # Flatten if y is 2D

# Encode labels
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

# Function to create time series dataset
def create_series(x, y, timestep, overlap):
    slide_step = int(timestep * (1 - overlap))
    data_num = int((len(x) / slide_step) - 1)
    dataset = np.ndarray(shape=(data_num, timestep, x.shape[1]))
    labels = []

    for i in range(data_num):
        labels.append(y[slide_step * (i + 1) - 1])
        for j in range(timestep):
            dataset[i, j, :] = x[slide_step * i + j, :]

    return dataset, np.array(labels)

# Create time series
timestep = 16  # Replace with your value
overlap = 0.5  # Replace with your value
X_series, y_series = create_series(x, y_encoded, timestep, overlap)

# Split into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X_series, y_series, test_size=0.2, random_state=42)
print(f'X_train shape:{X_train.shape}, X_test shape:{X_test.shape}, y_train shape:{y_train.shape}, y_test shape:{y_test.shape}')

# Convert arrays to PyTorch Tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)  # Assuming y_train is class labels for classification
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)

# Creating TensorDatasets
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

# Creating DataLoaders
batch_size = 64
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
from pathlib import Path

def coreml_metrics(model_name, X_test, y_test, model_path):
    predictions = []
    for i in range(len(X_test)):
        X_test_sample = X_test[i].view(1, 3, 16)
        #X_test_new = np.expand_dims(X_test[id], axis=0)
        output_dict = model_name.predict({'x': X_test_sample.numpy()})
        pred_class = np.argmax(output_dict['var_75'])
        predictions.append(pred_class)
    
    accuracy = np.sum(np.array(predictions) == y_test.numpy()) / len(predictions)
    print("Accuracy:", accuracy)
    
    model_file = Path(model_path)
    
    # Size in bytes
    model_size_bytes = model_file.stat().st_size
    
    # Convert size to kilobytes (optional)
    model_size_kb = model_size_bytes / 1024
    print(f"Size of the model: {model_size_kb:.2f} KB")

In [None]:
coreml_metrics(quantized_model, X_test, y_test, model_path)