# Purpose of this file:

Brainstorm different things and also checking the different functions

In [1]:
# Here's how it should work
# extract_frames -> crop_face -> extract_mfccs -> load_aligments -> align_frames_and_audio

In [2]:
import os
import cv2

def extract_frames(video_path, output_dir, frame_rate=10):
    # Get the video filename without extension to use as subdirectory
    video_name = os.path.splitext(os.path.basename(video_path))[0]
    
    # Create a subdirectory for each video
    video_output_dir = os.path.join(output_dir, video_name)
    os.makedirs(video_output_dir, exist_ok=True)
    
    print(f"Extracting frames from: {video_path}")
    print(f"Saving frames to: {video_output_dir}")
    
    cap = cv2.VideoCapture(video_path)
    frame_count = 0

    while True:
        ret, frame = cap.read()

        if not ret:
            break
            
        if frame_count % frame_rate == 0:
            frame_filename = os.path.join(video_output_dir, f"frame_{frame_count:04d}.jpg")
            cv2.imwrite(frame_filename, frame)
        
        frame_count += 1
    
    cap.release()
    print(f"Completed frame extraction for {video_name}")

video_path = "../data/raw/speaker1/video"
os.makedirs("test/frames", exist_ok=True)
output_dir = "test/frames"
video_files = os.listdir(os.path.join(video_path))

for video in video_files:
    full_video_path = os.path.join(video_path, video)
    extract_frames(video_path=full_video_path, output_dir=output_dir, frame_rate=10)

Extracting frames from: ../data/raw/speaker1/video\bbaf2n.mpg
Saving frames to: test/frames\bbaf2n
Completed frame extraction for bbaf2n
Extracting frames from: ../data/raw/speaker1/video\bbaf3s.mpg
Saving frames to: test/frames\bbaf3s
Completed frame extraction for bbaf3s
Extracting frames from: ../data/raw/speaker1/video\bbaf4p.mpg
Saving frames to: test/frames\bbaf4p
Completed frame extraction for bbaf4p
Extracting frames from: ../data/raw/speaker1/video\bbaf5a.mpg
Saving frames to: test/frames\bbaf5a
Completed frame extraction for bbaf5a
Extracting frames from: ../data/raw/speaker1/video\bbal6n.mpg
Saving frames to: test/frames\bbal6n
Completed frame extraction for bbal6n
Extracting frames from: ../data/raw/speaker1/video\bbal7s.mpg
Saving frames to: test/frames\bbal7s
Completed frame extraction for bbal7s
Extracting frames from: ../data/raw/speaker1/video\bbal8p.mpg
Saving frames to: test/frames\bbal8p
Completed frame extraction for bbal8p
Extracting frames from: ../data/raw/speak

In [3]:
import cv2
from mtcnn import MTCNN
from concurrent.futures import ThreadPoolExecutor, as_completed

def crop_face(image_path, output_path):
    try:
        detector = MTCNN()
        image = cv2.cvtColor(cv2.imread(image_path), cv2.COLOR_BGR2RGB)
        result = detector.detect_faces(image)

        if result:
            x, y, width, height = result[0]["box"]
            face = image[y : y + height, x : x + width]
            cv2.imwrite(output_path, cv2.cvtColor(face, cv2.COLOR_BGR2RGB))
            return True
        else:
            print(f"No face detected in {image_path}")
            return False
    except Exception as e:
        print(f"Error processing {image_path}: {e}")
        return False

def process_video_frames(video_frames, data_dir, output_path, batch_size=10, max_workers=4):
    full_frame_path = os.path.join(data_dir, video_frames)
    video_output_path = os.path.join(output_path, video_frames)
    os.makedirs(video_output_path, exist_ok=True)

    # Get all image paths
    image_paths = [os.path.join(full_frame_path, image_name) for image_name in os.listdir(full_frame_path)]

    for i in range(0, len(image_paths), batch_size):
        batch = image_paths[i:i+batch_size]
        batch_output_paths = [
            os.path.join(video_output_path, os.path.basename(path))
                         for path in batch
        ]

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_image = {
                executor.submit(crop_face, img_path, out_path) : (img_path, out_path)
                for img_path, out_path in zip(batch, batch_output_paths)
            }

            for future in as_completed(future_to_image):
                img_path, out_path = future_to_image[future]
                try:
                    result = future.result()
                    if result:
                        print(f"Successfully processed {img_path}")
                    else:
                        print(f"Failed to process {img_path}")
                except Exception as exc:
                    print(f"{img_path} generated an exception: {exc}")

data_dir = "test/frames"
os.makedirs("test/cropped_faces", exist_ok=True)
output_path = "test/cropped_faces"
frames_dir = os.listdir(os.path.join(data_dir))

for video_frames in frames_dir:
    process_video_frames(video_frames=video_frames, data_dir=data_dir, output_path=output_path)


Successfully processed test/frames\bbaf2n\frame_0000.jpg
Successfully processed test/frames\bbaf2n\frame_0020.jpg
Successfully processed test/frames\bbaf2n\frame_0010.jpg
Successfully processed test/frames\bbaf2n\frame_0030.jpg
Successfully processed test/frames\bbaf2n\frame_0060.jpg
Successfully processed test/frames\bbaf2n\frame_0070.jpg
Successfully processed test/frames\bbaf2n\frame_0050.jpg
Successfully processed test/frames\bbaf2n\frame_0040.jpg
Successfully processed test/frames\bbaf3s\frame_0010.jpg
Successfully processed test/frames\bbaf3s\frame_0000.jpg
Successfully processed test/frames\bbaf3s\frame_0020.jpg
Successfully processed test/frames\bbaf3s\frame_0030.jpg
Successfully processed test/frames\bbaf3s\frame_0070.jpg
Successfully processed test/frames\bbaf3s\frame_0040.jpg
Successfully processed test/frames\bbaf3s\frame_0060.jpg
Successfully processed test/frames\bbaf3s\frame_0050.jpg
Successfully processed test/frames\bbaf4p\frame_0010.jpg
Successfully processed test/fra

In [4]:
import librosa
import numpy as np

def extract_mfcc(audio_path, n_mfcc=13):
    y, sr = librosa.load(audio_path, sr=None)
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
    return mfcc

audio_file_path = "../data/raw/speaker1/audio"
os.makedirs("test/audio_features", exist_ok=True)
output_path = "test/audio_features"
for file in os.listdir(audio_file_path):
    filename_without_ext = os.path.splitext(file)[0]
    full_audio_path = os.path.join(audio_file_path, file)
    output_file_path = os.path.join(output_path, f"{filename_without_ext}_mfcc.npy")
    mfcc = extract_mfcc(audio_path=full_audio_path, n_mfcc=13)
    np.save(output_file_path, mfcc)

In [8]:
import os
import json

def load_alignments(align_path):
    alignments = []
    with open(align_path, "r") as f:
        for line in f:
            start_frame, end_frame, word = line.strip().split()
            print(f"Start Frame: {start_frame}, End Frame: {end_frame}, Word: {word}")

            alignments.append((float(start_frame), float(end_frame), word))
    return alignments

def align_frames_and_audio(frame_dir, alignments, output_file, frame_rate=10):
    aligned_data = []
    video_id = os.path.splitext(os.path.basename(output_file))[0].replace('_aligned', '')
    video_frame_dir = os.path.join(frame_dir, video_id)
        
    for frame_file in os.listdir(video_frame_dir): 
        try:
            frame_number = int(frame_file.split('_')[1].split('.')[0])
            print(f"Frame Number: {frame_number}")
            frame_path = os.path.join(video_id, frame_file)
            
            # Compare directly with frame numbers
            for start_frame, end_frame, word in alignments:
                if start_frame <= frame_number*1000 < end_frame:
                    aligned_data.append({
                        "frame_path": frame_path,
                        "word": word,
                        "start_frame": start_frame,
                        "end_frame": end_frame,
                    })
                    break
        except (IndexError, ValueError) as e:
            print(f"Error processing frame {frame_file}: {e}")
            continue

    with open(output_file, "w") as f:
        json.dump(aligned_data, f, indent=4)


path_to_alignmen_file = "../data/raw/speaker1/alignments"
os.makedirs("test/aligned_data", exist_ok=True)
frame_dir = "test/frames"

for alignment_file in os.listdir(path_to_alignmen_file):
    output_file = os.path.join("test/aligned_data", f"{os.path.splitext(alignment_file)[0]}_aligned.json")
    print(f"Output File: {output_file}")
    alignment = load_alignments(os.path.join(path_to_alignmen_file, alignment_file))
    print(alignment)
    align_frames_and_audio(
        frame_dir=frame_dir,
        alignments=alignment,
        output_file=output_file,
        frame_rate=10
    )
    break

Output File: test/aligned_data\bbaf2n_aligned.json
Start Frame: 0, End Frame: 23750, Word: sil
Start Frame: 23750, End Frame: 29500, Word: bin
Start Frame: 29500, End Frame: 34000, Word: blue
Start Frame: 34000, End Frame: 35500, Word: at
Start Frame: 35500, End Frame: 41000, Word: f
Start Frame: 41000, End Frame: 47250, Word: two
Start Frame: 47250, End Frame: 53000, Word: now
Start Frame: 53000, End Frame: 74500, Word: sil
[(0.0, 23750.0, 'sil'), (23750.0, 29500.0, 'bin'), (29500.0, 34000.0, 'blue'), (34000.0, 35500.0, 'at'), (35500.0, 41000.0, 'f'), (41000.0, 47250.0, 'two'), (47250.0, 53000.0, 'now'), (53000.0, 74500.0, 'sil')]
Frame Number: 0
Frame Number: 10
Frame Number: 20
Frame Number: 30
Frame Number: 40
Frame Number: 50
Frame Number: 60
Frame Number: 70
