In [None]:
import os
import glob
import random
import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf

In [None]:
!pip install scikit-video==1.1.11
import skvideo.io

In [None]:
!wget http://www.csc.kth.se/cvap/actions/walking.zip
!wget http://www.csc.kth.se/cvap/actions/jogging.zip
!wget http://www.csc.kth.se/cvap/actions/running.zip
!wget http://www.csc.kth.se/cvap/actions/boxing.zip
!wget http://www.csc.kth.se/cvap/actions/handwaving.zip
!wget http://www.csc.kth.se/cvap/actions/handclapping.zip

In [None]:
classes = [
    'walking',
    'jogging',
    'running',
    'boxing',
    'handwaving',
    'handclapping',
]

dataset_data = []
data_root = './'
for cls in classes:
    print('Processing class: {}'.format(cls))
    for fpath in glob.glob(os.path.join(data_root, cls, '*.avi')):
      dataset_data.append((fpath, cls))
print(f'Количество видео: {len(dataset_data)}')

In [None]:
fig = plt.figure(figsize=(16, 8))
ax_1 = fig.add_subplot(1, 2, 1)
videodata = skvideo.io.vread(dataset_data[0][0])
videodata = videodata.astype(np.float32) / 255.
plt.imshow(videodata[50, ...])
plt.title(f'videodata shape: {videodata.shape}')
  
ax_2 = fig.add_subplot(1, 2, 2)
motion = np.mean(videodata[1:, ...] - videodata[:-1, ...], axis=3, keepdims=True)
plt.imshow(motion[50, ..., 0])
plt.title(f'motion shape: {motion.shape}')
plt.show()

In [None]:
NUM_TRAIN_ELEM = 500
NUM_FRAMES = 200
NUM_EPOCHS = 5
BATCH_SIZE = 2

random.shuffle(dataset_data)

def path_to_motion(video_class, NUM_FRAMES=200):
    videodata = skvideo.io.vread(video_class.numpy()[0].decode('UTF-8'), num_frames=NUM_FRAMES)
    videodata = videodata.astype(np.float32) / 255.
    motion = np.mean(videodata[1:, ...] - videodata[:-1, ...], axis=3, keepdims=True)
    return motion, classes.index(video_class.numpy()[1].decode('UTF-8'))

def set_shapes(motion, cl_idx):

    motion.set_shape((199, 120, 160, 1))
    cl_idx.set_shape([])
    return motion, cl_idx

train_ds = tf.data.Dataset.from_tensor_slices(dataset_data[:NUM_TRAIN_ELEM])
train_ds = train_ds.shuffle(buffer_size=len(train_ds))
train_ds = train_ds.map(lambda video_class: tf.py_function(func=path_to_motion,
                                                            inp=[video_class], 
                                                           Tout=[tf.float32, tf.uint8]))
train_ds = train_ds.map(lambda x, y: set_shapes(x, y))
train_ds = train_ds.batch(BATCH_SIZE, drop_remainder=True)
print(f'Длина тренировочного датасета: {len(train_ds)}')

test_ds = tf.data.Dataset.from_tensor_slices(dataset_data[NUM_TRAIN_ELEM:])
test_ds = test_ds.map(lambda video_class: tf.py_function(func=path_to_motion,
                                                          inp=[video_class], 
                                                         Tout=[tf.float32, tf.uint8]))
test_ds = test_ds.map(lambda x, y: set_shapes(x, y))
test_ds = test_ds.batch(BATCH_SIZE, drop_remainder=True)
print(f'Длина тестового датасета: {len(test_ds)}')

In [None]:
import tensorflow_datasets as tfds
ds_train_np = tfds.as_numpy(train_ds.take(6)) #tf.data.Dataset -> Iterator[Tree[np.array]] (преобразуем датасет в итератор)
fig = plt.figure(figsize=(16, 8))
j = 0
for motion, cl_idx in ds_train_np:
  ax = fig.add_subplot(2, 3, j+1)
  plt.imshow(motion[0][0][...,0])
  plt.xticks([]), plt.yticks([])
  plt.title(classes[cl_idx[0]])
  j += 1
plt.show()

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Conv3D(32, (5, 5, 5), (1, 2, 2), padding='same', activation='relu'),
    tf.keras.layers.MaxPool3D((1, 2, 2), padding='same'),
    tf.keras.layers.Conv3D(64, (5, 5, 5), (1, 2, 2), padding='same', activation='relu'),
    tf.keras.layers.MaxPool3D((1, 2, 2), padding='same'),
    tf.keras.layers.Conv3D(64, (3, 3, 3), (1, 2, 2), padding='same', activation='relu'),
    tf.keras.layers.MaxPool3D((1, 2, 2), padding='same'),
    tf.keras.layers.Conv3D(64, (3, 3, 3), (1, 1, 1), padding='same', activation=None),
    tf.keras.layers.GlobalAveragePooling3D(),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(6, activation=None)
])

In [None]:
LEARNING_RATE = 0.001

model.compile(
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), 
         optimizer=tf.keras.optimizers.Adam(LEARNING_RATE),
          metrics = ['accuracy']
              )

In [None]:
%%time
NUM_EPOCHS = 10

history = model.fit(
                     train_ds,
                     epochs=NUM_EPOCHS,
                     validation_data=test_ds
                    )

In [None]:
model.evaluate(test_ds)

In [None]:
plt.plot(history.history['accuracy'], label='accuracy')
plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0.5, 1])
plt.legend(loc='lower right')
plt.show()

In [None]:
fpath_cls_true_list = random.sample(dataset_data[NUM_TRAIN_ELEM:], 6) 
fig = plt.figure(figsize=(16, 8))
for i in range(len(fpath_cls_true_list)):
  ax = fig.add_subplot(2, 3, i+1)
  videodata = skvideo.io.vread(fpath_cls_true_list[i][0])
  videodata = videodata.astype(np.float32) / 255.
  plt.imshow(videodata[30, ...])
  
  motion = np.mean(videodata[1:, ...] - videodata[:-1, ...], axis=3, keepdims=True)
  out = model(motion[None, ...])[0]
  cls_pred = np.argmax(out.numpy())
  plt.title(f'True class: {fpath_cls_true_list[i][1]} \n Predicted class: {classes[cls_pred]}')
  plt.xticks([]), plt.yticks([])
plt.show()