Implementation of PointNet for ModelNet10 classification was taken from https://github.com/keras-team/keras-io/blob/master/examples/vision/pointnet.py

In [None]:
import os
import glob
import trimesh
import numpy as np
import tiledb
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from matplotlib import pyplot as plt

tf.random.set_seed(1234)

NUM_POINTS = 2048
NUM_CLASSES = 10
BATCH_SIZE = 32

## Load dataset
We use the ModelNet10 model dataset, the smaller 10 class version of the ModelNet40
dataset. First download the data:

In [None]:
DATA_DIR = tf.keras.utils.get_file(
    "modelnet.zip",
    "http://3dvision.princeton.edu/projects/2014/3DShapeNets/ModelNet10.zip",
    extract=True,
)
DATA_DIR = os.path.join(os.path.dirname(DATA_DIR), "ModelNet10")

In [None]:
# Where all our data live.
DATA_PATH = "data/"

# Where our tileDB arrays live.
TILEDB_PATH = "data/tiledb"

# Where trained models live
MODEL_PATH = "data/trained_models"

if not os.path.exists(DATA_PATH):
    os.mkdir(DATA_PATH)

if not os.path.exists(TILEDB_PATH):
    os.mkdir(TILEDB_PATH)

if not os.path.exists(MODEL_PATH):
    os.mkdir(MODEL_PATH)

os.system("mv ~/.keras/datasets/ModelNet10 ./data")


Function for ingestion in TileDB


In [None]:
def ingest_in_tiledb(num_points=2048):

    train_points = []
    train_labels = []
    test_points = []
    test_labels = []
    class_map = {}
    folders = glob.glob(os.path.join("data/ModelNet10", "[!README]*"))

    for i, folder in enumerate(folders):
        print("processing class: {}".format(os.path.basename(folder)))
        # store folder name with ID so we can retrieve later
        class_map[i] = folder.split("/")[-1]
        # gather all files
        train_files = glob.glob(os.path.join(folder, "train/*"))
        test_files = glob.glob(os.path.join(folder, "test/*"))

        for f in train_files:
            train_points.append(trimesh.load(f).sample(num_points))
            train_labels.append(i)

        for f in test_files:
            test_points.append(trimesh.load(f).sample(num_points))
            test_labels.append(i)

    train_points = np.stack(train_points, axis=0).astype(np.float32)
    train_labels = np.array(train_labels).astype(np.float32)

    test_points = np.stack(test_points, axis=0).astype(np.float32)
    test_labels = np.array(test_labels).astype(np.float32)

    # Shuffle point and label data in the same manner

    # TRAIN DATA
    randomize = np.arange(train_points.shape[0])
    np.random.shuffle(randomize)

    train_points = train_points[randomize]
    train_labels = train_labels[randomize]

    # TEST DATA
    randomize = np.arange(test_points.shape[0])
    np.random.shuffle(randomize)

    test_points = test_points[randomize]
    test_labels = test_labels[randomize]

    # Ingest data into TileDB

    # Define dimensions, Schema and write TileDB array for point cloud data
    train_point_cloud_id = tiledb.Dim(name="point_cloud_id", domain=(0, train_points.shape[0] - 1), tile=BATCH_SIZE, dtype=np.int32)
    validate_point_cloud_id = tiledb.Dim(name="point_cloud_id", domain=(0, test_points.shape[0] - 1), tile=BATCH_SIZE, dtype=np.int32)

    # The following dimensions are common
    samples = tiledb.Dim(name="mesh_samples", domain=(0, train_points.shape[1] - 1), tile=train_points.shape[1], dtype=np.int32)

    # Two different schemas for train and validate
    train_point_cloud_schema = tiledb.ArraySchema(domain=tiledb.Domain(train_point_cloud_id, samples),
                                                  sparse=False,
                                                  attrs=[tiledb.Attr(name="features", dtype=[("", np.float32),
                                                                                             ("", np.float32),
                                                                                             ("", np.float32)])])

    validate_point_cloud_schema = tiledb.ArraySchema(domain=tiledb.Domain(validate_point_cloud_id, samples),
                                                     sparse=False,
                                                     attrs=[tiledb.Attr(name="features", dtype=[("", np.float32),
                                                                                                ("", np.float32),
                                                                                                ("", np.float32)])])

    tiledb.Array.create(TILEDB_PATH + "/train_point_cloud_array", train_point_cloud_schema)
    tiledb.Array.create(TILEDB_PATH + "/validate_point_cloud_array", validate_point_cloud_schema)

    train_view = train_points.view([("", np.float32), ("", np.float32), ("", np.float32)])
    validate_view = test_points.view([("", np.float32), ("", np.float32), ("", np.float32)])

    with tiledb.open(TILEDB_PATH + "/train_point_cloud_array", 'w') as train_tiledb:
        train_tiledb[:] = train_view

    with tiledb.open(TILEDB_PATH + "/validate_point_cloud_array", 'w') as validate_tiledb:
        validate_tiledb[:] = validate_view

    print("[STATUS] point cloud TileDB arrays are ready.")

    ######################################################

    # Similarly for label arrays.
    train_label_id = tiledb.Dim(name="label_id", domain=(0, train_labels.shape[0] - 1), tile=BATCH_SIZE, dtype=np.int32)
    validate_label_id = tiledb.Dim(name="label_id", domain=(0, test_labels.shape[0] - 1), tile=BATCH_SIZE,
                                   dtype=np.int32)

    train_labels_schema = tiledb.ArraySchema(domain=tiledb.Domain(train_label_id),
                                             sparse=False,
                                             attrs=[tiledb.Attr(name="label",
                                                                dtype=[("", np.float32)])])

    validate_labels_schema = tiledb.ArraySchema(domain=tiledb.Domain(validate_label_id),
                                                sparse=False,
                                                attrs=[tiledb.Attr(name="label",
                                                                   dtype=[("", np.float32)])])

    tiledb.Array.create(TILEDB_PATH + "/train_label_array", train_labels_schema)
    tiledb.Array.create(TILEDB_PATH + "/validate_label_array", validate_labels_schema)

    train_labels_view = train_labels.view([("", np.float32)])
    validate_labels_view = test_labels.view([("", np.float32)])

    with tiledb.open(TILEDB_PATH + "/train_label_array", 'w') as train_labels_tiledb:
        train_labels_tiledb[:] = train_labels_view

    with tiledb.open(TILEDB_PATH + "/validate_label_array", 'w') as validate_labels_tiledb:
        validate_labels_tiledb[:] = validate_labels_view

    print("[STATUS] labels TileDB arrays are ready.")

    return class_map

Run ingestion.

In [None]:
CLASS_MAP = ingest_in_tiledb(NUM_POINTS)


We will need a data generator than will feed training and validation data into the model while training.

In [None]:
def generator(tiledb_images_obj, tiledb_labels_obj, shape, batch_size=BATCH_SIZE):
    """
    Yields the next training batch.
    """

    while True:  # Loop forever so the generator never terminates

        # Get index to start each batch
        for offset in range(0, shape, batch_size):

            # Get the samples you'll use in this batch. We have to convert structured numpy arrays to
            # numpy arrays.

            # Avoid reshaping error in last batch
            if offset + batch_size > shape:
                batch_size = shape - offset

            x_train = tiledb_images_obj[offset:offset + batch_size]['features'].view(np.float32).reshape(batch_size, NUM_POINTS, 3)

            y_train = tiledb_labels_obj[offset:offset + batch_size]['label'].view(np.float32).reshape(batch_size, 1)

            # Augment points...jitter
            augment = lambda x: x + tf.random.uniform(x.shape, -0.005, 0.005, dtype=tf.float64)
            x_train = augment(x_train)

            # The generator-y part: yield the next training batch
            yield x_train, y_train

We will create generators for train and validation data.

In [None]:
# Open TileDB image and label arrays.
train_point_clouds_tiledb = tiledb.open(TILEDB_PATH + "/train_point_cloud_array")
train_labels_tiledb = tiledb.open(TILEDB_PATH + "/train_label_array")

validate_point_clouds_tiledb = tiledb.open(TILEDB_PATH + "/validate_point_cloud_array")
validate_labels_tiledb = tiledb.open(TILEDB_PATH + "/validate_label_array")

# Create generators
train_generator = generator(tiledb_images_obj=train_point_clouds_tiledb,
                            tiledb_labels_obj=train_labels_tiledb,
                            shape=train_point_clouds_tiledb.domain.shape[0],
                            batch_size=BATCH_SIZE)


validate_generator = generator(tiledb_images_obj=validate_point_clouds_tiledb,
                               tiledb_labels_obj=validate_labels_tiledb,
                               shape=validate_point_clouds_tiledb.domain.shape[0],
                               batch_size=BATCH_SIZE)

Define model. Each convolution and fully-connected layer (with exception for end layers) consits of
Convolution / Dense -> Batch Normalization -> ReLU Activation.

In [None]:
def conv_bn(x, filters):
    x = layers.Conv1D(filters, kernel_size=1, padding="valid")(x)
    x = layers.BatchNormalization(momentum=0.0)(x)
    return layers.Activation("relu")(x)


def dense_bn(x, filters):
    x = layers.Dense(filters)(x)
    x = layers.BatchNormalization(momentum=0.0)(x)
    return layers.Activation("relu")(x)

PointNet consists of two core components. The primary MLP network, and the transformer
net (T-net). The T-net aims to learn an affine transformation matrix by its own mini
network. The T-net is used twice. The first time to transform the input features (n, 3)
into a canonical representation. The second is an affine transformation for alignment in
feature space (n, 3). As per the original paper we constrain the transformation to be
close to an orthogonal matrix (i.e. ||X*X^T - I|| = 0).

In [None]:
class OrthogonalRegularizer(keras.regularizers.Regularizer):
    def __init__(self, num_features, l2reg=0.001):
        self.num_features = num_features
        self.l2reg = l2reg
        self.eye = tf.eye(num_features)

    def __call__(self, x):
        x = tf.reshape(x, (-1, self.num_features, self.num_features))
        xxt = tf.tensordot(x, x, axes=(2, 2))
        xxt = tf.reshape(xxt, (-1, self.num_features, self.num_features))
        return tf.reduce_sum(self.l2reg * tf.square(xxt - self.eye))

We can then define a general function to build T-net layers.

In [None]:
def tnet(inputs, num_features):

    # Initalise bias as the indentity matrix
    bias = keras.initializers.Constant(np.eye(num_features).flatten())
    reg = OrthogonalRegularizer(num_features)

    x = conv_bn(inputs, 32)
    x = conv_bn(x, 64)
    x = conv_bn(x, 512)
    x = layers.GlobalMaxPooling1D()(x)
    x = dense_bn(x, 256)
    x = dense_bn(x, 128)
    x = layers.Dense(
        num_features * num_features,
        kernel_initializer="zeros",
        bias_initializer=bias,
        activity_regularizer=reg,
    )(x)
    feat_T = layers.Reshape((num_features, num_features))(x)
    # Apply affine transformation to input features
    return layers.Dot(axes=(2, 1))([inputs, feat_T])

The main network can be then implemented in the same manner where the t-net mini models
can be dropped in a layers in the graph. Here we replicate the network architecture
published in the original paper but with half the number of weights at each layer as we
are using the smaller 10 class ModelNet dataset.

In [None]:
inputs = keras.Input(shape=(NUM_POINTS, 3))

x = tnet(inputs, 3)
x = conv_bn(x, 32)
x = conv_bn(x, 32)
x = tnet(x, 32)
x = conv_bn(x, 32)
x = conv_bn(x, 64)
x = conv_bn(x, 512)
x = layers.GlobalMaxPooling1D()(x)
x = dense_bn(x, 256)
x = layers.Dropout(0.3)(x)
x = dense_bn(x, 128)
x = layers.Dropout(0.3)(x)

outputs = layers.Dense(NUM_CLASSES, activation="softmax")(x)

model = keras.Model(inputs=inputs, outputs=outputs, name="pointnet")
model.summary()

Train model
Once the model is defined it can be trained like any other standard classification model
using `.compile()` and `.fit()`.

In [None]:
model.compile(
    loss="sparse_categorical_crossentropy",
    optimizer=keras.optimizers.Adam(learning_rate=0.001),
    metrics=["sparse_categorical_accuracy"],
)

model.fit(train_generator,
          steps_per_epoch=train_point_clouds_tiledb.domain.shape[0] // BATCH_SIZE,
          epochs=1,
          validation_data=validate_generator,
          validation_steps=validate_point_clouds_tiledb.domain.shape[0] // BATCH_SIZE)



Visualize predictions some predictions


In [None]:
num_of_points = 8
points = validate_point_clouds_tiledb[:num_of_points]['features'].view(np.float32).reshape(num_of_points, NUM_POINTS, 3)
labels = validate_labels_tiledb[:num_of_points]['label'].view(np.float32).reshape(num_of_points,) 

# run test data through model
preds = model.predict(points)
preds = tf.math.argmax(preds, -1)

# plot points with predicted class and label
fig = plt.figure(figsize=(15, 10))
for i in range(num_of_points):
    ax = fig.add_subplot(2, 4, i + 1, projection="3d")
    ax.scatter(points[i, :, 0], points[i, :, 1], points[i, :, 2])
    ax.set_title(
        "pred: {:}, label: {:}".format(
            CLASS_MAP[preds.numpy()[i]], CLASS_MAP[labels[i]]
        )
    )
    ax.set_axis_off()
plt.show()
