In [None]:
import torch.nn.functional as F
import pandas as pd
import numpy as np
import json
from collections import Counter
import itertools
import matplotlib.pyplot as plt
import seaborn as sns
import keras
import tensorflow as tf
from keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers
from sklearn.preprocessing import LabelEncoder
from keras.callbacks import EarlyStopping
from keras.models import load_model
from keras.callbacks import ModelCheckpoint

In [None]:
#set figure size for notebook
sns.set(rc={'figure.figsize':(11.7,8.27) ,'savefig.dpi':400})

In [None]:
#import google drive 
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#open file
with open('/content/drive/MyDrive/data/pc_data.txt') as f:
    pointcloud_data = json.load(f)

In [None]:
#check dictionary items
dict_items = pointcloud_data.items()

first_two = list(dict_items)[:2]


In [None]:
#create X and y from json 
X = []
y = []
for x in pointcloud_data:
  X.append(pointcloud_data[x]['pointcloud'])
  y.append(pointcloud_data[x]['category'])


In [None]:
#check the length of X and y to ensure proper unpacking of json file 
print(len(X), len(y))

In [None]:
#unnest pointcloud arrays

X = [item for sublist in X for item in sublist]

In [None]:

#find max length of point cloud vectors 
def FindMinMaxLength(x,min_size,max_size):
  max_length = min_size
  min_length = max_size
  for a in range(len(x)):
    if len(x[a]) > max_length:
      max_length = len(x[a])
      max_a = a
    elif len(x[a]) < min_length:
      min_length = len(x[a])
      min_a = a
  print ("Maximum Point Cloud Vector is " + str(max_length) +" and is labelled: "+
         str(y[max_a])+"\nMinimum Point Cloud Vector is " + str(min_length) + 
         " and is labelled :" + str(y[min_a]))

In [None]:
FindMinMaxLength(X,0,1000)

In [None]:
#Function to save point cloud size as list 

def CalculateSize(x):
  Sizes = []
  for a in range(len(x)):
    Sizes.append(len(x[a]))
  return Sizes
    


In [None]:
#create list of vector sizes
VectorSizes = CalculateSize(X)

In [None]:
#plot histogram of vector sizes 
sns.histplot(VectorSizes).set(title="Distribution of Point Cloud Vector Sizes")
plt.xlabel("Number of Dots in Point Cloud")
plt.savefig("Vector_Size.png")

In [None]:
#visualize class distribution


merged_labels = list(itertools.chain(*y))
sns.histplot(merged_labels).set(title="Number of Objects per Class in the Nuscenes Mini Dataset")
plt.xticks(rotation=90)
plt.xlabel("Category Label")
plt.tight_layout()
plt.savefig("Category_Count.png")

In [None]:
#create df of class with vector size

ClassSize = pd.DataFrame(list(zip(merged_labels,VectorSizes)), columns =['Category', 'Size'])


In [None]:
#plot histogram of cloud sizes by category label
sns.histplot(data=ClassSize, x="Size", hue="Category").set(title="Histogram of Point Cloud Vector Size per Category")
plt.savefig("Category_Size.png")

In [None]:
#function reshape vectors to x,y,z arrays 

def ReshapeVectors(x):
  for a in range(len(x)):
    dots = len(X[a])
    x[a] = np.asarray(x[a])
    x[a]  = np.transpose((x[a]))


In [None]:
    #reshape vectors 
    ReshapeVectors(X)

In [None]:
#function to pad vectors to equal length 

def PadClouds(x, length):
  for a in range(len(x)):
    x[a] = pad_sequences(
    x[a], maxlen=length, dtype='float64', padding='pre',
    truncating='pre', value=0.0) 



In [None]:
#pad vectors to 1024 as the average length that will preserve most vectors 
#and is the PointNet standard

PadClouds(X,1024)

In [None]:
#function to check length of padded vectors and ensure same length 

def FindMinMaxLengthDos(x):
  max_length = 0
  min_length = 1025
  for a in range(len(x)):
      if len(x[a][0]) > max_length:
        max_length = len(x[a][0])
        max_a = a
      elif len(x[a][0]) < min_length:
        min_length = len(x[a][0])
        min_a = a
  print ("Maximum Cloud Point Vector is " + str(max_length) +" and is labelled: "+
         str(y[max_a])+"\nMinimum Cloud Point Vector is " + str(min_length) + 
         " and is labelled :" + str(y[min_a]))

In [None]:
#check for equal length in all vectors

FindMinMaxLengthDos(X)

In [None]:
#Function Normalize point cloud values

def NormalizePointClouds(x):
  for a in range(len(x)):
    for b in range(len(x[a])):
      norm = np.linalg.norm(x[a][b])
      x[a][b] = x[a][b]/norm

In [None]:
#Normalize point cloud values for modeling
NormalizePointClouds(X)

In [None]:
#reshape vectors back to [x,y,z] values for each point 

ReshapeVectors(X)

In [None]:
#unnest list of category labels 
y_list = [item for sublist in y for item in sublist]

In [None]:

#encode all y labels to int for modelling
LE = LabelEncoder()
y_encoded = LE.fit_transform(y_list)

#create dictionary of encoded labels
le_name_mapping = dict(zip(LE.classes_, LE.transform(LE.classes_)))

#reverse key and value
le_map = {v: k for k, v in le_name_mapping.items()}

#print encode keys 
print(le_map)


In [None]:
#keep only "vehicle_car" class for binary classification

y_encoded[y_encoded != 10] = 0
y_encoded[y_encoded == 10] = 1

In [None]:
#visualize class distribution


sns.histplot(y_encoded, binwidth=0.5).set(title="Binary Class Category for Modeling")
plt.savefig("Binary_Hist.png")


In [None]:
#split data into train and test 

X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.33, random_state=33)


In [None]:
#augment cloud point vectors by jittering and randomizing 

BATCH_SIZE = 32
NUM_CLASSES = 1
NUM_POINTS = 1024

def augment(points, label):
    # jitter points
    points += tf.random.uniform(points.shape, -0.005, 0.005, dtype=tf.float64)
    # shuffle points
    points = tf.random.shuffle(points)
    return points, label


train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))

train_dataset = train_dataset.shuffle(len(X_train)).map(augment).batch(BATCH_SIZE)
test_dataset = test_dataset.shuffle(len(X_test)).batch(BATCH_SIZE)

In [None]:
#set random seed
tf.random.set_seed(33)

In [None]:
#create convolutional layers
def conv_bn(x, filters):
    x = layers.Conv1D(filters, kernel_size=1, padding="valid")(x)
    x = layers.BatchNormalization(momentum=0.0)(x)
    return layers.Activation("relu")(x)

#create dense layers
def dense_bn(x, filters):
    x = layers.Dense(filters)(x)
    x = layers.BatchNormalization(momentum=0.0)(x)
    return layers.Activation("relu")(x)

In [None]:
#create orthogonal class
class OrthogonalRegularizer(keras.regularizers.Regularizer):
    def __init__(self, num_features, l2reg=0.001):
        self.num_features = num_features
        self.l2reg = l2reg
        self.eye = tf.eye(num_features)

    def __call__(self, x):
        x = tf.reshape(x, (-1, self.num_features, self.num_features))
        xxt = tf.tensordot(x, x, axes=(2, 2))
        xxt = tf.reshape(xxt, (-1, self.num_features, self.num_features))
        return tf.reduce_sum(self.l2reg * tf.square(xxt - self.eye))

In [None]:
#create function to initialize tnet
def tnet(inputs, num_features):

    # Initalise bias as the indentity matrix
    bias = keras.initializers.Constant(np.eye(num_features).flatten())
    reg = OrthogonalRegularizer(num_features)

    x = conv_bn(inputs, 32)
    x = conv_bn(x, 64)
    x = conv_bn(x, 512)
    x = layers.GlobalMaxPooling1D()(x)
    x = dense_bn(x, 256)
    x = dense_bn(x, 128)
    x = layers.Dense(
        num_features * num_features,
        kernel_initializer="zeros",
        bias_initializer=bias,
        activity_regularizer=reg,
    )(x)
    feat_T = layers.Reshape((num_features, num_features))(x)
    # Apply affine transformation to input features
    return layers.Dot(axes=(2, 1))([inputs, feat_T])

In [None]:
#initiate model
inputs = keras.Input(shape=(NUM_POINTS,3))

x = tnet(inputs, 3)
x = conv_bn(x, 32)
x = conv_bn(x, 32)
x = tnet(x, 32)
x = conv_bn(x, 32)
x = conv_bn(x, 64)
x = conv_bn(x, 512)
x = layers.GlobalMaxPooling1D()(x)
x = dense_bn(x, 256)
x = layers.Dropout(0.3)(x)
x = dense_bn(x, 128)
x = layers.Dropout(0.3)(x)

outputs = layers.Dense(NUM_CLASSES, activation="sigmoid")(x)

model = keras.Model(inputs=inputs, outputs=outputs, name="pointnet")

#get model summary
model.summary()

In [None]:
#compile model
model.compile(
    loss="binary_crossentropy",
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    metrics=["accuracy", tf.keras.metrics.AUC()]
)

# create patient early stopping
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=15)

#train and fit model on data
PointNet = model.fit(train_dataset, epochs=200, validation_data=test_dataset, callbacks=[es])

In [None]:
#create map of category labels 
bin_map = {0: 'Noise', 1:'Car'}

In [None]:
#visualize prediction and actual label for point cloud vectors 
data = test_dataset.take(1)

points, labels = list(data)[0]
points = points[:8, ...]
labels = labels[:8, ...]

# run test data through model
preds = model.predict(points)
preds = tf.math.argmax(preds, -1)

points = points.numpy()

# plot points with predicted class and label
fig = plt.figure(figsize=(15, 10))
for i in range(8):
    ax = fig.add_subplot(2, 4, i + 1, projection="3d")
    ax.scatter(points[i, :, 0], points[i, :, 1], points[i, :, 2])
    ax.set_title(
        "pred: {:} \n label: {:}".format(
            bin_map[preds[i].numpy()], bin_map[labels.numpy()[i]]
        )
    )
    ax.set_axis_off()
plt.show()

In [None]:
# plot loss during training
plt.subplot(211)
plt.title('Loss')
plt.plot(PointNet.history['loss'], label='train')
plt.plot(PointNet.history['val_loss'], label='test')

# plot accuracy during training
plt.subplot(212)
plt.title('Accuracy')
plt.plot(PointNet.history['accuracy'], label='train')
plt.plot(PointNet.history['val_accuracy'], label='test')
plt.legend()
plt.savefig("loss_accuracy.png")
plt.show()


In [None]:
PointNet adapted from Keras:

https://keras.io/examples/vision/pointnet/