In [2]:
import glob
import math
import os
import shutil
import pickle
import threading
import subprocess
from pathlib import Path
from typing import Any, List, Optional, Tuple, Union
from uuid import uuid4

import cv2
import librosa
import numpy as np
import torch
import whisper
from moviepy import editor
from PIL import Image
from transformers import pipeline
from pyAudioAnalysis import MidTermFeatures as mtf
from pyAudioAnalysis import audioTrainTest as at
from sklearn.feature_selection import VarianceThreshold
from sklearn.metrics.pairwise import cosine_distances
from sklearn.neighbors import LocalOutlierFactor
from torchvision import models, transforms
from yt_dlp import YoutubeDL

np.random.seed(0)

  @numba.jit


In [3]:
save_videos_to = "data"
window = 10
summary_output = "videos_summary"
num_highlights = 10

## **Helper Functions**

In [11]:
class ImageHighlightsFinder:
    def __init__(self, batch_size: int = 32) -> None:
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.batch_size = batch_size
        model = models.resnet18(pretrained=True)
        self.model = torch.nn.Sequential(*(list(model.children())[:-1])).to(self.device)
        self.model.eval()

    def _get_transformations(self, will_be_saved: bool) -> List[Any]:
        transformations = [
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ]
        if will_be_saved:
            transformations.append(transforms.ToPILImage())
        return transformations

    def _preprocess_image(
        self, image: Image.Image, will_be_saved: bool = False
    ) -> Union[torch.Tensor, Image.Image]:
        transformations = self._get_transformations(will_be_saved=will_be_saved)

        transform = transforms.Compose(transformations)
        image = transform(image)
        if will_be_saved:
            return image

        image = image.unsqueeze(0)
        # print(image.shape)
        return image

    def _chunks(self, lst, n):
        """
        Yield successive n-sized chunks from lst.
        """
        for i in range(0, len(lst), n):
            yield lst[i : i + n]

    def _create_feature_vectors(self, file_paths: List[str]) -> np.ndarray:
        features = None
        for file_paths_chunk in self._chunks(file_paths, n=self.batch_size):
            # Get the data for this batch.
            imgs = [Image.open(img).convert("RGB") for img in file_paths_chunk]
            imgs = [self._preprocess_image(img) for img in imgs]
            imgs = torch.cat(imgs, dim=0).to(self.device)

            # Convert them to features.
            with torch.no_grad():
                f = self.model(imgs)

            if features is None:
                features = f.clone()
            else:
                features = torch.cat((features, f), dim=0)

        features = features.squeeze(dim=-1)
        features = features.squeeze(dim=-1)
        features = features.cpu().detach().numpy()
        return features

    def _extract_frames(
        self,
        video_path: str,
        images_folder: str,
        start_at_sec: int = 5,
        window: int = 10,
    ):
        os.makedirs(images_folder, exist_ok=True)
        duration = editor.VideoFileClip(video_path).duration
        vidcap = cv2.VideoCapture(video_path)
        success, image = vidcap.read()
        success = True
        while success & (start_at_sec <= duration):
            vidcap.set(
                cv2.CAP_PROP_POS_MSEC, (start_at_sec * 1000)
            )  # One frame per second.
            success, image = vidcap.read()
            # print("Read a new frame: ", success)
            if success:
                cv2.imwrite(
                    os.path.join(images_folder, f"sec_{start_at_sec}.jpg"), image
                )  # save frame as JPEG file.
            start_at_sec += window

    def _get_ground_percentage(self, file_paths: List[str]) -> np.ndarray:
        perc = []
        for img_path in file_paths:
            img = cv2.imread(img_path)
            hsv = cv2.cvtColor(img,cv2.COLOR_BGR2HSV)
            # Green range
            lower_green = np.array([40,40, 40])
            upper_green = np.array([70, 255, 255])

            # Masking
            mask = cv2.inRange(hsv, lower_green, upper_green)
            res = cv2.bitwise_and(img, img, mask=mask)
            res_gray = cv2.cvtColor(res,cv2.COLOR_BGR2GRAY)

            # Defining a kernel to do morphological operation to get better output.
            kernel = np.ones((5,5),np.uint8)
            thresh = cv2.threshold(res_gray,127,255,cv2.THRESH_BINARY_INV |  cv2.THRESH_OTSU)[1]
            thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)

            ground_percentage = (thresh.size - np.count_nonzero(thresh))/thresh.size

            perc.append(ground_percentage)

        return np.array(perc)

    def get_distances(
        self,
        video_path: str,
        temp_path: str,
        window: int = 10,
    ) -> np.ndarray:
        temp_file_path = os.path.join(temp_path, "image")
        if not os.path.exists(temp_file_path):
            start_at_sec = int(window * 0.5)
            self._extract_frames(
                video_path=video_path,
                images_folder=temp_file_path,
                start_at_sec=start_at_sec,
                window=window,
            )
        file_paths = glob.glob(os.path.join(temp_file_path, "*.jpg"))
        image_features_file = os.path.join(temp_path, "image_features.pkl")
        if os.path.exists(image_features_file):
            with open(image_features_file,'rb') as f:
                features = pickle.load(f)
        else:
            features = self._create_feature_vectors(file_paths=file_paths)
            with open(image_features_file, 'wb') as f:
                pickle.dump(features, f)
        distances = cosine_distances(features, features)
        del features
        median_distances = np.median(distances, axis=1)
        ground_perc = self._get_ground_percentage(file_paths=file_paths)
        median_distances = np.where(ground_perc < 0.01, 0, median_distances)
        del distances
        assert median_distances.shape[0] == len(file_paths)
        # shutil.rmtree(temp_file_path, ignore_errors=True)
        return median_distances

In [12]:
class AudioHighlightsFinder:
    def __init__(self) -> None:
        pass

    def _audio_seg(self, video_path: str, output_path: str, window: int = 10):
        os.makedirs(output_path, exist_ok=True)
        output_name = "output_%04d.wav"
        subprocess.call(
            [
                "ffmpeg",
                "-y",
                "-i",
                video_path,
                "-f",
                "segment",
                "-segment_time",
                str(window),
                os.path.join(output_path, output_name),
            ]
        )

        # Rename the segments
        for seg_path in glob.glob(os.path.join(output_path, "*wav")):
            audio_name = os.path.basename(seg_path)
            audio_name = Path(audio_name).stem

            audio_idx = audio_name.split("_")[-1]
            audio_idx = int(audio_idx)
            # print(int(audio_idx))
            start_ = audio_idx * window
            end_ = (audio_idx + 1) * window
            new_name = f"sec_{int(start_)}_{int(end_)}"

            new_path = seg_path.replace(audio_name, new_name)

            os.rename(seg_path, new_path)

    def mean_amplitude(self, seg_path: str):
        y, sr = librosa.load(seg_path)
        second = []
        for s in range(0, len(y), sr):
            second.append(np.abs(y[s : s + sr]).mean())
        return np.mean(second)

    def _get_silent(self, segments_path: str, thres_q = 30) -> Tuple[List[int], List[str]]:
        segs_amp = []
        file_names = []
        for seg_name in glob.glob(os.path.join(segments_path, "*.wav")):
            seg_amplitude = self.mean_amplitude(seg_name)
            segs_amp.append(seg_amplitude)
            
            file_names.append(seg_name)
            
        # threshold = np.mean(segs_amp)
        threshold = np.percentile(segs_amp, q=thres_q)
        is_silence = [1 if seg_amp < threshold else 0 for seg_amp in segs_amp]
        return is_silence, file_names


    def _feature_extraction(self, directory: str) -> Tuple[np.ndarray, List[str]]:
        f1, _, feature_names = mtf.directory_feature_extraction(
            directory, 1, 1, 0.1, 0.1
        )
        mid_term_features = [f1]
        # convert list of feature matrices to x, y format:
        x, y = at.features_to_matrix(mid_term_features)
        m = x.mean(axis=0)
        s = np.std(x, axis=0)
        X = (x - m) / s
        return X, feature_names

    def _feature_selection(
        self, X: np.ndarray, feature_names: List[str]
    ) -> Tuple[np.ndarray, List[str]]:
        # Choose Features that have some variability
        threshold = 1
        selector = VarianceThreshold(threshold=threshold)
        X_selected = selector.fit_transform(X)
        selected_feature_indices = selector.get_support(indices=True)
        selected_feature_names = [feature_names[i] for i in selected_feature_indices]
        return X_selected, selected_feature_names

    def _outlier_detection(self, X: np.ndarray, num_high: int):
        clf = LocalOutlierFactor(n_neighbors=20, metric="cosine")
        clf.fit(X)
        outlier_scores = clf.negative_outlier_factor_
        sorted_indices = np.argsort(outlier_scores)
        highlight_indices = sorted_indices[:num_high]
        # print(outlier_detection(X_new, 10))
        return highlight_indices

    def get_distances(
        self,
        video_path: str,
        temp_path: str,
        window: int = 10,
    ) -> np.ndarray:
        temp_file_path = os.path.join(temp_path, "audio")
        # Cut the file into smaller chunks.
        if not os.path.exists(temp_file_path):
            self._audio_seg(
                video_path=video_path,
                output_path=temp_file_path,
                window=window,
            )

        is_silence, file_names = self._get_silent(
            segments_path=temp_file_path
        )
        audio_features_file = os.path.join(temp_path, "audio_features.pkl")
        audio_feature_names_file = os.path.join(temp_path, "audio_feature_names.pkl")
        if os.path.exists(audio_features_file):
            with open(audio_features_file,'rb') as f:
                X = pickle.load(f)
            with open(audio_feature_names_file,'rb') as f:
                feature_names = pickle.load(f)
        else:
            X, feature_names = self._feature_extraction(directory=temp_file_path)
            with open(audio_features_file, 'wb') as f:
                pickle.dump(X, f)
            with open(audio_feature_names_file, 'wb') as f:
                pickle.dump(feature_names, f)
        
        
        X, feature_names = self._feature_selection(X=X, feature_names=feature_names)

        distances = cosine_distances(X, X)
        del X
        median_distances = np.median(distances, axis=1)
        # Make distance equal to 0 for the silences to be ignored.
        median_distances = np.where(np.array(is_silence) == 1, 0, median_distances)

        # shutil.rmtree(temp_file_path, ignore_errors=True)
        return median_distances

In [13]:
class TextHighlightsFinder:
    def __init__(self) -> None:
        self.ahf = AudioHighlightsFinder()

    def _transcript_thread_callback(self, filepath_list, model, output_path):
        for file in filepath_list:
            filename = file.split('\\')[-1].replace('wav','txt')
            filepath = os.path.join(output_path, filename)
            f = open(filepath, 'w')
            try:
                f.write(model.transcribe(file)['text'])
            except:
                f.write('')
            f.close()

    def transcript_clips(self, non_silent_files: List[str], output_path: str) -> None:

        if len(non_silent_files) >= 8:
            batch_size = len(non_silent_files)//8
        else:
            batch_size = len(non_silent_files)
        
        chunks = [non_silent_files[i:(i+batch_size)] for i in range(0, len(non_silent_files), batch_size)]

        t = []
        for i in range(0, len(chunks)):
            sm_model_whisper = whisper.load_model("small.en")
            t.append(threading.Thread(target=self._transcript_thread_callback, args=(chunks[i], sm_model_whisper, output_path)))
            t[i].start()

        for i in t:
            i.join()

    def _silent_files_filler(self, silent_files, output_path):
        for file in silent_files:
            filename = file.split('\\')[-1].replace('wav','txt')
            filepath = os.path.join(output_path, filename)
            with open(filepath, 'w') as f:
                f.write("")

    def _feature_extraction(self, directory: str) -> Tuple[np.ndarray, List[str]]:
        texts = []
        for text_file in os.listdir(directory):
            with open(os.path.join(directory, text_file), 'r') as f:
                texts.append(f.read())
            
        classifier = pipeline("sentiment-analysis", model="michellejieli/emotion_text_classifier", top_k=None)
        results  = classifier(texts)
        results = [sorted([tuple(i.values()) for i in j]) for j in results]

        scores = [[j[1] for j in i] for i in results]
        feature_names = [j[0] for j in results[0]]

        speech_rate = [len(t.strip().split(' ')) for t in texts]
        scores = np.array([i[0] + [i[1]] for i in list(zip(scores, speech_rate))])

        feature_names.append('speech_rate')

        return scores, feature_names

    def _calculate_score(self, x):

        m = x.mean(axis=0)
        s = np.std(x, axis=0)
        X = (x - m) / s

        distances = cosine_distances(X, X)
        median_distances = np.median(distances, axis=1)

        w_score = 0.6*median_distances + 0.2*X[:,3] + 0.2*X[:,6]

        return w_score


    def get_distances(
        self,
        temp_path: str,
    ) -> np.ndarray:
        
        temp_file_path = os.path.join(temp_path, "text")
        if not os.path.exists(temp_file_path):
            os.makedirs(temp_file_path, exist_ok=True)
            temp_file_audio_path = os.path.join(temp_path, "audio")
            is_silent, files = self.ahf._get_silent(temp_file_audio_path, thres_q=80)
            non_silent_files = [i[1] for i in list(zip(is_silent, files)) if i[0]==0]
            self.transcript_clips(non_silent_files, temp_file_path)
            silent_files = [i[1] for i in list(zip(is_silent, files)) if i[0]==1]
            self._silent_files_filler(silent_files, temp_file_path)
        
        x, feature_names = self._feature_extraction(temp_file_path)

        w_score = self._calculate_score(x)

        # shutil.rmtree(temp_file_path, ignore_errors=True)
        return w_score

In [23]:
class HighlightsFinder:
    def __init__(self, batch_size: int = 32) -> None:
        self.ahf = AudioHighlightsFinder()
        self.thf = TextHighlightsFinder()
        self.ihf = ImageHighlightsFinder(batch_size=batch_size)
    
    def _str_to_int_tuple(self, s: str) -> Tuple[int, int]:
        start, end = s.split("_")
        start = int(start)
        end = int(end)
        return start, end

    def _merge_timestamps(self, timestamps: List[Tuple]) -> List[Tuple]:
        merged_timestamps = []
        timestamps.sort(key=lambda x: x[0])  # Sort the timestamps based on start time

        for timestamp in timestamps:
            if merged_timestamps and timestamp[0] == merged_timestamps[-1][1]:
                merged_timestamps[-1] = (
                    merged_timestamps[-1][0],
                    timestamp[1],
                )  # Extend the previous timestamp
            else:
                merged_timestamps.append(timestamp)  # Add a new timestamp

        return merged_timestamps

    def _convert_str_to_timestamps(self, highlights: List[str]) -> List[Tuple]:
        timestamps = [
            self._str_to_int_tuple(s=timestamp)
            for timestamp in highlights
        ]
        timestamps = self._merge_timestamps(timestamps=timestamps)
        return timestamps

    def create_video_summary(
        self, video_path: str, summary_output: str, num_highlights: int, window: int
    ):
        # Get video's name.
        video_name = os.path.basename(video_path)
        video_name = Path(video_name).stem

        temp_file_dir = "temp"
        temp_file_path = os.path.join(temp_file_dir, video_name)

        # Get the distances from each modality.
        image_distances = self.ihf.get_distances(
            video_path=video_path, temp_path=temp_file_path, window=window
        )
        audio_distances = self.ahf.get_distances(
            video_path=video_path, temp_path=temp_file_path, window=window
        )
        text_distances = self.thf.get_distances(
            temp_path=temp_file_path
        )
        # This means we have the last audio which is smaller than window.
        if image_distances.shape[0] != audio_distances.shape[0]:
            audio_distances = audio_distances[:-1].copy()
            text_distances = text_distances[:-1].copy()
            
        # assert image_distances.shape[0] == audio_distances.shape[0].

        image_distances = (image_distances-np.min(image_distances))/(np.max(image_distances)-np.min(image_distances))
        audio_distances = (audio_distances-np.min(audio_distances))/(np.max(audio_distances)-np.min(audio_distances))
        text_distances = (text_distances-np.min(text_distances))/(np.max(text_distances)-np.min(text_distances))

        distances = np.add(0.2*image_distances, 0.4*audio_distances, 0.4*text_distances)

        # Get the idx of the segments with the greater distance.
        idx = np.argsort(distances)[-num_highlights:]

        # Load the video.
        video = editor.VideoFileClip(video_path)
        # Get the duration of the video in secs.
        duration = video.duration

        # Create the timestamps in the same way as they will get processed.
        timestamps = [
            f"{(j - 1) * window}_{j * window}"
            for j in range(1, math.floor(duration / window) + 1)
        ]
        timestamps = list(sorted(timestamps))

        # Get the timestamps of the segments with the greater distance.
        highlights = np.array(timestamps)[idx].tolist()

        timestamps = self._convert_str_to_timestamps(highlights=highlights)
        
        # Create the summary video.
        clips = []
        for start_time , end_time in timestamps:
            clip = video.subclip(start_time, end_time).fadeout(0.5)
            # clip = video.subclip(start_time, end_time).resize(height=250).fadeout(0.5)
            clips.append(clip)
        
        final = editor.concatenate_videoclips(clips)
        
        os.makedirs(summary_output, exist_ok=True)
        final.write_videofile(os.path.join(summary_output, f"{video_name}_summary.mp4"))
        # Delete the temp dir.
        # shutil.rmtree(temp_file_dir, ignore_errors=True)

## **Download Videos**

In [28]:
links = [
    "https://www.youtube.com/watch?v=SvV6aUki6LU&list=PLCGIzmTE4d0iCqSmha1X7F-_AqB3jjo26&index=6&ab_channel=FIFA",
    "https://www.youtube.com/watch?v=oZEVgDXJwCc&list=PLCGIzmTE4d0iCqSmha1X7F-_AqB3jjo26&index=7&ab_channel=FIFA",
    "https://www.youtube.com/watch?v=FopM2tiNJO4&list=PLCGIzmTE4d0iCqSmha1X7F-_AqB3jjo26&index=10&ab_channel=FIFA",
    "https://www.youtube.com/watch?v=WlNAln9mcg8&list=PLCGIzmTE4d0iCqSmha1X7F-_AqB3jjo26&index=11&ab_channel=FIFA",
    "https://www.youtube.com/watch?v=xPfs2JL_4ws&list=PLCGIzmTE4d0iCqSmha1X7F-_AqB3jjo26&index=12&ab_channel=FIFA",
    "https://www.youtube.com/watch?v=Cbij3MKhdOY&list=PLCGIzmTE4d0iCqSmha1X7F-_AqB3jjo26&index=14&ab_channel=FIFA",
    "https://www.youtube.com/watch?v=L6sbfskaTDQ&list=PLCGIzmTE4d0iCqSmha1X7F-_AqB3jjo26&index=16&ab_channel=FIFA",
    "https://www.youtube.com/watch?v=i6DaUHROjTg&list=PLCGIzmTE4d0iCqSmha1X7F-_AqB3jjo26&index=3&ab_channel=FIFA",
    "https://www.youtube.com/watch?v=b-HZviMbqxc&list=PLCGIzmTE4d0iCqSmha1X7F-_AqB3jjo26&index=2&ab_channel=FIFA",
    "https://www.youtube.com/watch?v=SirRnkDOrlU&list=PLCGIzmTE4d0iCqSmha1X7F-_AqB3jjo26&index=5&ab_channel=FIFA"
]



ydl_opts = {"noplaylist": True, "outtmpl": os.path.join(save_videos_to, "%(id)s"), "format": "bv*[ext=mp4]+ba[ext=m4a]/b[ext=mp4] / bv*+ba/b"}

with YoutubeDL(ydl_opts) as ydl:
    ydl.download(links)

## **Create Video Summary**

In [29]:
hf = HighlightsFinder(batch_size=32)

In [30]:
videos_output_path = [f for f in glob.glob(os.path.join(save_videos_to, "*.mp4")) if "_summary" not in f and "ipynb_checkpoints" not in f]

In [34]:
for video_path in videos_output_path[:1]:
    hf.create_video_summary(video_path=video_path, summary_output=summary_output, num_highlights=num_highlights, window=window)

Analyzing file 1 of 590: temp\oZEVgDXJwCc\audio\sec_0_10.wav
Analyzing file 2 of 590: temp\oZEVgDXJwCc\audio\sec_1000_1010.wav
Analyzing file 3 of 590: temp\oZEVgDXJwCc\audio\sec_100_110.wav
Analyzing file 4 of 590: temp\oZEVgDXJwCc\audio\sec_1010_1020.wav
Analyzing file 5 of 590: temp\oZEVgDXJwCc\audio\sec_1020_1030.wav
Analyzing file 6 of 590: temp\oZEVgDXJwCc\audio\sec_1030_1040.wav
Analyzing file 7 of 590: temp\oZEVgDXJwCc\audio\sec_1040_1050.wav
Analyzing file 8 of 590: temp\oZEVgDXJwCc\audio\sec_1050_1060.wav
Analyzing file 9 of 590: temp\oZEVgDXJwCc\audio\sec_1060_1070.wav
Analyzing file 10 of 590: temp\oZEVgDXJwCc\audio\sec_1070_1080.wav
Analyzing file 11 of 590: temp\oZEVgDXJwCc\audio\sec_1080_1090.wav
Analyzing file 12 of 590: temp\oZEVgDXJwCc\audio\sec_1090_1100.wav
Analyzing file 13 of 590: temp\oZEVgDXJwCc\audio\sec_10_20.wav
Analyzing file 14 of 590: temp\oZEVgDXJwCc\audio\sec_1100_1110.wav
Analyzing file 15 of 590: temp\oZEVgDXJwCc\audio\sec_110_120.wav
Analyzing file 16

                                                                     

MoviePy - Done.
Moviepy - Writing video videos_summary\oZEVgDXJwCc_summary.mp4



                                                                

Moviepy - Done !
Moviepy - video ready videos_summary\oZEVgDXJwCc_summary.mp4


#### Libraries & Pre-trained Models References
He, K., Zhang, X., Ren, S., & Sun, J. (2016). Deep residual learning for image recognition. In Proceedings of the IEEE conference on computer vision and pattern recognition (pp. 770-778).

Jochen Hartmann, "Emotion English DistilRoBERTa-base". https://huggingface.co/j-hartmann/emotion-english-distilroberta-base/, 2022.
Ashritha R Murthy and K M Anil Kumar 2021 IOP Conf. Ser.: Mater. Sci. Eng. 1110 012009

Radford, A., Kim, J. W., Xu, T., Brockman, G., McLeavey, C., & Sutskever, I. (2022). Robust speech recognition via large-scale weak supervision. arXiv preprint arXiv:2212.04356.

Giannakopoulos, T. (2015). pyaudioanalysis: An open-source python library for audio signal analysis. PloS one, 10(12), e0144610.