In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
import cv2
import numpy as np

from tensorflow.keras.applications import EfficientNetB7

import tensorflow.keras
from tensorflow.keras import applications
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import optimizers
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import *

In [None]:
validation_video_path = '/content/drive/MyDrive/4levelClassification/validate/'
test_video_path = '/content/drive/MyDrive/4levelClassification/Test/'
validate_nparray_path = '/content/drive/MyDrive/4levelClassification/validate_nparray/'
test_nparray_path = '/content/drive/MyDrive/4levelClassification/test_nparray/'
saved_nparray_validate_path = '/content/drive/MyDrive/4levelClassification/validate_nparray/data/'
saved_nparray_test_path = '/content/drive/MyDrive/4levelClassification/test_nparray/data/'

img_height , img_width = 600, 600  # dimension of each frame in video
seq_len = 60 #the number of images we pass as one sequence (only 70 frames per video) -more frames/ better results/ computationally expensive

classes = [0, 1, 2, 3]

In [None]:
def conv_base():
    feature_extractor =  tensorflow.keras.applications.EfficientNetB7(input_shape=(img_height,img_width,3),
                              include_top=False, weights='imagenet',pooling="avg")

    preprocess_input = tensorflow.keras.applications.efficientnet.preprocess_input
    inputs = tensorflow.keras.Input((img_height, img_width, 3))
    preprocessed = preprocess_input(inputs)

    outputs = feature_extractor(preprocessed)
    return tensorflow.keras.Model(inputs, outputs, name="conv_base")

conv_base = conv_base()
conv_base.summary()

Model: "conv_base"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_4 (InputLayer)        [(None, 600, 600, 3)]     0         
                                                                 
 efficientnetb7 (Functional)  (None, 2560)             64097687  
                                                                 
Total params: 64,097,687
Trainable params: 63,786,960
Non-trainable params: 310,727
_________________________________________________________________


In [None]:
part1=0
part2=60
part3=120
part4=180
part5=240
currentPart = -1
def frames_extraction(video_path):

    frames_list = []

    #creating an object by capturing the given video
    #This object can be used to read frames from the captured video
    vidObj = cv2.VideoCapture(video_path)

    #Counter variable - will be incrementing upto the seq_len
    seqCount = 1
    #skipCount = 2
    defect = 0
    successCount = 0
    start = 0

    #The count variable ensures that the number of extracted frames should be equal to the seq_len
    while seqCount <= 60:

        #Reading one frame at a time from the video, it returns status (success) and the actual frame (image)
        success, image = vidObj.read()
        #print(success)
        #If it is not able to read the frame properly then success = 0, and code will jump to else part
        #if success and skipCount == 0:
        if success and start>currentPart:
            image = cv2.resize(image, (img_height, img_width))
            frames_list.append(image)
            seqCount += 1
            #skipCount = 2
            successCount += 1


        elif success and start<=currentPart:
            start +=1
        #elif skipCount > 0:
        #  skipCount -= 1

        elif (defect + seqCount) >=300:
          break

        else:
            '''Defect frame will not be added to list of frames and if number of extracted frames is less than the seq_len, video will be rejected, in next section'''
            defect += 1
            print("Defected frame at ", defect, " ", video_path)

    print(start)
    print("Defected", defect)
    print("successCount", successCount)
    frames = np.asarray(frames_list)
    vidObj.release()
    return frames


In [None]:
def create_Data(batch_dir):
    X = []
    Y = []

    classes_list = os.listdir(batch_dir)
    print(classes_list)

    for c in classes_list:
        print(c)
        # creating the list of videos for the current class
        files_list = os.listdir(os.path.join(batch_dir, c))
        # we’ll pass videos one by one for frames extraction
        for f in files_list:
            #extracting the frames from the given video
            video_path = (os.path.join(os.path.join(batch_dir, c), f))
            #print(video_path)
            frames = frames_extraction(video_path)
            # Check whether the number of frames is equal to the seq_len, if yes -> process, otherwise reject the current video
            if len(frames) == seq_len:
                print(np.asarray(frames).shape)
                #appending the sequence of frames to the input list X
                X.append(conv_base.predict(frames))
                #X.append(frames)
                #we are creating one hot encoding for target variable
                #create the list y of length number of classes, where each element is 0
                y = [0]*len(classes)
                #We find the index of current class c in the classes list and making corresponding element 1 in y
                y[classes.index(int(c))] = 1
                #Now we append the list y into Y
                Y.append(y)

    X = np.asarray(X)
    Y = np.asarray(Y)
    return X, Y

In [None]:
#validate all parts
parts = ["part1", "part2", "part3", "part4", "part5"]
parts_value = [part1, part2, part3, part4, part5]
batches_list = os.listdir(validation_video_path)
print("batches_list", batches_list)
saved_batches = os.listdir(saved_nparray_validate_path) #no need for this if we have good reources
print("saved_batches", saved_batches)
counter = 0
for i in range(5):
        if parts[i]+".npy" in saved_batches: #no need for the if we have no resources
          skippedBatch = np.load(os.path.join(validate_nparray_path, "data" , parts[i]+".npy"))
          print(parts[i], "skipped", skippedBatch.shape)
          continue
        currentPart = parts_value[i]
        print("currentPart ", parts[i])
        x_batch, y_batch = create_Data(validation_video_path)
        x_batch = np.asarray(x_batch)
        y_batch = np.asarray(y_batch)
        np.save(os.path.join(validate_nparray_path, "data", parts[i]), x_batch)
        np.save(os.path.join(validate_nparray_path, "label", parts[i]), y_batch)

        counter += 1
        print(counter)

batches_list ['2', '3', '1', '0']
saved_batches ['part1.npy', 'part2.npy', 'part3.npy', 'part4.npy', '.ipynb_checkpoints']
part1 skipped (1429, 60, 2560)
part2 skipped (1429, 60, 2560)
part3 skipped (1429, 60, 2560)
part4 skipped (1429, 60, 2560)
currentPart  part5
['2', '3', '1', '0']
2
Defected frame at  1   /content/drive/MyDrive/4levelClassification/validate/2/5564630143.avi
Defected frame at  2   /content/drive/MyDrive/4levelClassification/validate/2/5564630143.avi
Defected frame at  3   /content/drive/MyDrive/4levelClassification/validate/2/5564630143.avi
Defected frame at  4   /content/drive/MyDrive/4levelClassification/validate/2/5564630143.avi
Defected frame at  5   /content/drive/MyDrive/4levelClassification/validate/2/5564630143.avi
Defected frame at  6   /content/drive/MyDrive/4levelClassification/validate/2/5564630143.avi
Defected frame at  7   /content/drive/MyDrive/4levelClassification/validate/2/5564630143.avi
Defected frame at  8   /content/drive/MyDrive/4levelClassifi

KeyboardInterrupt: ignored

In [None]:
'''
#test all parts
parts = ["part1", "part2", "part3", "part4", "part5"]
parts_value = [part1, part2, part3, part4, part5]
batches_list = os.listdir(test_video_path)
print("batches_list", batches_list)
saved_batches = os.listdir(saved_nparray_test_path) #no need for this if we have good reources
print("saved_batches", saved_batches)
counter = 0
for i in range(5):
        if parts[i]+".npy" in saved_batches: #no need for the if we have no resources
          skippedBatch = np.load(os.path.join(test_nparray_path, "data" , parts[i]+".npy"))
          print(parts[i], "skipped", skippedBatch.shape)
          continue
        currentPart = parts_value[i]
        print("currentPart ", parts[i])
        x_batch, y_batch = create_Data(test_video_path)
        x_batch = np.asarray(x_batch)
        y_batch = np.asarray(y_batch)
        np.save(os.path.join(test_nparray_path, "data", parts[i]), x_batch)
        np.save(os.path.join(test_nparray_path, "label", parts[i]), y_batch)

        counter += 1
        print(counter)
'''

'\n#test all parts\nparts = ["part1", "part2", "part3", "part4", "part5"]\nparts_value = [part1, part2, part3, part4, part5]\nbatches_list = os.listdir(validation_video_path)\nprint("batches_list", batches_list)\nsaved_batches = os.listdir(saved_nparray_test_path) #no need for this if we have good reources\nprint("saved_batches", saved_batches)\ncounter = 0\nfor i in range(5):\n        if parts[i]+".npy" in saved_batches: #no need for the if we have no resources\n          skippedBatch = np.load(os.path.join(test_nparray_path, "data" , parts[i]+".npy"))\n          print(parts[i], "skipped", skippedBatch.shape)\n          continue\n        currentPart = parts_value[i]    \n        print("currentPart ", parts[i])\n        x_batch, y_batch = create_Data(test_video_path)\n        x_batch = np.asarray(x_batch)\n        y_batch = np.asarray(y_batch)\n        np.save(os.path.join(test_nparray_path, "data", parts[i]), x_batch)\n        np.save(os.path.join(test_nparray_path, "label", parts[i])

In [None]:

validation_data = np.load("/content/drive/MyDrive/4levelClassification/validate_nparray/data/part4.npy")
validation_label = np.load("/content/drive/MyDrive/4levelClassification/validate_nparray/label/part4.npy")
print(validation_data.shape)
print(validation_label.shape)
'''
test_data = np.load(test_nparray_path + "data" +"part1.npy")
test_label = np.load(test_nparray_path + "label" +"part1.npy")
print(test_data.shape)
print(test_label.shape)
'''

(1429, 60, 2560)
(1429, 4)


'\ntest_data = np.load(test_nparray_path + "data" +"part1.npy")\ntest_label = np.load(test_nparray_path + "label" +"part1.npy")\nprint(test_data.shape)\nprint(test_label.shape)\n'