In [2]:
import numpy as np
import librosa
import os
import pickle
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import hashlib
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, PublicFormat, NoEncryption
import warnings
warnings.filterwarnings('ignore')

class AudioSeal:
    """Simple audio watermarking class that implements LSB watermarking"""
    
    def __init__(self, watermark_bits=32):
        self.watermark_bits = watermark_bits
        self.watermark = np.random.randint(0, 2, size=watermark_bits).astype(np.int8)
    
    def embed(self, audio):
        """Embed watermark into audio using LSB technique"""
        # Make a copy and convert to int16 for bitwise operations
        audio_int = np.int16(audio * 32767)  # Convert float to int16
        watermarked_audio = audio_int.copy()
        
        # Only watermark if we have enough samples
        if len(audio_int) >= self.watermark_bits:
            # Replace LSB of first watermark_bits samples with watermark bits
            for i in range(self.watermark_bits):
                # Set LSB to 0
                watermarked_audio[i] = (watermarked_audio[i] & ~1)
                # Set LSB to watermark bit
                watermarked_audio[i] = watermarked_audio[i] | self.watermark[i]
        
        # Convert back to float for further processing
        return watermarked_audio.astype(np.float32) / 32767.0
    
    def detect(self, audio):
        """Detect watermark in audio"""
        # Convert to int16 for bitwise operations
        audio_int = np.int16(audio * 32767)
        
        # Extract LSBs from the first watermark_bits samples
        if len(audio_int) < self.watermark_bits:
            return False
            
        extracted_bits = np.zeros(self.watermark_bits, dtype=np.int8)
        for i in range(self.watermark_bits):
            # Extract LSB
            extracted_bits[i] = audio_int[i] & 1
        
        # Calculate similarity (percentage match)
        similarity = np.sum(extracted_bits == self.watermark) / self.watermark_bits
        
        # Return True if similarity exceeds threshold (80%)
        return similarity > 0.8
    
    def get_watermark(self):
        """Return the current watermark"""
        return self.watermark
    
    def set_watermark(self, watermark):
        """Set a specific watermark"""
        if len(watermark) == self.watermark_bits:
            self.watermark = watermark
        else:
            raise ValueError(f"Watermark must be {self.watermark_bits} bits")


class DigitalSignature:
    """Class for digital signature operations"""
    
    def __init__(self):
        # Generate RSA key pair
        self.private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048
        )
        self.public_key = self.private_key.public_key()
    
    def generate_hash(self, audio):
        """Generate SHA-256 hash of audio data"""
        # Convert to bytes with consistent format
        audio_bytes = np.array(audio * 32767, dtype=np.int16).tobytes()
        return hashlib.sha256(audio_bytes).digest()
    
    def sign(self, audio):
        """Sign audio with private key"""
        audio_hash = self.generate_hash(audio)
        signature = self.private_key.sign(
            audio_hash,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
        return signature
    
    def verify(self, audio, signature):
        """Verify signature with public key"""
        audio_hash = self.generate_hash(audio)
        try:
            self.public_key.verify(
                signature,
                audio_hash,
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=padding.PSS.MAX_LENGTH
                ),
                hashes.SHA256()
            )
            return True
        except Exception:
            return False
    
    def export_public_key(self):
        """Export public key for later use"""
        return self.public_key.public_bytes(
            encoding=Encoding.PEM,
            format=PublicFormat.SubjectPublicKeyInfo
        )


class FeatureExtractor:
    """Extract audio features for spoofing detection"""
    
    def __init__(self, sr=16000, n_mfcc=20, n_mels=128, n_gtcc=20):
        self.sr = sr
        self.n_mfcc = n_mfcc
        self.n_mels = n_mels
        self.n_gtcc = n_gtcc
    
    def extract_mfcc(self, audio):
        """Extract MFCC features"""
        mfcc = librosa.feature.mfcc(y=audio, sr=self.sr, n_mfcc=self.n_mfcc)
        return np.mean(mfcc, axis=1)  # Average over time
    
    def extract_gtcc(self, audio):
        """Extract GTCC features (approximated using MFCC)"""
        # For simplicity, we'll use MFCC with different parameters as an approximation
        gtcc = librosa.feature.mfcc(
            y=audio, sr=self.sr, n_mfcc=self.n_gtcc,
            htk=True, lifter=0.5
        )
        return np.mean(gtcc, axis=1)  # Average over time
    
    def extract_spectrogram_features(self, audio):
        """Extract statistical features from spectrogram"""
        spec = np.abs(librosa.stft(audio))
        # Extract statistical features
        spec_mean = np.mean(spec, axis=1)
        spec_var = np.var(spec, axis=1)
        spec_max = np.max(spec, axis=1)
        
        # Return a subset of these features to keep dimensionality reasonable
        features = np.concatenate([
            spec_mean[:20],
            spec_var[:20],
            spec_max[:20]
        ])
        return features
    
    def extract_all_features(self, audio):
        """Extract all features and concatenate"""
        mfcc_features = self.extract_mfcc(audio)
        gtcc_features = self.extract_gtcc(audio)
        spec_features = self.extract_spectrogram_features(audio)
        
        return np.concatenate([mfcc_features, gtcc_features, spec_features])


class SpoofingDetectionSystem:
    """Main class for the audio spoofing detection system"""
    
    def __init__(self, model_type='rf'):
        self.audio_seal = AudioSeal()
        self.digital_signature = DigitalSignature()
        self.feature_extractor = FeatureExtractor()
        
        # Choose model type
        if model_type.lower() == 'rf':
            self.model = RandomForestClassifier(n_estimators=100, random_state=42)
        elif model_type.lower() == 'svm':
            self.model = SVC(kernel='rbf', probability=True, random_state=42)
        else:
            raise ValueError("Model type must be 'rf' or 'svm'")
        
        # Database to store watermarks and signatures
        self.security_db = {}
    
    def process_bonafide_audio(self, audio, audio_id):
        """Process bonafide audio for training"""
        # Ensure audio is float32 and in [-1, 1] range
        audio = np.asarray(audio, dtype=np.float32)
        audio = audio / np.max(np.abs(audio)) if np.max(np.abs(audio)) > 0 else audio
        
        # 1. Embed watermark
        watermarked_audio = self.audio_seal.embed(audio)
        
        # 2. Generate digital signature
        signature = self.digital_signature.sign(watermarked_audio)
        
        # 3. Store security data
        self.security_db[audio_id] = {
            'watermark': self.audio_seal.get_watermark(),
            'signature': signature
        }
        
        return watermarked_audio
    
    def train(self, bonafide_audios, spoofed_audios):
        """Train the ML model"""
        features = []
        labels = []
        
        # Process bonafide audios
        for i, audio in enumerate(bonafide_audios):
            audio_id = f"bonafide_{i}"
            watermarked_audio = self.process_bonafide_audio(audio, audio_id)
            features.append(self.feature_extractor.extract_all_features(watermarked_audio))
            labels.append(1)  # 1 for bonafide
        
        # Process spoofed audios
        for audio in spoofed_audios:
            # Ensure audio is float32 and in [-1, 1] range
            audio = np.asarray(audio, dtype=np.float32)
            audio = audio / np.max(np.abs(audio)) if np.max(np.abs(audio)) > 0 else audio
            
            features.append(self.feature_extractor.extract_all_features(audio))
            labels.append(0)  # 0 for spoofed
        
        # Convert to numpy arrays
        X = np.array(features)
        y = np.array(labels)
        
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        
        # Train the model
        self.model.fit(X_train, y_train)
        
        # Evaluate
        y_pred = self.model.predict(X_test)
        print("Model Evaluation:")
        print(classification_report(y_test, y_pred))
        
        return self.model
    
    def test(self, audio, audio_id=None):
        """Test new audio for spoofing"""
        # Ensure audio is float32 and in [-1, 1] range
        audio = np.asarray(audio, dtype=np.float32)
        audio = audio / np.max(np.abs(audio)) if np.max(np.abs(audio)) > 0 else audio
        
        # 1. Check watermark if audio_id is provided
        if audio_id and audio_id in self.security_db:
            # Set the correct watermark for detection
            self.audio_seal.set_watermark(self.security_db[audio_id]['watermark'])
            
            # Check watermark
            if not self.audio_seal.detect(audio):
                print(f"Watermark verification failed for {audio_id}")
                return "SPOOFED"
            
            # Check digital signature
            if not self.digital_signature.verify(audio, self.security_db[audio_id]['signature']):
                print(f"Digital signature verification failed for {audio_id}")
                return "SPOOFED"
        
        # 2. Extract features
        features = self.feature_extractor.extract_all_features(audio)
        
        # 3. Run ML classification
        prediction = self.model.predict([features])[0]
        probability = self.model.predict_proba([features])[0]
        
        if prediction == 1:
            print(f"ML classification: REAL (confidence: {probability[1]:.2f})")
            return "REAL"
        else:
            print(f"ML classification: SPOOFED (confidence: {probability[0]:.2f})")
            return "SPOOFED"
    
    def save_model(self, filename="spoofing_detection_model.pkl"):
        """Save the trained model and security database"""
        with open(filename, 'wb') as f:
            pickle.dump({
                'model': self.model,
                'security_db': self.security_db,
                'watermark_bits': self.audio_seal.watermark_bits,
                'public_key': self.digital_signature.export_public_key()
            }, f)
        print(f"Model saved to {filename}")
    
    def load_model(self, filename="spoofing_detection_model.pkl"):
        """Load a trained model and security database"""
        with open(filename, 'rb') as f:
            data = pickle.load(f)
            self.model = data['model']
            self.security_db = data['security_db']
            # Note: We can't load the private key for security reasons
            # In a real system, you'd need a more secure key management approach
        print(f"Model loaded from {filename}")


# Example usage
def demo():
    """Demonstrate the spoofing detection system with synthetic data"""
    # Create synthetic audio data
    np.random.seed(42)
    
    # Generate 50 bonafide and 50 spoofed samples
    bonafide_audios = []
    spoofed_audios = []
    
    for _ in range(50):
        # Bonafide: smooth signal with some noise
        t = np.linspace(0, 1, 16000)
        bonafide = np.sin(2 * np.pi * 440 * t) + 0.1 * np.random.randn(16000)
        bonafide_audios.append(bonafide)
        
        # Spoofed: more erratic signal
        spoofed = np.sin(2 * np.pi * 440 * t) * np.sin(2 * np.pi * 10 * t) + 0.3 * np.random.randn(16000)
        spoofed_audios.append(spoofed)
    
    # Create and train the system
    system = SpoofingDetectionSystem(model_type='rf')
    system.train(bonafide_audios, spoofed_audios)
    
    # Save the model
    system.save_model()
    
    # Test with new samples
    print("\nTesting with new samples:")
    
    # Test with a bonafide sample (processed during training)
    test_bonafide = system.process_bonafide_audio(bonafide_audios[0], "test_bonafide")
    result = system.test(test_bonafide, "test_bonafide")
    print(f"Test bonafide result: {result}")
    
    # Test with a spoofed sample
    t = np.linspace(0, 1, 16000)
    test_spoofed = np.sin(2 * np.pi * 440 * t) * np.cos(2 * np.pi * 20 * t) + 0.4 * np.random.randn(16000)
    result = system.test(test_spoofed)
    print(f"Test spoofed result: {result}")
    
    # Test with a tampered bonafide sample
    tampered = test_bonafide.copy()
    tampered[1000:2000] = 0  # Tamper with a portion of the audio
    result = system.test(tampered, "test_bonafide")
    print(f"Test tampered result: {result}")


# Function to load and process real audio files (for when you have actual data)
def process_real_data(bonafide_dir, spoofed_dir, test_file=None):
    """Process real audio files for training and testing"""
    bonafide_audios = []
    spoofed_audios = []
    
    # Load bonafide audio files
    for filename in os.listdir(bonafide_dir):
        if filename.endswith(('.wav', '.mp3')):
            filepath = os.path.join(bonafide_dir, filename)
            audio, _ = librosa.load(filepath, sr=16000, mono=True)
            bonafide_audios.append(audio)
    
    # Load spoofed audio files
    for filename in os.listdir(spoofed_dir):
        if filename.endswith(('.wav', '.mp3')):
            filepath = os.path.join(spoofed_dir, filename)
            audio, _ = librosa.load(filepath, sr=16000, mono=True)
            spoofed_audios.append(audio)
    
    # Create and train the system
    system = SpoofingDetectionSystem(model_type='rf')
    system.train(bonafide_audios, spoofed_audios)
    system.save_model()
    
    # Test with a file if provided
    if test_file and os.path.exists(test_file):
        test_audio, _ = librosa.load(test_file, sr=16000, mono=True)
        result = system.test(test_audio)
        print(f"Test result for {test_file}: {result}")


if __name__ == "__main__":
    # Run the demo with synthetic data
    print("Running demo with synthetic data...")
    demo()
    
    # Uncomment and modify paths to run with real data
    """
    print("\nProcessing real data...")
    bonafide_dir = "path/to/bonafide_audios"
    spoofed_dir = "path/to/spoofed_audios"
    test_file = "path/to/test_file.wav"  # Optional
    
    if os.path.isdir(bonafide_dir) and os.path.isdir(spoofed_dir):
        process_real_data(bonafide_dir, spoofed_dir, test_file)
    else:
        print("Directories not found. Skipping real data processing.")
    """

Running demo with synthetic data...
Model Evaluation:
              precision    recall  f1-score   support

           0       1.00      1.00      1.00         8
           1       1.00      1.00      1.00        12

    accuracy                           1.00        20
   macro avg       1.00      1.00      1.00        20
weighted avg       1.00      1.00      1.00        20

Model saved to spoofing_detection_model.pkl

Testing with new samples:
ML classification: REAL (confidence: 0.93)
Test bonafide result: REAL
ML classification: SPOOFED (confidence: 0.92)
Test spoofed result: SPOOFED
Digital signature verification failed for test_bonafide
Test tampered result: SPOOFED
