<a href="https://colab.research.google.com/github/H605-oss/Basic_Calculator/blob/main/Notebooks/Cricket_Shot_Classification_EfNetB0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install deap



In [2]:
import matplotlib.pyplot as plt
import numpy as np
import os
import PIL
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import models
from tensorflow.keras.layers import Conv2D,MaxPooling2D,Dropout,Dense,Flatten
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import regularizers
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Model
from sklearn.metrics import confusion_matrix
import random
from deap import base, creator, tools, algorithms
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau
import random
import pathlib
import cv2
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from google.colab import drive


In [3]:
def format_frames(frame, output_size):
  """
    Pad and resize an image from a video.

    Args:
      frame: Image that needs to resized and padded.
      output_size: Pixel size of the output frame image.

    Return:
      Formatted frame with padding of specified output size.
  """
  frame = tf.image.convert_image_dtype(frame, tf.uint8)
  frame = tf.image.resize_with_pad(frame, *output_size)
  return frame.numpy()


def frames_from_video_file(video_path, n_frames, output_size=(224, 224), frame_step=1):
    """
    Extracts frames sequentially from the start of the video file, with a specified step between frames.

    Args:
      video_path: File path to the video.
      n_frames: Number of frames to be created per video file.
      output_size: Pixel size of the output frame image (height, width).
      frame_step: Number of frames to skip between extracted frames.

    Returns:
      A NumPy array of frames in the shape of (n_frames, height, width, channels).
    """
    result = []
    src = cv2.VideoCapture(str(video_path))

    src.set(cv2.CAP_PROP_POS_FRAMES, 0)  # Start from the first frame

    # Attempt to read the first frame
    ret, frame = src.read()
    if ret:
        frame = format_frames(frame, output_size)
        result.append(frame)
    else:
        # If the first frame can't be read, append a zero frame and exit
        result.append(np.zeros((output_size[0], output_size[1], 3), dtype=np.uint8))

    # Read subsequent frames with the specified frame_step
    for _ in range(n_frames - 1):
        for _ in range(frame_step):
            ret, frame = src.read()
        if ret:
            frame = format_frames(frame, output_size)
            result.append(frame)
        else:
            # Append a zero-like frame if no more frames can be read
            result.append(np.zeros_like(result[0]))

    src.release()

    # Convert the list of frames to a NumPy array and adjust color channels from BGR to RGB
    result = np.array(result)[..., [2, 1, 0]]

    return result

class FrameGenerator:
  def __init__(self, path, n_frames, training = False):
    """ Returns a set of frames with their associated label.

      Args:
        path: Video file paths.
        n_frames: Number of frames.
        training: Boolean to determine if training dataset is being created.
    """
    self.path = path
    self.n_frames = n_frames
    self.training = training
    self.class_names = sorted(set(p.name for p in self.path.iterdir() if p.is_dir()))
    self.class_ids_for_name = dict((name, idx) for idx, name in enumerate(self.class_names))

  def get_files_and_class_names(self):
    video_paths = list(self.path.glob('*/*.mp4'))
    classes = [p.parent.name for p in video_paths]
    return video_paths, classes

  def __call__(self):
    video_paths, classes = self.get_files_and_class_names()

    pairs = list(zip(video_paths, classes))

    if self.training:
      random.shuffle(pairs)

    for path, name in pairs:
      video_frames = frames_from_video_file(path, self.n_frames)
      label = self.class_ids_for_name[name] # Encode labels
      yield video_frames, label

In [4]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
batch_size = 16
num_frames = 30

# Define paths using pathlib
train_path = pathlib.Path('/content/drive/MyDrive/dataset/train/')
test_path = pathlib.Path('/content/drive/MyDrive/dataset/test/')
val_path = pathlib.Path('/content/drive/MyDrive/dataset/val/')

# Update output_signature to match the expected shapes:
# - The first element of the tuple describes the input tensor to the model:
#   This should be (num_frames, 224, 224, 3) for each video in the batch.
# - The second element describes the labels, which should be a scalar value per video (batch).
output_signature = (
    tf.TensorSpec(shape=(num_frames, 224, 224, 3), dtype=tf.uint8),  # Updated input shape per video
    tf.TensorSpec(shape=(), dtype=tf.uint8)  # Scalar integer for the label
)

# Generator for training data
train_ds = tf.data.Dataset.from_generator(
    lambda: FrameGenerator(train_path, num_frames, training=True)(),
    output_signature=output_signature
)
train_ds = train_ds.batch(batch_size)

# Generator for test data
test_ds = tf.data.Dataset.from_generator(
    lambda: FrameGenerator(test_path, num_frames)(),
    output_signature=output_signature
)
test_ds = test_ds.batch(batch_size)

# Generator for validation data
val_ds = tf.data.Dataset.from_generator(
    lambda: FrameGenerator(val_path, num_frames)(),
    output_signature=output_signature
)
val_ds = val_ds.batch(batch_size)

In [6]:
for frames, labels in train_ds.take(2):
  print(labels)
  print(f"Shape: {frames.shape}")
  print(f"Label: {labels.shape}")

In [8]:
filepath = "models/GA.h5"

In [9]:
train_ds = train_ds.prefetch(tf.data.experimental.AUTOTUNE)
val_ds = val_ds.prefetch(tf.data.experimental.AUTOTUNE)
test_ds = test_ds.prefetch(tf.data.experimental.AUTOTUNE)

In [10]:
train_ds = train_ds.cache()
val_ds = val_ds.cache()
test_ds = test_ds.cache()

In [12]:
model_save = tf.keras.callbacks.ModelCheckpoint(filepath, monitor="val_accuracy", verbose=0, save_best_only=True, save_weights_only=True, mode="max", save_freq="epoch")

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss',
                                   factor=0.2,
                                   patience=4,
                                   verbose=1,
                                   min_delta=1e-3,min_lr = 5*1e-12,
                                   )

callbacks = [model_save,  reduce_lr]

ValueError: When using `save_weights_only=True` in `ModelCheckpoint`, the filepath provided must end in `.weights.h5` (Keras weights format). Received: filepath=models/GA.h5

In [None]:
from tensorflow.keras import models, layers
from tensorflow.keras.applications import EfficientNetB0
def build_model(learning_rate):
    """
    Builds a TensorFlow/Keras model with a specified learning rate.

    Args:
    learning_rate: The learning rate to use for the Adam optimizer.

    Returns:
    A compiled Keras model.
    """
    base_model = EfficientNetB0(include_top=False, weights='imagenet', input_shape=(224, 224, 3))
    base_model.trainable = False

    model = models.Sequential([
        layers.TimeDistributed(base_model, input_shape=(None, 224, 224, 3)),
        layers.TimeDistributed(layers.GlobalAveragePooling2D()),
        layers.GRU(256, return_sequences=True),
        layers.GRU(128),
        layers.Dense(1024, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(10, activation='softmax')
    ])

    optimizer = Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=['accuracy'])

    return model

In [28]:
import pandas as pd
import random
from deap import base, creator, tools, algorithms
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import models, layers
from tensorflow.keras.applications import EfficientNetB0

def build_model(learning_rate):
    """
    Builds a TensorFlow/Keras model with a specified learning rate.

    Args:
    learning_rate: The learning rate to use for the Adam optimizer.

    Returns:
    A compiled Keras model.
    """
    base_model = EfficientNetB0(include_top=False, weights='imagenet', input_shape=(224, 224, 3))
    base_model.trainable = False

    model = models.Sequential([
        layers.TimeDistributed(base_model, input_shape=(None, 224, 224, 3)),
        layers.TimeDistributed(layers.GlobalAveragePooling2D()),
        layers.GRU(256, return_sequences=True),
        layers.GRU(128),
        layers.Dense(1024, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(10, activation='softmax')
    ])

    optimizer = Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss="sparse_categorical_crossentropy", metrics=['accuracy'])

    return model

def log_individual(log_df, gen, ind, fit, stage, p1=None, p2=None, mut=None):
    new_row = pd.DataFrame([{
        "Generation": gen,
        "Individual": str(ind),
        "Learning Rate": ind[0],
        "Epochs": ind[1],
        "Fitness": fit[0] if fit else None,
        "Stage": stage,
        "Parent 1": str(p1) if p1 else "",
        "Parent 2": str(p2) if p2 else "",
        "Mutation": str(mut) if mut else ""
    }])
    return pd.concat([log_df, new_row], ignore_index=True)

# Initialize your DataFrame before the loop
log_df = pd.DataFrame(columns=['Generation', 'Individual', 'Learning Rate', 'Epochs', 'Fitness', 'Stage', 'Parent 1', 'Parent 2', 'Mutation'])

# Define the evaluation function
def evaluate_model(individual, callbacks):
    learning_rate, num_epochs = float(individual[0]), int(individual[1])
    print(f"Evaluating with learning rate: {learning_rate} and epochs: {num_epochs}")
    model = build_model(learning_rate)
    history = model.fit(train_ds, validation_data=val_ds, epochs=num_epochs, callbacks=callbacks, verbose=1)
    max_val_acc = max(history.history['val_accuracy'])
    print(f"Validation accuracy: {max_val_acc}")
    return (max_val_acc,)

# Initialize DEAP for GA
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)
toolbox = base.Toolbox()
toolbox.register("attr_float", random.uniform, 0.0001, 0.02)
toolbox.register("attr_int", random.randint, 1, 20)
toolbox.register("individual", tools.initCycle, creator.Individual, (toolbox.attr_float, toolbox.attr_int), n=1)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", evaluate_model, callbacks=callbacks)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=1, indpb=0.2)
toolbox.register("select", tools.selTournament, tournsize=3)

# Enhanced logging setup
log_df = pd.DataFrame(columns=['Generation', 'Individual', 'Learning Rate', 'Epochs', 'Fitness', 'Stage', 'Parent 1', 'Parent 2', 'Mutation'])

# Genetic Algorithm Loop with enhanced logging
stagnation_limit = 10  # Allowable number of generations without improvement
best_fitness = float('-inf')
generations_without_improvement = 0
population = toolbox.population(n=10)
ngen = 50  # Maximum number of generations

for gen in range(ngen):
    print(f"Generation {gen}")
    # Evaluate all individuals with invalid fitness
    invalid_ind = [ind for ind in population if not ind.fitness.valid]
    fitnesses = list(map(toolbox.evaluate, invalid_ind))
    for ind, fit in zip(invalid_ind, fitnesses):
        ind.fitness.values = fit
        log_df = log_individual(log_df, gen, ind, fit, "Evaluation")


    # Select the next generation individuals
    offspring = toolbox.select(population, len(population))
    # Clone the selected individuals
    offspring = list(map(toolbox.clone, offspring))

    # Apply crossover and mutation on the offspring
    for child1, child2 in zip(offspring[::2], offspring[1::2]):
        if random.random() < 0.5:
            toolbox.mate(child1, child2)
            del child1.fitness.values
            del child2.fitness.values
            log_df = log_individual(log_df, gen, child1, None, "Crossover", p1=child1, p2=child2)
            log_df = log_individual(log_df, gen, child2, None, "Crossover", p1=child1, p2=child2)


    for mutant in offspring:
        if random.random() < 0.2:
            toolbox.mutate(mutant)
            del mutant.fitness.values
            log_df = log_individual(log_df, gen, mutant, None, "Mutation", mut=mutant)


    # The population is entirely replaced by the offspring
    population[:] = offspring

    # Gather all the fitnesses in one list and print the stats
    fits = [ind.fitness.values[0] for ind in population if ind.fitness.valid]

    if len(fits) > 0:
        length = len(population)
        mean = sum(fits) / length
        sum2 = sum(x*x for x in fits)
        std = abs(sum2 / length - mean**2)**0.5

        print(f"  Min {min(fits)}")
        print(f"  Max {max(fits)}")
        print(f"  Avg {mean}")
        print(f"  Std {std}")

        # Checking for improvement
        current_best = max(fits)
        if current_best > best_fitness:
            best_fitness = current_best
            generations_without_improvement = 0
        else:
            generations_without_improvement += 1
    else:
        generations_without_improvement += 1


    if generations_without_improvement >= stagnation_limit:
        print(f"Stopping early due to no improvement after {stagnation_limit} generations.")
        break

# Final logging and output
best_ind = tools.selBest(population, 1)[0]
print(f"Best individual is: {best_ind}, with accuracy: {best_ind.fitness.values[0]}")
log_df.to_csv('GA_run_log.csv')



Generation 0
Evaluating with learning rate: 0.001172762097285506 and epochs: 19


  super().__init__(**kwargs)


NameError: name 'callbacks' is not defined

In [None]:
log_df.to_csv('ga_process_log.csv')

# Optionally print the DataFrame to check the outputs immediately
print(log_df)

In [None]:
model.compile(optimizer = adam, loss = "sparse_categorical_crossentropy", metrics=['accuracy'])

In [None]:
history =  model.fit(train_ds, validation_data=val_ds, epochs = 20, callbacks = callbacks)

In [None]:
# summarize history for accuracy
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Cricket Shot Classification Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.savefig('custom_1_acc.png', bbox_inches='tight')
plt.show()

In [None]:
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Cricket Shot Classification Model Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epochs')
plt.legend(['train', 'val'], loc='upper left')
plt.savefig('custom_1_loss.png', bbox_inches='tight')
plt.show()

In [None]:
#Prediction Function
array = model.predict(test_ds, batch_size=1, verbose=1)
y_pred = np.argmax(array, axis=1)

In [None]:
y_true = y = np.concatenate([y for x, y in test_ds], axis=0)

In [None]:
conf_mat = confusion_matrix(y_true, y_pred)

In [None]:
conf_mat

In [None]:
classes = {'cover': 0, 'defense': 1, 'flick': 2, 'hook': 3, 'late_cut': 4, 'lofted': 5, 'pull': 6, 'square_cut': 7, 'straight': 8, 'sweep': 9}

In [None]:
from sklearn import metrics
report = metrics.classification_report(y_true, y_pred, target_names=classes)
print(report)

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def plot_confusion_matrix(cm,
                          target_names,
                          title='Confusion matrix',
                          cmap=None,
                          normalize=True):
    import matplotlib.pyplot as plt
    import numpy as np
    import itertools

    accuracy = np.trace(cm) / float(np.sum(cm))
    misclass = 1 - accuracy

    if cmap is None:
        cmap = plt.get_cmap('Blues')

    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()

    if target_names is not None:
        tick_marks = np.arange(len(target_names))
        plt.xticks(tick_marks, target_names, rotation=45)
        plt.yticks(tick_marks, target_names)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]


    thresh = cm.max() / 1.5 if normalize else cm.max() / 2
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        if normalize:
            plt.text(j, i, "{:0.4f}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
        else:
            plt.text(j, i, "{:,}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")


    plt.tight_layout()
    plt.ylabel('True label')
    #plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}'.format(accuracy, misclass))
    plt.xlabel('Predicted label')
    plt.savefig('confmats/Efnetb0_10/{}1.jpg'.format(title))
    plt.show()

In [None]:
plot_confusion_matrix(conf_mat,target_names=classes,title='Circket Shot Confusion Matrix',normalize=False)

In [None]:
plot_confusion_matrix(conf_mat,target_names=classes,title='Circket Shot Confusion Matrix',normalize=True)

In [None]:
import numpy as np

def classify_video(video_path, model, frame_count, class_labels):
    # Process the video file to get the frames
    frames = frames_from_video_file(video_path, frame_count)

    # Add batch dimension if the model expects it
    frames = np.expand_dims(frames, axis=0)

    # Use the model to predict the class probabilities
    predictions = model.predict(frames)
    print("Raw predictions:", predictions)

    # Convert predictions to class labels
    predicted_class_idx = np.argmax(predictions, axis=1)[0]  # Get the index of the max class score
    print("Predicted class index:", predicted_class_idx)

    # Get the class name using the predicted index
    predicted_class_name = list(class_labels.keys())[list(class_labels.values()).index(predicted_class_idx)]

    # Calculate the confidence percentage of the predicted class
    confidence = predictions[0][predicted_class_idx] * 100  # Assuming softmax output, multiply by 100 for percentage
    print("Confidence (%): {:.2f}%".format(confidence))

    return predicted_class_name, confidence


In [None]:
# Example usage:
video_path = '/content/drive/MyDrive/dataset/test/cover/26.mp4'
predicted_class, confidence = classify_video(video_path, model, frame_count=30, class_labels=classes)
print(f"The predicted class for the video is: {predicted_class}", f"Confidence: {confidence:.2f}%")

In [None]:
# Example usage:
video_path = '/content/drive/MyDrive/dataset/test/defense/9.mp4'
predicted_class, confidence = classify_video(video_path, model, frame_count=8, class_labels=classes)
print(f"The predicted class for the video is: {predicted_class}", f"Confidence: {confidence:.2f}%")

In [None]:
# Example usage:
video_path = '/content/drive/MyDrive/dataset/test/flick/26.mp4'
predicted_class, confidence = classify_video(video_path, model, frame_count=8, class_labels=classes)
print(f"The predicted class for the video is: {predicted_class}", f"Confidence: {confidence:.2f}%")

In [None]:
# Example usage:
video_path = '/content/drive/MyDrive/dataset/test/hook/4.mp4'
predicted_class, confidence = classify_video(video_path, model, frame_count=8, class_labels=classes)
print(f"The predicted class for the video is: {predicted_class}", f"Confidence: {confidence:.2f}%")

In [None]:
# Example usage:
video_path = '/content/drive/MyDrive/dataset/test/late_cut/15.mp4'
predicted_class, confidence = classify_video(video_path, model, frame_count=16, class_labels=classes)
print(f"The predicted class for the video is: {predicted_class}", f"Confidence: {confidence:.2f}%")

In [None]:
# Example usage:
video_path = '/content/drive/MyDrive/dataset/test/lofted/7.mp4'
predicted_class, confidence = classify_video(video_path, model, frame_count=16, class_labels=classes)
print(f"The predicted class for the video is: {predicted_class}", f"Confidence: {confidence:.2f}%")

In [None]:
# Example usage:
video_path = '/content/drive/MyDrive/dataset/test/square_cut/242.mp4'
predicted_class, confidence = classify_video(video_path, model, frame_count=16, class_labels=classes)
print(f"The predicted class for the video is: {predicted_class}", f"Confidence: {confidence:.2f}%")

In [None]:
# Example usage:
video_path = '/content/drive/MyDrive/dataset/test/straight/295.mp4'
predicted_class, confidence = classify_video(video_path, model, frame_count=16, class_labels=classes)
print(f"The predicted class for the video is: {predicted_class}", f"Confidence: {confidence:.2f}%")

In [None]:
# Example usage:
video_path = '/content/drive/MyDrive/dataset/test/sweep/267.mp4'
predicted_class, confidence = classify_video(video_path, model, frame_count=16, class_labels=classes)
print(f"The predicted class for the video is: {predicted_class}", f"Confidence: {confidence:.2f}%")

In [None]:
# Example usage:
video_path = '/content/drive/MyDrive/dataset/test/pull/178.mp4'
predicted_class, confidence = classify_video(video_path, model, frame_count=16, class_labels=classes)
print(f"The predicted class for the video is: {predicted_class}", f"Confidence: {confidence:.2f}%")

In [None]:
from tensorflow.keras import models, layers
from tensorflow.keras.applications import EfficientNetB0

# Load pre-trained EfficientNetB0 without the top layer to use as a feature extractor
base_model = EfficientNetB0(include_top=False, weights='imagenet', input_shape=(224, 224, 3))

# Set the base model as non-trainable
base_model.trainable = False

# Define the full model using a Sequential model
model = models.Sequential([
    # Apply EfficientNetB0 to each frame of the video
    layers.TimeDistributed(base_model, input_shape=(None, 224, 224, 3)),
    layers.TimeDistributed(layers.GlobalAveragePooling2D()),

    # Use GRU layers to capture temporal relationships
    layers.GRU(256, return_sequences=True),
    layers.GRU(128),

    # Dense layers for classification
    layers.Dense(1024, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(10, activation='softmax')
])


In [None]:
# Path to the saved weights
weights_path = '/content/models/Custom3_20_weights.h5'

# Load the weights into the model
model.load_weights(weights_path)
