In [None]:
!pip install git+https://github.com/regtm/dload.git
!pip install imageio==2.4.1
!pip install moviepy
!pip install opencv-python

In [None]:
import os, subprocess
from pathlib import Path

import librosa
import dload
import cv2
import numpy as np
import moviepy.editor as mp

In [None]:
carrying_download_link = "https://mynbox.nus.edu.sg/fss/public/link/public/stream/read/carrying_videos_complete.zip?linkToken=_lK00jNStNLsUrT0&itemName=fcf428c1-5881-4ec9-8787-ec5bd7a5c695"
normal_download_link = "https://mynbox.nus.edu.sg/fss/public/link/public/stream/read/normal_videos_complete.zip?linkToken=_lK00jNStNLsUrT0&itemName=fcf428c1-5881-4ec9-8787-ec5bd7a5c695"
threat_download_link = "https://mynbox.nus.edu.sg/fss/public/link/public/stream/read/threat_videos_complete.zip?linkToken=_lK00jNStNLsUrT0&itemName=fcf428c1-5881-4ec9-8787-ec5bd7a5c695"


In [None]:
root_path = "/content/drive/MyDrive/NUS/CS4243/CS4243_mini_project"
video_path = "/content/sample_data/video_data"
audio_path = "/content/sample_data/audio_data"
spec_path = "/content/sample_data/spec_data"

Path(video_path).mkdir(exist_ok=True)
Path(audio_path).mkdir(exist_ok=True)
Path(spec_path).mkdir(exist_ok=True)

In [None]:
def download_video(link, label, video_path, audio_path):
    out_path = os.path.join(video_path, label)
    Path(out_path).mkdir(exist_ok=True)
    dload.save_unzip(link, out_path)
    return len([x for x in os.listdir(out_path) if os.path.isfile(os.path.join(out_path, x))])


def has_audio(file_path):
    result = subprocess.run(
        [
            "ffprobe", 
            "-v", 
            "error", 
            "-show_entries", 
            "format=nb_streams", 
            "-of", 
            "default=noprint_wrappers=1:nokey=1", 
            file_path,
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )
    try:
        return int(result.stdout.strip()) > 1
    except:
        return False


def extract_audio(label, video_path, audio_path):
    in_path = os.path.join(video_path, label)
    video_names = [x for x in os.listdir(in_path) if os.path.isfile(os.path.join(in_path, x))]
    out_path = os.path.join(audio_path, label)
    Path(out_path).mkdir(exist_ok=True)
    for name in video_names:
        video_file_path = os.path.join(in_path, name)
        if has_audio(video_file_path):
            try:
                video_data = mp.VideoFileClip(video_file_path)
                file_name, _, _ = name.rpartition(".")
                if video_data.audio:
                    video_data.audio.write_audiofile(os.path.join(out_path, f"{file_name}_audio.mp3"))
            except:
                ...
    return len([x for x in os.listdir(out_path) if os.path.isfile(os.path.join(out_path, x))])


def get_melspectrogram_db(file_path, sr = None, n_fft = 2048, hop_length = 512, n_mels = 128, fmin = 20, fmax = 8300, top_db = 80):
    wav, sr = librosa.load(file_path, sr=sr)
    if wav.shape[0] < 5 * sr:
        wav = np.pad(wav, int(np.ceil((5 * sr - wav.shape[0]) / 2)), mode="reflect")
    else:
        wav = wav[:5*sr]
    spec = librosa.feature.melspectrogram(
        wav,
        sr=sr, 
        n_fft=n_fft,
        hop_length=hop_length,
        n_mels=n_mels,
        fmin=fmin,
        fmax=fmax
    )
    spec_db = librosa.power_to_db(spec, top_db=top_db)
    return spec_db


def get_spectrogram_image(file_path):
    spec = get_melspectrogram_db(file_path)
    mean = spec.mean()
    std = spec.std()
    spec_norm = (spec - mean) / (std + 1e-6)
    spec_min, spec_max = spec_norm.min(), spec_norm.max()
    spec_scaled = 255 * (spec_norm - spec_min) / (spec_max - spec_min)
    spec_scaled = spec_scaled.astype(np.uint8)
    return spec_scaled


def convert_spectrograms(label, audio_path, spec_path):
    in_path = os.path.join(audio_path, label)
    audio_names = [x for x in os.listdir(in_path) if os.path.isfile(os.path.join(in_path, x))]
    out_path = os.path.join(spec_path, label)
    Path(out_path).mkdir(exist_ok=True)
    for name in audio_names:
        audio_file_path = os.path.join(in_path, name)
        spec_image = get_spectrogram_image(audio_file_path)
        file_name, _, _ = name.rpartition(".")
        cv2.imwrite(os.path.join(out_path, f"{file_name}_spec.jpg"), spec_image)


def get_spectrograms(download_link, label, video_path, audio_path, spec_path, process_video=True, process_audio=True):
    print("======")
    print("Processing", label)

    if process_video:
        print("Processing videos...")
        num_video = download_video(download_link, label, video_path, audio_path)
    else:
        video_data_path = os.path.join(video_path, label)
        num_video = len([x for x in os.listdir(video_data_path) if os.path.isfile(os.path.join(video_data_path, x))])

    if process_audio:
        print("Processing Audio...")
        num_audio = extract_audio(label, video_path, audio_path)
    else:
        audio_data_path = os.path.join(audio_path, label)
        num_audio = len([x for x in os.listdir(audio_data_path) if os.path.isfile(os.path.join(audio_data_path, x))])

    print(label, "-", num_video, "videos,", num_audio, "audio,", (num_video - num_audio), "deleted")

    print("Processing Spectrograms...")
    convert_spectrograms(label, audio_path, spec_path)

In [None]:
# get_spectrograms(carrying_download_link, "carrying", video_path, audio_path, spec_path, False, False)
# get_spectrograms(normal_download_link, "normal", video_path, audio_path, spec_path)
get_spectrograms(threat_download_link, "threat", video_path, audio_path, spec_path, False)

In [None]:
from google.colab import drive
drive.mount('/content/drive')
root_path = "/content/drive/MyDrive/NUS/CS4243/CS4243_mini_project"

Mounted at /content/drive


In [None]:
from distutils.dir_util import copy_tree
gdrive_spec_path = os.path.join(root_path, "spectrogram_data")
copy_tree(spec_path, gdrive_spec_path)