# Bundesliga Emirhan CNN-LSTM Video Processing

**Author:** [Emirhan BULUT](https://www.linkedin.com/in/artificialintelligencebulut/)<br>
**Date created:** 2022/08/22<br>
**Last modified:** 2022/08/22<br>
**Description:** This project prepared competition of Kaggle DFL Bundesliga Data Shootout 

## Setup

Import libraries

In [1]:
from tensorflow import keras

import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import imageio
import cv2
import os
import numpy

## Define some hyperparameters

In [2]:
IMG_SIZE = 224
BATCH_SIZE = 32
EPOCHS = 5

MAX_SEQ_LENGTH = 10
NUM_FEATURES = 512

## Data preparation

In [3]:
train_df = pd.read_csv("../input/dfl-bundesliga-data-shootout/train.csv")
test_df = pd.read_csv("../input/dfl-bundesliga-data-shootout/sample_submission.csv")

print(f"Total videos for training: {len(train_df)}")
print(f"Total videos for testing: {len(test_df)}")

train_df.sample(10)

FileNotFoundError: [Errno 2] No such file or directory: '../input/dfl-bundesliga-data-shootout/train.csv'

In [None]:
def crop_center_square(frame):
    y, x = frame.shape[0:2]
    min_dim = min(y, x)
    start_x = (x // 2) - (min_dim // 2)
    start_y = (y // 2) - (min_dim // 2)
    return frame[start_y : start_y + min_dim, start_x : start_x + min_dim]


def load_video(path, max_frames=0, resize=(IMG_SIZE, IMG_SIZE)):
    cap = cv2.VideoCapture(path)
    frames = []
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = crop_center_square(frame)
            frame = cv2.resize(frame, resize)
            frame = frame[:, :, [2, 1, 0]]
            frames.append(frame)

            if len(frames) == max_frames:
                break
    finally:
        cap.release()
    return np.array(frames)


In [None]:
def build_feature_extractor():
    feature_extractor = keras.applications.ResNet50(
        weights="imagenet",
        include_top=False,
        pooling="max",
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
    )
    preprocess_input = keras.applications.resnet50.preprocess_input

    inputs = keras.Input((IMG_SIZE, IMG_SIZE, 3))
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return keras.Model(inputs, outputs, name="feature_extractor")


feature_extractor = build_feature_extractor()

In [None]:
label_processor = keras.layers.StringLookup(
    num_oov_indices=0, vocabulary=np.unique(train_df["event"])
)
print(label_processor.get_vocabulary())

Our data processing part.

In [None]:
def prepare_all_videos(df, root_dir):
    num_samples = len(df)
    video_paths = df["video_id"].values.tolist()
    labels = df["event"].values
    labels = label_processor(labels[..., None]).numpy()

    frame_masks = np.zeros(shape=(num_samples, MAX_SEQ_LENGTH), dtype="bool")
    frame_features = np.zeros(
        shape=(num_samples, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
    )

    for idx, path in enumerate(video_paths):

        frames = load_video(os.path.join(root_dir, path))
        frames = frames[None, ...]


        temp_frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
        temp_frame_features = np.zeros(
            shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32"
        )


        for i, batch in enumerate(frames):
            video_length = batch.shape[0]
            length = min(MAX_SEQ_LENGTH, video_length)
            for j in range(length):
                temp_frame_features[i, j, :] = feature_extractor.predict(
                    batch[None, j, :]
                )
            temp_frame_mask[i, :length] = 0  # 1 = not masked, 0 = masked

        frame_features[idx,] = temp_frame_features.squeeze()
        frame_masks[idx,] = temp_frame_mask.squeeze()

    return (frame_features, frame_masks), labels


train_data, train_labels = prepare_all_videos(train_df, "train")
test_data, test_labels = prepare_all_videos(test_df, "test")

print(f"Frame features in train set: {train_data[0].shape}")
print(f"Frame masks in train set: {train_data[1].shape}")

## The LSTM model

Now, we can feed this data to a sequence model consisting of recurrent layers like `LSTM`.

In [None]:
def get_sequence_model():
    class_vocab = label_processor.get_vocabulary()

    frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
    mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

  
    x = keras.layers.LSTM(4, return_sequences=True)(
        frame_features_input, mask=mask_input
    )
    x = keras.layers.LSTM(8)(x)
    x = keras.layers.Dropout(0.2)(x)
    output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)

    rnn_model = keras.Model([frame_features_input, mask_input], output)

    rnn_model.compile(
        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )
    return rnn_model


# Utility for running experiments.
def run_experiment():

    seq_model = get_sequence_model()
    history = seq_model.fit(
        [train_data[0], train_data[1]],
        train_labels,
        epochs=EPOCHS,
    )

    _, accuracy = seq_model.evaluate([test_data[0], test_data[1]], test_labels)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return history, seq_model


_, sequence_model = run_experiment()

## Prediction

In [None]:
def prepare_single_video(frames):
    frames = frames[None, ...]
    frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
    frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")

    for i, batch in enumerate(frames):
        video_length = batch.shape[0]
        length = min(MAX_SEQ_LENGTH, video_length)
        for j in range(length):
            frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
        frame_mask[i, :length] = 0  # 1 = not masked, 0 = masked

    return frame_features, frame_mask

test_frames= []
def sequence_prediction(path):
    global test_frames
    class_vocab = label_processor.get_vocabulary()
    frames = load_video(os.path.join("test", path))
    frame_features, frame_mask = prepare_single_video(frames)
    probabilities = sequence_model.predict([frame_features, frame_mask])[0]
    if probabilities[::].max() == probabilities[0]:
        return test_frames.append("challange")
    elif probabilities[::].max() == probabilities[1]:
        return test_frames.append("end")
    elif probabilities[::].max() == probabilities[2]:
        return test_frames.append("play")
    elif probabilities[::].max() == probabilities[3]:
        return test_frames.append("start")
    elif probabilities[::].max() == probabilities[4]:
        return test_frames.append("throwin")

test_video = test_df["video_id"].values.tolist()
print(f"Test video path: {test_video}")
for i in test_video:
  sequence_prediction(i)

## Write My Submission CSV File

In [None]:
import csv
listt = ["video_id"]
listt2 = ["event"]
with open('submission.csv', 'w') as f:
    writer = csv.writer(f)
    writer.writerows(zip(listt,listt2))
    writer.writerows(zip(test_video,test_frames))