# import

```python
!pip install remotezip tqdm opencv-python==4.5.2.52 opencv-python-headless==4.5.2.52 tf-models-official
!pip install remotezip
!pip install tf-models-official
```

In [10]:
import tqdm
import random
import pathlib
import itertools
import collections

import cv2
import numpy as np
import remotezip as rz
import seaborn as sns
import matplotlib.pyplot as plt

import keras
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from pathlib import PosixPath

# Import the MoViNet model from TensorFlow Models (tf-models-official) for the MoViNet model
from official.projects.movinet.modeling import movinet
from official.projects.movinet.modeling import movinet_model

In [11]:
%cd /content/drive/MyDrive/Colab Notebooks/big project

/content/drive/MyDrive/Colab Notebooks/big project


# 필요한 함수 정의

In [14]:
def format_frames(frame, output_size):
  """
    Pad and resize an image from a video.

    Args:
      frame: Image that needs to resized and padded.
      output_size: Pixel size of the output frame image.

    Return:
      Formatted frame with padding of specified output size.
  """
  frame = tf.image.convert_image_dtype(frame, tf.float32)
  frame = tf.image.resize_with_pad(frame, *output_size)
  return frame

In [15]:
def frames_from_video_file(video_path, n_frames, output_size = (224,224), frame_step = 15):
  """
    Creates frames from each video file present for each category.

    Args:
      video_path: File path to the video.
      n_frames: Number of frames to be created per video file.
      output_size: Pixel size of the output frame image.

    Return:
      An NumPy array of frames in the shape of (n_frames, height, width, channels).
  """
  # Read each video frame by frame
  result = []
  src = cv2.VideoCapture(str(video_path))

  video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)

  need_length = 1 + (n_frames - 1) * frame_step

  if need_length > video_length:
    start = 0
  else:
    max_start = video_length - need_length
    start = random.randint(0, max_start + 1)

  src.set(cv2.CAP_PROP_POS_FRAMES, start)
  # ret is a boolean indicating whether read was successful, frame is the image itself
  ret, frame = src.read()
  result.append(format_frames(frame, output_size))

  for _ in range(n_frames - 1):
    for _ in range(frame_step):
      ret, frame = src.read()
    if ret:
      frame = format_frames(frame, output_size)
      result.append(frame)
    else:
      result.append(np.zeros_like(result[0]))
  src.release()
  result = np.array(result)[..., [2, 1, 0]]

  return result

In [16]:
class FrameGenerator:
  def __init__(self, path, n_frames, training = False):
    """ Returns a set of frames with their associated label.

      Args:
        path: Video file paths.
        n_frames: Number of frames.
        training: Boolean to determine if training dataset is being created.
    """
    self.path = path
    self.n_frames = n_frames
    self.training = training
    self.class_names = sorted(set(p.name for p in self.path.iterdir() if p.is_dir()))
    self.class_ids_for_name = dict((name, idx) for idx, name in enumerate(self.class_names))

  def get_files_and_class_names(self):
    video_paths = list(self.path.glob('*/*.mp4'))
    classes = [p.parent.name for p in video_paths]
    return video_paths, classes

  def __call__(self):
    video_paths, classes = self.get_files_and_class_names()

    pairs = list(zip(video_paths, classes))

    if self.training:
      random.shuffle(pairs)

    for path, name in pairs:
      video_frames = frames_from_video_file(path, self.n_frames)
      label = self.class_ids_for_name[name] # Encode labels
      yield video_frames, label

In [17]:
subset_paths  = {'train': PosixPath('DATA(20231222)/train'),
                              'test': PosixPath('DATA(20231222)/test')}

# 잡다

In [18]:
batch_size = 8
num_frames = 8

output_signature = (tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32),
                    tf.TensorSpec(shape = (), dtype = tf.int16))

train_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['train'], num_frames, training = True),
                                          output_signature = output_signature)
train_ds = train_ds.batch(batch_size)

test_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['test'], num_frames),
                                         output_signature = output_signature)
test_ds = test_ds.batch(batch_size)

In [19]:
for frames, labels in train_ds.take(10):
  print(labels)

tf.Tensor([1 0 3 2 3 2 0 0], shape=(8,), dtype=int16)
tf.Tensor([3 0 2 4 0 0 1 1], shape=(8,), dtype=int16)
tf.Tensor([2 2 2 0 3 1 4 3], shape=(8,), dtype=int16)
tf.Tensor([1 3 4 0 0 3 4 2], shape=(8,), dtype=int16)
tf.Tensor([2 1 1 0 0 1 1 3], shape=(8,), dtype=int16)
tf.Tensor([2 1 2 3 2 4 0 1], shape=(8,), dtype=int16)
tf.Tensor([3 4 3 2 1 2 2 1], shape=(8,), dtype=int16)
tf.Tensor([2 3 4 3 4 4 3 3], shape=(8,), dtype=int16)
tf.Tensor([1 4 3 0 0 2 0 3], shape=(8,), dtype=int16)
tf.Tensor([1 3 3 0 1 2 4 3], shape=(8,), dtype=int16)


In [20]:
print(f"Shape: {frames.shape}")
print(f"Label: {labels.shape}")

Shape: (8, 8, 224, 224, 3)
Label: (8,)


In [21]:
gru = layers.GRU(units=4, return_sequences=True, return_state=True)

inputs = tf.random.normal(shape=[1, 10, 8]) # (batch, sequence, channels)

result, state = gru(inputs) # Run it all at once

In [22]:
result

<tf.Tensor: shape=(1, 10, 4), dtype=float32, numpy=
array([[[ 0.78977203, -0.7666972 , -0.18226078, -0.11051563],
        [ 0.7447158 , -0.5760373 , -0.3625499 , -0.32132182],
        [ 0.70870763, -0.8776552 , -0.40342405, -0.20605183],
        [ 0.8300309 , -0.7762664 , -0.66221005, -0.15138052],
        [ 0.7058173 , -0.601603  , -0.697772  ,  0.21956235],
        [ 0.2517114 , -0.8170609 , -0.6245837 ,  0.2573869 ],
        [-0.09677514, -0.753946  , -0.44089788, -0.1442756 ],
        [-0.13653909, -0.5563181 ,  0.04367654,  0.1682852 ],
        [ 0.16750489, -0.43640408, -0.30911005,  0.16984233],
        [ 0.50685465, -0.84352696, -0.295877  ,  0.06026528]]],
      dtype=float32)>

In [23]:
first_half, state = gru(inputs[:, :5, :])   # run the first half, and capture the state
second_half, _ = gru(inputs[:,5:, :], initial_state=state)  # Use the state to continue where you left off.

print(np.allclose(result[:, :5,:], first_half))
print(np.allclose(result[:, 5:,:], second_half))

True
True


# 사전학습된 모델 로드

In [24]:
model_id = 'a0'
resolution = 224

tf.keras.backend.clear_session()

backbone = movinet.Movinet(model_id=model_id)
backbone.trainable = False

# Set num_classes=600 to load the pre-trained weights from the original model
model = movinet_model.MovinetClassifier(backbone=backbone, num_classes=600)
model.build([None, None, None, None, 3])

# Load pre-trained weights
!wget https://storage.googleapis.com/tf_model_garden/vision/movinet/movinet_a0_base.tar.gz -O movinet_a0_base.tar.gz -q
!tar -xvf movinet_a0_base.tar.gz

checkpoint_dir = f'movinet_{model_id}_base'
checkpoint_path = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint = tf.train.Checkpoint(model=model)
status = checkpoint.restore(checkpoint_path)
status.assert_existing_objects_matched()

movinet_a0_base/
movinet_a0_base/checkpoint
movinet_a0_base/ckpt-1.data-00000-of-00001
movinet_a0_base/ckpt-1.index


<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x78f47a527970>

In [25]:
def build_classifier(batch_size, num_frames, resolution, backbone, num_classes):
  """Builds a classifier on top of a backbone model."""
  model = movinet_model.MovinetClassifier(
      backbone=backbone,
      num_classes=num_classes)
  model.build([batch_size, num_frames, resolution, resolution, 3])

  return model

In [26]:
model = build_classifier(batch_size, num_frames, resolution, backbone, 5)

## 모델 설계

In [27]:
num_epochs = 3

loss_obj = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

optimizer = tf.keras.optimizers.Adam(learning_rate = 0.001)

model.compile(loss=loss_obj, optimizer=optimizer, metrics=['accuracy'])

## 모델 학습

In [35]:
history = model.fit(train_ds,
                    validation_data=test_ds,
                    epochs=num_epochs,
                    validation_freq=1,
                    verbose=1).history

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [37]:
import matplotlib.pyplot as plt

In [44]:
history["loss"]

[0.21817944943904877,
 0.10981059074401855,
 0.08505159616470337,
 0.08084923028945923,
 0.05538821220397949,
 0.06905801594257355,
 0.05758459120988846,
 0.027318188920617104,
 0.022160962224006653,
 0.023646153509616852]

In [46]:
plt.figure(figsize = (12, 4))
plt.plot(history["loss"], "--.", label = "train_loss", alpha = 0.3)
plt.plot(history["val_loss"], "--.", label = "val_loss",alpha = 0.5)
plt.legend()

<matplotlib.legend.Legend at 0x78f3dc26e530>

## 모델 예측

In [36]:
model.evaluate(test_ds, return_dict=True)



{'loss': 2.515213966369629, 'accuracy': 0.4399999976158142}

`-` 모델 성능이 너무 안나온다...