In [1]:
# import itertools
from typing import Literal, Tuple
import numpy as np

# from tqdm import tqdm

def ndarr_to_bytes_arr(a: np.ndarray):
    assert isinstance(a.dtype.itemsize, int) and a.dtype.itemsize>=1
    newshape = a.shape + (a.dtype.itemsize,)
    dtype = np.dtype('=u1') # force little-endian

    a_decon = np.frombuffer(a.tobytes(order='C'), dtype=dtype).reshape(newshape)
    return np.flip(a_decon, axis=-1)

def bytes_arr_to_ndarr(a: np.ndarray, dtype=np.uint8, shape=None):
    # assert isinstance(dtype.itemsize, int) and dtype.itemsize>=1

    if shape is None:
        shape = a.shape[0:-1]

    dtype_new = np.dtype(dtype)
    dtype_new = dtype_new.newbyteorder('=')

    return np.frombuffer(np.flip(a, axis=-1).tobytes(order='C'), dtype=dtype_new).reshape(shape)

def test_ndarr_to_bytes_arr():
    def test_recon(a: np.ndarray):
        a_bytes = ndarr_to_bytes_arr(a)
        a_recover = bytes_arr_to_ndarr(a_bytes, dtype=a.dtype, shape=a.shape)
        assert np.array_equal(a, a_recover)

    def test(test_type:Literal['arrange', 'random_intarr', 'random_floatarr'],shape:Tuple[int], dtype=np.uint8, low=None, high=None):
        def get_dtype_limits(dtype):
            dtype = dtype
            
            if np.issubdtype(dtype, np.integer):
                info = np.iinfo(dtype)
                low, high = info.min, info.max
            elif np.issubdtype(dtype, np.floating):
                low, high = 0,1
            else:
                raise TypeError("Unsupported dtype")
            
            return low, high
        
        if low is None or high is None:
            low, high = get_dtype_limits(dtype)

        n = np.prod(shape)

        if test_type == 'arrange':
            a = np.arange(n, dtype=dtype).reshape(shape)
        elif test_type == 'random_intarr':
            assert np.issubdtype(dtype, np.integer), "dtype should be integer"
            a = np.random.randint(low, high, size=n, dtype=dtype).reshape(shape)
        elif test_type == 'random_floatarr':
            assert np.issubdtype(dtype, np.floating), "dtype should be floating"
            a = np.random.uniform(low, high, size=n).astype(dtype).reshape(shape)

        test_recon(a)
    from tqdm.contrib.itertools import product
    for m, ndim, dtype in product(range(1, 5), range(1, 10), [np.uint8, np.int8, np.int16, np.int32, np.int64, np.float32, np.float64]):
        shape = (m,)*ndim
        test('arrange', shape, dtype)
        if np.issubdtype(dtype, np.integer):
            test('random_intarr', shape, dtype)
        if np.issubdtype(dtype, np.floating):
            test('random_floatarr', shape, dtype)

    a = np.array([1.9991, -1.9991, 1.999, -1.999], dtype=np.float16)
    test_recon(a)
        

test_ndarr_to_bytes_arr()

  from .autonotebook import tqdm as notebook_tqdm
100%|██████████| 252/252 [00:00<00:00, 8048.65it/s]


In [2]:
dtype = np.float16
a = ndarr_to_bytes_arr(np.array([0.123, 0.5, 0.77], dtype=dtype))
print(a.shape, a.dtype, a)

a_recon = bytes_arr_to_ndarr(a, dtype=dtype)
print(a_recon.shape, a_recon)

(3, 2) uint8 [[ 47 223]
 [ 56   0]
 [ 58  41]]
(3,) [0.123 0.5   0.77 ]


In [3]:
dtype = np.float16

a = np.random.uniform(low=0, high=1, size=30).astype(dtype).reshape((3,5,2))
print(a.shape, a.dtype)
print(a[0,0,0])

a_decon = ndarr_to_bytes_arr(a)
print(a_decon.shape, a_decon.dtype)
print(a_decon[0,0,0])

a_recon = bytes_arr_to_ndarr(a_decon, dtype=a.dtype)
print(a_recon.shape, a_recon.dtype)
print(a_recon[0,0,0])

(3, 5, 2) float16
0.4731
(3, 5, 2, 2) uint8
[ 55 146]
(3, 5, 2) float16
0.4731


In [1]:
from typing import Literal
import numpy as np

def ret_binstr_arr(a: np.ndarray):
    return [np.binary_repr(x, 8) for x in a]

def zero_x_lsbs(a: np.ndarray, X: int=2):
    assert a.ndim >= 2
    assert 0 <= X <= 8*a.shape[-1]
    assert a.dtype==np.uint8

    q = X // 8
    r = X % 8

    full_zero_masks = [int(f'0b{"0"*8}', 2) for _ in range(q)]
    remainder_zero_mask = int(f'0b{"1"*(8-r)}{"0"*r}', 2)
    identity_masks = [int(f'0b{"1"*8}', 2) for _ in range(a.shape[-1] - (q+1))]

    final_masks = tuple(reversed(full_zero_masks + [remainder_zero_mask] + identity_masks))
    print(ret_binstr_arr(final_masks))
    return np.bitwise_and(a, final_masks)

def check_str(s: str, rule: Literal['zeros', 'ones', 'bitstr']):
    allowed = None
    if rule == 'zeros':
        allowed = set(('0',))
    elif rule == 'ones':
        allowed = set(('1',))
    elif rule == 'bitstr':
        allowed = set(('0','1'))
    else:
        raise Exception()

    return isinstance(s, str) and all([c in allowed for c in s])

def test_check_str():
    def ret_checks(s):
        is_bitstr = check_str(s, 'bitstr')
        is_zeros = check_str(s, 'zeros')
        is_ones = check_str(s, 'ones')

        return (is_bitstr, is_zeros, is_ones)
    def check(s, vals):
        checks = ret_checks(s)

        assert vals == checks, f'check failed: s: {s}, true: {vals}, got: {checks}'

    for i in range(1,10):
        s = "0"*i
        
        check(s, (True, True, False))

        s = "1"*i
        check(s, (True, False, True))

        s = "01"*i
        check(s, (True, False, False))

        s = "0"*i + "2" + "a"
        check(s, (False, False, False))

from time import time

def test_zero_x_lsbs(test_speed=False):
    n=int(1e8)
    a = np.random.randint(0, 255, size=(n,), dtype=np.uint8)
    a_bin = ret_binstr_arr(a)

    if test_speed:
        start = time()

    for X in range(0, 8):
        a_zeroed = zero_x_lsbs(a, X)
        if test_speed:
            continue

        a_zeroed_bin = ret_binstr_arr(a_zeroed)

        assert a_zeroed.shape == a.shape

        is_msb_eq = [x[0:-X] == y[0:-X] for x, y in zip(a_bin, a_zeroed_bin)]
        is_lsb_zeroed = [check_str(y[-X:], rule='zeros') for y in a_zeroed_bin]

        assert all(is_msb_eq), print(a_bin, a_zeroed_bin, is_lsb_zeroed)

        if X == 0:
            continue

        assert all(is_lsb_zeroed), print(X, a_bin, a_zeroed_bin, is_lsb_zeroed, [y[-X:] for y in a_zeroed_bin], sep='\n')

    if test_speed:
        print(f"Time: {time()-start}")

# test_check_str()
# test_zero_x_lsbs(test_speed=True)

def emplace(host: np.ndarray, s_bits=None, X: int=2):
    assert 0 <= X <= 8
    assert len(host.shape)==1

    host_bytes = ndarr_to_bytes_arr(host)
    host_X_LSBs_zeroed = zero_x_lsbs(host_bytes, X)

    return host_X_LSBs_zeroed

# host = np.arange(10, dtype=np.uint8)
# emplace(host, X=1)

In [21]:
dtype = np.float16

a = np.random.uniform(low=0, high=1, size=30).astype(dtype).reshape((3,5,2))
print(a.shape, a.dtype)
print(a[0,0,0])

a_decon = ndarr_to_bytes_arr(a)
print(a_decon.shape, a_decon.dtype)
print(a_decon[0,0,0])

a_recon = bytes_arr_to_ndarr(a_decon, dtype=a.dtype)
print(a_recon.shape, a_recon.dtype)
print(a_recon[0,0,0])

b = zero_x_lsbs(a_decon, 3)
print(b[0,0,0])

(3, 5, 2) float16
0.466
(3, 5, 2, 2) uint8
[ 55 117]
(3, 5, 2) float16
0.466
['11111111', '11111000']
[ 55 112]


In [41]:
import numpy as np
import bitstring

def get_file_bytes(filepath: str):
    with open(filepath, 'rb') as f:
        data = f.read()

    return data

def get_malware_bytes(filepath: str, X: int=2, fill=True, n_w:int=100):
    data = get_file_bytes(filepath)

    bs = bitstring.Bits(data)

    if fill:
        n = n_w*X
        bs*=(n//len(bs))+1

        bs = bs[0:n]

    chunks = list(tuple(byte_chunks.uint for byte_chunks in x_bits.cut(8)) for x_bits in bs.cut(X, count=len(bs.bin)//X))

    if len(bs)%X!=0:
        chunk_last_byte = bs[(len(bs)//X)*X:].uint
        chunk_last = (chunk_last_byte,) + tuple(0 for _ in range(X//8))
        chunks.append(chunk_last)

    chunks = np.flip(np.array(chunks, dtype=np.uint8), axis=-1)

    return chunks

In [42]:
chunks = get_malware_bytes('malware/050ef', X=20, fill=True, n_w=10)
chunks

array([[  7, 117,  37],
       [ 14,  70, 148],
       [  4, 121, 119],
       [  4,   5, 183],
       [  4, 106,  87],
       [  3, 165, 100],
       [  6,  81, 117],
       [ 10, 119, 100],
       [  4,  78,  82],
       [  3, 165,  39]], dtype=uint8)

In [37]:
import numpy as np

data = get_file_bytes('malware/050ef')
# a = np.frombuffer(data, dtype=np.uint8)

import bitstring
bs = bitstring.Bits(data)[0:16]
print(bs.bin)
print(len(bs))
print(len(bs.bin))

n_b = 16
# ones_mask = int(f'0b{"1"*8}', 2)


X=9
# chunks = []
for i, x_bits in enumerate(bs.cut(X)):
    print(f'chunk {i}')
    # curr_chunk = []
    for j, byte_chunks in enumerate(x_bits.cut(8)):
        print(f'byte {j}: {byte_chunks.uint}', end=' ')
        # curr_chunk.append(byte_chunks.uint)
    # chunks.append(curr_chunk)
    print()

chunks = list(tuple(byte_chunks.uint for byte_chunks in x_bits.cut(8)) for x_bits in bs.cut(X, count=len(bs.bin)//X))
chunk_last_byte = bs[(len(bs)//X)*X:].uint
chunk_last = (chunk_last_byte,) + tuple(0 for _ in range(X//8))
print('chunk_last:', chunk_last)

chunks.append(chunk_last)

chunks = np.flip(np.array(chunks, dtype=np.uint8), axis=-1)
chunks

0010010101110101
16
16
chunk 0
byte 0: 37 byte 1: 0 
chunk 1
byte 0: 117 
chunk_last: (117, 0)


array([[  0,  37],
       [  0, 117]], dtype=uint8)

In [3]:
len(a)

15291390

In [9]:
import numpy as np

a = np.array([1,2,3,4,5,6,7,8,9,10], dtype=np.float16)
a.dtype.itemsize

2