In [3]:
import cv2
from PIL import Image
from tqdm.notebook import tqdm
import numpy as np
import os
from time import time

from scipy.signal import argrelextrema
from skimage.filters.rank import entropy
from skimage.morphology import disk
import hdbscan

from typing import Iterator, Tuple, List, Optional

In [4]:
path = "/home/kirb/Downloads/Volkswagen.mp4"

In [3]:
def get_images(path: str):
    cap = cv2.VideoCapture(path)
    while(cap.isOpened()):
        ret, frame = cap.read()
        if ret:
            yield frame

In [4]:
tic = time()

images = get_images(path)
res = []
change_images = []
img_prev = None
for n, img in enumerate(tqdm(images, total=2571)):    
    if img_prev is not None:
        diff = (img - img_prev)
        diff = np.abs(diff)
        mean_diff = diff.mean() 
        res.append(mean_diff)
        if mean_diff > 100:
            img = img[:, :, ::-1]
            change_images.append(Image.fromarray(img))
    img_prev = img
    if n >= 2571:
        break
        
dir_path = "/tmp/dataset/prev_res/"
os.makedirs(dir_path, exist_ok=True)

for i, img in enumerate(tqdm(change_images)):
    img.save("{}/{:06d}.jpg".format(dir_path, i))
    
toc = time()
print(f'time: {toc - tic}s')

  0%|          | 0/2571 [00:00<?, ?it/s]

  0%|          | 0/531 [00:00<?, ?it/s]

time: 11.868627786636353s


In [59]:
class Frame:
    def __init__(self, frame: np.ndarray, frame_number: int, sum_abs_diff: np.ndarray):
        self.frame = frame
        self.frame_number = frame_number
        self.sum_abs_diff = sum_abs_diff


class FrameExtractor(object):
    def __init__(self, use_local_maxima: bool, len_window: int, max_frames_in_chunk: int, window_type: str):
        self.use_local_maxima = use_local_maxima
        self.len_window = len_window
        self.max_frames_in_chunk = max_frames_in_chunk
        self.window_type = window_type

    @staticmethod
    def __calculate_frame_difference(frame: np.ndarray, curr_frame: np.ndarray, prev_frame: np.ndarray, frame_number: int) -> Optional[Frame]:
        if curr_frame is None or prev_frame is None:
            return None
        diff = cv2.absdiff(curr_frame, prev_frame)
        abs_diff = np.sum(diff)
        frame = Frame(frame, frame_number, abs_diff)
        return frame

    def __process_frame(self, frame: np.ndarray, prev_frame: np.ndarray, frame_diffs: list, frames: list, frame_number: int) -> Tuple[np.ndarray, np.ndarray]:
        curr_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
        frame = self.__calculate_frame_difference(frame, curr_frame, prev_frame, frame_number)
        if frame is not None:
            frame_diffs.append(frame.sum_abs_diff)
            frames.append(frame)
        prev_frame = curr_frame
        return prev_frame, curr_frame

    def __extract_all_frames_from_video(self, cap: cv2.VideoCapture) -> Iterator[Tuple[List[np.ndarray], List[int]]]:
        curr_frame = None
        prev_frame = None
        frame_number = 0
        while cap.isOpened():
            frame_diffs = []
            frames = []
            for _ in range(self.max_frames_in_chunk):
                ret, frame = cap.read()
                if not ret:
                    cap.release()
                    break
                prev_frame, curr_frame = self.__process_frame(frame, prev_frame, frame_diffs, frames, frame_number)
                frame_number += 1
            yield frames, frame_diffs

    def __get_frames_in_local_maxima(self, frames: list, frame_diffs: list) -> Tuple[List[np.ndarray], List[int]]:
        extracted_key_frames = []
        extracted_key_numbers = []
        diff_array = np.array(frame_diffs)
        sm_diff_array = self.__smooth(diff_array, self.len_window, self.window_type)
        frame_indexes = np.asarray(argrelextrema(sm_diff_array, np.greater))[0]

        for frame_index in frame_indexes:
            extracted_key_frames.append(frames[frame_index - 1].frame)
            extracted_key_numbers.append(frames[frame_index - 1].frame_number)
        return extracted_key_frames, extracted_key_numbers

    @staticmethod
    def __smooth(x: np.ndarray, window_len: int, window: str) -> np.ndarray:
        if x.ndim != 1:
            raise (ValueError, "smooth only accepts 1 dimension arrays.")

        if x.size < window_len:
            raise (ValueError, "Input vector needs to be bigger than window size.")

        if window_len < 3:
            return x

        if window not in ["flat", "hanning", "hamming", "bartlett", "blackman"]:
            raise (
                ValueError,
                "Smoothing Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'",
            )
        s = np.r_[2 * x[0] - x[window_len:1:-1], x, 2 * x[-1] - x[-1:-window_len:-1]]

        if window == "flat":
            w = np.ones(window_len, "d")
        else:
            w = getattr(np, window)(window_len)
        y = np.convolve(w / w.sum(), s, mode="same")
        return y[window_len - 1: -window_len + 1]

    def extract_candidate_frames(self, cap: cv2.VideoCapture) -> Tuple[np.ndarray, np.ndarray]:
        extracted_candidate_key_frames = []
        extracted_candidate_key_numbers = []

        frame_extractor_from_video_generator = self.__extract_all_frames_from_video(cap)

        for frames, frame_diffs in frame_extractor_from_video_generator:
            extracted_candidate_key_frames_chunk = []
            extracted_candidate_key_numbers_chunk = []
            if self.use_local_maxima:
                extracted_candidate_key_frames_chunk, extracted_candidate_key_numbers_chunk = self.__get_frames_in_local_maxima(frames, frame_diffs)
                extracted_candidate_key_frames.extend(extracted_candidate_key_frames_chunk)
                extracted_candidate_key_numbers.extend(extracted_candidate_key_numbers_chunk)

        extracted_candidate_key_frames = np.array(extracted_candidate_key_frames)
        extracted_candidate_key_numbers = np.array(extracted_candidate_key_numbers)
        return extracted_candidate_key_frames, extracted_candidate_key_numbers

In [60]:
class ImageSelector(object):
    def __init__(self, min_brightness_value: float, max_brightness_value: float, min_contrast_value: float, max_contrast_value: float):
        self.min_brightness_value = min_brightness_value
        self.max_brightness_value = max_brightness_value

        self.min_contrast_value = min_contrast_value
        self.max_contrast_value = max_contrast_value

    @staticmethod
    def __get_brightness_and_contrast_score(images: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        gray = np.average(images, axis=-1, weights=[0.114, 0.587, 0.299])
        brightness_score = np.average(gray, axis=(1, 2)) * 100 / 255
        contrast_score = np.std(gray, axis=(1, 2)) * 100 / 255

        return brightness_score, contrast_score

    def __filter_optimum_brightness_and_contrast_images(self, input_img_files: np.ndarray, input_img_numbers: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        brightness_score, contrast_score = self.__get_brightness_and_contrast_score(input_img_files)
        brightness_contrast_ok = (brightness_score > self.min_brightness_value) & (brightness_score < self.max_brightness_value) &\
                                 (contrast_score > self.min_contrast_value) & (contrast_score < self.max_contrast_value)

        return input_img_files[brightness_contrast_ok], input_img_numbers[brightness_contrast_ok]

    @staticmethod
    def __prepare_cluster_sets__hdbscan(files: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        all_dst = []
        for img_file in files:
            img = cv2.cvtColor(img_file, cv2.COLOR_BGR2GRAY)
            img = cv2.resize(img, (256, 256), img)
            imf = np.float32(img) / 255.0
            dst = cv2.dct(imf)
            dst = dst[:16, :16]
            dst = dst.reshape(256)
            all_dst.append(dst)

        Hdbscan = hdbscan.HDBSCAN(min_cluster_size=2, metric='manhattan').fit(all_dst)
        labels = np.add(Hdbscan.labels_, 1)
        nb_clusters = len(np.unique(Hdbscan.labels_))

        files_clusters_index_array = []
        files_clusters_index_array_of_only_one_image = [np.where(labels == 0)]

        for i in np.arange(1, nb_clusters):
            files_clusters_index_array.append(np.where(labels == i))

        files_clusters_index_array = np.array(files_clusters_index_array)
        files_clusters_index_array_of_only_one_image = np.array(files_clusters_index_array_of_only_one_image)
        return files_clusters_index_array, files_clusters_index_array_of_only_one_image

    @staticmethod
    def __get_laplacian_scores(files: np.ndarray, n_images: np.ndarray) -> List[float]:
        variance_laplacians = []
        for image_i in n_images:
            img_file = files[n_images[image_i]]
            img = cv2.cvtColor(img_file, cv2.COLOR_BGR2GRAY)

            variance_laplacian = cv2.Laplacian(img, cv2.CV_64F).var()
            variance_laplacians.append(variance_laplacian)

        return variance_laplacians

    def __get_best_images_index_from_each_cluster(self, files: np.ndarray, files_clusters_index_array: np.ndarray) -> List[int]:
        filtered_items = []

        clusters = np.arange(len(files_clusters_index_array))
        for cluster_i in clusters:
            curr_row = files_clusters_index_array[cluster_i][0]
            n_images = np.arange(len(curr_row))
            variance_laplacians = self.__get_laplacian_scores(files, n_images)
            selected_frame_of_current_cluster = curr_row[np.argmax(variance_laplacians)]
            filtered_items.append(selected_frame_of_current_cluster)

        return filtered_items

    def select_best_frames(self, input_key_frames: np.ndarray, input_key_numbers: np.ndarray) -> Tuple[List[np.ndarray], List[int]]:
        filtered_images_list = []
        filtered_numbers_list = []
        input_key_frames, input_key_numbers = self.__filter_optimum_brightness_and_contrast_images(input_key_frames, input_key_numbers)

        if len(input_key_frames) >= 1:
            files_clusters_index_array, files_clusters_index_array_of_only_one_image = self.__prepare_cluster_sets__hdbscan(input_key_frames)
            selected_images_index = self.__get_best_images_index_from_each_cluster(input_key_frames, files_clusters_index_array)
            files_clusters_index_array_of_only_one_image = [item for t in files_clusters_index_array_of_only_one_image for item in t]
            files_clusters_index_array_of_only_one_image = files_clusters_index_array_of_only_one_image[0].tolist()
            selected_images_index.extend(files_clusters_index_array_of_only_one_image)
            for index in selected_images_index:
                img = input_key_frames[index]
                number = input_key_numbers[index]
                filtered_images_list.append(img)
                filtered_numbers_list.append(number)

        return filtered_images_list, filtered_numbers_list

In [8]:
tic = time()

vd = FrameExtractor()
imgs = vd.extract_candidate_frames(path)

dir_new_path = "/tmp/dataset/no_smooth_no_brightness_res/"
os.makedirs(dir_new_path, exist_ok=True)

config = Configs()
pool_obj = Pool(processes=cpu_count())

final_images = ImageSelector(pool_obj)
imgs_final = final_images.select_best_frames(imgs, dir_new_path)

for counter, i in enumerate(imgs_final):
    vd.save_frame_to_disk(
        i,
        file_path=os.path.join(dir_new_path),
        file_name="test_" + str(counter),
        file_ext=".jpeg",
    )
    
toc = time()
print(f'time: {toc - tic}s')

  files_clusters_index_array = np.array(files_clusters_index_array)


time: 16.397907495498657s


In [10]:
tic = time()

vd = FrameExtractor()
imgs = vd.extract_candidate_frames(path)

dir_new_path = "/tmp/dataset/no_smooth_res/"
os.makedirs(dir_new_path, exist_ok=True)

config = Configs()
pool_obj = Pool(processes=cpu_count())

final_images = ImageSelector(pool_obj)
imgs_final = final_images.select_best_frames(imgs, dir_new_path)

for counter, i in enumerate(imgs_final):
    vd.save_frame_to_disk(
        i,
        file_path=os.path.join(dir_new_path),
        file_name="test_" + str(counter),
        file_ext=".jpeg",
    )
    
toc = time()
print(f'time: {toc - tic}s')

  files_clusters_index_array = np.array(files_clusters_index_array)


time: 267.0932261943817s


In [14]:
tic = time()

vd = FrameExtractor()
imgs = vd.extract_candidate_frames(path)

dir_new_path = "/tmp/dataset/all_in_res/"
os.makedirs(dir_new_path, exist_ok=True)

config = Configs()
pool_obj = Pool(processes=cpu_count())

final_images = ImageSelector(pool_obj)
imgs_final = final_images.select_best_frames(imgs, dir_new_path)

for counter, i in enumerate(imgs_final):
    vd.save_frame_to_disk(
        i,
        file_path=os.path.join(dir_new_path),
        file_name="test_" + str(counter),
        file_ext=".jpeg",
    )
    
toc = time()
print(f'time: {toc - tic}s')

  files_clusters_index_array = np.array(files_clusters_index_array)


time: 75.28714036941528s


In [18]:
tic = time()

vd = FrameExtractor()
imgs = vd.extract_candidate_frames(path)

dir_new_path = "/tmp/dataset/no_brightness_res/"
os.makedirs(dir_new_path, exist_ok=True)

config = Configs()
pool_obj = Pool(processes=cpu_count())

final_images = ImageSelector(pool_obj)
imgs_final = final_images.select_best_frames(imgs, dir_new_path)

for counter, i in enumerate(imgs_final):
    vd.save_frame_to_disk(
        i,
        file_path=os.path.join(dir_new_path),
        file_name="test_" + str(counter),
        file_ext=".jpeg",
    )
    
toc = time()
print(f'time: {toc - tic}s')

  files_clusters_index_array = np.array(files_clusters_index_array)


time: 14.274819135665894s


In [21]:
tic = time()

vd = FrameExtractor()
imgs = vd.extract_candidate_frames(path)

#dir_new_path = "/tmp/dataset/all_in_res/"
#os.makedirs(dir_new_path, exist_ok=True)

config = Configs()
pool_obj = Pool(processes=cpu_count())

final_images = ImageSelector(pool_obj)
imgs_final = final_images.select_best_frames(imgs, dir_new_path)

'''for counter, i in enumerate(imgs_final):
    vd.save_frame_to_disk(
        i,
        file_path=os.path.join(dir_new_path),
        file_name="test_" + str(counter),
        file_ext=".jpeg",
    )'''
    
toc = time()
print(f'time: {toc - tic}s')

  files_clusters_index_array = np.array(files_clusters_index_array)


time: 140.90563869476318s


In [6]:
tic = time()

vd = FrameExtractor()
imgs = vd.extract_candidate_frames(path)

dir_new_path = "/tmp/dataset/all_new_in_res/"
os.makedirs(dir_new_path, exist_ok=True)

config = Configs()
pool_obj = Pool(processes=cpu_count())

final_images = ImageSelector(pool_obj)
imgs_final = final_images.select_best_frames(imgs, dir_new_path)

for counter, i in enumerate(imgs_final):
    vd.save_frame_to_disk(
        i,
        file_path=os.path.join(dir_new_path),
        file_name="test_" + str(counter),
        file_ext=".jpeg",
    )
    
toc = time()
print(f'time: {toc - tic}s')

  files_clusters_index_array = np.array(files_clusters_index_array)


time: 17.108604192733765s


In [64]:
vd = FrameExtractor(True, 10, 2500, "hanning")
imgs, numbers = vd.extract_candidate_frames(cap)

In [65]:
ims = ImageSelector(15.0, 85.0, 10.0, 90.0)
imgs_final, numbers_final = ims.select_best_frames(imgs, numbers)

  files_clusters_index_array = np.array(files_clusters_index_array)


In [67]:
dir_new_path = "/tmp/dataset/all_new_in_res/"
os.makedirs(dir_new_path, exist_ok=True)

In [69]:
for counter, i in enumerate(imgs_final):
    vd.save_frame_to_disk(
        i,
        file_path=os.path.join(dir_new_path),
        file_name="test_" + str(counter),
        file_ext=".jpeg",
    )

AttributeError: 'FrameExtractor' object has no attribute 'save_frame_to_disk'