# Primitive Segmentation using ConvLSTM

**Author:** Aditya Jain <br>
**Date started:** 27th July, 2020<br>
**Description:** Predict primitves actions in a human demonstration using ConvLSTM model.

### Setup


In [4]:
from google.colab import drive
drive.mount('/content/drive', force_remount=False)

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [5]:
from tensorflow import keras
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
import pylab as plt
import os
from glob import glob
import cv2
from google.colab.patches import cv2_imshow
from sklearn.model_selection import train_test_split


In [11]:
N_CLASSES  = 2          # no. of primitves in the TADL
PX         = 128        # no. of rows in training/test images
PY         = 128        # no. of columns in training/test images
CHANNELS   = 3          # no. of channels in the image
N_FRAMES   = 15         # no. of frames in each training/test video
BATCH_SIZE = 32         # size of the batches
DATA_DIR   = "/content/drive/My Drive/TCS FullTime Work/LfD/Liquid_Pouring/TADL/"
TEST_SPLIT = 0.2        # no. of test samples to draw from data

## Building the Dataset

Reads the datafiles and builds the dataset

In [12]:
def video_path(dataset_dir):
  '''
  returns the paths of all video files in the dataset; takes input the parent directory
  '''
  # no. of primitives in the library
  prim_actions    = [dI for dI in os.listdir(dataset_dir) if os.path.isdir(os.path.join(dataset_dir,dI))]
  video_path_list = []
  
  # building a dictionary of primtive actions for label generation
  prim_dict       = {}
  i               = 0

  for action in prim_actions:
    prim_dict[action] = i
    prim_path         = os.path.join(DATA_DIR, action)   # gives path for each primitive  
    i                 += 1
  
    for video in os.listdir(prim_path):
      video_path  = os.path.join(prim_path, video)        # path to all videos in a prim
      video_path_list.append(video_path)
      
  return video_path_list, prim_dict


video_list, primitives_dict = video_path(DATA_DIR)
N_CLASSES                   = len(primitives_dict)


def build_dataset(vid_list, primit_dict):
    '''
    This function builds the dataset given the video frames and no of primitives
    '''

    image_data  = []
    label_data  = []

    for video_path in vid_list:

      label        = video_path.split(os.sep)[-2]
      label        = primit_dict[label]
      temp_image   = []
      temp_label   = []
    
      for image in os.listdir(video_path):
        image_path    = video_path + "/" + image 
      
        # taking care of labels
        # temp_label.append(label)

        # Image read and processing
        img = cv2.imread(image_path, 1)       # image read
        img = cv2.resize(img, (PX, PY))       # image resize
        # cv2_imshow(img)                     # optional command to visualize the read image
        img = img.astype("float32") / 255     # rescale the image from 0-1
        temp_image.append(img)

      image_data.append(temp_image)
      temp_label = label       # temporary
      label_data.append(temp_label)

    # return image_data, label_data
    return  np.asarray(image_data),  np.asarray(tf.one_hot(label_data, len(primit_dict)))

data, label = build_dataset(video_list, primitives_dict)
train_data, test_data, train_label, test_label = train_test_split(
    data, label, test_size=0.20, shuffle=True, random_state=0)
# dataset = tf.data.Dataset.from_tensor_slices((train_data, train_label))


## Build a model

We create a model which take as input movies of shape
`(n_frames, width, height, channels)` and returns a movie
of identical shape.


In [13]:
model = keras.Sequential(
    [
        keras.Input(shape = (N_FRAMES, PX, PY, CHANNELS)),  
        layers.ConvLSTM2D(
            filters=64, kernel_size=(3, 3), padding="same", return_sequences=False
        ),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Flatten(),
        layers.Dropout(0.5),
        # layers.Dense(100, activation='relu'),
        layers.Dense(N_CLASSES, activation='softmax'),
        
    ]
)

# print model summary
model.summary()

# model compilation
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=["accuracy"])


Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv_lst_m2d (ConvLSTM2D)    (None, 128, 128, 64)      154624    
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 64, 64, 64)        0         
_________________________________________________________________
flatten (Flatten)            (None, 262144)            0         
_________________________________________________________________
dropout (Dropout)            (None, 262144)            0         
_________________________________________________________________
dense (Dense)                (None, 2)                 524290    
Total params: 678,914
Trainable params: 678,914
Non-trainable params: 0
_________________________________________________________________


## Train the model


In [None]:
epochs = 20

model.fit(
    train_data,
    train_label,
    batch_size=2,
    epochs=epochs,
    verbose=2,
)


## Model Evaluation on Test Set

In [17]:
score = model.evaluate(test_data, test_label, verbose=0)
print("Test loss:", score[0])
print("Test accuracy:", score[1])


Test loss: 99.90675354003906
Test accuracy: 0.0


## Miscellaneous - Not to be run

In [None]:
model = keras.Sequential()
model.add(layers.ConvLSTM2D(filters = 64, kernel_size = (3, 3), return_sequences = False, data_format = "channels_last", input_shape = (N_FRAMES, PX, PY, 3)))
model.add(layers.Dropout(0.2))
model.add(layers.Flatten())
model.add(layers.Dense(256, activation="relu"))
model.add(layers.Dropout(0.3))
model.add(layers.Dense(2, activation = "softmax"))

model.summary()
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=["accuracy"])

In [None]:
model = keras.Sequential(
    [
        keras.Input(shape = (None, PX, PY, CHANNELS)),  
        layers.ConvLSTM2D(
            filters=64, kernel_size=(3, 3), padding="same", return_sequences=False
        ),
        layers.BatchNormalization(),
        layers.ConvLSTM2D(
            filters=64, kernel_size=(3, 3), padding="same", return_sequences=True
        ),
        layers.BatchNormalization(),
        layers.ConvLSTM2D(
            filters=64, kernel_size=(3, 3), padding="same", return_sequences=True
        ),
        layers.BatchNormalization(),
        layers.Flatten(),
        layers.Dropout(0.5),
        
        # layers.Dense(100, activation='relu'),
        layers.Dense(N_CLASSES, activation='softmax'),
        
    ]
)

model.summary()

# model compilation
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=["accuracy"])

In [76]:
test_data = np.asarray(train_data)
test_label = np.asarray(train_label)

print(train_data.shape)
print(train_label.shape)
print(train_label)

(2, 15, 128, 128, 3)
(2, 2)
[[1. 0.]
 [0. 1.]]
