## Используемые библиотеки

In [None]:
import pywt
from enum import Enum
import cv2
from PIL import Image, ImageChops
import numpy as np
import matplotlib.pyplot as plt
from typing import Optional, Tuple, Dict, Any

## Получение выбранного канала изображения

In [None]:
class ColorSpaceType(Enum):
    RGB = 'RGB'
    YCrCb = 'YCrCb'

class SubbandType(Enum):
    LL = "LL"
    LH = "LH"
    HL = "HL"
    HH = "HH"

def get_image_channels(image_path: str, color_space: ColorSpaceType):
    if color_space is ColorSpaceType.RGB:
        img = Image.open(image_path).convert('RGB')
        img_array = np.array(img)

        return {
            'color_space': color_space,
            'channels': img_array
        }
    
    elif color_space is ColorSpaceType.YCrCb:
        img = Image.open(image_path).convert('RGB')
        img_array = np.array(img)
        img_ycbcr = cv2.cvtColor(img_array, cv2.COLOR_RGB2YCrCb)

        return {
            'color_space': color_space,
            'channels': img_ycbcr
        }
    
    else:
        raise ValueError('Invalid color space.')
        
def save_image_channels(img_data: np.ndarray, out_path: str, color_space: ColorSpaceType):
    if color_space is ColorSpaceType.RGB:
        Image.fromarray(img_data).save(out_path)
        return
    
    elif color_space is ColorSpaceType.YCrCb:
        cbcr_image_data = cv2.cvtColor(img_data, cv2.COLOR_YCrCb2RGB)
        Image.fromarray(cbcr_image_data).save(out_path)
        return
    
    else:
        raise ValueError('Invalid color space.')

## Заполнение контейнера и получение из него строки

In [None]:
def text_to_bits(text: str) -> str:
    # кодируем строку в UTF-8
    data = text.encode("utf-8")
    # каждый байт превращаем в 8 бит
    return ''.join(f"{byte:08b}" for byte in data)

def bits_to_text(bits: str) -> str:
    # берём только полные байты
    bits = bits[: (len(bits) // 8) * 8]

    # переводим 8-битные блоки в байты
    bytes_arr = bytearray(
        int(bits[i:i+8], 2)
        for i in range(0, len(bits), 8)
    )

    # декодируем как UTF-8
    return bytes_arr.decode("utf-8", errors="replace")

def fill_container(container: np.ndarray[np.float64], text: str, Q: int):
    bits = text_to_bits(text)
    
    if len(bits) > len(container):
        raise ValueError(f"Контейнер слишком мал для встраивания текста: необходимо {len(bits)}, есть {len(container)}")
    
    modified_container = container.copy()

    # Квантование: используем округление, чтобы симметрично работать с отриц. значениями
    for idx, bit in enumerate(bits):
        c = container[idx]
        q = int(np.round(c / Q))
        if bit == '0':
            if q % 2 == 1:
                q -= 1
        else:
            if q % 2 == 0:
                q += 1
        modified_container[idx] = q * Q

    return modified_container, len(bits)

def extract_container(container: np.ndarray[np.float64], text_length: int, Q: int) -> str:
    data_bits = ''
    for i in range(0, text_length):
        c = container[i]
        q = int(np.round(c / Q))
        data_bits += '1' if (q % 2 == 1) else '0'

    return bits_to_text(data_bits)

### Метод 1 (Выбор наиболее больших коэффициентов по убыванию)

In [None]:
def fill_container_topk(container: np.ndarray[np.float64], text: str, Q: int):
    bits = text_to_bits(text)
    N = len(bits)

    if N > len(container):
        raise ValueError("Не хватает коэффициентов для Top-K")

    modified = container.copy()

    # Индексы отсортированы по |c|
    sorted_indices = np.argsort(np.abs(container))[::-1]
    embed_indices = sorted_indices[:N]

    for bit, idx in zip(bits, embed_indices):
        c = container[idx]
        q = int(np.round(c / Q))
        if bit == '0':
            if q % 2 == 1: q -= 1
        else:
            if q % 2 == 0: q += 1
        modified[idx] = q * Q

    return modified, N

def extract_container_topk(container: np.ndarray[np.float64], text_length: int, Q: int):
    sorted_indices = np.argsort(np.abs(container))[::-1]
    extract_indices = sorted_indices[:text_length]

    bits = ''
    for idx in extract_indices:
        c = container[idx]
        q = int(np.round(c / Q))
        bits += '1' if (q % 2 == 1) else '0'

    return bits_to_text(bits)

### Метод 2 (Выбор коэффициентов по порогу в ручную)

In [None]:
def fill_container_threshold(container: np.ndarray[np.float64], text: str, Q: int, T: float):
    bits = text_to_bits(text)

    # выбираем коэффициенты выше порога
    candidate_indices = np.where(np.abs(container) >= T)[0]

    print(f'max: {np.max(np.abs(container))}')

    print(f"Коэффициентов ≥ T: {len(candidate_indices)}, необходимо для встраивания: {len(bits)}")

    if len(candidate_indices) < len(bits):
        raise ValueError("Недостаточно коэффициентов ≥ T")

    modified = container.copy()

    for bit, idx in zip(bits, candidate_indices):
        c = container[idx]
        q = int(np.round(c / Q))
        if bit == '0':
            if q % 2 == 1: q -= 1
        else:
            if q % 2 == 0: q += 1
        modified[idx] = q * Q

    return modified, len(bits)

def extract_container_threshold(container: np.ndarray[np.float64], text_length: int, Q: int, T: float):
    candidate_indices = np.where(np.abs(container) >= T)[0]

    print(f'max: {np.max(np.abs(container))}')

    print(f"Коэффициентов ≥ T: {len(candidate_indices)}, необходимо для извлечения: {text_length}")

    if len(candidate_indices) < text_length:
        raise ValueError("Недостаточно коэффициентов ≥ T для извлечения")

    bits = ''
    for idx in candidate_indices[:text_length]:
        c = container[idx]
        q = int(np.round(c / Q))
        bits += '1' if (q % 2 == 1) else '0'

    return bits_to_text(bits)

## Анализ изображений

### Выделение отличий

In [None]:
def image_diff(img1: np.ndarray, img2: np.ndarray, amplify: int = 10) -> np.ndarray:
    """
    Создаёт diff двух изображений с усилением различий.

    amplify — множитель, чтобы лучше выделять слабые различия.
    """
    pil1 = Image.fromarray(img1)
    pil2 = Image.fromarray(img2)
    diff = ImageChops.difference(pil1, pil2)

    # Усиливаем различия
    diff = diff.point(lambda p: min(255, p * amplify))

    return np.array(diff)


def image_mask(img1: np.ndarray, img2: np.ndarray) -> np.ndarray:
    """
    Создаёт бинарную маску различий (красные пиксели — есть разница).
    """
    mask = np.any(img1 != img2, axis=-1)

    heatmap = np.zeros_like(img1)
    heatmap[mask] = [255, 0, 0]

    return heatmap


### Вычисление PSNR

In [None]:
def calculate_psnr(img1: np.ndarray, img2: np.ndarray) -> float:
    """
    Простой PSNR для RGB или grayscale.
    """
    if img1.shape != img2.shape:
        raise ValueError(f"Размеры не совпадают: {img1.shape} != {img2.shape}")

    mse = np.mean((img1.astype(float) - img2.astype(float)) ** 2)
    if mse == 0:
        return float('inf')

    return 10 * np.log10((255.0 ** 2) / mse)


def calculate_psnr_detailed(img1: np.ndarray, img2: np.ndarray) -> Dict[str, Any]:
    """
    PSNR по каналам + средний PSNR.
    """
    if img1.shape != img2.shape:
        raise ValueError("Размеры изображений не совпадают")

    if img1.ndim == 2:
        # 1 канал
        mse = np.mean((img1.astype(float) - img2.astype(float)) ** 2)
        psnr = float('inf') if mse == 0 else 10 * np.log10((255.0 ** 2) / mse)
        return {"psnr": psnr, "mse": mse}

    results = {}
    names = ["R", "G", "B"]

    psnrs = []
    mses = []

    for i, ch in enumerate(names):
        c1 = img1[:, :, i].astype(float)
        c2 = img2[:, :, i].astype(float)

        mse = np.mean((c1 - c2) ** 2)
        psnr = float('inf') if mse == 0 else 10 * np.log10((255.0 ** 2) / mse)

        results[f"psnr_{ch}"] = psnr
        results[f"mse_{ch}"] = mse

        psnrs.append(psnr)
        mses.append(mse)

    results["psnr_avg"] = np.mean(psnrs)
    results["mse_avg"] = np.mean(mses)

    return results

### Полный анализ изображений и визуализация

In [None]:
def visualize_comparison(
        original: np.ndarray,
        modified: np.ndarray,
        diff: np.ndarray,
        mask: np.ndarray,
        figsize: Tuple[int, int] = (16, 8)
):
    """
    Рисует 4 изображения: исходное, модифицированное, diff и маску различий.
    """
    plt.figure(figsize=figsize)

    plt.subplot(1, 4, 1)
    plt.title("Original")
    plt.imshow(original)
    plt.axis("off")

    plt.subplot(1, 4, 2)
    plt.title("Modified")
    plt.imshow(modified)
    plt.axis("off")

    plt.subplot(1, 4, 3)
    plt.title("Diff (amplified)")
    plt.imshow(diff)
    plt.axis("off")

    plt.subplot(1, 4, 4)
    plt.title("Difference Mask")
    plt.imshow(mask)
    plt.axis("off")

    plt.tight_layout()
    plt.show()

def visualize_comparison_v2(
        original: np.ndarray,
        modified: np.ndarray,
        diff: np.ndarray,
        mask: np.ndarray,
        figsize: Tuple[int, int] = (12, 8)
):
    plt.figure(figsize=figsize)

    plt.subplot(1, 2, 1)
    plt.title("Original")
    plt.imshow(original)
    plt.axis("off")

    plt.subplot(1, 2, 2)
    plt.title("Modified")
    plt.imshow(modified)
    plt.axis("off")

    plt.tight_layout()
    plt.show()

    plt.subplot(1, 2, 1)
    plt.title("Diff (amplified)")
    plt.imshow(diff)
    plt.axis("off")

    plt.subplot(1, 2, 2)
    plt.title("Difference Mask")
    plt.imshow(mask)
    plt.axis("off")

    plt.tight_layout()
    plt.show()
    plt.savefig("comparison.png")


def analyze_images(path1: str, path2: str, amplify: int = 10) -> Dict[str, Any]:
    """
    Полный анализ двух изображений:
    - diff
    - маска различий
    - PSNR
    - PSNR по каналам

    Возвращает словарь с результатами.
    """
    img1 = Image.open(path1).convert("RGB")
    img2 = Image.open(path2).convert("RGB")
    img_data1 = np.array(img1)
    img_data2 = np.array(img2)

    diff = image_diff(img_data1, img_data2, amplify)
    mask = image_mask(img_data1, img_data2)

    psnr = calculate_psnr(img_data1, img_data2)
    psnr_detailed = calculate_psnr_detailed(img_data1, img_data2)

    visualize_comparison_v2(img_data1, img_data2, diff, mask)

    Image.fromarray(diff).save("diff.png")
    Image.fromarray(mask).save("mask.png")

    return {
        "psnr": psnr,
        "psnr_detailed": psnr_detailed,
        "diff": diff,
        "mask": mask
    }

## Анализ встроенной информации

In [None]:
def loss_percentage_chars(original: str, extracted: str) -> float:
    """
    Возвращает процент символов, которые отличаются между original и extracted
    """
    # берём длину меньшей строки
    length = max(len(original), len(extracted))
    if length == 0:
        return 0.0
    
    # считаем несовпадающие позиции
    mismatches = 0
    for i in range(length):
        c1 = original[i] if i < len(original) else None
        c2 = extracted[i] if i < len(extracted) else None
        if c1 != c2:
            mismatches += 1
    
    return 100.0 * mismatches / length

def compare_strings(str1, str2):
    """Сравнивает две строки и показывает различия"""
    if len(str1) != len(str2):
        print(f"Длины разные: {len(str1)} vs {len(str2)}")
    
    min_len = min(len(str1), len(str2))
    differences = []
    
    for i in range(min_len):
        if str1[i] != str2[i]:
            differences.append((i, str1[i], str2[i]))
    
    if differences:
        print("Найдены различия:")
        for pos, char1, char2 in differences:
            print(f"  Позиция {pos}: '{char1}' vs '{char2}'")
    else:
        print("Строки идентичны (в пределах общей длины)")

## Функции встраивания текста в изображение и получение теста из изображения

In [None]:
def embed_text(image_path: str, out_path: str, embedded_text: str, Q: int, color_space: ColorSpaceType = ColorSpaceType.RGB, target_subband: str = 'LL', channel: int = 0) -> dict:
    # 1. Получаем выбранный канал изображения
    img_data = get_image_channels(image_path, color_space)
    channel_data = img_data['channels'][:, :, channel].astype(np.float64)

    # 2. Применяем к каналу DWT (используем symmetric для более стабильного поведения)
    wt_type = 'haar'
    wt_mode = 'symmetric'
    
    LL, (LH, HL, HH) = pywt.dwt2(channel_data, wt_type, mode=wt_mode)
    coeffs_dict = {
        'LL': LL,
        'LH': LH,
        'HL': HL,
        'HH': HH
    }

    # 3. Берем выбранную подполосу и приводим ее к одномерному массиву дл удобства
    selected_subband: np.ndarray[np.float64] = coeffs_dict[target_subband]
    container = selected_subband.flatten()

    # 4. Модифициурем контейнер (одномерный массив)
    modified_container, embedded_count = fill_container(container, embedded_text, Q)
    # modified_container, embedded_count = fill_container_topk(container, embedded_text, Q)
    # modified_container, embedded_count = fill_container_threshold(container, embedded_text, Q, T = 400.0)

    # 5. Выполняем обратные преобразования для получения модифицированного канала
    modied_subband = modified_container.reshape(selected_subband.shape)
    coeffs_dict[target_subband] = modied_subband
    modified_coeffs = (coeffs_dict['LL'], 
                           (coeffs_dict['LH'], 
                            coeffs_dict['HL'], 
                            coeffs_dict['HH']))
    modified_channel_data = pywt.idwt2(modified_coeffs, wt_type, mode=wt_mode)

    # 6. Сохраняем модифицированный канал в изображении
    img_data['channels'][:, :, channel] = modified_channel_data
    save_image_channels(img_data['channels'], out_path, color_space)

    return {
        'image_data': img_data,
        'selected_channel': channel,
        'channel_data': channel_data,
        'target_subband': target_subband,
        'embedded_count': embedded_count,
        'Q': Q
    }

In [None]:
def extract_text(image_path: str, embedded_count: int, Q: int, color_space: ColorSpaceType = ColorSpaceType.RGB, target_subband: str = 'LL', channel: int = 0) -> dict:
    # 1. Получаем выбранный канал изображения
    img_data = get_image_channels(image_path, color_space)
    channel_data = img_data['channels'][:, :, channel].astype(np.float64)

    # 2. Применяем к каналу DWT (используем symmetric для более стабильного поведения)
    wt_type = 'haar'
    wt_mode = 'symmetric'
    
    LL, (LH, HL, HH) = pywt.dwt2(channel_data, wt_type, mode=wt_mode)
    coeffs_dict = {
        'LL': LL,
        'LH': LH,
        'HL': HL,
        'HH': HH
    }

    # 3. Берем выбранную подполосу и приводим ее к одномерному массиву дл удобства
    selected_subband: np.ndarray[np.float64] = coeffs_dict[target_subband]
    container = selected_subband.flatten()

    # 4. Извлекаем текст из контейнера    
    embedded_text = extract_container(container, embedded_count, Q)
    # embedded_text = extract_container_topk(container, embedded_count, Q)
    # embedded_text = extract_container_threshold(container, embedded_count, Q, T = 400.0)


    return {
        'image_data': img_data,
        'selected_channel': channel,
        'channel_data': channel_data,
        'target_subband': target_subband,
        'extracted_text': embedded_text
    }

## Демонстрация работы

### Подготовка данных

In [None]:
used_img_path = 'img/marsik-960x1280-cleared.png'
embedded_img_path = 'embedded/marsik-960x1280-cleared-embedded.png'

# used_img_path = 'img/marsik-960x1280.png'
# embedded_img_path = 'embedded/marsik-960x1280-embedded.png'

# used_img_path = 'img/marsik-960x1280.jpg'
# embedded_img_path = 'embedded/marsik-960x1280-embedded.jpg'

# used_img_path = 'img/marsik-600x800.png'
# embedded_img_path = 'embedded/marsik-600x800-embedded.png'

# used_img_path = 'img/marsik-300x400.png'
# embedded_img_path = 'embedded/marsik-300x400-embedded.png'

# used_img_path = 'img/marsik-150x200.png'
# embedded_img_path = 'embedded/marsik-150x200-embedded.png'

# used_img_path = 'img/cat.jpg'
# embedded_img_path = 'embedded/cat-embedded.jpg'

# used_img_path = 'img/cat-cleared.jpg'
# embedded_img_path = 'embedded/cat-cleared-embedded.jpg'

# used_img_path = 'img/forest.jpg'
# embedded_img_path = 'embedded/forest-embedded.jpg'

secret_string = "Silence in an era of noise: How to find yourself in a world that never stops Our world is immersed in a continuous, intrusive hum. This is not just the physical noise of megacities, but a fundamental information and social backdrop that has become our new habitat. We wake up to the vibration of our smartphones, scroll through our news feeds at breakfast, and are immersed in a whirlwind of notifications, messaging apps, and endless online meetings at work. In the evening, as we try to relax, we unconsciously scroll through social media, consuming the carefully curated lives of others. This permanent digital noise creates the illusion of hyper-connection, but paradoxically leads to deep loneliness and distraction. We know more about the world, but less about ourselves. We have thousands of \"friends,\" but sometimes we don't have anyone to share our true sadness or joy with."

### Встраивание и получения встроенного изображения

In [None]:
embedded_result = embed_text(
    image_path=used_img_path,
    out_path=embedded_img_path,
    Q=7,
    embedded_text=secret_string,
    color_space=ColorSpaceType.YCrCb,
    target_subband='LL',
    channel=0
)

print(f'Встроено битов: {embedded_result["embedded_count"]}')

extract_result = extract_text(
    image_path=embedded_img_path,
    embedded_count=embedded_result['embedded_count'],
    Q=embedded_result['Q'],
    color_space=embedded_result['image_data']['color_space'],
    target_subband=embedded_result['target_subband'],
    channel=embedded_result['selected_channel']
)

print(f'Извлечённый текст: {extract_result["extracted_text"]}')
print(f'Извлечено битов: {len(extract_result["extracted_text"]) * 8}')

### Анализ результатов встраивания

In [None]:
result = analyze_images(
    used_img_path,
    embedded_img_path,
    amplify=10
)

print("PSNR:", result["psnr"])
# print(result["psnr_detailed"])

loss_percent = loss_percentage_chars(secret_string, extract_result["extracted_text"])
print(f"Процент потерянных символов: {loss_percent:.2f}%")
compare_strings(secret_string, extract_result["extracted_text"])