# Extract LSBs 

In this notebook, we will extract the LSBs of a stego images to find similarities in the payloads.

In [1]:
%pip install pillow tqdm ipywidgets notebook asyncstdlib
!jupyter nbextension enable --py widgetsnbextension
from PIL import Image, ImageChops
from pathlib import Path
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm
from asyncstdlib import itertools as ait
from asyncstdlib import functools as afn

[31mERROR: Exception:
Traceback (most recent call last):
  File "/Users/fabianloewe/Data/master-thesis/venv/lib/python3.11/site-packages/pip/_internal/cli/base_command.py", line 180, in exc_logging_wrapper
    status = run_func(*args)
             ^^^^^^^^^^^^^^^
  File "/Users/fabianloewe/Data/master-thesis/venv/lib/python3.11/site-packages/pip/_internal/cli/req_command.py", line 245, in wrapper
    return func(self, options, args)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/fabianloewe/Data/master-thesis/venv/lib/python3.11/site-packages/pip/_internal/commands/install.py", line 324, in run
    session = self.get_default_session(options)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/fabianloewe/Data/master-thesis/venv/lib/python3.11/site-packages/pip/_internal/cli/req_command.py", line 95, in get_default_session
    self._session = self.enter_context(self._build_session(options))
                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


## Define constants

In [3]:
STEGOAPPDB_PATH = Path('../datasets/StegoAppDB_stegos_20240309-030352')
INFO_FILE = STEGOAPPDB_PATH / 'StegoAppDB_stegos_20240309-030352_stego_directory.csv'
COVERS_PATH = STEGOAPPDB_PATH / 'covers'
STEGOS_PATH = STEGOAPPDB_PATH / 'stegos'
METHOD_COLUMN = 'embedding_method'
STEGO_COLUMN = 'image_filename'
COVER_COLUMN = 'cover_image_filename'

## Collect stego images

We will read the info file to collect all embedding methods and the stego images that were generated using them.

### Gather embedding methods

The embedding methods can be found in the `METHOD_COLUMN` of the info file.

In [4]:
info_file = pd.read_csv(INFO_FILE)
embedding_methods = info_file[METHOD_COLUMN].unique()
print(f'Found the following embedding methods: {", ".join(embedding_methods)}')

Found the following embedding methods: MobiStego, PixelKnot, PocketStego, Pictograph, SteganographyM, Passlok


### Collect stego images

Now, for each embedding method, we will collect the stego images that were generated with it.

In [5]:
def collect_stego_images(embedding_method: str):
    return info_file[info_file[METHOD_COLUMN] == embedding_method][STEGO_COLUMN]


stego_images_by_method = {method: STEGOS_PATH / collect_stego_images(method) for method in embedding_methods}
{method: len(stego_images) for method, stego_images in stego_images_by_method.items()}

{'MobiStego': 3060,
 'PixelKnot': 3060,
 'PocketStego': 3060,
 'Pictograph': 4800,
 'SteganographyM': 3060,
 'Passlok': 1530}

## Try to detect a signature

The next step is to try to detect a signature in the payloads corresponding to the embedding methods.
 
### Extract the payloads

First, we will extract the LSBs of the stego images.
Some embedders may construct the payloads starting from the most significant bits (MSB) and others from the least significant bits (LSB)
which leads to the different implementations ending on `msb` and `lsb` respectively.

Furthermore, we can optimize the extraction performance by doing a bitwise or-operation over the whole payload
if the embedder used the first or to the power of 2 LSBs which is reflected in the implementations containing `opt` in their name.

Finally, we operate asynchronously to speed up the extraction process.

In [6]:
def _extract_bits_opt_lsb(data, bits: int):
    div = 8 // bits
    message = np.zeros(len(data) // div, dtype=np.uint8)
    mask = (1 << bits) - 1
    for i in range(div):
        shift = bits * i
        message |= (data[i::div] & mask) << shift
    return message


def _extract_bits_opt_msb(data, bits: int):
    div = 8 // bits
    message = np.zeros(len(data) // div, dtype=np.uint8)
    mask = (1 << bits) - 1
    for i in range(div):
        shift = 8 - bits - (bits * i)
        message |= (data[i::div] & mask) << shift
    return message


def _extract_bits_lsb(data, bits: int):
    msg_byte = 0
    shift = 0
    message = []
    mask = (1 << bits) - 1
    for byte in data:
        msg_byte |= (byte & mask) << shift
        shift += bits
        if shift >= 8:
            tmp = msg_byte >> 8
            message.append(msg_byte & 0xFF)
            msg_byte = tmp
            shift -= 8
    return np.array(message)


def _extract_bits_msb(data, bits: int):
    msg_byte = 0
    shift = 8 - bits
    message = []
    mask = (1 << bits) - 1
    for byte in data:
        msg_byte |= (byte & mask) << shift
        shift += bits
        if shift <= 0:
            tmp = msg_byte >> 8
            message.append(msg_byte & 0xFF)
            msg_byte = tmp
            shift += 8
    return np.array(message)


async def _load_image(img_path: Path, convert_mode='RGB', channels=None):    
    with Image.open(img_path) as img:
        arr = np.array(img.convert(convert_mode))
        
    channels = channels.split() if channels else None
    if convert_mode == 'RGB' and 0 < len(channels) < 3:
        arr = arr[..., [0, 1, 2][:len(channels)]]
    elif convert_mode == 'RGBA' and 0 < len(channels) < 4:
        arr = arr[..., [0, 1, 2, 3][:len(channels)]]
    return arr.reshape(-1)

async def _extract_message(img_path: Path, bits: int, direction='msb', convert_mode='RGB', channels=None):
    data = await _load_image(img_path, convert_mode, channels)
    if bits == 1 or bits.bit_count() == 1:
        if direction == 'msb':
            return _extract_bits_opt_msb(data, bits)
        else:
            return _extract_bits_opt_lsb(data, bits)
    else:
        if direction == 'msb':
            return _extract_bits_msb(data, bits)
        else:
            return _extract_bits_lsb(data, bits)


async def extract_messages(images, bits: int = 1, direction='msb', embedding_method=None, convert_mode='RGB', channels=None):
    if hasattr(images, '__aiter__'):
        tasks = [(img, _extract_message(img, bits, direction, convert_mode, channels)) async for img in images]
    elif hasattr(images, '__iter__'):
        tasks = [(img, _extract_message(img, bits, direction, convert_mode, channels)) for img in images]
    else:
        raise ValueError('diff_images must be an iterable or an async iterable')

    for (img, task) in tqdm(tasks,
                            desc=f'Extracting {bits}-LSBs' + (f' for {embedding_method}' if embedding_method else '') + (f' with {direction.upper()} direction' if direction else '')):
        yield img, await task