In [3]:
pip install ffmpeg-python

Note: you may need to restart the kernel to use updated packages.


In [6]:
import librosa

# video features

In [None]:
import pandas as pd
import numpy as np
import av
import torch
from transformers import AutoImageProcessor, VideoMAEModel
import os
import glob  

np.random.seed(0)

def read_video_pyav(container, indices):
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

def sample_frame_indices(clip_len, frame_sample_rate, seg_len):
    converted_len = int(clip_len * frame_sample_rate)
    end_idx = np.random.randint(converted_len, seg_len)
    start_idx = end_idx - converted_len
    indices = np.linspace(start_idx, end_idx, num=clip_len)
    indices = np.clip(indices, start_idx, end_idx - 1).astype(np.int64)
    return indices

def extract_video_features(file_path, device):
    container = av.open(file_path)
    indices = sample_frame_indices(clip_len=16, frame_sample_rate=1, seg_len=container.streams.video[0].frames)
    video = read_video_pyav(container, indices)
    
    image_processor = AutoImageProcessor.from_pretrained("MCG-NJU/videomae-base")
    model = VideoMAEModel.from_pretrained("MCG-NJU/videomae-base")
    model.to(device)
    
    inputs = image_processor(list(video), return_tensors="pt").to(device)
    
    with torch.no_grad():
        outputs = model(**inputs)
    features = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy()
    
    return file_path, features

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
video_folder = "C:/Users/giris/Downloads/RESEARCH/Personality/personality_data/Video_Interview"  # Change this to your folder path

video_files = glob.glob(os.path.join(video_folder, "*.mp4"))

all_features = []
file_names = []
for file_path in video_files:
    file_name, features = extract_video_features(file_path, device)
    file_names.append(os.path.basename(file_name))  
    all_features.append(features)

features_df = pd.DataFrame(all_features)
features_df['file_name'] = file_names 
features_csv_path = 'C:/Users/giris/Downloads/RESEARCH/Personality/personality_data/Video_Interview_features.csv' 
features_df.to_csv(features_csv_path, index=False)
#video_folder = "C:/Users/giris/Downloads/RESEARCH/Personality/personality_data/Video_Interview"
#features_csv_path = "C:/Users/giris/Downloads/RESEARCH/Personality/personality_data/Video_Interview_features.csv"
print(f"Features saved to {features_csv_path}")


# audio from video

In [None]:
import subprocess
import os

def extract_audio_ffmpeg(video_file_path, audio_file_path):
    # Include the -y flag to automatically overwrite existing files
    command = ['ffmpeg', '-y', '-i', video_file_path, '-q:a', '0', '-map', 'a', audio_file_path]
    
    try:
        subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print(f"Successfully extracted audio to {audio_file_path}")
    except subprocess.CalledProcessError as e:
        print(f"Error processing {video_file_path}: {e}")

def process_all_videos(video_folder_path, output_folder_path):
   
    if not os.path.exists(output_folder_path):
        os.makedirs(output_folder_path)
    
    for filename in os.listdir(video_folder_path):
        if filename.endswith((".mp4", ".mkv", ".avi")): 
            video_file_path = os.path.join(video_folder_path, filename)
            audio_filename = os.path.splitext(filename)[0] + ".mp3"
            audio_file_path = os.path.join(output_folder_path, audio_filename)
            
            extract_audio_ffmpeg(video_file_path, audio_file_path)
        else:
            print(f"Skipping unsupported file: {filename}")

video_folder_path = 'C:/Users/giris/Downloads/RESEARCH/Personality/personality_data/Video_Interview'
output_folder_path = 'C:/Users/giris/Downloads/RESEARCH/Personality/personality_data/Extracted_Audio'
process_all_videos(video_folder_path, output_folder_path)


# XLSR FEATURES

In [None]:
import os
import torchaudio
from transformers import Wav2Vec2FeatureExtractor, Wav2Vec2Model
import torch
import pandas as pd
from torchaudio.transforms import Resample

feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained("facebook/wav2vec2-large-xlsr-53")
model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-large-xlsr-53")


In [None]:

input_folder = 'C:/Users/giris/Downloads/RESEARCH/Personality/personality_data/Extracted_Audio'
output_folder = 'C:/Users/giris/Downloads/RESEARCH/Personality/personality_data/Extracted_Audio_features'

os.makedirs(output_folder, exist_ok=True)

processed_files = [os.path.splitext(f)[0] for f in os.listdir(output_folder) if f.endswith('.csv')]

for filename in os.listdir(input_folder):
    if filename.endswith(".mp3"):
        audio_file = os.path.join(input_folder, filename)
        
        if os.path.splitext(filename)[0] in processed_files:
            print(f"Features for {filename} already extracted, skipping...")
            continue

        waveform, sample_rate = torchaudio.load(audio_file)

        if waveform.shape[0] > 1:
            waveform = waveform.mean(dim=0, keepdim=True)
        if sample_rate != 16000:
            resampler = Resample(orig_freq=sample_rate, new_freq=16000)
            waveform = resampler(waveform)
            sample_rate = 16000  

        inputs = feature_extractor(waveform.squeeze(), return_tensors="pt", padding="longest", sampling_rate=sample_rate)
        with torch.no_grad():
            features = model(**inputs).last_hidden_state

        features_np = features.squeeze().detach().numpy()

        df = pd.DataFrame(features_np)

        csv_filename = os.path.join(output_folder, f"{os.path.splitext(filename)[0]}_features.csv")
        df.to_csv(csv_filename, index=False)

        print(f"Features saved to {csv_filename}")

# vggish

In [1]:
import tensorflow as tf
from tensorflow_hub import load  # Ensure TensorFlow Hub is installed: pip install tensorflow-hub

# Load the VGGish model
vggish_model = load('https://tfhub.dev/google/vggish/1')


In [None]:

def load_and_preprocess_audio(audio_path, target_sr=16000):
    """
    Load and preprocess audio data from the given file path.
    
    Parameters:
        audio_path (str): Path to the audio file.
        target_sr (int): Target sampling rate for resampling the audio (default: 16000).
    
    Returns:
        numpy.ndarray: Preprocessed audio data.
    """
    audio_data, _ = librosa.load(audio_path, sr=target_sr, mono=True)
    
    
    return audio_data

def extract_vggish_features(audio_path):
    """
    Extract VGGish features from the given audio file.
    
    Parameters:
        audio_path (str): Path to the audio file.
    
    Returns:
        numpy.ndarray: VGGish features.
    """
    audio_data = load_and_preprocess_audio(audio_path)
    embeddings = vggish_model(audio_data)
    return embeddings.numpy()

features = extract_vggish_features('C:/Users/giris/Downloads/RESEARCH/Personality/personality_data/Extracted_Audio/wzewmMk_Nzv.mp3')
print(features.shape)

(27, 128)


In [None]:
print(np.mean(features, axis=0).reshape(1, -1).shape)