<a href="https://colab.research.google.com/github/coldsober-irene/ASSIGNMENTS/blob/main/FullHAR.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##combined codes

In [30]:
from google.colab import drive
drive.mount('/content/drive')

import os
import cv2
import random
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
from sklearn.svm import SVC
from matplotlib import pyplot as plt
from tensorflow.keras.applications import ResNet50
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.callbacks import EarlyStopping

samples = 20
rate = 7

class Sampling:
  count = 0
  def __init__(self, video_path, map_file, sampling_type = 'uniform', val_sampling = False,
               ref_mean=[0.07, 0.07, 0.07], ref_std=[0.1, 0.09, 0.08], enhance_img = False, **kwargs):
    """save_feature_dir: path for storing extracted features for future use"""
    self.data_path = video_path
    self.map_file = map_file
    self.sampling_type = sampling_type
    self.is_valSampling = val_sampling
    self.mean = ref_mean
    self.std = ref_std
    self.enhance_img = enhance_img
    # ALL FEATURES OBTAINED FROM ENTIRE DATASETS
    self.obtained_features = []
    self.labels = []

    # GETTING MAPPING
    self.maps = {}
    with open(self.map_file, 'r') as f:
      lines = f.readlines()
      for line in lines:
          parts = line.split()
          if self.is_valSampling:
            self.maps[parts[-1]] = int(parts[1])
          else:
            self.maps[parts[1]] = int(parts[0])
    print(f'mapping: {self.maps}')
    # SAMPLERS
    self.TrainD_sampler = self.Sampling_Training(data_path = self.data_path,
                                                 mapping = self.maps,
                                                 labels_list = self.labels)
    self.ValidationD_sampler = self.Sampling_Validation(data_path = self.data_path,
                                                        mapping = self.maps,
                                                labels_list = self.labels )

    # FRAME SAMPLING
    if val_sampling:
      self.ValidationD_sampler.Sample(sampling_processor = self.sample)
    else:
      self.TrainD_sampler.Sample(sampling_processor = self.sample)


    # SAVE EXTRACTED FEATURE INTO THE FILE FOR FUTURE USE
    self.saveFeatures(destination_dir = kwargs['save_feature_dir'])
    print("EXTRACTED FEATURE ARE SAVED SUCCESSFULLY!")

  def UniformSampling(self, cap, sample_rate, frameCount):
    for i in range(0, frameCount, sample_rate):
      cap.set(cv2.CAP_PROP_POS_FRAMES, i)
      ret, frame = cap.read()
      if ret:
        if self.enhance_img:
          return self.enhance_image(input_frame = frame)
        return frame

  def RandomSampling(self, cap,num_samples, frameCount):
    sampled_indices = random.sample(range(frameCount), num_samples)

    for i in sampled_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, i)
        ret, frame = cap.read()
        if ret:
          if self.enhance_img:
            return self.enhance_image(input_frame = frame)
          return frame

  def sample(self, cap, **kwargs):
    # CREATE EXTRACTOR OBJECT
    Extractor = Feature_extract(sampled_type = self.sampling_type)
    Sampling.count += 1
    if self.sampling_type == 'uniform':
      sample_frame = self.UniformSampling(cap = cap, frameCount=kwargs['frame_count'], sample_rate = kwargs['sample_rate'])
      kwargs['frames_sampled'].append(sample_frame)

    elif self.sampling_type == 'random':
      sample_frame = self.RandomSampling(cap = cap, frameCount=kwargs['frame_count'], num_samples=kwargs['num_samples'])
      kwargs['frames_sampled'].append(sample_frame)

    # EXTRACT FEATURE FROM THE FRAMES OF EACH VIDEO
    features_obtained = Extractor.features(frames = kwargs['frames_sampled'], ref_mean = self.mean, ref_std = self.std)
    self.obtained_features.append(features_obtained)
    print('Constructor created!')

  def enhance_image(self, input_frame, gamma=0.35, kernel_size=3):
    # Apply gamma correction
    gamma_corrected = np.power(input_frame / 255.0, gamma) * 255.0
    gamma_corrected = gamma_corrected.astype(np.uint8)

    # Return the gamma_corrected image
    return gamma_corrected
    print("FEATURE EXTRACTION AND SAVING IS DONE!!!")

  def saveFeatures(self, destination_dir):
    # CREATE VSTACK ARRAY OF ALL FEATURES EXTRACTED
    all_features = np.vstack(self.obtained_features)
    labels = np.array(self.labels)

    # SAVED THE EXTRACTED FEATURES and their corresponding labels FOR FUTURE USE
    os.makedirs(destination_dir, exist_ok = True)

    np.save(os.path.join(destination_dir,f'features{Sampling.count}.npy'), all_features)
    np.save(os.path.join(destination_dir,f'labels{Sampling.count}.npy'), labels)


  class Sampling_Training:
    def __init__(self, data_path, mapping:dict, labels_list:list = None):
      self.data_path = data_path
      self.maps = mapping
      # subfolders
      self.activities = os.listdir(self.data_path)
      # EXTRACTED FEATURES FROM ALL THE VIDEOS
      self.obtained_features = []
      # LABELS OF THE EXTRACTED FEATURES
      self.labels_list = labels_list

    def Sample(self,sampling_processor, sample_rate = 5, num_samples = 10):
      # Loop through each activity
      for activity in self.activities:
          activity_folder = os.path.join(self.data_path, activity)

          # Loop through video files in the activity folder
          for video_file in os.listdir(activity_folder):
            if '.mp4' in video_file:
              frames_sampled = []
              video_path = os.path.join(activity_folder, video_file)
              cap = cv2.VideoCapture(video_path)

              frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

              # PROCESSOR FOR FINISHING THE TASK
              sampling_processor(frame_count = frame_count,
                  sample_rate = sample_rate, frames_sampled = frames_sampled
                                 , num_samples = num_samples)
              # POPULATE THE LABEL CORRESPONDING TO THE CURRENT VIDEO
              if self.labels_list:
                self.labels_list.append(self.maps[activity])

  class Sampling_Validation:
    def __init__(self, data_path,mapping:dict, labels_list:list = None):
      self.data_path = data_path
      self.maps = mapping
      self.labels_list = labels_list

    def Sample(self, sampling_processor, sample_rate = 5, num_samples = 10):
      'sampling_processor: object to make sampling'
      # Loop through video files in the activity folder
      for video_file in os.listdir(self.data_path):
        if '.mp4' in video_file:
          frames_sampled = []
          video_path = os.path.join(self.data_path, video_file)
          cap = cv2.VideoCapture(video_path)

          frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

          # PROCESSOR FOR FINISHING THE TASK
          sampling_processor(cap = cap, frame_count = frame_count,
            sample_rate = sample_rate, frames_sampled = frames_sampled,
                             num_samples = num_samples)
          # POPULATE THE LABEL CORRESPONDING TO THE CURRENT VIDEO
          self.labels_list.append(self.maps[video_file])

class Feature_extract:
  def __init__(self, sampled_type = 'uniform'):
    # Load pre-trained ResNet50
    self.model = ResNet50(weights='imagenet', include_top=False)
    self.sampled_type = sampled_type

  # Function to normalize a frame
  def normalize_frame(self, frame, ref_mean, ref_std):
      actual_mean = np.mean(frame, axis=(0, 1), keepdims=True)
      actual_std = np.std(frame, axis=(0, 1))
      normalized_frame = (frame - actual_mean) / actual_std * ref_std + ref_mean
      return normalized_frame

  # Function to preprocess frames and extract features using ResNet
  def features(self,frames, ref_mean, ref_std):
      processed_frames = [self.normalize_frame(frame, ref_mean, ref_std) for frame in frames]
      processed_frames = [preprocess_input(frame) for frame in processed_frames]
      features = self.model.predict(np.array(processed_frames))
      return features

class Train_model:
  def __init__(self, valid_features_dir, train_features_dir, feaures_base_names = [], labels_base_name = [], **kwargs):
    '''kwargs: [val_size ex: 0.2, num_classes: classes of dataset, epoch ex : 200, patience: ex: 10, model_storage_dir: dir to hold trained model]'''
    self.trainFeatures = os.path.join(train_features_dir, feaures_base_names[0])
    self.valFeatures = os.path.join(valid_features_dir, feaures_base_names[1])
    self.trainLabels = os.path.join(train_features_dir, labels_base_name[0])
    self.valLabels = os.path.join(valid_features_dir, labels_base_name[1])
    #'/content/drive/MyDrive/machine vision assignment 2/EE6222 train and validate 2023/train/features/features1.npy'
    # labels_dir = '/content/drive/MyDrive/machine vision assignment 2/EE6222 train and validate 2023/train/features/labels1.npy'
    # val_features = '/content/drive/MyDrive/machine vision assignment 2/EE6222 train and validate 2023/validate/features/features1.npy'
    # val_labels = '/content/drive/MyDrive/machine vision assignment 2/EE6222 train and validate 2023/validate/features/labels1.npy'
    self.X_train = np.load(self.trainFeatures, allow_pickle = True)
    self.y_train = np.load(self.trainLabels, allow_pickle = True)
    self.X_val = np.load(self.valFeatures, allow_pickle = True)
    self.y_val = np.load(self.valLabels, allow_pickle = True)

    self.kwargs = kwargs
    self.val_size = self.kwargs['val_size'] # 0.64
    self.shuffle = True
    self.num_classes = self.kwargs['num_classes'] #6
    self.epoch = self.kwargs['epoch'] #300
    self.patience = self.kwargs['patience'] # 10
    self.early_stopping = EarlyStopping(monitor='val_loss', patience= self.patience)

    features = np.vstack([self.X_train, self.X_val])
    labels = np.hstack([self.y_train, self.y_val])
    # print(labels.shape)
    self.X_train, self.X_val, self.y_train, self.y_val = train_test_split(features, labels, test_size=self.val_size,
                                                                          shuffle = self.shuffle, random_state=42)
    # DIR FOR MODEL
    # BASE_DIR ='/content/drive/MyDrive/machine vision assignment 2/EE6222 train and validate 2023/models'
    os.makedirs(kwargs['model_storage_dir'], exist_ok = True)

  def trainModel(self, model_savename = 'uniform_model'):
    # Define the model with 3D convolutional layers
    model = keras.Sequential([
        layers.Input(shape=self.X_train.shape[1:]),  # Input shape matches your feature shape
        layers.Conv2D(32, kernel_size=(3, 3), activation='relu'),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.Dense(self.num_classes, activation='softmax')  # Output layer with the number of classes
    ])

    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    modelHistory = model.fit(self.X_train, self.y_train, epochs=self.epoch, validation_data=(self.X_val, self.y_val)) # , callbacks=[early_stopping]
    plotting(modelHistory)
    model.save(os.path.join(self.kwargs['model_storage_dir'],f'{model_savename}.h5'))

    # PLOTTING
    self.plotting(model)

  def model_evaluation(self, val_features, val_labels):
    results = {}
    X_val = np.load(val_features, allow_pickle = True)
    y_val = np.load(val_labels, allow_pickle = True)
    BASE_DIR = self.kwargs['model_storage_dir']
    models = os.listdir(BASE_DIR)
    # Load your saved model
    model = keras.models.load_model(os.path.join(BASE_DIR, f'{k.split()[0]}_model.h5'))
    # Evaluate the model on the test data
    test_loss, test_accuracy = model.evaluate(X_val, y_val)

    # Print the evaluation results

    print("enhanced images sample")
    print("-"*100)
    print(f'Test Loss: {test_loss}')
    print(f'Test Accuracy: {test_accuracy}')
    results['test'] = (test_loss, test_accuracy)
    # RESULTS DATAFRAME
    result_df = pd.DataFrame(results)
    print(result_df)
    return result_df.to_latex(index=False)

def plotting(model):
  # Access training history
  training_accuracy = model.history['accuracy']
  validation_accuracy = model.history['val_accuracy']
  training_loss = model.history['loss']
  validation_loss = model.history['val_loss']
  # Plot the training and validation accuracy
  plt.figure(figsize=(10, 6))
  plt.plot(training_accuracy, label='Training Accuracy')
  plt.plot(validation_accuracy, label='Validation Accuracy')
  plt.plot(training_loss, label='Training loss')
  plt.plot(validation_loss, label='Validation loss')
  plt.xlabel('Epochs')
  plt.ylabel('Accuracy')
  plt.title('Training and Validation')
  plt.legend()
  plt.grid(True)
  plt.show()



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


#object construction

In [33]:
map_file = '/content/drive/MyDrive/machine vision assignment 2/EE6222 train and validate 2023/validate.txt'
samp = Sampling(video_path = '/content/drive/MyDrive/dummy',
                map_file = map_file,
                sampling_type='random',
                val_sampling = True,
                ref_mean=[0.07, 0.07, 0.07],
                ref_std=[0.1, 0.09, 0.08],
                save_feature_dir = '/content/drive/MyDrive/dummy')



mapping: {'24.mp4': 0, '25.mp4': 0, '26.mp4': 0, '27.mp4': 0, '28.mp4': 0, '29.mp4': 0, '30.mp4': 0, '31.mp4': 0, '32.mp4': 0, '33.mp4': 0, '34.mp4': 0, '35.mp4': 0, '36.mp4': 0, '37.mp4': 0, '38.mp4': 0, '39.mp4': 0, '40.mp4': 0, '176.mp4': 1, '177.mp4': 1, '178.mp4': 1, '179.mp4': 1, '180.mp4': 1, '181.mp4': 1, '182.mp4': 1, '183.mp4': 1, '184.mp4': 1, '185.mp4': 1, '186.mp4': 1, '187.mp4': 1, '188.mp4': 1, '189.mp4': 1, '190.mp4': 1, '202.mp4': 2, '203.mp4': 2, '204.mp4': 2, '205.mp4': 2, '206.mp4': 2, '207.mp4': 2, '208.mp4': 2, '209.mp4': 2, '210.mp4': 2, '211.mp4': 2, '212.mp4': 2, '213.mp4': 2, '214.mp4': 2, '215.mp4': 2, '216.mp4': 2, '235.mp4': 3, '236.mp4': 3, '237.mp4': 3, '238.mp4': 3, '239.mp4': 3, '240.mp4': 3, '241.mp4': 3, '242.mp4': 3, '243.mp4': 3, '244.mp4': 3, '245.mp4': 3, '246.mp4': 3, '247.mp4': 3, '248.mp4': 3, '249.mp4': 3, '250.mp4': 3, '267.mp4': 4, '268.mp4': 4, '269.mp4': 4, '270.mp4': 4, '271.mp4': 4, '272.mp4': 4, '273.mp4': 4, '274.mp4': 4, '275.mp4': 4,



Constructor created!
EXTRACTED FEATURE ARE SAVED SUCCESSFULLY!
