In [1]:

import os
os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = '1'


import numpy as np
import soundfile
import torch
import wavmark
from IPython.display import Audio, display




def text_to_binary(text):
    return [format(ord(char), '08b') for char in text]

def binary_to_text(binary_list):
    try:
        text = ''.join(chr(int(b, 2)) for b in binary_list)
        return text
    except ValueError:
        raise ValueError("Input should be a space-separated string of 8-bit binary numbers.")


embedded_text = "Tchaikovsky"

In [2]:
# 1.load model
# device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
device = torch.device("mps")
model = wavmark.load_model("wavmark_params/step59000_snr39.99_pesq4.35_BERP_none0.30_mean1.81_std1.81.model.pkl").to(device)

In [3]:
# 2.create 16-bit payload
# if embedded_text is odd, add a space to the end
if len(embedded_text) % 2 == 1:
    embedded_text += " "
binary_list = text_to_binary(embedded_text)

payload_list = []
for i in np.arange(0,len(embedded_text),2):
    payload_list.append(np.concatenate((
        np.array(list(binary_list[i]),dtype=np.int64),
        np.array(list(binary_list[i+1]),dtype=np.int64)
    )))
print("Information:", binary_to_text(binary_list))


Information: Tchaikovsky 


In [4]:
# 3.read host audio
# the audio should be a single-channel 16kHz wav, you can read it using soundfile:
signal, sampling_rate = soundfile.read("data/tchaikovsky_symphony5_movement4_theme.wav")
# Otherwise, you can use the following function to convert the host audio to single-channel 16kHz format:
# from wavmark.utils import file_reader
# signal = file_reader.read_as_single_channel("example.wav", aim_sr=16000)
signal_left = signal[:, 0]
signal_right = signal[:, 1]
display(Audio(data=signal_left, rate=sampling_rate, autoplay=True))



In [5]:
# 3. segmentation and encode
audio_length_left = len(signal_left)/sampling_rate
audio_length_left_int = np.int64(audio_length_left//1)
print("Audio length left:", audio_length_left, "s")

audio_length_right = len(signal_right)/sampling_rate
audio_length_right_int = np.int64(audio_length_right//1)
print("Audio length right:", audio_length_right, "s")

# segment the audio to 1s clips
signal_clips_left = [signal_left[i*sampling_rate:(i+1)*sampling_rate] for i in range(audio_length_left_int)]
signal_clips_right = [signal_right[i*sampling_rate:(i+1)*sampling_rate] for i in range(audio_length_right_int)]
print("Number of clips left:", len(signal_clips_left))
print("Number of clips right:", len(signal_clips_right))
print("Maximum number of encoded ASCII characters:", len(signal_clips_left)*2+len(signal_clips_right)*2)

Audio length left: 15.002993197278911 s
Audio length right: 15.002993197278911 s
Number of clips left: 15
Number of clips right: 15
Maximum number of encoded ASCII characters: 60


In [18]:
# 4. encode the payload
# the encoding rule:
# 1. left channel 0~1s (2 bytes) -> right channel 0~1s (2 bytes) -> left channel 1~2s (2 bytes) -> right channel 1~2s (2 bytes) -> ...

# get payload lists separately for left and right channels
payload_list_left = []
payload_list_right = []
for i in range(len(payload_list)):
    if i % 2 == 0:
        payload_list_left.append(payload_list[i])
    else:
        payload_list_right.append(payload_list[i])
print("Payload length left:", len(payload_list_left))
print("Payload length right:", len(payload_list_right))

# encode left channel
encoded_signal_clips_left = []
for i in range(len(payload_list_left)):
    if i < len(payload_list_left):
        print(i)
        watermarked_signal, _ = wavmark.encode_watermark(model, signal_clips_left[i], payload_list_left[i], show_progress=True)
        encoded_signal_clips_left.append(watermarked_signal)
    else:
        encoded_signal_clips_left.append(signal_clips_left[i])
encoded_signal_left = np.concatenate(encoded_signal_clips_left, axis=0)

# encode right channel
encoded_signal_clips_right = []
for i in range(len(payload_list_right)):
    if i < len(payload_list_right):
        watermarked_signal, _ = wavmark.encode_watermark(model, signal_clips_right[i], payload_list_right[i], show_progress=True)
        encoded_signal_clips_right.append(watermarked_signal)
    else:
        encoded_signal_clips_right.append(signal_clips_right[i])
encoded_signal_right = np.concatenate(encoded_signal_clips_right, axis=0)


soundfile.write("data/output_left.wav", encoded_signal_left, sampling_rate)
soundfile.write("data/output_right.wav", encoded_signal_right, sampling_rate)
soundfile.write("data/output.wav", np.stack((encoded_signal_left, encoded_signal_right), axis=1), sampling_rate)
display(Audio(data=np.stack((encoded_signal_left, encoded_signal_right)), rate=sampling_rate, autoplay=True))


Payload length left: 3
Payload length right: 3
0


Processing: 100%|██████████| 2/2 [00:00<00:00, 12.97it/s]


1


Processing: 100%|██████████| 2/2 [00:00<00:00, 18.70it/s]


2


Processing: 100%|██████████| 2/2 [00:00<00:00, 19.76it/s]
Processing: 100%|██████████| 2/2 [00:00<00:00, 19.68it/s]
Processing: 100%|██████████| 2/2 [00:00<00:00, 19.90it/s]
Processing: 100%|██████████| 2/2 [00:00<00:00, 19.88it/s]


In [23]:
# 5.decode watermark
payload_decoded, _ = wavmark.decode_watermark(model, encoded_signal_left, show_progress=True)
BER = ((payload_list[0]) != payload_decoded).mean() * 100

print("Decode BER:%.1f" % BER)

100%|██████████| 15/15 [00:07<00:00,  2.05it/s]

Decode BER:25.0





In [8]:
payload_decoded

array([0, 1, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 1, 1])