In [8]:
import numpy as np
import matplotlib.pyplot as plt
import sys
import os
import copy
import itertools as it
import functools as ft

In [9]:
def bytes_to_int_arr(byts):
    return [b for b in byts]


def pop_byte(byte_arr):
    return byte_arr.pop(0)


def pop_bytes(byte_arr, length):
    return bytes([pop_byte(byte_arr) for _ in range(length)])


def pop_uint8(byte_arr):
    return pop_byte(byte_arr)


def pop_uint32(byte_arr, byteorder='little'):
    byts = pop_bytes(byte_arr, 4)
    return int.from_bytes(byts, byteorder)
    
    
def pop_string(byte_arr, length, encoding='utf-8'):
    byts = pop_bytes(byte_arr, length)
    return byts.decode(encoding)
    
        
def indent(string):
    indent_str = ' ' * 3
    return indent_str + (('\n' + indent_str).join(string.splitlines()))

In [10]:
slice_footer = bytes([7, ] + ([0, ] * 31))
slice_footer_len = len(slice_footer)

packet_header_one = bytes([1, 16, 192, 219])
packet_header_one_len = len(packet_header_one)

packet_header_two = bytes([1, 0, 0, 16])
packet_header_two_len = len(packet_header_two)

end_of_packets_signal = bytes([0, 0, 0, 0])
end_of_packets_signal_len = len(end_of_packets_signal)


packet_ender_one = bytes([0, 1, 1, 129])

class PAMS_Packet:
    def __init__(self, header=None, lens=None, data=None, footer=None):
        self.header = header
        self.lens = lens
        self.data = bytes(data) if data is not None else data
        self.footer= bytes(footer) if footer is not None else footer
        
    def __repr__(self):
        repr_str = '''{:s}(
   header={},
   lens={},
   data={},
   footer={})'''.format(
            self.__class__.__name__,
            [bytes_to_int_arr(x) for x in self.header],
            self.lens,
            bytes_to_int_arr(self.data),
            bytes_to_int_arr(self.footer))
        return repr_str
    

class PhotoactivationMaskSlice:
    
    def __init__(self, lens=None, footer=None, data=None):
        self.lens = lens
        self.footer = footer
        
        self._data = None
        self._raw_data = None
        
        if data is not None:
            self.data = data

    def __repr__(self):
        if self.data is None:
            data_str = None
        else:
            data_str = ('[\n' 
                        + indent(',\n'.join(indent(repr(x)) 
                                            for x in ([bytes_to_int_arr(self.data[0])] 
                                                      + [bytes_to_int_arr(self.data[1])]
                                                      + [bytes_to_int_arr(self.data[2])] 
                                                      + self.data[3:-1] 
                                                      + [(bytes_to_int_arr(self.data[-1]) if type(self.data[-1]) is bytes else self.data[-1]), ]))) 
                        + ']')
        
        repr_str = '''{:s}(
   lens={},
   footer={:s},
   data={:s})'''.format(
            self.__class__.__name__,
            self.lens,
            repr(self.footer),
            data_str)
        return repr_str

    @staticmethod
    def build_from_bytes(byts):
        pass
    
    @property
    def data(self):
        return self._data
    
    @data.setter
    def data(self, data):
        if data is None:
            self._data = None
            self._raw_data = None
        else:
            self._raw_data = bytes(data)
            self._data = self.parse_data(self._raw_data)
    
    @staticmethod
    def parse_data(byts):
        data_bytearray = bytearray(byts)
        data = []
        
        data.append(pop_bytes(data_bytearray, 4))
        
        first_header_one = pop_bytes(data_bytearray, 4)
        assert first_header_one == packet_header_one
        
        data.append(packet_header_one)
        
        next_bytes = bytearray()
        while len(data_bytearray) > 4 and not (data_bytearray[:4] == packet_header_two):
            next_bytes += pop_bytes(data_bytearray, 4)
        
        data.append(bytes(next_bytes))
        
        if len(data_bytearray) == 4:
            data.append(pop_bytes(data_bytearray, 4))
        else:   
            packet_arr = None
            
            while data_bytearray:
                at_packet_header_two = data_bytearray[:4] == packet_header_two[:4]
                
                if at_packet_header_two or len(data_bytearray) == 4:     
                    if packet_arr is not None:
                        # assert packet_arr[:4] == packet_header_one
                        packet_header.append(bytes(packet_arr[:4]))
                     
                        data.append(PAMS_Packet(header=packet_header,
                                                lens=packet_lens,                                                
                                                data=packet_arr[4:-4],
                                                footer=packet_arr[-4:]))

                    if at_packet_header_two:
                        packet_header = [pop_bytes(data_bytearray, 4)]
                        packet_arr = bytearray()
                        packet_header.append(bytes(data_bytearray[:4]))
                        packet_lens = [pop_uint32(data_bytearray)]
                        packet_arr += pop_bytes(data_bytearray, packet_lens[-1])
                    else:
                        end_of_bytes = pop_bytes(data_bytearray, 4)
                        data.append(end_of_bytes)
                else:
                    # packet_lens.append(pop_uint32(data_bytearray))
                    packet_arr += pop_bytes(data_bytearray, 1)
                
        return data
                
            
            

        
class PhotoactivationMask:
    
    def __init__(self, name=None, shape=None, slices=None,):
        self.shape = shape
        self.slices = list() if slices is None else slices
        self.name = str() if name is None else name

    def __repr__(self):
        repr_str = '''{:s}(
   name={:s},
   shape={},
   slices=[{:s}])'''.format(
            self.__class__.__name__,
            repr(self.name), 
            self.shape, 
            '' if not self.slices else ('\n' + 
            indent(',\n'.join(indent(repr(sl)) for sl in self.slices))))
        return repr_str
        
    @staticmethod
    def build_from_binary(byts):
        byts = bytearray(byts)
        pam = PhotoactivationMask()
        
        name_length = pop_uint8(byts)
        pam.name = pop_string(byts, name_length)
        
        pam.shape = tuple(pop_uint32(byts) for _ in range(3))
        
        slice_byts = None
        
        while byts:
            if slice_byts is None:
                slice_byts = bytearray()
                slice_lens = [pop_uint32(byts)]
                _other_len = pop_uint32(byts)
                assert slice_lens[0] == (_other_len + 8)
                slice_byts += pop_bytes(byts, slice_lens[0])
                
            elif byts[:slice_footer_len] == slice_footer:
                pop_bytes(byts, slice_footer_len)
                pam_slice = PhotoactivationMaskSlice(
                    lens=slice_lens,
                    footer='slice_footer',
                    data=slice_byts)
                pam.slices.append(pam_slice)
                
                slice_byts = None
                
            else:
                _next_len = int.from_bytes(byts[:4], 'little')
                slice_lens.append(_next_len)
                slice_byts += pop_bytes(byts, _next_len + 4)
        
        assert len(pam.slices) == pam.shape[2]
        
        return pam
    
    @staticmethod
    def build_from_file(path):
        with open(path, 'rb') as fle:
            byts = fle.read()
        return PhotoactivationMask.build_from_binary(byts)

In [11]:
data_dir = 'data'
file_name = 'test_z_mask_06.pam'

file_path = os.path.join(data_dir, file_name)

pam = PhotoactivationMask.build_from_file(file_path)
print(pam)

PhotoactivationMask(
   name='test_mask_06',
   shape=(128, 128, 10),
   slices=[
      PhotoactivationMaskSlice(
         lens=[20],
         footer='slice_footer',
         data=[
            [14, 129, 0, 108],
            [1, 16, 192, 219],
            [0, 0, 0, 0, 2, 0, 0, 16],
            [0, 0, 0, 0]]),
      PhotoactivationMaskSlice(
         lens=[20],
         footer='slice_footer',
         data=[
            [14, 129, 0, 108],
            [1, 16, 192, 219],
            [0, 0, 0, 0, 2, 0, 0, 16],
            [0, 0, 0, 0]]),
      PhotoactivationMaskSlice(
         lens=[36],
         footer='slice_footer',
         data=[
            [98, 250, 255, 121],
            [1, 16, 192, 219],
            [0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 67, 0, 0, 0, 67],
            [0, 0, 0, 0]]),
      PhotoactivationMaskSlice(
         lens=[36],
         footer='slice_footer',
         data=[
            [98, 250, 255, 121],
            [1, 16, 192, 219],
            [0,

In [244]:
n = 3
b = pam.slices[n - 1]._raw_data
print(int.from_bytes(b[:4], 'little'))
print(bytes_to_int_arr(b[:4]))
print(bytes_to_int_arr(b[4:]))

def byte_xor(x, y):
    return bytes([_x ^ _y for _x, _y in zip(x, y)])

chunked_b = [b[(i * 4):((i * 4) + 4)]
             for i in range(1, round(len(b) / 4))]

chunked_ints = [int.from_bytes(x, 'little') for x in chunked_b]
print([bytes_to_int_arr(x) for x in chunked_b])
print(chunked_ints)

int_sum = np.array(sum(chunked_ints))
dt = np.dtype('uint32')
print(int_sum.astype(dt))

y = ft.reduce(byte_xor, chunked_b)

print(bytes_to_int_arr(y))

3925301432
[184, 88, 247, 233]
[1, 16, 192, 219, 0, 0, 0, 0, 1, 0, 0, 16, 32, 0, 0, 0, 1, 16, 192, 219, 4, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 1, 1, 129, 0, 0, 0, 0]
[[1, 16, 192, 219], [0, 0, 0, 0], [1, 0, 0, 16], [32, 0, 0, 0], [1, 16, 192, 219], [4, 0, 0, 0], [0, 64, 0, 0], [0, 0, 0, 0], [1, 0, 0, 0], [1, 0, 1, 0], [0, 0, 1, 0], [0, 1, 1, 129], [0, 0, 0, 0]]
[3686797313, 0, 268435457, 32, 3686797313, 4, 16384, 0, 1, 65537, 65536, 2164326656, 0]
1216569641
[37, 65, 1, 145]


In [219]:
all_mask_data = bytes_to_int_arr(bytes().join(sl.data for sl in pam.slices))
mask_data_np = np.array(all_mask_data)
original_size = mask_data_np.size

vis_width = 20
new_shape = (np.ceil(mask_data_np.size / vis_width).astype(int), vis_width)
mask_data_vis = np.resize(mask_data_np.astype(float), new_shape)

num_extra_values = mask_data_vis.size - original_size
extra_value_idxs = mask_data_vis.size - (np.arange(num_extra_values) + 1)
extra_value_slice = np.unravel_index(extra_value_idxs, mask_data_vis.shape)
mask_data_vis[extra_value_slice] = np.NaN

plt.matshow(mask_data_vis, cmap='magma')

TypeError: sequence item 0: expected a bytes-like object, list found

In [357]:
_ = [print('header[0] = {:7d}, len(data) = {:7d}, len(data) - header[0] = {:7d}'.format(
        sl.header[0], 
        len(sl.data),
        len(sl.data) - sl.header[0]))
     for sl in pam.slices]

header[0] =      20, len(data) =      20, len(data) - header[0] =       0
header[0] =      20, len(data) =      20, len(data) - header[0] =       0
header[0] =   10120, len(data) =   17596, len(data) - header[0] =    7476
header[0] =   30520, len(data) =   45220, len(data) - header[0] =   14700
header[0] =   41560, len(data) =   97260, len(data) - header[0] =   55700
header[0] =   50296, len(data) =  149172, len(data) - header[0] =   98876
header[0] =   50816, len(data) =  157832, len(data) - header[0] =  107016
header[0] =  209388, len(data) =  319844, len(data) - header[0] =  110456
header[0] =  229088, len(data) =  757496, len(data) - header[0] =  528408
header[0] =  246380, len(data) =  986772, len(data) - header[0] =  740392


In [29]:
file_path_1 = os.path.join(data_dir, 'test_z_mask_07.pam')
file_path_2 = os.path.join(data_dir, 'test_z_mask_08.pam')

pam_1 = PhotoactivationMask.build_from_file(file_path_1)
pam_2 = PhotoactivationMask.build_from_file(file_path_2)

print(pam_1.slices[2].data[:4])
_ = [print('{}:{}'.format((i * 4), ((i * 4) + 4))) for i in range(1, round(len(pam_1.slices[2].data) / 4))]
x = [pam_1.slices[2].data[(i * 4):((i * 4) + 4)]
     for i in range(1, round(len(pam_1.slices[2].data) / 4))]
print(x)

def byte_xor(x, y):
    return bytes([_x ^ _y for _x, _y in zip(x, y)])

print(int.from_bytes(ft.reduce(byte_xor, x), 'little'))
print(int.from_bytes(pam_1.slices[3].data[:4], 'little') - int.from_bytes(pam_1.slices[0].data[:4], 'little'))


b'\xb8X\xf7\xe9'
4:8
8:12
12:16
16:20
20:24
24:28
28:32
32:36
36:40
40:44
44:48
48:52
52:56
[b'\x01\x10\xc0\xdb', b'\x00\x00\x00\x00', b'\x01\x00\x00\x10', b' \x00\x00\x00', b'\x01\x10\xc0\xdb', b'\x04\x00\x00\x00', b'\x00@\x00\x00', b'\x00\x00\x00\x00', b'\x01\x00\x00\x00', b'\x01\x00\x01\x00', b'\x00\x00\x01\x00', b'\x00\x01\x01\x81', b'\x00\x00\x00\x00']
2432778533
1268825516


In [252]:
point = np.array([2, 3])
bytes().join([int(x).to_bytes(2, 'little') for x in point])

b'\x02\x00\x03\x00'

In [6]:
bytes('hello', 'utf8')

b'hello'