In [361]:
import numpy as np
import cv2
import os
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, classification_report, confusion_matrix
from tensorflow.keras.applications import MobileNetV2, EfficientNetB0, ResNet50, DenseNet121, InceptionV3
from tensorflow.keras.layers import Dropout, Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers.legacy import Adam
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import matplotlib.pyplot as plt
from tensorflow.keras.regularizers import l2
from transformers import ViTForImageClassification, ViTFeatureExtractor, TFAutoModelForImageClassification
from transformers import AdamW


## Extracting frames from the video clips

In [249]:
def extract_frames(video_path, output_folder, player_name, num_frames=20):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Cannot open video file {video_path}")
        return

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    interval = total_frames // num_frames if total_frames > num_frames else 1

    video_basename = os.path.splitext(os.path.basename(video_path))[0]  # Get video file name without extension
    player_folder = os.path.join(output_folder, player_name)

    try:
        if not os.path.exists(player_folder):
            os.makedirs(player_folder)
    except OSError:
        print(f"Error: Creating directory {player_folder}")

    frame_ids = [int(interval * i) for i in range(num_frames)]
    frame_count = 0
    saved_frames = 0

    while saved_frames < num_frames:
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_ids[saved_frames])
        success, frame = cap.read()
        if not success:
            print(f"Failed to read frame at index {frame_ids[saved_frames]} from {video_path}")
            saved_frames += 1
            continue

        frame_path = os.path.join(player_folder, f"{video_basename}_frame_{frame_count}.jpg")
        cv2.imwrite(frame_path, frame)
        saved_frames += 1
        frame_count += 1

    cap.release()
    cv2.destroyAllWindows()


## Loading the extracted frames and preprocessing

In [356]:
def load_and_preprocess_data(root_folder):
    images = []
    labels = []
    valid_extensions = (".jpg", ".jpeg", ".png", ".bmp", ".tiff")

    # Iterate through each player's folder to load images
    for player_name in os.listdir(root_folder):
        player_folder = os.path.join(root_folder, player_name)
        if os.path.isdir(player_folder):
            for img_file in os.listdir(player_folder):
                if img_file.lower().endswith(valid_extensions):
                    img_path = os.path.join(player_folder, img_file)
                    img = cv2.imread(img_path)
                    if img is None:
                        print(f"Warning: Could not read image {img_path} - it may be corrupt or in an unsupported format.")
                        continue

                    # img = cv2.resize(img, (64, 64))
                    # img = cv2.resize(img, (128, 128))  # Resize for MobileNetV2
                    img = cv2.resize(img, (224, 224))  # Resize for ViT
                    images.append(img)
                    labels.append(player_name)  # Label is the player's name

    images = np.array(images)
    labels = np.array(labels)

    # Encode labels to integers
    encoder = LabelEncoder()
    encoded_labels = encoder.fit_transform(labels)
    categorical_labels = to_categorical(encoded_labels)  # One-hot encoding

    num_images = len(images)
    print(f"Number of loaded and preprocessed frames: {num_images}")
    return images, categorical_labels, encoder, num_images

## Data augmentation

In [342]:
def create_data_augmentation():
    return ImageDataGenerator(
        rotation_range=40,
        width_shift_range=0.2,
        height_shift_range=0.2,
        shear_range=0.2,
        zoom_range=0.2,
        horizontal_flip=True,
        fill_mode='nearest'
    )

## Model building and training

### CNN

In [None]:
# def build_model(num_classes):
#     model = Sequential([
#         Conv2D(32, (3, 3), activation='relu', input_shape=(64, 64, 3)),
#         MaxPooling2D(2, 2),
#         Conv2D(64, (3, 3), activation='relu'),
#         MaxPooling2D(2, 2),
#         Flatten(),
#         Dense(128, activation='relu'),
#         Dense(num_classes, activation='softmax')
#     ])
    
#     model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])
#     return model

### MobileNetV2

In [None]:
# def build_model(num_classes):
#     base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=(128, 128, 3))
#     x = base_model.output
#     x = GlobalAveragePooling2D()(x)
#     x = Dense(128, activation='relu')(x)
#     # x = Dropout(0.5)(x)  # Dropout layer added here
#     # x = Dense(128, activation='relu')(x)
#     # x = Dropout(0.5)(x)  # Another dropout layer for additional regularization
#     predictions = Dense(num_classes, activation='softmax')(x)
#     model = Model(inputs=base_model.input, outputs=predictions)
    
#     for layer in base_model.layers:
#         layer.trainable = False  # Freeze the layers of the base model
    
#     model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
#     return model

### EfficientNetB0

In [310]:
# def build_model(num_classes):
#     base_model = EfficientNetB0(include_top=False, weights='imagenet', input_shape=(128, 128, 3))
#     x = base_model.output
#     x = GlobalAveragePooling2D()(x)
#     x = Dense(128, activation='relu')(x)
#     x = Dropout(0.3)(x)  
#     predictions = Dense(num_classes, activation='softmax')(x)
#     model = Model(inputs=base_model.input, outputs=predictions)

#     # Optionally, fine-tune from this layer onwards
#     fine_tune_at = 100
#     for layer in base_model.layers[:fine_tune_at]:
#         layer.trainable = False
#     for layer in base_model.layers[fine_tune_at:]:
#         layer.trainable = True

#     model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
#     return model



### ResNet

In [320]:
# def build_model(num_classes):
#     base_model = ResNet50(include_top=False, weights='imagenet', input_shape=(128, 128, 3))
#     x = base_model.output
#     x = GlobalAveragePooling2D()(x)
#     x = Dense(256, activation='relu')(x)
#     x = Dropout(0.5)(x)
#     predictions = Dense(num_classes, activation='softmax')(x)
#     model = Model(inputs=base_model.input, outputs=predictions)

#     # Freezing the base layers
#     for layer in base_model.layers:
#         layer.trainable = False

#     model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
#     return model

### DenseNet

In [332]:
# def build_model(num_classes):
#     base_model = DenseNet121(include_top=False, weights='imagenet', input_shape=(128, 128, 3))
#     x = base_model.output
#     x = GlobalAveragePooling2D()(x)
#     x = Dense(256, activation='relu')(x)
#     x = Dropout(0.5)(x)
#     predictions = Dense(num_classes, activation='softmax')(x)
#     model = Model(inputs=base_model.input, outputs=predictions)

#     # Freezing the base layers
#     for layer in base_model.layers:
#         layer.trainable = False

#     model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
#     return model

### InceptionV3

In [348]:
# def build_model(num_classes):
#     base_model = InceptionV3(include_top=False, weights='imagenet', input_shape=(128, 128, 3))
#     x = base_model.output
#     x = GlobalAveragePooling2D()(x)
#     x = Dense(256, activation='relu')(x)
#     x = Dropout(0.5)(x)
#     predictions = Dense(num_classes, activation='softmax')(x)
#     model = Model(inputs=base_model.input, outputs=predictions)

#     # Freezing the base layers
#     for layer in base_model.layers:
#         layer.trainable = False

#     model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])
#     return model

### ViT


In [359]:
def build_model(num_classes):
    # Load feature extractor
    feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16-224-in21k')
    
    # Load the base ViT model
    model = ViTForImageClassification.from_pretrained('google/vit-base-patch16-224-in21k',
                                                      num_labels=num_classes,
                                                      id2label={str(i): f'label_{i}' for i in range(num_classes)},
                                                      label2id={f'label_{i}': i for i in range(num_classes)})
    
    # Set all layers to be trainable (or you can choose to freeze some layers)
    for layer in model.vit.parameters():
        layer.requires_grad = True
    
    # Compile the model with appropriate optimizer and loss function
    optimizer = AdamW(model.parameters(), lr=0.0001)  # AdamW optimizer is often used with Transformers
    model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
    
    return model, feature_extractor



## Model evaluation

In [333]:
def evaluate_model(X_test, y_test, model, encoder):
    # Predict the probabilities for the test data
    y_probs = model.predict(X_test)
    y_pred = np.argmax(y_probs, axis=1)
    y_true = np.argmax(y_test, axis=1)

    # Compute the average precision score
    precision = dict()
    recall = dict()
    average_precision = dict()
    for i in range(len(encoder.classes_)):
        precision[i], recall[i], _ = precision_recall_curve(y_test[:, i], y_probs[:, i])
        average_precision[i] = average_precision_score(y_test[:, i], y_probs[:, i])
    
    mAP = np.mean(list(average_precision.values()))
    
    # Generate classification report
    print("Classification Report:")
    print(classification_report(y_true, y_pred, target_names=encoder.classes_))
    
    f1 = f1_score(y_true, y_pred, average='macro')  # You can change 'macro' to 'micro' or 'weighted' depending on your needs
    print("F1 Score (macro-average):", f1)
    print("Accuracy:", accuracy_score(y_true, y_pred))
    print("Mean Average Precision (mAP):", mAP)

In [334]:
def plot_history(history):
    plt.figure(figsize=(12, 6))
    
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training vs. Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Training vs. Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.show()

In [360]:
def main():
    root_folder = '/Users/nadunsenarathne/Downloads/Documents/IIT/4th Year/FYP/CricXpert/Datasets/India'
    output_frame_folder = '/Users/nadunsenarathne/Downloads/Documents/IIT/4th Year/FYP/CricXpert/Hybrid_Spatio_Temporal_Model_For_Gait_Analysis/extracted_frames'
    video_extensions = ('.mp4', '.avi', '.mov', '.mpeg', '.mpg', '.mkv')

    # for player_name in os.listdir(root_folder):
    #     player_path = os.path.join(root_folder, player_name, 'gait_data')
    #     if os.path.isdir(player_path):
    #         print(f"Processing videos for player: {player_name}")
    #         for video_file in os.listdir(player_path):
    #             if video_file.endswith(video_extensions):
    #                 video_path = os.path.join(player_path, video_file)
    #                 print(f"Extracting frames from: {video_path}")
    #                 extract_frames(video_path, output_frame_folder, player_name, 20)
    #             else:
    #                 print(f"Skipped non-video file: {video_file}")


    # Load and preprocess data
    X, y, encoder, num_images = load_and_preprocess_data(output_frame_folder)

    # Split data into training and testing
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # # Data augmentation
    # datagen = create_data_augmentation()

    # # Build and train model
    # model = build_model(len(encoder.classes_))    
    # history = model.fit(datagen.flow(X_train, y_train, batch_size=32),
    #                     steps_per_epoch=len(X_train) // 32,
    #                     epochs=10,
    #                     validation_data=(X_test, y_test))
 
    # Build and train model
    model = build_model(len(encoder.classes_))  # Ensure the model is built for the correct number of classes
    history = model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_test, y_test))

    # Plot training history
    plot_history(history)

    # Evaluate the model
    evaluate_model(X_test, y_test, model, encoder)

    # Save the model and label encoder
    model.save('spatial_model.h5')
    np.save('label_encoder_classes.npy', encoder.classes_)


if __name__ == "__main__":
    main()

Number of loaded and preprocessed frames: 1749


Downloading (…)rocessor_config.json:   0%|          | 0.00/160 [00:00<?, ?B/s]



Downloading config.json:   0%|          | 0.00/502 [00:00<?, ?B/s]

Downloading model.safetensors:   0%|          | 0.00/346M [00:00<?, ?B/s]

Some weights of ViTForImageClassification were not initialized from the model checkpoint at google/vit-base-patch16-224-in21k and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


TypeError: compile() got an unexpected keyword argument 'optimizer'

## Inference

In [355]:
import cv2
import numpy as np
import os
from tensorflow.keras.models import load_model
from sklearn.preprocessing import LabelEncoder

# Function to extract frames
def extract_frames_for_prediction(video_path, output_folder, num_frames=20):
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    interval = total_frames // num_frames if total_frames > num_frames else 1
    frame_ids = [int(interval * i) for i in range(num_frames)]
    frames = []

    try:
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)
    except OSError:
        print(f"Error: Creating directory {output_folder}")

    frame_count = 0
    while frame_count < total_frames:
        ret, frame = cap.read()
        if not ret:
            break
        if frame_count in frame_ids:
            resized_frame = cv2.resize(frame, (128, 128))  # Resize frame as per training
            frames.append(resized_frame)
        frame_count += 1

    cap.release()
    return np.array(frames)


# Load the model
model = load_model('spatial_model.h5')

# Properly load the LabelEncoder
encoder = LabelEncoder()
encoder.classes_ = np.load('label_encoder_classes.npy', allow_pickle=True)

# Prediction function
def predict_player(video_path):
    frame_folder = 'temp_frames'
    frames = extract_frames_for_prediction(video_path, frame_folder)
    if len(frames) == 0:
        return "No frames to analyze."

    predictions = model.predict(frames)
    predicted_class = np.argmax(np.mean(predictions, axis=0))
    predicted_player = encoder.inverse_transform([predicted_class])[0]  # Translate label index back to player name

    return predicted_player

# Example usage
video_path = '/Users/nadunsenarathne/Downloads/Documents/IIT/4th Year/FYP/CricXpert/Datasets/untitled folder/3.mov'  # Path to your test video clip
result = predict_player(video_path)
print(f"The player in the video is: {result}")


The player in the video is: Hardik_Pandya
