<a href="https://colab.research.google.com/github/KaifAhmad1/deepfake/blob/main/audio_deepfake_detection_enahced.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

####  **Audio Deepfake Detection, Fake Calls, Spoofing, Fraud Calls and Voice Cloning Analysis for Defensive Forensics**
This script provides a comprehensive forensic analysis pipeline for audio files, focusing on detecting signs of deepfakes, spoofing, and manipulation. It integrates various analysis techniques including signal processing, feature extraction, traditional ML/DSP-based detection methods, SpeechBrain models (stubbed for demonstration), and state-of-the-art multimodal LLMs via vLLM and Groq.


In [1]:
!pip install -q numpy librosa soundfile matplotlib IPython webrtcvad pydub noisereduce pyAudioAnalysis speechbrain langchain openai langgraph transformers vllm requests ipywidgets audiomentations hmmlearn eyed3 langchain_community praat-parselmouth webrtcvad groq

In [6]:
import os
import json
import asyncio
import time
import sys
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import torch
import librosa
import librosa.display
import matplotlib.pyplot as plt
import nest_asyncio
import ipywidgets as widgets
import webrtcvad
import noisereduce as nr
import parselmouth
from pydub import AudioSegment
from moviepy.editor import VideoFileClip
from transformers import AutoTokenizer
from vllm import LLM, EngineArgs, SamplingParams
import IPython.display as ipd
from IPython.display import display, clear_output, HTML, Image, Markdown

# --- Optional Dependency Handling & Imports ---
try:
    import soundfile as sf
    HAS_SOUNDFILE = True
except ImportError:
    print("[WARN] soundfile library not found (`pip install soundfile`). Some operations might be slower or fail.")
    HAS_SOUNDFILE = False

try:
    import pyloudnorm as pyln
    HAS_PYLOUDNORM = True
except ImportError:
    print("[WARN] pyloudnorm library not found (`pip install pyloudnorm`). Loudness normalization disabled.")
    HAS_PYLOUDNORM = False

try:
    from scipy import signal
    HAS_SCIPY = True
except ImportError:
    print("[WARN] scipy library not found (`pip install scipy`). De-humming feature disabled.")
    HAS_SCIPY = False

try:
    import seaborn as sns
    HAS_SEABORN = True
except ImportError:
    print("[WARN] seaborn library not found (`pip install seaborn`). Enhanced plots disabled.")
    HAS_SEABORN = False

# --- SpeechBrain & LLM Integrations ---
from speechbrain.inference.speaker import SpeakerRecognition
try:
    from speechbrain.augment import AddNoise
except ImportError:
    AddNoise = None
try:
    from speechbrain.pretrained import EncoderClassifier, LanguageIdentification
except ImportError:
    print("[WARN] SpeechBrain pretrained models not fully available. Some features might be limited.")
    EncoderClassifier, LanguageIdentification = None, None

try:
    from groq import Groq, AsyncGroq
    HAS_GROQ = True
except ImportError:
    print("[WARN] Groq library not installed (`pip install groq`). Groq report generation disabled.")
    HAS_GROQ = False
    AsyncGroq = None

# --- UI/Display ---
nest_asyncio.apply()

[WARN] pyloudnorm library not found (`pip install pyloudnorm`). Loudness normalization disabled.
[WARN] SpeechBrain pretrained models not fully available. Some features might be limited.


In [7]:
# --- Configuration & Constants ---
GENERAL_PIPELINE_SETTINGS = {
    "TARGET_SR": 16000,
    "VAD_AGGRESSIVENESS": 2,
    "MAX_CONCURRENT_TASKS": os.cpu_count() or 4,
    "PRINT_LEVEL": "INFO",
    "LOUDNESS_TARGET_LUFS": -23.0,
    "ENABLE_LOUDNESS_NORMALIZATION": HAS_PYLOUDNORM and HAS_SOUNDFILE,
    "ENABLE_NOISE_REDUCTION": True,
    "ENABLE_DEHUMMING": HAS_SCIPY,
    "MAX_VLLM_TOKENS": 350,
    "VLLM_TEMPERATURE": 0.1,
    "GROQ_MODEL": "llama3-70b-8192",
    "GROQ_TEMPERATURE": 0.1,
    "VLLM_MODELS_TO_RUN": ["qwen2_audio", "ultravox"],
}

MODEL_PATHS = {
    "SPKREC_MODEL_SOURCE": "speechbrain/spkrec-ecapa-voxceleb",
    "ANTISPOOF_MODEL_SOURCE": "speechbrain/anti-spoofing-ecapa-voxceleb",
    "LANGID_MODEL_SOURCE": "speechbrain/lang-id-commonlanguage_ecapa",
    "EMOTION_MODEL_SOURCE": "speechbrain/emotion-recognition-wav2vec2-IEMOCAP",
}

# --- Resource Management ---
executor = ThreadPoolExecutor(max_workers=GENERAL_PIPELINE_SETTINGS["MAX_CONCURRENT_TASKS"], thread_name_prefix='ForensicWorker')
vllm_engines = {}

# --- Utility Functions ---
def print_message(level, message):
    levels = {"DEBUG": 0, "INFO": 1, "WARN": 2, "ERROR": 3}
    if levels.get(level, 1) >= levels.get(GENERAL_PIPELINE_SETTINGS["PRINT_LEVEL"], 1):
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"{timestamp} [{level:<5}] {message}")

def get_file_extension(file_path):
    return os.path.splitext(file_path)[1].lower()

def is_video_file(ext):
    return ext in [".mp4", ".avi", ".mov", ".mkv", ".webm"]

async def run_sync_in_executor(func, *args):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, func, *args)

def set_device_for_engine():
    return "cuda" if torch.cuda.is_available() else "cpu"

# --- Data Model ---
class ForensicReport:
    def __init__(self, **kwargs):
        self.file_path = kwargs.get("file_path")
        self.verdict = kwargs.get("verdict", "Error: Report not generated")
        self.mean_risk_score = kwargs.get("mean_risk_score", -1.0)
        self.confidence = kwargs.get("confidence", 0.0)
        self.all_model_scores = kwargs.get("all_model_scores", {})
        self.all_anomalies = kwargs.get("all_anomalies", [])
        self.groq_summary = kwargs.get("groq_summary", "N/A")
        self.vllm_outputs = kwargs.get("vllm_outputs", {})
        self.features = kwargs.get("features", {})
        self.metrics = kwargs.get("metrics", {})
        self.speaker_info = kwargs.get("speaker_info", {})
        self.quality_info = kwargs.get("quality_info", {})
        self.loudness_info = kwargs.get("loudness_info", {})
        self.compression_info = kwargs.get("compression_info", {})
        self.reverb_info = kwargs.get("reverb_info", {})
        self.edit_detection_info = kwargs.get("edit_detection_info", {})
        self.plots = kwargs.get("plots", {})
        self.processing_times = kwargs.get("processing_times", {})
        self.timestamp = kwargs.get("timestamp", datetime.utcnow().isoformat())

    def json(self, indent=2):
        serializable_data = self._make_serializable(self.__dict__)
        return json.dumps(serializable_data, indent=indent)

    def _make_serializable(self, data):
        if isinstance(data, dict):
            return {k: self._make_serializable(v) for k, v in data.items()}
        elif isinstance(data, list):
            return [self._make_serializable(item) for item in data]
        elif isinstance(data, np.ndarray):
            return data.tolist()
        elif isinstance(data, (np.int_, np.intc, np.intp, np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64)):
            return int(data)
        elif isinstance(data, (np.float_, np.float16, np.float32, np.float64)):
            if np.isnan(data): return None
            if np.isinf(data): return None
            return float(data)
        elif isinstance(data, (np.complex_, np.complex64, np.complex128)):
            return {'real': data.real, 'imag': data.imag}
        elif isinstance(data, (np.bool_)):
            return bool(data)
        elif isinstance(data, (np.void)):
            return None
        return data

In [8]:
# --- Enhanced Preprocessing Steps ---
async def normalize_loudness(audio_path: str, sr: int, target_lufs: float = GENERAL_PIPELINE_SETTINGS["LOUDNESS_TARGET_LUFS"]) -> tuple[np.ndarray | None, dict]:
    loudness_info = {"status": "Skipped", "original_lufs": None, "target_lufs": target_lufs}
    if not GENERAL_PIPELINE_SETTINGS["ENABLE_LOUDNESS_NORMALIZATION"]:
        print_message("INFO", "Loudness normalization disabled.")
        try:
            audio_data, _ = await run_sync_in_executor(sf.read, audio_path)
            return audio_data, loudness_info
        except Exception as e:
            print_message("ERROR", f"Failed to read audio file {audio_path} even without normalization: {e}")
            return None, loudness_info

    print_message("INFO", f"Normalizing loudness for {audio_path} to {target_lufs} LUFS...")
    audio_data = None
    try:
        audio_data, current_sr = await run_sync_in_executor(sf.read, audio_path)
        if current_sr != sr:
            print_message("WARN", f"Sample rate mismatch in normalize_loudness ({current_sr} != {sr}). This shouldn't happen if preprocess_audio worked correctly.")
            audio_data = await run_sync_in_executor(librosa.resample, audio_data.T, orig_sr=current_sr, target_sr=sr)
            audio_data = audio_data.T

        if np.max(np.abs(audio_data)) < 1e-6:
            print_message("WARN", "Audio is silent, skipping loudness normalization.")
            loudness_info["status"] = "Skipped (Silent Audio)"
            return audio_data, loudness_info

        meter = pyln.Meter(sr)
        if audio_data.ndim > 1:
            print_message("DEBUG", "Audio has multiple channels, converting to mono for LUFS calculation.")
            mono_audio = np.mean(audio_data, axis=1)
        else:
            mono_audio = audio_data

        original_loudness = await run_sync_in_executor(meter.integrate_loudness, mono_audio)
        loudness_info["original_lufs"] = original_loudness

        gain_db = target_lufs - original_loudness
        gain_linear = 10.0**(gain_db / 20.0)
        normalized_audio = audio_data * gain_linear

        max_peak = np.max(np.abs(normalized_audio))
        if max_peak > 0.99:
            print_message("WARN", f"Potential clipping detected after LUFS normalization (Peak: {max_peak:.2f}). Scaling down.")
            normalized_audio = normalized_audio / (max_peak / 0.99)
            loudness_info["status"] = f"Normalized (Peak Limited from {max_peak:.2f})"
        else:
            loudness_info["status"] = "Normalized"

        await run_sync_in_executor(sf.write, audio_path, normalized_audio, sr)
        print_message("INFO", f"Loudness normalized. Original: {original_loudness:.2f} LUFS -> Target: {target_lufs} LUFS.")
        return normalized_audio, loudness_info

    except Exception as e:
        print_message("ERROR", f"Loudness normalization failed: {e}")
        loudness_info["status"] = f"Failed ({e})"
        if audio_data is None:
            try:
                audio_data, _ = await run_sync_in_executor(sf.read, audio_path)
            except Exception as read_e:
                print_message("ERROR", f"Failed to read audio file {audio_path} after normalization error: {read_e}")
                return None, loudness_info
        return audio_data, loudness_info

def apply_dehumming(audio_data: np.ndarray, sr: int, freqs_to_remove: list = [60, 120, 180, 50, 100, 150]) -> np.ndarray:
    if not GENERAL_PIPELINE_SETTINGS["ENABLE_DEHUMMING"]:
        return audio_data
    print_message("INFO", "Applying de-humming notch filters...")
    try:
        processed_audio = audio_data.copy()
        for freq in freqs_to_remove:
            if freq < sr / 2:
                Q = 30.0
                b, a = signal.iirnotch(freq, Q, sr)
                if processed_audio.ndim > 1:
                    for i in range(processed_audio.shape[1]):
                        processed_audio[:, i] = signal.filtfilt(b, a, processed_audio[:, i])
                else:
                    processed_audio = signal.filtfilt(b, a, processed_audio)
        print_message("INFO", f"Applied notch filters for frequencies: {freqs_to_remove}")
        return processed_audio
    except Exception as e:
        print_message("ERROR", f"De-humming failed: {e}")
        return audio_data

async def preprocess_audio(input_path: str, target_sr: int = GENERAL_PIPELINE_SETTINGS["TARGET_SR"]) -> tuple[np.ndarray | None, int, str, dict]:
    start_time = time.time()
    print_message("INFO", f"Starting audio preprocessing for: {input_path}")
    base, _ = os.path.splitext(os.path.basename(input_path))
    processed_wav_path = f"processed_{base}_{int(time.time())}.wav"
    print_message("DEBUG", f"Processed audio will be saved to: {processed_wav_path}")

    audio_data = None
    loudness_info = {"status": "Not Attempted"}

    try:
        ext = get_file_extension(input_path)

        if is_video_file(ext):
            print_message("INFO", "Video file detected. Extracting audio using MoviePy...")
            def extract_audio_sync():
                try:
                    clip = VideoFileClip(input_path)
                    clip.audio.write_audiofile(processed_wav_path, fps=target_sr, codec='pcm_s16le', logger=None)
                    clip.close()
                    print_message("INFO", f"Audio extracted successfully to {processed_wav_path}")
                    return processed_wav_path
                except Exception as e:
                    print_message("ERROR", f"MoviePy audio extraction failed: {e}")
                    return None
            processed_path = await run_sync_in_executor(extract_audio_sync)
            if not processed_path: return None, target_sr, "", loudness_info
            current_input = processed_path

        elif ext != ".wav":
            print_message("INFO", f"Non-WAV audio file ({ext}) detected. Converting using pydub...")
            def convert_audio_sync():
                try:
                    audio = AudioSegment.from_file(input_path)
                    audio = audio.set_channels(1).set_frame_rate(target_sr)
                    audio.export(processed_wav_path, format="wav")
                    print_message("INFO", f"Audio converted successfully to {processed_wav_path}")
                    return processed_wav_path
                except Exception as e:
                    print_message("ERROR", f"Pydub audio conversion failed: {e}")
                    return None
            processed_path = await run_sync_in_executor(convert_audio_sync)
            if not processed_path: return None, target_sr, "", loudness_info
            current_input = processed_path
        else:
            print_message("INFO", "Input is WAV. Ensuring target SR and mono...")
            def resave_wav():
                try:
                    audio, sr_orig = sf.read(input_path)
                    if audio.ndim > 1:
                        audio = np.mean(audio, axis=1)
                    if sr_orig != target_sr:
                        print_message("DEBUG", f"Resampling WAV from {sr_orig} Hz to {target_sr} Hz")
                        audio = librosa.resample(audio, orig_sr=sr_orig, target_sr=target_sr)
                    sf.write(processed_wav_path, audio, target_sr)
                    print_message("INFO", f"WAV standardized to {processed_wav_path} (SR={target_sr}, mono=True)")
                    return processed_wav_path
                except Exception as e:
                    print_message("ERROR", f"Failed to standardize WAV: {e}")
                    return None
            processed_path = await run_sync_in_executor(resave_wav)
            if not processed_path: return None, target_sr, "", loudness_info
            current_input = processed_path

        if GENERAL_PIPELINE_SETTINGS["ENABLE_LOUDNESS_NORMALIZATION"]:
            audio_data, loudness_info = await normalize_loudness(current_input, target_sr, GENERAL_PIPELINE_SETTINGS["LOUDNESS_TARGET_LUFS"])
            if audio_data is None:
                print_message("WARN", "Proceeding without successfully normalized audio data due to error.")
                try:
                    audio_data, _ = await run_sync_in_executor(sf.read, current_input)
                except Exception as read_e:
                    print_message("ERROR", f"Failed to load audio {current_input} after normalization error: {read_e}")
                    return None, target_sr, current_input, loudness_info
        else:
            try:
                audio_data, _ = await run_sync_in_executor(sf.read, current_input)
                loudness_info["status"] = "Skipped (Disabled)"
            except Exception as e:
                print_message("ERROR", f"Failed to read audio file {current_input}: {e}")
                return None, target_sr, current_input, loudness_info

        if audio_data is None:
            print_message("ERROR", "Audio data is None after loading/normalization attempts.")
            return None, target_sr, current_input, loudness_info

        if GENERAL_PIPELINE_SETTINGS["ENABLE_DEHUMMING"]:
            audio_data = await run_sync_in_executor(apply_dehumming, audio_data, target_sr)

        peak_val = np.max(np.abs(audio_data))
        if peak_val > 1e-6:
            audio_data = audio_data / peak_val * 0.98
        else:
            print_message("WARN", "Audio signal is near silent after processing steps.")

        if GENERAL_PIPELINE_SETTINGS["ENABLE_NOISE_REDUCTION"] and audio_data is not None and len(audio_data) > 0:
            print_message("INFO", "Applying noise reduction...")
            def reduce_noise_sync():
                try:
                    reduced_audio = nr.reduce_noise(y=audio_data, sr=target_sr, prop_decrease=0.8, stationary=False)
                    print_message("INFO", "Noise reduction applied.")
                    return reduced_audio
                except Exception as e:
                    print_message("WARN", f"Noise reduction failed: {e}")
                    return audio_data
            audio_data = await run_sync_in_executor(reduce_noise_sync)

        processing_time = time.time() - start_time
        print_message("INFO", f"Preprocessing complete. Time: {processing_time:.2f}s")
        return audio_data, target_sr, current_input, loudness_info

    except Exception as e:
        print_message("ERROR", f"Critical error during preprocessing: {e}")
        processing_time = time.time() - start_time
        print_message("ERROR", f"Preprocessing failed after {processing_time:.2f}s")
        return None, target_sr, processed_wav_path, {"status": f"Failed ({e})"}

In [9]:
# --- Feature Extraction (Includes VAD/Silence) ---
async def extract_comprehensive_features(audio_data: np.ndarray, sr: int) -> dict:
    if audio_data is None or len(audio_data) == 0:
        print_message("WARN", "Cannot extract features from empty audio data.")
        return {}
    start_time = time.time()
    print_message("INFO", "Extracting comprehensive audio features...")

    features = {}

    def compute_vad_sync():
        vad_ratio = 0.0
        silence_ratio = 1.0
        segments = []
        try:
            vad = webrtcvad.Vad(GENERAL_PIPELINE_SETTINGS["VAD_AGGRESSIVENESS"])
            frame_duration_ms = 20
            frame_length = int(sr * frame_duration_ms / 1000)
            num_frames = len(audio_data) // frame_length

            if np.max(np.abs(audio_data)) > 0:
                int16_audio = (audio_data * 32767).astype(np.int16)
            else:
                int16_audio = np.zeros_like(audio_data, dtype=np.int16)
            audio_bytes = int16_audio.tobytes()
            bytes_per_frame = frame_length * 2

            speech_frames_count = 0
            total_frames_processed = 0
            is_speaking = False
            segment_start_ms = 0

            if bytes_per_frame == 0:
                print_message("WARN", "Frame length is zero, cannot perform VAD.")
                return 0.0, 1.0, []

            for i in range(num_frames):
                start_byte = i * bytes_per_frame
                end_byte = start_byte + bytes_per_frame
                if end_byte > len(audio_bytes): break
                frame_bytes = audio_bytes[start_byte:end_byte]

                if len(frame_bytes) == bytes_per_frame:
                    frame_is_speech = vad.is_speech(frame_bytes, sr)
                    if frame_is_speech:
                        speech_frames_count += 1
                        if not is_speaking:
                            segment_start_ms = i * frame_duration_ms
                            is_speaking = True
                    else:
                        if is_speaking:
                            segment_end_ms = i * frame_duration_ms
                            segments.append([segment_start_ms, segment_end_ms])
                            is_speaking = False
                    total_frames_processed += 1
                else:
                    if is_speaking:
                        segment_end_ms = i * frame_duration_ms
                        segments.append([segment_start_ms, segment_end_ms])
                        is_speaking = False

            if is_speaking:
                segments.append([segment_start_ms, num_frames * frame_duration_ms])

            if total_frames_processed > 0:
                vad_ratio = speech_frames_count / total_frames_processed
                silence_ratio = 1.0 - vad_ratio
            else:
                print_message("WARN", "No frames processed for VAD analysis.")
                vad_ratio = 0.0
                silence_ratio = 1.0

            return vad_ratio, silence_ratio, segments

        except Exception as e:
            print_message("ERROR", f"WebRTC VAD failed: {e}")
            return 0.0, 1.0, []

    vad_ratio, silence_ratio, speech_segments_ms = await run_sync_in_executor(compute_vad_sync)
    features['vad_ratio'] = vad_ratio
    features['silence_ratio'] = silence_ratio
    features['speech_segments_ms'] = speech_segments_ms
    features['speech_segments_s'] = [[s / 1000.0, e / 1000.0] for s, e in speech_segments_ms]

    def compute_other_features_sync():
        other_feats = {}
        try:
            other_feats['duration_s'] = len(audio_data) / sr
            other_feats['energy_rms'] = np.sqrt(np.mean(audio_data ** 2))
            other_feats['zero_crossing_rate_mean'] = np.mean(librosa.feature.zero_crossing_rate(y=audio_data))
            other_feats['rmse_mean'] = np.mean(librosa.feature.rms(y=audio_data))

            other_feats['spectral_centroid_mean'] = np.mean(librosa.feature.spectral_centroid(y=audio_data, sr=sr))
            other_feats['spectral_bandwidth_mean'] = np.mean(librosa.feature.spectral_bandwidth(y=audio_data, sr=sr))
            other_feats['spectral_rolloff_mean'] = np.mean(librosa.feature.spectral_rolloff(y=audio_data, sr=sr))
            other_feats['spectral_flatness_mean'] = np.mean(librosa.feature.spectral_flatness(y=audio_data))
            spectral_contrast = librosa.feature.spectral_contrast(y=audio_data, sr=sr, n_bands=6)
            other_feats['spectral_contrast_mean'] = np.mean(spectral_contrast)
            other_feats['spectral_contrast_std'] = np.std(spectral_contrast)

            mfccs = librosa.feature.mfcc(y=audio_data, sr=sr, n_mfcc=24)
            other_feats['mfcc_mean'] = np.mean(mfccs)
            other_feats['mfcc_std'] = np.std(mfccs)
            if mfccs.shape[1] > 3:
                other_feats['mfcc_delta_mean'] = np.mean(librosa.feature.delta(mfccs))
                other_feats['mfcc_delta2_mean'] = np.mean(librosa.feature.delta(mfccs, order=2))
            else:
                other_feats['mfcc_delta_mean'] = 0
                other_feats['mfcc_delta2_mean'] = 0

            chroma = librosa.feature.chroma_stft(y=audio_data, sr=sr)
            other_feats['chroma_mean'] = np.mean(chroma)
            other_feats['chroma_std'] = np.std(chroma)

            snd = parselmouth.Sound(audio_data, sr)
            pitch = snd.to_pitch_ac(time_step=0.01, pitch_floor=75, pitch_ceiling=500)
            pitch_values = pitch.selected_array['frequency']
            pitch_values = pitch_values[pitch_values > 0]

            if len(pitch_values) > 0:
                other_feats['pitch_mean_hz'] = np.mean(pitch_values)
                other_feats['pitch_std_hz'] = np.std(pitch_values)
                other_feats['pitch_min_hz'] = np.min(pitch_values)
                other_feats['pitch_max_hz'] = np.max(pitch_values)

                point_process = parselmouth.praat.call(pitch, "To PointProcess")
                jitter_local = parselmouth.praat.call(point_process, "Get jitter (local)", 0.0, 0.0, 0.0001, 0.02, 1.3)
                intensity = snd.to_intensity(minimum_pitch=75)
                shimmer_local = parselmouth.praat.call([snd, point_process], "Get shimmer (local)", 0.0, 0.0, 0.0001, 0.02, 1.3, 1.6)
                other_feats['pitch_jitter_local_rel'] = jitter_local if not np.isnan(jitter_local) else 0
                other_feats['intensity_shimmer_local_db'] = shimmer_local if not np.isnan(shimmer_local) else 0

                harmonicity = parselmouth.praat.call(snd, "To Harmonicity (cc)", 0.01, 75, 0.1, 1.0)
                hnr = parselmouth.praat.call(harmonicity, "Get mean", 0, 0)
                other_feats['hnr_mean_db'] = hnr if not np.isnan(hnr) else 0
            else:
                for key in ['pitch_mean_hz', 'pitch_std_hz', 'pitch_min_hz', 'pitch_max_hz',
                            'pitch_jitter_local_rel', 'intensity_shimmer_local_db', 'hnr_mean_db']:
                    other_feats[key] = 0

        except Exception as e:
            print_message("ERROR", f"Core feature extraction failed: {e}")
            return None

        return other_feats

    other_features = await run_sync_in_executor(compute_other_features_sync)
    if other_features:
        features.update(other_features)

    processing_time = time.time() - start_time
    if features:
        print_message("INFO", f"Comprehensive feature extraction complete. Time: {processing_time:.2f}s")
        return features
    else:
        print_message("ERROR", f"Feature extraction process encountered an error after {processing_time:.2f}s")
        return {}

In [10]:
# --- New Vertical Analysis Agents ---
async def analyze_compression_artifacts(audio_data: np.ndarray, sr: int) -> dict:
    start_time = time.time()
    print_message("INFO", "Analyzing for compression artifacts...")
    results = {"score": 0.0, "anomaly": False, "reason": "No significant artifacts detected."}
    if audio_data is None or len(audio_data) == 0:
        results["reason"] = "Skipped (Empty Audio)"
        return results
    try:
        stft_result = np.abs(librosa.stft(audio_data))
        freqs = librosa.fft_frequencies(sr=sr)
        low_band_mask = freqs < 8000
        high_band_mask = freqs > 16000

        if np.any(low_band_mask) and np.any(high_band_mask):
            low_energy = np.mean(stft_result[low_band_mask, :]**2)
            high_energy = np.mean(stft_result[high_band_mask, :]**2)
            if low_energy > 1e-8:
                hf_ratio = high_energy / low_energy
                results['hf_energy_ratio'] = hf_ratio
                if hf_ratio < 0.005:
                    results['score'] = 0.7
                    results['anomaly'] = True
                    results['reason'] = f"Very low high-frequency energy ratio ({hf_ratio:.4f}), suggests potential compression cutoff."
            else:
                results['hf_energy_ratio'] = 0
        else:
            results['hf_energy_ratio'] = None

        processing_time = time.time() - start_time
        print_message("INFO", f"Compression analysis complete. Score: {results['score']:.2f}. Time: {processing_time:.2f}s")
        return results
    except Exception as e:
        print_message("ERROR", f"Compression artifact analysis failed: {e}")
        results["reason"] = f"Error during analysis: {e}"
        results["score"] = 0.1
        return results

async def estimate_reverb(audio_data: np.ndarray, sr: int) -> dict:
    start_time = time.time()
    print_message("INFO", "Estimating reverberation (basic)...")
    results = {"score": 0.0, "description": "Reverb estimation inconclusive.", "rt60_approx_s": None}
    if audio_data is None or len(audio_data) < sr:
        results["description"] = "Skipped (Audio too short or empty)"
        return results

    try:
        rms = librosa.feature.rms(y=audio_data)[0]
        if len(rms) > 10:
            from scipy.stats import kurtosis
            rms_kurtosis = kurtosis(rms, fisher=False)
            results['rms_envelope_kurtosis'] = rms_kurtosis
            if rms_kurtosis < 2.5:
                results['score'] = 0.6
                results['description'] = f"Low RMS kurtosis ({rms_kurtosis:.2f}) suggests possible significant reverberation."
            elif rms_kurtosis > 5.0:
                results['score'] = 0.1
                results['description'] = f"High RMS kurtosis ({rms_kurtosis:.2f}) suggests relatively dry signal."
            else:
                results['score'] = 0.3
                results['description'] = f"Moderate RMS kurtosis ({rms_kurtosis:.2f}). Reverb likely moderate."

            results['rt60_approx_s'] = None  # Placeholder for estimated_rt60 if needed

        else:
            results['description'] = "RMS envelope too short for kurtosis calculation."

    except Exception as e:
        print_message("ERROR", f"Reverb estimation failed: {e}")
        results["description"] = f"Error during analysis: {e}"
        results["score"] = 0.1

    processing_time = time.time() - start_time
    print_message("INFO", f"Reverb estimation complete. Score: {results['score']:.2f}. Time: {processing_time:.2f}s")
    return results

async def detect_potential_edits(audio_data: np.ndarray, sr: int) -> dict:
    start_time = time.time()
    print_message("INFO", "Detecting potential edit points (basic)...")
    results = {"score": 0.0, "anomaly": False, "reason": "No significant inconsistencies detected.", "segment_feature_std": {}}
    if audio_data is None or len(audio_data) < sr:
        results["reason"] = "Skipped (Audio too short or empty)"
        return results

    try:
        non_silent_segments = librosa.effects.split(audio_data, top_db=45, frame_length=2048, hop_length=512)

        if len(non_silent_segments) <= 1:
            results["reason"] = "Skipped (Audio contains only one non-silent segment)"
            return results

        print_message("DEBUG", f"Found {len(non_silent_segments)} non-silent segments for edit detection.")

        segment_features = {'zcr': [], 'rms': [], 'centroid': [], 'flatness': []}
        min_segment_len_samples = int(0.1 * sr)

        for i, (start, end) in enumerate(non_silent_segments):
            segment_audio = audio_data[start:end]
            if len(segment_audio) < min_segment_len_samples:
                continue

            segment_features['zcr'].append(np.mean(librosa.feature.zero_crossing_rate(y=segment_audio)))
            segment_features['rms'].append(np.mean(librosa.feature.rms(y=segment_audio)))
            segment_features['centroid'].append(np.mean(librosa.feature.spectral_centroid(y=segment_audio, sr=sr)))
            segment_features['flatness'].append(np.mean(librosa.feature.spectral_flatness(y=segment_audio)))

        if len(segment_features['zcr']) <= 1:
            results["reason"] = "Skipped (Not enough valid non-silent segments for comparison)"
            return results

        max_relative_std = 0.0
        feature_std_devs = {}

        for key, values in segment_features.items():
            if not values: continue
            mean_val = np.mean(values)
            std_val = np.std(values)
            feature_std_devs[key] = std_val
            if abs(mean_val) > 1e-6:
                relative_std = std_val / abs(mean_val)
                feature_std_devs[f"{key}_relative"] = relative_std
                max_relative_std = max(max_relative_std, relative_std)
            else:
                feature_std_devs[f"{key}_relative"] = 0

        results["segment_feature_std"] = feature_std_devs

        consistency_threshold = 0.5

        if max_relative_std > consistency_threshold:
            results['score'] = 0.8
            results['anomaly'] = True
            results['reason'] = f"High inconsistency detected between audio segments (Max Rel Std Dev: {max_relative_std:.3f}). Potential edit point(s)."
        else:
            results['score'] = 0.1
            results['reason'] = f"Segment features appear relatively consistent (Max Rel Std Dev: {max_relative_std:.3f})."

    except Exception as e:
        print_message("ERROR", f"Edit detection failed: {e}")
        results["reason"] = f"Error during analysis: {e}"
        results["score"] = 0.1

    processing_time = time.time() - start_time
    print_message("INFO", f"Edit detection complete. Score: {results['score']:.2f}. Time: {processing_time:.2f}s")
    return results