# Analysis of AudioSeal watermark resistance to basic audio processing

In [1]:
!pip install BytesIO

ERROR: Could not find a version that satisfies the requirement BytesIO (from versions: none)
ERROR: No matching distribution found for BytesIO


In [2]:
import numpy as np
import librosa
import scipy.signal
from pydub import AudioSegment
import matplotlib.pyplot as plt
import seaborn as sns
import typing as tp
import torch
import torchaudio
import pandas as pd
import glob
from tqdm import tqdm
import soundfile as sf
import io

## Defining audio manipulation functions

In [16]:

def apply_lossy_compression(audio, sample_rate):
    with io.BytesIO() as inmemoryfile:
        audio_segment = AudioSegment(
            audio.tobytes(), 
            frame_rate=sample_rate,
            sample_width=audio.dtype.itemsize, 
            channels=1
        )
        audio_segment.export(inmemoryfile, format="mp3")
        return np.array(AudioSegment.from_file_using_temporary_files(inmemoryfile)
                       .get_array_of_samples())

def apply_lossless_compression(audio, sample_rate):
    with io.BytesIO() as inmemoryfile:
        audio_segment = AudioSegment(
            audio.tobytes(), 
            frame_rate=sample_rate,
            sample_width=audio.dtype.itemsize, 
            channels=1
        )
        audio_segment.export(inmemoryfile, format="flac")
        return np.array(AudioSegment.from_file_using_temporary_files(inmemoryfile)
                       .get_array_of_samples())

def add_noise(audio, noise_type="white", noise_level=0.01):
    if noise_type == "white":
        noise = np.random.normal(0, noise_level, audio.shape)
    elif noise_type == "gaussian":
        noise = np.random.normal(0, noise_level * np.std(audio), audio.shape)
    else:
        raise ValueError("Unsupported noise type")
    return audio + noise

def apply_filter(audio, sample_rate, filter_type="lowpass", cutoff=3000):
    nyquist = 0.5 * sample_rate
    normal_cutoff = cutoff / nyquist
    if filter_type == "lowpass":
        b, a = scipy.signal.butter(5, normal_cutoff, btype='low', analog=False)
    elif filter_type == "highpass":
        b, a = scipy.signal.butter(5, normal_cutoff, btype='high', analog=False)
    elif filter_type == "bandpass":
        low, high = cutoff
        b, a = scipy.signal.butter(5, [low / nyquist, high / nyquist], btype='band')
    else:
        raise ValueError("Unsupported filter type")
    return scipy.signal.lfilter(b, a, audio)

def resample_audio(audio, original_sr, target_sr):
    return librosa.resample(audio, original_sr, target_sr)

def equalize_audio(audio, sample_rate):
    equalized_audio = librosa.effects.equalize(audio, sample_rate)
    return equalized_audio

def add_reverb(audio, sample_rate, reverberance=50):
    reverb_audio = librosa.effects.preemphasis(audio)
    return reverb_audio

def time_scale_modification(audio, rate):
    return librosa.effects.time_stretch(audio, rate)

def pitch_shift(audio, sample_rate, n_steps):
    return librosa.effects.pitch_shift(audio, sample_rate, n_steps)

def dynamic_range_compression(audio):
    return librosa.effects.percussive(audio)

def clip_audio(audio, threshold=0.8):
    return np.clip(audio, -threshold, threshold)

In [17]:
from audioseal import AudioSeal
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

secret_message = torch.randint(0, 2, (1, 16), dtype=torch.int32)
secret_message = secret_message.to(device)
print(f"Secret message: {secret_message}")


model = AudioSeal.load_generator("audioseal_wm_16bits")
detector = AudioSeal.load_detector("audioseal_detector_16bits")


model = model.to(device)
detector = detector.to(device)

def generate_watermark_audio(
    tensor: torch.Tensor,
    sample_rate: int
) -> tp.Optional[torch.Tensor]:
    try:
        global model, device, secret_message
        audios = tensor.unsqueeze(0).to(device)
        watermarked_audio = model(audios, sample_rate=sample_rate, message=secret_message.to(device), alpha=1)
        return watermarked_audio

    
    except Exception as e:
        print(f"Error while watermarking audio: {e}")
        return None

# Function to get the confidence score that an audio tensor was watermarked by Audioseal
# provided by the Audioseal team
def detect_watermark_audio(
    tensor: torch.Tensor,
    sample_rate: int,
    message_threshold: float = 0.50
) -> tp.Optional[float]:
    try:
        global detector, device
        # In our analysis we are not concerned with the hidden/embedded message as of now
        result, _ = detector.detect_watermark(tensor, sample_rate=sample_rate, message_threshold=message_threshold)
        return float(result)
    except Exception as e:
        print(f"Error while detecting watermark: {e}")
        return None

Using device: cpu
Secret message: tensor([[0, 1, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0]], dtype=torch.int32)


## Loading the audio samples

In [18]:
# Load metadata and audio samples from CREMA-D dataset
def load_crema_d_metadata(metadata_path):
    df = pd.read_csv(metadata_path)
    return df

def load_audio_samples(actor_id, base_path):
    file_paths = []
    # find all audio files starting with the actor_id
    for file_path in glob.glob(f"{base_path}/{actor_id}*.wav"):
        file_paths.append(file_path)
    
    audio_samples = []

    for file_path in file_paths:
        audio, sr = load_audio_sample(file_path)
        audio_samples.append((audio, sr, file_path))
    
    return audio_samples
    



def load_audio_sample(
    file_path: str
) -> tp.Optional[tp.Tuple[torch.Tensor, int]]:
    try:
        wav, sample_rate = torchaudio.load(file_path)
        return wav, sample_rate
    except Exception as e:
        print(f"Error while loading audio: {e}")
        return None

In [19]:
def filter_samples(df, emotions, levels, genders, races, ethnicities):
    filtered_df = df[
        df['Emotion'].isin(emotions) &
        df['Emotion_Level'].isin(levels) &
        df['Gender'].isin(genders) &
        df['Race'].isin(races) &
        df['Ethnicity'].isin(ethnicities)
    ]
    return filtered_df

## Expertiment setup

In [20]:
# Define the list of manipulations
manipulations = [
    ('Lossy Compression', apply_lossy_compression),
    ('Lossless Compression', apply_lossless_compression),
    ('White Noise Addition', lambda audio, sr: add_noise(audio, "white")),
    ('Gaussian Noise Addition', lambda audio, sr: add_noise(audio, "gaussian")),
    ('Low-pass Filter', lambda audio, sr: apply_filter(audio, sr, "lowpass")),
    ('High-pass Filter', lambda audio, sr: apply_filter(audio, sr, "highpass")),
    ('Band-pass Filter', lambda audio, sr: apply_filter(audio, sr, "bandpass")),
    ('Downsampling', lambda audio, sr: resample_audio(audio, sr, sr // 2)),
    ('Upsampling', lambda audio, sr: resample_audio(audio, sr, sr * 2)),
    ('Equalization', equalize_audio),
    ('Reverberation', add_reverb),
    ('Time-Scale Modification', lambda audio, sr: time_scale_modification(audio, 1.5)),
    ('Pitch Shifting', lambda audio, sr: pitch_shift(audio, sr, 2)),
    ('Dynamic Range Compression', dynamic_range_compression),
    ('Clipping', clip_audio)
]

In [21]:
def perform_experiment(df, base_path):
    results = []
    for index, row in tqdm(df.iterrows()):
        audio_sr = load_audio_samples(row['ActorID'], base_path)
        for audio, sr, fp in tqdm(audio_sr):
            audio = generate_watermark_audio(audio, sr)
            # open audio with librosa
            original_score = detect_watermark_audio(audio, sr)

            audio_np = audio.detach().cpu().numpy().squeeze()
            if audio_np.ndim == 1:
                audio_np = np.expand_dims(audio_np, axis=0)  # Ensure 2D shape (channels, samples)
            audio_path = "temp_audio.wav"
            sf.write(audio_path, audio_np.T, sr)  # Transpose to match (samples, channels) format
            

            
            # Load the audio with librosa
            print(audio_path, sr)
            audio_librosa, sr_librosa = librosa.load(audio_path, sr=sr)

            for name, manipulation in tqdm(manipulations):
                manipulated_audio = manipulation(audio_librosa, sr_librosa)
                # convert numpy array to torch tensor
                manipulated_audio_tensor = torch.tensor(manipulated_audio).float()
                # manipulated_audio_tensor = manipulated_audio_tensor.unsqueeze(0)
                detection_score = detect_watermark_audio(manipulated_audio_tensor, sr)
                results.append({
                    'Emotion': fp.split('/')[-1].split('_')[2],
                    'Emotion_Level': fp.split('/')[-1].split('_')[3],
                    'Gender': row['Sex'],
                    'Race': row['Race'],
                    'Ethnicity': row['Ethnicity'],
                    'Manipulation': name,
                    'Detection Score': detection_score,
                    'Original Score': original_score
                })
    return results

def plot_results(results):
    df = pd.DataFrame(results)
    plt.figure(figsize=(14, 8))
    sns.boxplot(x='Manipulation', y='Detection Score', hue='Emotion', data=df)
    plt.xticks(rotation=90)
    plt.title('Watermark Detection Scores for Different Audio Manipulations by Emotion')
    plt.tight_layout()
    plt.show()
    
    plt.figure(figsize=(14, 8))
    sns.boxplot(x='Manipulation', y='Detection Score', hue='Emotion_Level', data=df)
    plt.xticks(rotation=90)
    plt.title('Watermark Detection Scores for Different Audio Manipulations by Emotion Level')
    plt.tight_layout()
    plt.show()
    
    plt.figure(figsize=(14, 8))
    sns.boxplot(x='Manipulation', y='Detection Score', hue='Gender', data=df)
    plt.xticks(rotation=90)
    plt.title('Watermark Detection Scores for Different Audio Manipulations by Gender')
    plt.tight_layout()
    plt.show()
    
    plt.figure(figsize=(14, 8))
    sns.boxplot(x='Manipulation', y='Detection Score', hue='Race', data=df)
    plt.xticks(rotation=90)
    plt.title('Watermark Detection Scores for Different Audio Manipulations by Race')
    plt.tight_layout()
    plt.show()
    
    plt.figure(figsize=(14, 8))
    sns.boxplot(x='Manipulation', y='Detection Score', hue='Ethnicity', data=df)
    plt.xticks(rotation=90)
    plt.title('Watermark Detection Scores for Different Audio Manipulations by Ethnicity')
    plt.tight_layout()
    plt.show()

In [22]:
# Main execution
metadata_path = '../../../../crema-d/VideoDemographics.csv'  # Placeholder path
base_audio_path = '../../../../crema-d'  # Placeholder path
crema_d_metadata = load_crema_d_metadata(metadata_path)

# Define filters
# emotions = ['Anger', 'Disgust', 'Fear', 'Happy', 'Neutral', 'Sad']
# levels = ['Low', 'Medium', 'High', 'Unspecified']
emotions = ['ANG', 'DIS', 'FEA', 'HAP', 'NEU', 'SAD']
levels = ['LO', 'MD', 'HI', 'XX']
genders = ['Male', 'Female']
races = ['White', 'Black', 'Asian', 'Other']
ethnicities = ['Hispanic', 'Non-Hispanic']

# filtered_metadata = filter_samples(crema_d_metadata, emotions, levels, genders, races, ethnicities)
filtered_metadata = crema_d_metadata
experiment_results = perform_experiment(filtered_metadata, base_audio_path)
plot_results(experiment_results)

0it [00:00, ?it/s]

temp_audio.wav 16000



[A
 40%|████      | 6/15 [00:00<00:00, 36.45it/s]
  0%|          | 0/80 [00:00<?, ?it/s]
0it [00:00, ?it/s]

Error while detecting watermark: not enough values to unpack (expected 3, got 1)
Error while detecting watermark: not enough values to unpack (expected 3, got 1)
Error while detecting watermark: not enough values to unpack (expected 3, got 1)
Error while detecting watermark: not enough values to unpack (expected 3, got 1)
Error while detecting watermark: not enough values to unpack (expected 3, got 1)
Error while detecting watermark: not enough values to unpack (expected 3, got 1)





TypeError: cannot unpack non-iterable int object