In [334]:
import numpy as np
import pywt
from PIL import Image
import soundfile as sf

In [335]:
#File Paths
audio_path = "audio/blankspace.wav"
stego_path = "stego.wav"
secret_data_path = "images/secret.jpg"
extracted_data_path = "extracted.png"

#CONFIGURATIONS
DELTA = 0.001
DWT_LEVEL = 4
WAVELET = "db4"
SEED_LEFT = 122
SEED_RIGHT = 222
TWO_BIT_QIM = True
SYNC_BITS=["1","0","1","0","1","0","1","0","1","0","1","0","1","0","1","0"]
FRAME_SIZE = 2000000

In [336]:
#UTILITY FUNCTIONS
def group_bits(bits_array):
    bits_array= np.array(bits_array)
    grouped_bits = np.char.add(bits_array[::2].astype(str),bits_array[1::2].astype(str))
    return grouped_bits

#DATA TYPE: Image
def image_to_bits(path):
    img =Image.open(path).convert("RGB")
    img_array = np.array(img)
    img_shape = img_array.shape
    bit_array = np.unpackbits(img_array.astype(np.uint8))
    return bit_array, img_shape

def bits_to_image(bit_array,shape):
    bit_array = np.array(bit_array,dtype=np.uint8)
    byte_array = np.packbits(bit_array)
    img_array = byte_array.reshape(shape)
    return img_array

In [337]:
# EMBEDDING UTILITY FUNCTIONS
def modify_coefficent(coeff,embed_value,delta,two_bit_qim):
    dither = 0

    if(two_bit_qim):
        possible_dithers = [i*delta/16 for i in range(1,8,2)]

        if(embed_value == "00"): 
            dither = possible_dithers[0]
        elif (embed_value == "01"):
            dither = possible_dithers[1]
        elif (embed_value == "10"):
            dither = possible_dithers[2]
        elif (embed_value == "11"):
            dither = possible_dithers[3]
    
    else:
        possible_dithers = [i*delta/8 for i in range(1,4,2)]
        
        if(embed_value == 0):
            dither = possible_dithers[0]
        else:
            dither = possible_dithers[1]
    
    modified_coeff = delta * np.round((coeff-dither)/delta) + dither
    return modified_coeff

def apply_dwt(audio_path, wavelet, dwt_level, payload_size):
    audio_stereo, sr = sf.read(audio_path)
    audio_stereo = audio_stereo.T

    audio_left = audio_stereo[0]
    audio_right = audio_stereo[1]

    if len(audio_left) % 2 != 0:
        audio_left = audio_left[:-1]
        audio_right = audio_right[:-1]

    max_val = np.max(np.abs(audio_left))
    audio_left /= max_val
    audio_right /= max_val

    # Determine frame size
    frame_size = FRAME_SIZE

    # Split into frames
    num_frames = len(audio_left) // frame_size
    frames_left = [audio_left[i * frame_size: (i + 1) * frame_size] for i in range(num_frames)]
    frames_right = [audio_right[i * frame_size: (i + 1) * frame_size] for i in range(num_frames)]


    print(len(frames_left))
    # Apply DWT independently to each frame
    dwt_coeffs_left = []
    dwt_coeffs_right=[]

    frames_sync_values_left=[]
    for frame in frames_left:
        frames_sync_values_left.append(frame[:8])
        dwt_coeffs_left.append(pywt.wavedec(frame[8:], wavelet, level=dwt_level, mode="periodization"))


    frames_sync_values_right=[]
    for frame in frames_right:
        frames_sync_values_right.append(frame[:8])
        dwt_coeffs_right.append(pywt.wavedec(frame[8:], wavelet, level=dwt_level, mode="periodization"))

    return dwt_coeffs_left, dwt_coeffs_right, frames_sync_values_left, frames_sync_values_right,sr

In [338]:
def insert_sync_bits(sync_frame, sync_bits):
    sync_bits = group_bits(sync_bits)
    for i,sync_val in enumerate(sync_frame):
        sync_frame[i] = modify_coefficent(sync_val,sync_bits[i],delta=DELTA, two_bit_qim=TWO_BIT_QIM)
    
    return sync_frame

modified = insert_sync_bits([1,2,3,4],SYNC_BITS)

In [None]:
#HEADER UTILITIES
def prepare_header(version, files_list,two_bit_qim):
    """
    Header fields:
      - Version:        8 bits
      - Header Length:  8 bits -> computed as total header length in bits
      - File Count:     4 bits
    
    For each file:
      - File Type:      4 bytes (32 bits)
      - If an image (jpg/jpeg/png): 
                          Image metadata is stored in 26 bits, with:
                             * Height: 12 bits
                             * Width:  12 bits
                             * Channels: 2 bits
      - Payload Length: 32 bits
    
    Finally:
      - Decision Flag:  1 bit
    
    Returns:
      header_bits:  A list of bits (0/1) representing the header.
    """
    header_bits = []

    header_bits.extend(format(version,"08b"))
    header_bits.extend("00000000")  # placeholder for header length (8 bits)

    file_count = len(files_list) & 0x0F
    header_bits.extend(format(file_count,"04b"))

    for file in files_list:
        file_type = file["file_type"].ljust(4, " ")[:4].encode("ascii")
        for byte in file_type:
            header_bits.extend(format(byte, '08b'))

        # If file is an image, add image metadata
        if file["file_type"].strip().lower() in ["jpg", "jpeg", "png"]:
            height = file["height"]
            width = file["width"]
            channels = file["channels"]
            img_metadata = (height << 14) | (width << 2) | (channels & 0x3)
            header_bits.extend(format(img_metadata, '026b'))
        
        payload_length = file["payload_length"]
        header_bits.extend(format(payload_length, '032b'))
    
    header_bits.extend(format(two_bit_qim & 0x01, "01b"))

    computed_header_len = len(header_bits)
    if(computed_header_len % 2 !=0 ):
        #Add padding bit
        header_bits.append("0")
        computed_header_len +=1

    header_bits[8:16] = format(computed_header_len,"08b")

    return header_bits


def decode_header(header_bits):
    header = {}
    header['version'] = int("".join(header_bits[0:8]), 2)
    header['header_length'] = int("".join(header_bits[8:16]), 2)
    header['file_count'] = int("".join(header_bits[16:20]), 2)
    current_pos = 20
    files = []
    for _ in range(header['file_count']):
        file = {}
        file_type_bits = "".join(header_bits[current_pos:current_pos+32])
        file_type_bytes = bytearray(int(file_type_bits[i:i+8], 2) for i in range(0, 32, 8))
        file['file_type'] = file_type_bytes.decode("ascii").strip()
        current_pos += 32
        if file['file_type'].lower() in ["jpg", "jpeg", "png"]:
            img_metadata_bits = "".join(header_bits[current_pos:current_pos+26])
            img_metadata = int(img_metadata_bits, 2)
            file['height'] = (img_metadata >> 14) & 0xFFF
            file['width'] = (img_metadata >> 2) & 0xFFF
            file['channels'] = img_metadata & 0x3
            current_pos += 26
        payload_length_bits = "".join(header_bits[current_pos:current_pos+32])
        file['payload_length'] = int(payload_length_bits, 2)
        current_pos += 32
        files.append(file)
    header['two_bit_qim'] = int("".join(header_bits[current_pos:current_pos+1]), 2)
    header['files'] = files
    return header


In [340]:
#Considering data to embed is an image
secret_data, secret_shape = image_to_bits(path=secret_data_path)
files = [
    {
        "file_type":"jpg",
        "height":secret_shape[0],
        "width":secret_shape[1],
        "channels":secret_shape[2],
        "payload_length":len(secret_data)
    }
]

header_bits = prepare_header(version=1,files_list=files,two_bit_qim=1)

#Add Sync Bits(8 bit) at the start of the header
header_bits = header_bits
print(header_bits)
## CHECK WHETHER HEADER IS CREATED PROPERLY OR NOT
decoded = decode_header(header_bits)
print(decoded)
######################################

if(TWO_BIT_QIM):
    secret_data = group_bits(secret_data)
    header_bits = group_bits(header_bits)


['0', '0', '0', '0', '0', '0', '0', '1', '0', '1', '1', '1', '0', '0', '0', '0', '0', '0', '0', '1', '0', '1', '1', '0', '1', '0', '1', '0', '0', '1', '1', '1', '0', '0', '0', '0', '0', '1', '1', '0', '0', '1', '1', '1', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '1', '1', '0', '1', '0', '1', '0', '1', '1', '0', '0', '1', '0', '1', '0', '0', '0', '0', '0', '0', '0', '1', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '1', '0', '0', '1', '0', '0', '0', '0', '0', '1', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '0']
{'version': 1, 'header_length': 112, 'file_count': 1, 'two_bit_qim': 1, 'files': [{'file_type': 'jpg', 'height': 427, 'width': 640, 'channels': 3, 'payload_length': 6558720}]}


In [341]:
def split_data(payload_array, payload_distribution):
    start_index = 0
    result = []

    for size in payload_distribution:
        end_index = start_index + size
        result.append(payload_array[start_index:end_index])
        start_index = end_index  # Update the start index for the next slice
    return result

print(split_data([1,2,3,4,5],[1,4]))

[[1], [2, 3, 4, 5]]


In [342]:
def generate_embedding_location(coeffs_array,payload_size,seed):
    embedding_location = []
    np.random.seed(seed)
    total_elements = sum(len(level_coeffs) for level_coeffs in coeffs_array)
    payload_distribution_len = [int(len(level_coeffs) / total_elements * payload_size) for level_coeffs in coeffs_array]
    payload_distribution_len[0] += payload_size - sum(payload_distribution_len)

    for i, level_coeffs in enumerate(coeffs_array):
        embedding_location.append(np.random.choice(
            len(level_coeffs),
            size=payload_distribution_len[i],
            replace=False
        ))
    
    return embedding_location,payload_distribution_len

In [343]:
frames_dwt_coeffs_left, frames_dwt_coeffs_right,frames_sync_values_left,frames_sync_values_right, sr = apply_dwt(audio_path=audio_path, wavelet=WAVELET, dwt_level=DWT_LEVEL,payload_size=len(secret_data) + len(header_bits))  # Arrays of coeffs for each frame

modified_frames_dwt_coeffs_left = []
modified_frames_dwt_coeffs_right = []

for frame_left, frame_right in zip(frames_dwt_coeffs_left, frames_dwt_coeffs_right):
    dwt_coeffs_left = frame_left
    dwt_coeffs_right = frame_right

    detail_coeffs_left = dwt_coeffs_left[1:]
    detail_coeffs_right = dwt_coeffs_right[1:]

    possible_locations = sum([level.shape[0] for level in detail_coeffs_left]) * 2
    payload_length = len(secret_data)

    print(f"Payload Capacity: {possible_locations} and Payload Length: {payload_length}")
    if payload_length > possible_locations:
        raise ValueError(f"Payload length ({payload_length}) exceeds available valid embedding locations ({possible_locations}).")

    # Inject the header directly into first-level detail coefficients
    for i, value in enumerate(header_bits):
        detail_coeffs_left[0][i] = modify_coefficent(
            coeff=detail_coeffs_left[0][i],
            embed_value=value,
            delta=DELTA,
            two_bit_qim=TWO_BIT_QIM
        )


    detail_coeffs_header_part = detail_coeffs_left[0][:len(header_bits)]

    mid = int(payload_length / 2)
    secret_data_left = secret_data[mid:]
    secret_data_right = secret_data[:mid]
    # Payload is injected in remaining bits
    detail_coeffs_left[0] = detail_coeffs_left[0][len(header_bits):]

    embed_location_left, payload_dist_left_len = generate_embedding_location(detail_coeffs_left, len(secret_data_left), SEED_LEFT)
    payload_dist_left = split_data(secret_data_left, payload_dist_left_len)

    embed_location_right, payload_dist_right_len = generate_embedding_location(detail_coeffs_right, len(secret_data_right), SEED_RIGHT)
    payload_dist_right = split_data(secret_data_right, payload_dist_right_len)

    for i, (level_location_left, level_location_right) in enumerate(zip(embed_location_left, embed_location_right)):
        for j, (loc_left, loc_right) in enumerate(zip(level_location_left, level_location_right)):
            detail_coeffs_left[i][loc_left] = modify_coefficent(
                coeff=detail_coeffs_left[i][loc_left],
                embed_value=payload_dist_left[i][j],
                delta=DELTA,
                two_bit_qim=TWO_BIT_QIM
            )
            detail_coeffs_right[i][loc_right] = modify_coefficent(
                coeff=detail_coeffs_right[i][loc_right],
                embed_value=payload_dist_right[i][j],
                delta=DELTA,
                two_bit_qim=TWO_BIT_QIM
            )

    detail_coeffs_left[0] = np.concatenate([detail_coeffs_header_part, detail_coeffs_left[0]])
    print("CHECK LEFT RIGHT AFTER AFTER--------------------")
    print(sum([level.shape[0] for level in detail_coeffs_left]))
    print(sum([level.shape[0] for level in detail_coeffs_right]))

    modified_frames_dwt_coeffs_left.append([dwt_coeffs_left[0]] + detail_coeffs_left)
    modified_frames_dwt_coeffs_right.append([dwt_coeffs_right[0]] + detail_coeffs_right)
    
# Function to reconstruct frames from their DWT coefficients
def reconstruct_frames(frames_dwt_coeffs, wavelet):
    reconstructed_frames = []
    for frame_coeffs in frames_dwt_coeffs:
        # Reconstruct the time-domain signal for this frame
        frame_signal = pywt.waverec(frame_coeffs, wavelet=wavelet, mode="periodization")
        reconstructed_frames.append(frame_signal)
    return reconstructed_frames

# Reconstruct each frame separately
reconstructed_frames_left = reconstruct_frames(modified_frames_dwt_coeffs_left, WAVELET)
reconstructed_frames_right = reconstruct_frames(modified_frames_dwt_coeffs_right, WAVELET)

# Now concatenate the frames in the time domain
stego_audio_left = np.concatenate(reconstructed_frames_left)
stego_audio_right = np.concatenate(reconstructed_frames_right)


6
Payload Capacity: 3749986 and Payload Length: 3279360
CHECK LEFT RIGHT AFTER AFTER--------------------
1874993
1874993
Payload Capacity: 3749986 and Payload Length: 3279360
CHECK LEFT RIGHT AFTER AFTER--------------------
1874993
1874993
Payload Capacity: 3749986 and Payload Length: 3279360
CHECK LEFT RIGHT AFTER AFTER--------------------
1874993
1874993
Payload Capacity: 3749986 and Payload Length: 3279360
CHECK LEFT RIGHT AFTER AFTER--------------------
1874993
1874993
Payload Capacity: 3749986 and Payload Length: 3279360
CHECK LEFT RIGHT AFTER AFTER--------------------
1874993
1874993
Payload Capacity: 3749986 and Payload Length: 3279360
CHECK LEFT RIGHT AFTER AFTER--------------------
1874993
1874993


In [344]:
def insert_array_at_index(arr, insert_arr, index):
    print(arr, insert_arr)
    # Ensure the index is within the range
    if index > len(arr):
        raise IndexError("Index is out of bounds")
    # Insert the array using list slicing
    return np.concatenate([arr[:index], insert_arr, arr[index:]])

In [345]:
frame_size = FRAME_SIZE

for i,sync_portion in enumerate(frames_sync_values_left):
    sync_portion = insert_sync_bits(sync_frame=sync_portion, sync_bits=SYNC_BITS)
    stego_audio_left = insert_array_at_index(stego_audio_left, sync_portion, i*frame_size)
    stego_audio_right = insert_array_at_index(stego_audio_right,frames_sync_values_right[i], i*frame_size)
    
# #Insert the Sync Bits
# stego_audio_left = insert_sync_bits(audio=stego_audio_left,interval=len(stego_audio_left)//4,sync_bits=SYNC_BITS)
# stego_audio_right = insert_sync_bits(audio=stego_audio_right,interval=len(stego_audio_right)//4,sync_bits=SYNC_BITS)

min_len = min(len(stego_audio_left), len(stego_audio_right))
reconstructed_stereo = np.column_stack((stego_audio_left[:min_len], stego_audio_right[:min_len]))

sf.write(stego_path, reconstructed_stereo, sr)

[ 3.59730016e-04 -9.56854953e-05  5.56700731e-04 ...  2.77345724e-02
  2.93322316e-02  3.04659117e-02] [0.0003125 0.0003125 0.0003125 0.0003125 0.0003125 0.0003125 0.0003125
 0.0003125]
[-0.00018167 -0.00012893  0.00028893 ...  0.07276029  0.07148552
  0.06994002] [0. 0. 0. 0. 0. 0. 0. 0.]
[0.0003125  0.0003125  0.0003125  ... 0.02773457 0.02933223 0.03046591] [-0.1996875 -0.2196875 -0.2386875 -0.2496875 -0.2506875 -0.2516875
 -0.2526875 -0.2516875]
[0.         0.         0.         ... 0.07276029 0.07148552 0.06994002] [-0.13122559 -0.13787842 -0.14852905 -0.16296387 -0.1763916  -0.18630981
 -0.18609619 -0.18395996]
[0.0003125  0.0003125  0.0003125  ... 0.02773457 0.02933223 0.03046591] [-0.4946875 -0.5016875 -0.4896875 -0.4786875 -0.4726875 -0.4576875
 -0.4616875 -0.4776875]
[0.         0.         0.         ... 0.07276029 0.07148552 0.06994002] [-0.12234497 -0.16638184 -0.1946106  -0.20812988 -0.20404053 -0.19338989
 -0.20587158 -0.22906494]
[0.0003125  0.0003125  0.0003125  ... 0.0

In [346]:
def extract_secret_value(stego_coeff, two_bit_qim, delta):
    dithers = []
    possible_values = []

    if(two_bit_qim):
        dithers = [i*delta/16 for i in range(1,8,2)]
        possible_values=["00","01","10","11"]
    else:
        possible_values=["0","1"]

    q = [(delta * np.round((stego_coeff - dither)/delta) + dither) for dither in dithers]
    error_value = [abs(stego_coeff - q_value) for q_value in q]
    min_index = error_value.index(min(error_value))

    return possible_values[min_index]

In [347]:
def get_synced_start(audio_path, wavelet, dwt_level):
    audio_stereo, sr = sf.read(audio_path)
    audio_stereo = audio_stereo.T

    audio_left = audio_stereo[0]
    audio_right = audio_stereo[1]

    if len(audio_left) % 2 != 0:
        audio_left = audio_left[:-1]
        audio_right = audio_right[:-1]

    max_val = np.max(np.abs(audio_left))
    audio_left /= max_val
    audio_right /= max_val

    bits_accumulated = []
    start_i = -1
    end_i = 0
    sync_bits = group_bits(SYNC_BITS)

    for i, coeff in enumerate(audio_left):
        bit = extract_secret_value(coeff, two_bit_qim=TWO_BIT_QIM, delta=DELTA)
        bits_accumulated.append(bit)

        if len(bits_accumulated) > len(sync_bits):
            bits_accumulated.pop(0)

        if np.array_equal(bits_accumulated, sync_bits):
            print("matched at index:", i)
            end_i = i + 1
            break

    if end_i == 0:
        raise ValueError("Sync bits not found in the audio.")

    frame_end = end_i + FRAME_SIZE - len(sync_bits)
    
    if frame_end > len(audio_left):
        raise ValueError("Frame size exceeds audio length after sync detection.")

    dwt_coeffs_left = pywt.wavedec(audio_left[end_i:frame_end], wavelet, level=dwt_level, mode="periodization")
    dwt_coeffs_right = pywt.wavedec(audio_right[end_i:frame_end], wavelet, level=dwt_level, mode="periodization")

    return dwt_coeffs_left, dwt_coeffs_right


In [349]:
stego_dwt_coeffs_left, stego_dwt_coeffs_right=get_synced_start(audio_path="stego.wav",wavelet=WAVELET,dwt_level=DWT_LEVEL)

stego_detail_coeffs_left = stego_dwt_coeffs_left[1:] #Gives an array of array of different level of detail coeffs
stego_detail_coeffs_right = stego_dwt_coeffs_right[1:]
#Stego Detail Coeffs left of level zero contains header
#In the index 8 to 15

header_len_coeffs = stego_detail_coeffs_left[0][4:8]
header_len_bits = []
for _,coeff in enumerate(header_len_coeffs):
    header_len_bits.append(extract_secret_value(coeff,two_bit_qim=TWO_BIT_QIM,delta=DELTA))

header_length = int("".join(map(str, header_len_bits)), 2)

header_coeffs = stego_detail_coeffs_left[0][:header_length//2]
header_bits=[]
for _,coeff in enumerate(header_coeffs):
    header_bits.append(extract_secret_value(coeff,two_bit_qim=TWO_BIT_QIM, delta=DELTA))

print(header_bits)
if(TWO_BIT_QIM):
    header_bits = [bit for pair in header_bits for bit in pair]

#Decode the Header
header = decode_header(header_bits)
print(header)
##############################################

#Remove the header coeff part and extract the payload
stego_detail_coeffs_left[0] = stego_detail_coeffs_left[0][header_length//2:]
#group_factor
if(TWO_BIT_QIM):
    group_factor = 2
else:
    group_factor = 1

stego_payload_len = int((header["files"][0]["payload_length"])/group_factor)

payload_left= int(stego_payload_len/2)
payload_right = stego_payload_len - payload_left

embed_location_left,payload_dist_left_len = generate_embedding_location(stego_detail_coeffs_left,payload_left,SEED_LEFT)

embed_location_right, payload_dist_right_len = generate_embedding_location(stego_detail_coeffs_right,payload_right, SEED_RIGHT)

extracted_data_left = []
extracted_data_right = []

# Extract payload from left channel detail coefficients using list comprehension
extracted_data_left = [
    extract_secret_value(stego_detail_coeffs_left[i][loc], two_bit_qim=TWO_BIT_QIM, delta=DELTA)
    for i in range(len(embed_location_left))
    for loc in embed_location_left[i]
]

# Extract payload from right channel detail coefficients similarly
extracted_data_right = [
    extract_secret_value(stego_detail_coeffs_right[i][loc], two_bit_qim=TWO_BIT_QIM, delta=DELTA)
    for i in range(len(embed_location_right))
    for loc in embed_location_right[i]
]


matched at index: 7
['00', '00', '00', '01', '01', '11', '00', '00', '00', '01', '01', '10', '10', '10', '01', '11', '00', '00', '01', '10', '01', '11', '00', '10', '00', '00', '00', '01', '10', '10', '10', '11', '00', '10', '10', '00', '00', '00', '11', '00', '00', '00', '00', '01', '10', '01', '00', '00', '01', '01', '00', '00', '00', '00', '00', '10']
{'version': 1, 'header_length': 112, 'file_count': 1, 'two_bit_qim': 1, 'files': [{'file_type': 'jpg', 'height': 427, 'width': 640, 'channels': 3, 'payload_length': 6558720}]}


In [350]:
extracted_data = extracted_data_right + extracted_data_left
print(len(extracted_data))
if(TWO_BIT_QIM):
    extracted_data = [bit for pair in extracted_data for bit in pair] #Ungroup the bits

img_metadata = header["files"][0]
extracted_image = bits_to_image(extracted_data, (img_metadata["height"],img_metadata["width"],img_metadata["channels"]))

img = Image.fromarray(extracted_image)
img.save(extracted_data_path)

3279360


In [None]:
#to check bit alteration
counter=0
original_data=secret_data
if(TWO_BIT_QIM):
    original_data = [bit for pair in original_data for bit in pair]
for i in range(len(original_data)):
    if(int(original_data[i])!=int(extracted_data[i])):
        print(f"Mismatch location: {i}")
        counter=counter+1
print(counter)


Mismatch location: 218538
Mismatch location: 218541
Mismatch location: 218542
Mismatch location: 218543
Mismatch location: 218545
Mismatch location: 218547
Mismatch location: 218549
Mismatch location: 218550
Mismatch location: 218553
Mismatch location: 218554
Mismatch location: 218556
Mismatch location: 218557
Mismatch location: 218559
Mismatch location: 218562
Mismatch location: 218564
Mismatch location: 218565
Mismatch location: 218568
Mismatch location: 218570
Mismatch location: 218572
Mismatch location: 218575
Mismatch location: 218576
Mismatch location: 218577
Mismatch location: 218578
Mismatch location: 218581
Mismatch location: 218582
Mismatch location: 218587
Mismatch location: 218588
Mismatch location: 218592
Mismatch location: 218593
Mismatch location: 218594
Mismatch location: 218595
Mismatch location: 218598
Mismatch location: 218600
Mismatch location: 218602
Mismatch location: 218604
Mismatch location: 218605
Mismatch location: 218606
Mismatch location: 218608
Mismatch loc

In [352]:
def calculate_mse(original, processed):
    return np.mean((original - processed) ** 2)

def calculate_snr(original, noise):
    signal_power = np.mean(original ** 2)
    noise_power = np.mean(noise ** 2)
    return 10 * np.log10(signal_power / noise_power) if noise_power > 0 else float('inf')

def calculate_psnr(original, processed):
    mse = calculate_mse(original, processed)
    max_val = np.max(original)
    return 10 * np.log10((max_val ** 2) / mse) if mse > 0 else float('inf')

def compute_audio_metrics(original_path, processed_path):
    # Load audio files
    original, sr1 = sf.read(original_path)
    processed, sr2 = sf.read(processed_path)

    # Ensure both audios have the same sample rate
    if sr1 != sr2:
        raise ValueError("Sample rates do not match!")

    # Ensure both signals are of the same length
    min_len = min(len(original), len(processed))
    original = original[:min_len]
    processed = processed[:min_len]

    # Compute metrics
    mse = calculate_mse(original, processed)
    snr = calculate_snr(original, original - processed)
    psnr = calculate_psnr(original, processed)

    return mse, snr, psnr

# Example usage
original_audio_path = "audio/blankspace.wav"
processed_audio_path = "stego.wav"

mse, snr, psnr = compute_audio_metrics(original_audio_path, processed_audio_path)

print(f"MSE: {mse}")
print(f"SNR: {snr} dB")
print(f"PSNR: {psnr} dB")

MSE: 6.829187910382947e-08
SNR: 59.01481393924139 dB
PSNR: 71.65604429532856 dB
