In [1]:
import tensorflow as tf
import numpy as np
import csv
import cv2
import os
from tensorflow.keras import datasets, layers, models
%matplotlib inline 
import matplotlib.pyplot as plt
import pickle
from sys import getsizeof
from sklearn.model_selection import train_test_split
import random

video_path = "videos"
main_dir = "C:\\Users\\kseni\\JUPYTER\\DM-STAVROPOL"
model_dir = "C:\\Users\\kseni\\JUPYTER\\DM-STAVROPOL\\models"
os.chdir(main_dir)


def getClasses():
    os.chdir(main_dir)
    classes = []
    with open("classes.csv") as fp:
        reader = csv.reader(fp, delimiter=",", quotechar='"')
        # next(reader, None)  # skip the headers
        data_read = [row for row in reader]
        classes = [x[1] for x in data_read[1:]]
    return classes
    

def createModel(classes):
    
    input_shape = (30, 30, 3)
    
    model = models.Sequential()
    model.add(layers.TimeDistributed(layers.Conv2D(16, (3, 3), padding='same',activation = 'relu')))
    model.add(layers.TimeDistributed(layers.MaxPooling2D((4, 4)))) 
    model.add(layers.TimeDistributed(layers.Dropout(0.25)))
    model.add(layers.TimeDistributed(layers.Conv2D(32, (3, 3), padding='same',activation = 'relu')))
    model.add(layers.TimeDistributed(layers.MaxPooling2D((4, 4))))
    model.add(layers.TimeDistributed(layers.Dropout(0.25)))
    model.add(layers.TimeDistributed(layers.Conv2D(64, (3, 3), padding='same',activation = 'relu')))
    model.add(layers.TimeDistributed(layers.MaxPooling2D((2, 2))))
    model.add(layers.TimeDistributed(layers.Dropout(0.25)))
    
    model.add(layers.TimeDistributed(layers.Conv2D(64, (3, 3), padding='same',activation = 'relu')))
    model.add(layers.TimeDistributed(layers.MaxPooling2D((2, 2))))
    #model.add(TimeDistributed(Dropout(0.25)))
                                      
    model.add(layers.TimeDistributed(layers.Flatten()))
                      
    model.add(layers.LSTM(32))
                                      
    model.add(layers.Dense(len(classes), activation = 'softmax'))
    
    #model.summary()
    
    return model


def getLabeledVideo(directory):
    os.chdir(f"{main_dir}\\{directory}")
    categories = os.listdir()
    x_data = []
    y_data = []
    for category in range(len(categories)):
        #print(f"Категория: {categories[category]}")
        path = f"{main_dir}\\{directory}\\{categories[category]}"
        os.chdir(path)
        videos = os.listdir()
        for video in videos:
            video_path = os.getcwd()+f"/{video}"
            x_data.append(video_path)
            y_data.append(category)
    return x_data, np.array(y_data)


def getVideo(path_video):
    vid_capture = cv2.VideoCapture(path_video)
    video_numerized = []
    if (vid_capture.isOpened() == False):
        print("Ошибка открытия видеофайла")
    else:
        file_count = 0
        while(vid_capture.isOpened()):
            ret, frame = vid_capture.read()
            if ret == True:
                file_count += 1
                frame = frame/255.0
                video_numerized.append(frame)
            else:
                break
    return np.array(video_numerized)



def batch_generator(x_data, y_data, batch_size):
    for i in range(0, len(x_data), batch_size): 
        yield np.array(list(map(getVideo, x_data[i:i+batch_size])), dtype="object"), y_data[i:i+batch_size]
        
def format_frames(frame, output_size):
    frame = tf.image.convert_image_dtype(frame, tf.float32)
    frame = tf.image.resize_with_pad(frame, *output_size)
    return frame



def frames_from_video_file(video_path, n_frames, output_size = (30,30), frame_step = 15):
    # Read each video frame by frame
    result = []
    src = cv2.VideoCapture(str(video_path))  

    video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)

    need_length = 1 + (n_frames - 1) * frame_step

    if need_length > video_length:
        start = 0
    else:
        max_start = video_length - need_length
        start = random.randint(0, max_start + 1)

    src.set(cv2.CAP_PROP_POS_FRAMES, start)
    # ret is a boolean indicating whether read was successful, frame is the image itself
    ret, frame = src.read()
    result.append(format_frames(frame, output_size))

    for _ in range(n_frames - 1):
        for _ in range(frame_step):
            ret, frame = src.read()
            if ret:
                frame = format_frames(frame, output_size)
                result.append(frame)
            else:
                result.append(np.zeros_like(result[0]))
    src.release()
    result = np.array(result)[..., [2, 1, 0]]

    return result




class FrameGenerator:
    def __init__(self, video_paths, classes, n_frames, training = False):
        """ Returns a set of frames with their associated label. 

          Args:
            path: Video file paths.
            n_frames: Number of frames. 
            training: Boolean to determine if training dataset is being created.
        """
        self.video_paths = video_paths
        self.classes = classes
        self.n_frames = n_frames
        self.training = training
        #self.class_names = sorted(set(p.name for p in self.path.iterdir() if p.is_dir()))
        #self.class_ids_for_name = dict((name, idx) for idx, name in enumerate(self.class_names))

    def get_files_and_class_names(self):
        video_paths = list(self.path.glob('*/*.avi'))
        classes = [p.parent.name for p in video_paths] 
        return video_paths, classes

    def __call__(self):
        #video_paths, classes = self.get_files_and_class_names()

        pairs = list(zip(self.video_paths, self.classes))

        #if self.training:
            #random.shuffle(pairs)

        for path, name in pairs:
            video_frames = frames_from_video_file(path, self.n_frames) 
            label = name#self.class_ids_for_name[name] # Encode labels
            yield video_frames, label

In [None]:
classes = getClasses()
model = createModel(classes)

x_data, y_data = getLabeledVideo(video_path)

x_train, x_test, y_train, y_test = train_test_split(x_data, y_data, test_size=0.33, random_state=42)
#print(getVideo(x_train[0]).shape, y_train[0])


model.compile(optimizer='adam',loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),metrics=['accuracy'])
#model.summary()
fg = FrameGenerator(x_train, y_train, 10, training=True)
#next(fg()).shape

output_signature = (tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32),
                    tf.TensorSpec(shape = (), dtype = tf.int16))
train_dataset = tf.data.Dataset.from_generator(FrameGenerator(x_train, y_train, 10, training=True),
                                          output_signature = output_signature)
val_dataset = tf.data.Dataset.from_generator(FrameGenerator(x_test, y_test, 10),
                                        output_signature = output_signature)

# ПОВЫШЕНИЕ ПРОИЗВОДИТЕЛЬНОСТИ
AUTOTUNE = tf.data.AUTOTUNE

train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size = AUTOTUNE)
val_dataset = val_dataset.cache().shuffle(1000).prefetch(buffer_size = AUTOTUNE)

train_dataset.batch(15)
val_dataset.batch(15)
train_frames, train_labels = next(iter(train_dataset))
print(f'Shape of training set of frames: {train_frames.shape}')

history = model.fit(train_gen, batch_size=32, epochs=40, validation_data=test_gen)



In [None]:
model.fit(train_dataset, 
          epochs = 30,
          validation_data = val_dataset,
          callbacks = tf.keras.callbacks.EarlyStopping(patience = 2, monitor = 'val_loss'))

os.chdir(model_dir)
model.save("Model_CNN_LSTM_ver1.h5")

In [None]:
plt.plot(history.history['accuracy'], label='accuracy')
plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0.5, 1])
plt.legend(loc='lower right')

test_loss, test_acc = model.evaluate(test_images,  test_labels, verbose=2)