## Python script to perform audio watermark embedding/detection using the direct-sequence spread spectrum (DSSS) method.


### Preliminary steps
- import dependencies
- Create `files` directory
- preprocess `.mp3` file

In [2]:
#  Required packages
import os
import numpy as np
from scipy.io import wavfile
from pydub import AudioSegment

# Create files directory
output_directory = "./files"
if not os.path.exists(output_directory):
    os.makedirs(output_directory)
    print(f"Directory '{output_directory}' created.")

# Preprocess the .mp3 file
audio = AudioSegment.from_mp3("raw.mp3")
audio = audio.set_channels(1)
audio.export("./files/preprocessed.wav", format="wav")

<_io.BufferedRandom name='./files/preprocessed.wav'>

### Watermaking single audio file using plaintext watermark

In [5]:
# File paths for the host audio and various watermark-related data
HOST_SIGNAL_FILE = "./files/preprocessed.wav"               # The audio file where the watermark will be embedded
WATERMARK_TEXT='This is your watermark'                     # watermark text
WATERMARK_SIGNAL_FILE = "./files/wmarked_file.wav"          # The output file containing the watermarked audio
PSEUDO_RAND_FILE = './files/pseudo_rand.dat'                # File to store the generated pseudo-random sequence
WATERMARK_ORIGINAL_FILE = './files/watermark_binary.dat'    # File to store the original watermark data
WATERMARK_EXTENDED_FILE = './files/watermark_extended.dat'  # File to store the extended watermark data

# Configuration settings
REP_CODE = True                 # Whether to use repetition code (repeating watermark bits for robustness)
FRAME_LENGTH = 1024             # Frame length for analysis
CONTROL_STRENGTH = 0.03         # Embedding strength (alpha value controlling watermark intensity)
OVERLAP = 0.0                   # Overlap percentage between frames (0 means no overlap)
NUM_REPS = 3                    # Number of repetitions per watermark bit if REP_CODE is True

def fix(xs):
    """
    Emulation of MATLAB's 'fix' function, which rounds towards zero.
    """
    if xs >= 0:
        return np.floor(xs)
    else:
        return np.ceil(xs)

def embed():
    """ Embed the watermark into the host audio signal. """

    # Generate a pseudo-random sequence (PRS) to use for embedding
    prs = np.random.rand(1, FRAME_LENGTH) - 0.5  # PRS values in the range [-0.5, 0.5]

    # Save the pseudo-random sequence to a file
    with open(PSEUDO_RAND_FILE, 'w') as f:
        for d in np.squeeze(prs):
            f.write("%f\n" % d)

    # Load the host audio signal (where the watermark will be embedded)
    sr, host_signal = wavfile.read(HOST_SIGNAL_FILE)

    signal_len = len(host_signal)  # Length of the host signal

    # Calculate the hop length (amount to shift for each frame)
    frame_shift = int(FRAME_LENGTH * (1 - OVERLAP))

    # Overlap length between adjacent frames
    overlap_length = int(FRAME_LENGTH * OVERLAP)

    # Calculate the total number of bits that can be embedded in the signal
    embed_nbit = fix((signal_len - overlap_length) / frame_shift)

    if REP_CODE:
        # Adjust the number of bits if repetition coding is used
        effective_nbit = np.floor(embed_nbit / NUM_REPS)
        embed_nbit = effective_nbit * NUM_REPS  # Total bits to be embedded
    else:
        effective_nbit = embed_nbit

    # Convert to integer values
    frame_shift = int(frame_shift)
    effective_nbit = int(effective_nbit)
    embed_nbit = int(embed_nbit)

    # Generate the original watermark signal as a random bit sequence (0s and 1s)
    # wmark_original = np.random.randint(2, size=int(effective_nbit))

    # Generate the original watermark signal from the WATERMARK_TEXT
    binary_str = ''.join(format(ord(char), '08b') for char in WATERMARK_TEXT)
    binary_str = binary_str[0:effective_nbit] if len(binary_str) > effective_nbit else '0' * (effective_nbit - len(binary_str)) + binary_str
    # print('PADDED BINARY WATERMARK: ',binary_str)
    wmark_original = np.array([int(digit) for digit in binary_str])
    

    # Save the original watermark to a file
    with open(WATERMARK_ORIGINAL_FILE, 'w') as f:
        for d in wmark_original:
            f.write("%d\n" % d)

    # Extend the watermark by repeating bits if repetition coding is enabled
    if REP_CODE:
        wmark_extended = np.repeat(wmark_original, NUM_REPS)
    else:
        wmark_extended = wmark_original

    # Save the extended watermark to a file
    with open(WATERMARK_EXTENDED_FILE, 'w') as f:
        for d in np.squeeze(wmark_extended):
            f.write("%f\n" % d)

    # Initialize the watermarked signal array
    pointer = 0
    wmed_signal = np.zeros((frame_shift * embed_nbit))  # To store the watermarked signal

    # Embed the watermark into the host signal frame by frame
    for i in range(embed_nbit):
        frame = host_signal[pointer: (pointer + FRAME_LENGTH)]  # Extract a frame of the host signal

        alpha = CONTROL_STRENGTH * np.max(np.abs(frame))  # Calculate embedding strength (alpha)

        # Modify the frame based on the watermark bit
        if wmark_extended[i] == 1:
            frame = frame + alpha * prs  # Add PRS if watermark bit is 1
        else:
            frame = frame - alpha * prs  # Subtract PRS if watermark bit is 0

        # Store the modified frame in the watermarked signal
        wmed_signal[frame_shift * i: frame_shift * (i+1)] = frame[0, 0:frame_shift]

        # Move to the next frame
        pointer = pointer + frame_shift

    # Append any remaining part of the host signal that wasn't watermarked
    wmed_signal = np.concatenate((wmed_signal, host_signal[len(wmed_signal): signal_len]))

    # Convert the watermarked signal to integer format and save it as a wav file
    wmed_signal = wmed_signal.astype(np.int16)
    wavfile.write(WATERMARK_SIGNAL_FILE, sr, wmed_signal)

def detect():
    """ Detect the watermark from the watermarked audio signal. """
    
    # Load the original (host) audio signal
    _, host_signal = wavfile.read(HOST_SIGNAL_FILE)

    # Load the watermarked audio signal
    _, eval_signal = wavfile.read(WATERMARK_SIGNAL_FILE)
    signal_len = len(eval_signal)

    # Calculate the hop length and the number of bits embedded
    frame_shift = FRAME_LENGTH * (1 - OVERLAP)
    embed_nbit = fix((signal_len - FRAME_LENGTH * OVERLAP) / frame_shift)

    if REP_CODE:
        # Adjust the number of bits if repetition coding was used
        effective_nbit = np.floor(embed_nbit / NUM_REPS)
        embed_nbit = effective_nbit * NUM_REPS
    else:
        effective_nbit = embed_nbit

    # Convert to integer values
    frame_shift = int(frame_shift)
    effective_nbit = int(effective_nbit)
    embed_nbit = int(embed_nbit)

    # Load the original watermark signal from the file
    with open(WATERMARK_ORIGINAL_FILE, 'r') as f:
        wmark_original = np.array([int(w.rstrip()) for w in f.readlines()])

    # Load the pseudo-random sequence used during embedding
    with open(PSEUDO_RAND_FILE, 'r') as f:
        prs = np.array([float(x.rstrip()) for x in f.readlines()])

    # Initialize the detected bits array
    pointer = 0
    detected_bit = np.zeros(embed_nbit)

    # Detect the watermark by correlating the watermarked signal with the PRS
    for i in range(embed_nbit):
        frame = eval_signal[pointer: pointer + FRAME_LENGTH] - host_signal[pointer: pointer + FRAME_LENGTH]

        # Perform correlation
        comp = np.correlate(frame, prs, "full")
        maxp = np.argmax(np.abs(comp))  # Find the correlation peak

        # Decide the bit value based on the correlation result
        if comp[maxp] >= 0:
            detected_bit[i] = 1
        else:
            detected_bit[i] = 0

        pointer = pointer + frame_shift

    # Recover the watermark by averaging over the repetitions
    if REP_CODE:
        count = 0
        wmark_recovered = np.zeros(effective_nbit)
        for i in range(effective_nbit):
            ave = np.sum(detected_bit[count:count+NUM_REPS]) / NUM_REPS
            wmark_recovered[i] = 1 if ave >= 0.5 else 0
            count += NUM_REPS
    else:
        wmark_recovered = detected_bit

    print('ORIGINAL WATERMARK: ',WATERMARK_TEXT)

    # decode recovered watermark 
    recovered_binary=''.join(str(digit) for digit in wmark_recovered.astype(int))
    padding_needed = (8 - len(recovered_binary) % 8) % 8  # Calculate padding amount
    recovered_binary = '0' * padding_needed + recovered_binary # Pad with zeros on the left
    chars = [recovered_binary[i:i + 8] for i in range(0, len(recovered_binary), 8)]# Split the binary string into chunks of 8
    original_text = ''
    for char in chars: # Convert each chunk back to a character,
        if char != '00000000':  # Skip null character
            original_text += chr(int(char, 2))
    print('RECOVERED WATERMARK:',original_text)

    # Calculate and display the Bit Error Rate (BER)
    BER = np.sum(np.abs(wmark_recovered - wmark_original)) / effective_nbit * 100
    print(f'bit error rate = {BER} %')

    # Calculate and display the Signal-to-Noise Ratio (SNR)
    SNR = 10 * np.log10(
        np.sum(np.square(host_signal.astype(np.float32))) /
        np.sum(np.square(host_signal.astype(np.float32) - eval_signal.astype(np.float32))))
    print(f'SNR = {SNR}dB')

def main():
    """ Main function to embed and then detect the watermark. """
    embed()  # Perform watermark embedding
    detect() # Perform watermark detection

# Entry point of the script
if __name__ in '__main__':
    main()


ORIGINAL WATERMARK:  This is your watermark
RECOVERED WATERMARK: This is your watermark
bit error rate = 0.0 %
SNR = 32.734291076660156dB


Extract watermark without source file

In [7]:
def detect_without_original():
    """ Detect the watermark from the watermarked audio signal without the original signal. """
    
    # Load the watermarked audio signal
    _, eval_signal = wavfile.read(WATERMARK_SIGNAL_FILE)
    signal_len = len(eval_signal)

    # Calculate the hop length and the number of bits embedded
    frame_shift = FRAME_LENGTH * (1 - OVERLAP)
    embed_nbit = fix((signal_len - FRAME_LENGTH * OVERLAP) / frame_shift)

    if REP_CODE:
        effective_nbit = np.floor(embed_nbit / NUM_REPS)
        embed_nbit = effective_nbit * NUM_REPS
    else:
        effective_nbit = embed_nbit

    # Load the pseudo-random sequence used during embedding
    with open(PSEUDO_RAND_FILE, 'r') as f:
        prs = np.array([float(x.rstrip()) for x in f.readlines()])

    # Initialize the detected bits array
    pointer = 0
    detected_bit = np.zeros(embed_nbit)

    # Detect the watermark by correlating the watermarked signal with the PRS directly
    for i in range(embed_nbit):
        frame = eval_signal[pointer: pointer + FRAME_LENGTH]

        # Perform correlation directly with the PRS
        comp = np.correlate(frame, prs, "full")
        maxp = np.argmax(np.abs(comp))  # Find the correlation peak

        # Decide the bit value based on the correlation result
        if comp[maxp] >= 0:
            detected_bit[i] = 1
        else:
            detected_bit[i] = 0

        pointer = pointer + frame_shift

    # Recover the watermark by averaging over the repetitions
    if REP_CODE:
        count = 0
        wmark_recovered = np.zeros(effective_nbit)
        for i in range(effective_nbit):
            ave = np.sum(detected_bit[count:count+NUM_REPS]) / NUM_REPS
            wmark_recovered[i] = 1 if ave >= 0.5 else 0
            count += NUM_REPS
    else:
        wmark_recovered = detected_bit

    print('ORIGINAL WATERMARK: ',WATERMARK_TEXT)

    # decode recovered watermark 
    recovered_binary=''.join(str(digit) for digit in wmark_recovered.astype(int))
    padding_needed = (8 - len(recovered_binary) % 8) % 8  # Calculate padding amount
    recovered_binary = '0' * padding_needed + recovered_binary # Pad with zeros on the left
    chars = [recovered_binary[i:i + 8] for i in range(0, len(recovered_binary), 8)]# Split the binary string into chunks of 8
    original_text = ''
    for char in chars: # Convert each chunk back to a character,
        if char != '00000000':  # Skip null character
            original_text += chr(int(char, 2))
    print('RECOVERED WATERMARK:',original_text)

def main():
    """ Main function to embed and then detect the watermark. """
    detect_without_original() # Perform watermark detection

# Entry point of the script
if __name__ in '__main__':
    main()
