In [21]:

# STEP 1: Install Required Packages
import os
def install_requirements():
    """Install all required packages"""
    packages = [
        "yt-dlp",
        "librosa",
        "soundfile", 
        "openai-whisper",
        "transformers",
        "torch",
        "numpy",
        "pandas",
        "requests",
        "pydub"
    ]
    
    for package in packages:
        try:
            __import__(package.replace('-', '_'))
            print(f"✅ {package} already installed")
        except ImportError:
            print(f"📦 Installing {package}...")
            os.system(f"pip install {package}")

# Uncomment the next line if you need to install packages
install_requirements()


✅ yt-dlp already installed
✅ librosa already installed
✅ soundfile already installed
📦 Installing openai-whisper...
✅ transformers already installed
✅ torch already installed
✅ numpy already installed
✅ pandas already installed
✅ requests already installed
✅ pydub already installed


In [22]:
import sys
import warnings
warnings.filterwarnings('ignore')

# Core libraries
import numpy as np
import pandas as pd
import yt_dlp as ytdlp
import librosa
import soundfile as sf
from pathlib import Path
import tempfile
import requests
from urllib.parse import urlparse
import re

# ML and Audio Processing
import whisper
from transformers import pipeline
import torch

# Utilities
import json
import time
from datetime import datetime

print("🎯 English Accent Detection System")
print("=" * 50)


🎯 English Accent Detection System


In [23]:

# STEP 2: Audio Extraction Module

class AudioExtractor:
    """Handle video URL processing and audio extraction"""
    
    def __init__(self):
        self.temp_dir = tempfile.mkdtemp()
        self.supported_formats = ['mp4', 'mp3', 'wav', 'avi', 'mov', 'webm']
        
    def is_valid_url(self, url):
        """Validate if URL is properly formatted"""
        try:
            result = urlparse(url)
            return all([result.scheme, result.netloc])
        except:
            return False
    
    def detect_url_type(self, url):
        """Detect the type of URL (direct file, YouTube, Loom, etc.)"""
        url_lower = url.lower()
        
        if 'youtube.com' in url_lower or 'youtu.be' in url_lower:
            return 'youtube'
        elif 'loom.com' in url_lower:
            return 'loom'
        elif any(ext in url_lower for ext in self.supported_formats):
            return 'direct'
        else:
            return 'unknown'
    
    def download_audio(self, url):
        """Download and extract audio from URL"""
        print(f"🔄 Processing URL: {url[:50]}...")
        
        if not self.is_valid_url(url):
            raise ValueError("Invalid URL format")
        
        url_type = self.detect_url_type(url)
        audio_path = None
        
        try:
            if url_type in ['youtube', 'loom', 'unknown']:
                # Use yt-dlp for video platforms
                audio_path = self._extract_with_ytdlp(url)
            elif url_type == 'direct':
                # Direct download for MP4/MP3 files
                audio_path = self._download_direct(url)
            else:
                raise ValueError(f"Unsupported URL type: {url_type}")
                
            print(f"✅ Audio extracted successfully")
            return audio_path
            
        except Exception as e:
            print(f"❌ Error extracting audio: {str(e)}")
            raise
    
    def _extract_with_ytdlp(self, url):
        """Extract audio using yt-dlp"""
        import yt_dlp
        
        output_path = os.path.join(self.temp_dir, 'extracted_audio.wav')
        
        ydl_opts = {
            'format': 'bestaudio/best',
            'outtmpl': os.path.join(self.temp_dir, 'temp_video.%(ext)s'),
            'postprocessors': [{
                'key': 'FFmpegExtractAudio',
                'preferredcodec': 'wav',
                'preferredquality': '192',
            }],
            'postprocessor_args': ['-ar', '16000'],  # 16kHz for speech recognition
        }
        
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])
        
        # Find the extracted audio file
        for file in os.listdir(self.temp_dir):
            if file.endswith('.wav'):
                return os.path.join(self.temp_dir, file)
        
        raise Exception("Audio extraction failed")
    
    def _download_direct(self, url):
        """Download direct audio/video files"""
        response = requests.get(url, stream=True)
        response.raise_for_status()
        
        # Determine file extension
        content_type = response.headers.get('content-type', '')
        if 'audio' in content_type:
            ext = '.mp3'
        elif 'video' in content_type:
            ext = '.mp4'
        else:
            ext = '.mp4'  # default
        
        temp_file = os.path.join(self.temp_dir, f'downloaded{ext}')
        
        with open(temp_file, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        
        # Convert to WAV if needed
        if ext != '.wav':
            return self._convert_to_wav(temp_file)
        return temp_file
    
    def _convert_to_wav(self, input_path):
        """Convert audio file to WAV format"""
        from pydub import AudioSegment
        
        audio = AudioSegment.from_file(input_path)
        wav_path = os.path.join(self.temp_dir, 'converted.wav')
        audio.export(wav_path, format='wav', parameters=['-ar', '16000'])
        return wav_path


In [24]:
# STEP 3: Speech Analysis Engine


class SpeechAnalyzer:
    """Analyze speech patterns for accent detection"""
    
    def __init__(self):
        self.whisper_model = None
        self.load_models()
        
        # Accent-specific phonetic patterns
        self.accent_patterns = {
            'american': {
                'r_colored': True,
                'vowel_patterns': ['æ', 'ɑ', 'ɔ'],
                'keywords': ['schedule', 'tomato', 'dance'],
                'rhotic': True
            },
            'british': {
                'r_colored': False,
                'vowel_patterns': ['ɑː', 'ɒ', 'ɜː'],
                'keywords': ['schedule', 'tomato', 'dance'],
                'rhotic': False
            },
            'australian': {
                'r_colored': False,
                'vowel_patterns': ['æɪ', 'əʉ', 'oɪ'],
                'keywords': ['today', 'mate', 'about'],
                'rhotic': False
            },
            'canadian': {
                'r_colored': True,
                'vowel_patterns': ['aʊ', 'aɪ'],
                'keywords': ['about', 'house', 'out'],
                'canadian_raising': True
            },
            'irish': {
                'r_colored': True,
                'vowel_patterns': ['ɪə', 'eɪ', 'oʊ'],
                'keywords': ['three', 'thirty', 'girl'],
                'rhotic': True
            },
            'south_african': {
                'r_colored': False,
                'vowel_patterns': ['ɪə', 'eə', 'ʊə'],
                'keywords': ['here', 'there', 'sure'],
                'kit_split': True
            }
        }
    
    def load_models(self):
        """Load speech recognition models"""
        print("🤖 Loading speech analysis models...")
        try:
            # Load Whisper for transcription
            self.whisper_model = whisper.load_model("base")
            print("✅ Whisper model loaded")
        except Exception as e:
            print(f"⚠️ Error loading models: {e}")
    
    def transcribe_audio(self, audio_path):
        """Transcribe audio to text with phonetic information"""
        print("🎤 Transcribing audio...")
        
        try:
            # Transcribe with Whisper
            result = self.whisper_model.transcribe(audio_path)
            text = result['text']
            
            # Extract additional features
            audio_features = self._extract_audio_features(audio_path)
            
            return {
                'text': text,
                'segments': result.get('segments', []),
                'audio_features': audio_features
            }
        except Exception as e:
            print(f"❌ Transcription error: {e}")
            raise
    
    def _extract_audio_features(self, audio_path):
        """Extract acoustic features for accent analysis"""
        try:
            # Load audio
            y, sr = librosa.load(audio_path, sr=16000)
            
            # Extract features
            features = {
                'mfcc': librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13).mean(axis=1),
                'spectral_centroid': librosa.feature.spectral_centroid(y=y, sr=sr).mean(),
                'zero_crossing_rate': librosa.feature.zero_crossing_rate(y).mean(),
                'tempo': librosa.beat.tempo(y=y, sr=sr)[0] if len(librosa.beat.tempo(y=y, sr=sr)) > 0 else 120,
                'pitch_range': self._get_pitch_range(y, sr),
                'formants': self._estimate_formants(y, sr)
            }
            
            return features
        except Exception as e:
            print(f"⚠️ Feature extraction warning: {e}")
            return {}
    
    def _get_pitch_range(self, y, sr):
        """Calculate pitch range"""
        try:
            pitches, magnitudes = librosa.piptrack(y=y, sr=sr)
            pitches = pitches[magnitudes > np.median(magnitudes)]
            pitches = pitches[pitches > 0]
            
            if len(pitches) > 0:
                return {'min': float(np.min(pitches)), 'max': float(np.max(pitches))}
            return {'min': 0, 'max': 0}
        except:
            return {'min': 0, 'max': 0}
    
    def _estimate_formants(self, y, sr):
        """Estimate formant frequencies"""
        try:
            # Simple formant estimation using spectral peaks
            fft = np.fft.fft(y)
            freqs = np.fft.fftfreq(len(fft), 1/sr)
            magnitude = np.abs(fft)
            
            # Find peaks (simplified)
            peaks = []
            for i in range(1, len(magnitude)-1):
                if magnitude[i] > magnitude[i-1] and magnitude[i] > magnitude[i+1]:
                    if freqs[i] > 0 and freqs[i] < 4000:  # Focus on speech range
                        peaks.append(freqs[i])
            
            peaks.sort()
            return peaks[:3] if len(peaks) >= 3 else peaks
        except:
            return []


In [25]:
# STEP 4: Accent Classification Engine

class AccentClassifier:
    """Classify English accents based on speech analysis"""
    
    def __init__(self):
        self.accent_features = {
            'american': {
                'rhotic': 1.0,
                'vowel_shift': 0.8,
                'intonation': 'flat',
                'tempo': 'medium',
                'key_words': {
                    'dance': 'dæns',
                    'bath': 'bæθ',
                    'car': 'kɑr',
                    'park': 'pɑrk'
                }
            },
            'british': {
                'rhotic': 0.0,
                'vowel_shift': 0.3,
                'intonation': 'rising',
                'tempo': 'medium',
                'key_words': {
                    'dance': 'dɑːns',
                    'bath': 'bɑːθ',
                    'car': 'kɑː',
                    'park': 'pɑːk'
                }
            },
            'australian': {
                'rhotic': 0.0,
                'vowel_shift': 0.9,
                'intonation': 'rising',
                'tempo': 'fast',
                'key_words': {
                    'today': 'təˈdæɪ',
                    'mate': 'mæɪt',
                    'about': 'əˈbæɪt'
                }
            },
            'canadian': {
                'rhotic': 0.8,
                'vowel_shift': 0.6,
                'intonation': 'rising',
                'tempo': 'medium',
                'canadian_raising': True,
                'key_words': {
                    'about': 'əˈbʌʊt',
                    'house': 'hʌʊs',
                    'out': 'ʌʊt'
                }
            },
            'irish': {
                'rhotic': 0.7,
                'vowel_shift': 0.4,
                'intonation': 'musical',
                'tempo': 'variable',
                'key_words': {
                    'three': 'triː',
                    'thirty': 'tɜrti',
                    'girl': 'gɜrl'
                }
            },
            'south_african': {
                'rhotic': 0.2,
                'vowel_shift': 0.7,
                'intonation': 'flat',
                'tempo': 'medium',
                'key_words': {
                    'here': 'hiər',
                    'there': 'ðeər',
                    'sure': 'ʃuər'
                }
            }
        }
    
    def classify_accent(self, transcription_data):
        """Main accent classification function"""
        print("🎯 Classifying accent...")
        
        text = transcription_data['text']
        audio_features = transcription_data.get('audio_features', {})
        
        # Calculate scores for each accent
        accent_scores = {}
        
        for accent_name, accent_features in self.accent_features.items():
            score = self._calculate_accent_score(text, audio_features, accent_features)
            accent_scores[accent_name] = score
        
        # Find best match
        best_accent = max(accent_scores, key=accent_scores.get)
        confidence = accent_scores[best_accent]
        
        # Normalize confidence to 0-100%
        confidence_percentage = min(100, max(0, confidence * 100))
        
        result = {
            'accent': best_accent,
            'confidence': confidence_percentage,
            'scores': accent_scores,
            'explanation': self._generate_explanation(best_accent, confidence_percentage, text)
        }
        
        print(f"✅ Classification complete: {best_accent.title()} ({confidence_percentage:.1f}%)")
        return result
    
    def _calculate_accent_score(self, text, audio_features, accent_features):
        """Calculate similarity score between speech and accent pattern"""
        score = 0.0
        weight_sum = 0.0
        
        # Text-based analysis
        text_score = self._analyze_text_patterns(text, accent_features)
        score += text_score * 0.4
        weight_sum += 0.4
        
        # Audio feature analysis
        if audio_features:
            audio_score = self._analyze_audio_patterns(audio_features, accent_features)
            score += audio_score * 0.6
            weight_sum += 0.6
        
        return score / weight_sum if weight_sum > 0 else 0.0
    
    def _analyze_text_patterns(self, text, accent_features):
        """Analyze text for accent-specific patterns"""
        score = 0.0
        
        # Convert to lowercase for analysis
        text_lower = text.lower()
        
        # Check for key words
        key_words = accent_features.get('key_words', {})
        found_keywords = 0
        for word in key_words:
            if word in text_lower:
                found_keywords += 1
        
        if key_words:
            score += (found_keywords / len(key_words)) * 0.5
        
        # Check for rhotic features (presence of 'r' sounds)
        rhotic_expected = accent_features.get('rhotic', 0.5)
        r_count = text_lower.count('r')
        total_chars = len(text_lower.replace(' ', ''))
        r_ratio = r_count / total_chars if total_chars > 0 else 0
        
        # Score based on expected rhoticity
        if rhotic_expected > 0.5:
            score += min(r_ratio * 2, 0.3)  # Reward rhotic accents
        else:
            score += max(0.3 - r_ratio * 2, 0)  # Reward non-rhotic accents
        
        # Length and complexity (some accents are more verbose)
        word_count = len(text.split())
        if word_count > 10:  # Minimum for reliable analysis
            score += 0.2
        
        return min(score, 1.0)
    
    def _analyze_audio_patterns(self, audio_features, accent_features):
        """Analyze audio features for accent classification"""
        score = 0.0
        
        # Tempo analysis
        tempo = audio_features.get('tempo', 120)
        expected_tempo = accent_features.get('tempo', 'medium')
        
        if expected_tempo == 'fast' and tempo > 130:
            score += 0.2
        elif expected_tempo == 'medium' and 110 <= tempo <= 130:
            score += 0.2
        elif expected_tempo == 'slow' and tempo < 110:
            score += 0.2
        
        # Pitch range analysis
        pitch_range = audio_features.get('pitch_range', {})
        if pitch_range:
            pitch_variance = pitch_range.get('max', 0) - pitch_range.get('min', 0)
            intonation = accent_features.get('intonation', 'flat')
            
            if intonation == 'rising' and pitch_variance > 100:
                score += 0.2
            elif intonation == 'flat' and pitch_variance < 100:
                score += 0.2
            elif intonation == 'musical' and pitch_variance > 150:
                score += 0.3
        
        # MFCC-based analysis (simplified)
        mfcc = audio_features.get('mfcc', [])
        if len(mfcc) >= 13:
            # Compare with typical patterns (this is simplified)
            vowel_shift = accent_features.get('vowel_shift', 0.5)
            mfcc_variance = np.var(mfcc)
            
            if vowel_shift > 0.7 and mfcc_variance > 0.5:
                score += 0.3
            elif vowel_shift < 0.4 and mfcc_variance < 0.3:
                score += 0.3
        
        return min(score, 1.0)
    
    def _generate_explanation(self, accent, confidence, text):
        """Generate explanation for the classification"""
        explanations = {
            'american': f"Detected American English features including rhotic 'r' sounds and typical vowel patterns. Confidence: {confidence:.1f}%",
            'british': f"Identified British English characteristics such as non-rhotic pronunciation and distinct vowel sounds. Confidence: {confidence:.1f}%",
            'australian': f"Found Australian English markers including vowel shifts and distinctive intonation patterns. Confidence: {confidence:.1f}%",
            'canadian': f"Detected Canadian English features including potential Canadian raising and rhotic patterns. Confidence: {confidence:.1f}%",
            'irish': f"Identified Irish English characteristics including musical intonation and specific vowel patterns. Confidence: {confidence:.1f}%",
            'south_african': f"Found South African English markers including specific vowel changes and intonation. Confidence: {confidence:.1f}%"
        }
        
        base_explanation = explanations.get(accent, f"Classified as {accent} accent with {confidence:.1f}% confidence")
        
        # Add context about text length
        word_count = len(text.split())
        if word_count < 10:
            base_explanation += " (Note: Short audio sample may limit accuracy)"
        elif word_count > 50:
            base_explanation += " (Good sample length for reliable analysis)"
        
        return base_explanation


In [26]:

# STEP 5: Main Processing Pipeline

class AccentDetectionSystem:
    """Complete accent detection system"""
    
    def __init__(self):
        self.audio_extractor = AudioExtractor()
        self.speech_analyzer = SpeechAnalyzer()
        self.accent_classifier = AccentClassifier()
        
    def process_video_url(self, url):
        """Main processing pipeline"""
        print("\n" + "="*50)
        print("🎬 ENGLISH ACCENT DETECTION SYSTEM")
        print("="*50)
        
        start_time = time.time()
        
        try:
            # Step 1: Extract audio
            print("\n1️⃣ AUDIO EXTRACTION")
            audio_path = self.audio_extractor.download_audio(url)
            
            # Step 2: Analyze speech
            print("\n2️⃣ SPEECH ANALYSIS")
            transcription_data = self.speech_analyzer.transcribe_audio(audio_path)
            
            # Step 3: Classify accent
            print("\n3️⃣ ACCENT CLASSIFICATION")
            classification_result = self.accent_classifier.classify_accent(transcription_data)
            
            # Step 4: Compile results
            processing_time = time.time() - start_time
            
            final_result = {
                'url': url,
                'transcript': transcription_data['text'],
                'accent': classification_result['accent'],
                'confidence': classification_result['confidence'],
                'explanation': classification_result['explanation'],
                'all_scores': classification_result['scores'],
                'processing_time': round(processing_time, 2),
                'timestamp': datetime.now().isoformat()
            }
            
            # Display results
            self._display_results(final_result)
            
            return final_result
            
        except Exception as e:
            error_result = {
                'error': str(e),
                'url': url,
                'timestamp': datetime.now().isoformat()
            }
            print(f"\n❌ ERROR: {str(e)}")
            return error_result
    
    def _display_results(self, result):
        """Display formatted results"""
        print("\n" + "="*50)
        print("📊 RESULTS")
        print("="*50)
        
        print(f"🎯 Detected Accent: {result['accent'].title()}")
        print(f"📈 Confidence Score: {result['confidence']:.1f}%")
        print(f"⏱️ Processing Time: {result['processing_time']}s")
        
        print(f"\n📝 Transcript Preview:")
        transcript = result['transcript'][:200] + "..." if len(result['transcript']) > 200 else result['transcript']
        print(f"   {transcript}")
        
        print(f"\n💡 Explanation:")
        print(f"   {result['explanation']}")
        
        print(f"\n📊 All Accent Scores:")
        for accent, score in result['all_scores'].items():
            bar = "█" * int(score * 20) + "░" * (20 - int(score * 20))
            print(f"   {accent.title():12} [{bar}] {score:.3f}")


In [27]:

# STEP 6: Testing and Examples

def run_tests():
    """Run system tests with sample URLs"""
    system = AccentDetectionSystem()
    
    # Test URLs (you'll need to replace these with actual working URLs)
    test_urls = [
        # Add your test URLs here
        "https://sample-videos.com/zip/10/mp4/SampleVideo_1280x720_1mb.mp4",  # Replace with actual
    ]
    
    print("🧪 Running system tests...")
    
    for i, url in enumerate(test_urls, 1):
        print(f"\n{'='*20} TEST {i} {'='*20}")
        try:
            result = system.process_video_url(url)
            print("✅ Test passed")
        except Exception as e:
            print(f"❌ Test failed: {e}")


In [28]:

# STEP 7: Interactive Usage
def interactive_mode():
    """Interactive mode for testing"""
    system = AccentDetectionSystem()
    
    print("\n🎤 Welcome to the English Accent Detection System!")
    print("Enter video URLs to analyze accents (or 'quit' to exit)")
    
    while True:
        print("\n" + "-"*50)
        url = input("Enter video URL: ").strip()
        
        if url.lower() in ['quit', 'exit', 'q']:
            print("👋 Goodbye!")
            break
        
        if not url:
            print("Please enter a valid URL")
            continue
        
        try:
            result = system.process_video_url(url)
            
            # Save result to file
            with open(f'accent_result_{int(time.time())}.json', 'w') as f:
                json.dump(result, f, indent=2)
            
        except Exception as e:
            print(f"Error processing URL: {e}")


In [29]:

# STEP 8: Execute the System

print("🚀 INITIALIZING ACCENT DETECTION SYSTEM")
print("="*60)

# Initialize the system
try:
    system = AccentDetectionSystem()
    print("✅ System initialized successfully!")
except Exception as e:
    print(f"❌ System initialization failed: {e}")


🚀 INITIALIZING ACCENT DETECTION SYSTEM
🤖 Loading speech analysis models...
✅ Whisper model loaded
✅ System initialized successfully!


In [30]:

# STEP 9: Quick Test Function

def quick_test(url):
    """Quick test function for immediate results"""
    print(f"\n🎯 QUICK TEST")
    print(f"URL: {url}")
    print("-" * 50)
    
    try:
        system = AccentDetectionSystem()
        result = system.process_video_url(url)
        return result
    except Exception as e:
        print(f"❌ Error: {e}")
        return None


In [31]:
# STEP 10: Example Usage

print("\n" + "="*60)
print("💡 EXAMPLE USAGE")
print("="*60)



💡 EXAMPLE USAGE


In [32]:

# STEP 11: Sample Test

sample_url = "https://www.youtube.com/watch?v=0Okxsszt624"  # Replace this!

print("\\n🧪 Running sample test...")
print("Note: Replace the sample_url with a real video URL containing English speech")
result = quick_test(sample_url)



\n🧪 Running sample test...
Note: Replace the sample_url with a real video URL containing English speech

🎯 QUICK TEST
URL: https://www.youtube.com/watch?v=0Okxsszt624
--------------------------------------------------
🤖 Loading speech analysis models...
✅ Whisper model loaded

🎬 ENGLISH ACCENT DETECTION SYSTEM

1️⃣ AUDIO EXTRACTION
🔄 Processing URL: https://www.youtube.com/watch?v=0Okxsszt624...
[youtube] Extracting URL: https://www.youtube.com/watch?v=0Okxsszt624
[youtube] 0Okxsszt624: Downloading webpage
[youtube] 0Okxsszt624: Downloading tv client config
[youtube] 0Okxsszt624: Downloading tv player API JSON
[youtube] 0Okxsszt624: Downloading ios player API JSON
[youtube] 0Okxsszt624: Downloading m3u8 information
[info] 0Okxsszt624: Downloading 1 format(s): 251
[download] Destination: C:\Users\SAHIL~1.RAN\AppData\Local\Temp\tmpvubdgvkf\temp_video.webm
[download] 100% of   13.79MiB in 00:00:01 at 8.67MiB/s   
[ExtractAudio] Destination: C:\Users\SAHIL~1.RAN\AppData\Local\Temp\tmpvubdg