## Settings

In [5]:

import scapy.all as scp
import struct
import numpy as np
import matplotlib.pyplot as plt

from typing import DefaultDict

from IrisBackendv3.data_standards import DataStandards
from IrisBackendv3.data_standards.logging import logger as DsLogger

from IrisBackendv3.codec.magic import Magic, MAGIC_SIZE

from IrisBackendv3.utils.basic import print_bytearray_hex as printraw

In [6]:
DsLogger.setLevel('CRITICAL')
standards = DataStandards.build_standards()
standards.print_overview()

Data Standards Overview: [
[1m[40m[35m
	Module[256]::BlockDriver[0m
[47m[30m
		Commands:[0m
[47m[30m
		Telemetry:[0m
[31m
			0.	Channel[0]::BdCycles: uint32[0m
[47m[30m
		Events:[0m
[1m[40m[35m
	Module[512]::RateGroupDriver[0m
[47m[30m
		Commands:[0m
[47m[30m
		Telemetry:[0m
[47m[30m
		Events:[0m
[1m[40m[35m
	Module[768]::ActiveRateGroup-RateGroupLowFreq[0m
[47m[30m
		Commands:[0m
[47m[30m
		Telemetry:[0m
[31m
			0.	Channel[0]::RgMaxTime: uint32[0m
[31m
			1.	Channel[1]::RgCycleSlips: uint32[0m
[47m[30m
		Events:[0m
[34m
			0.	Event[0]::RateGroupStarted[][0m
[34m
			1.	Event[1]::RateGroupCycleSlip[cycle: uint32][0m
[1m[40m[35m
	Module[1536]::CubeRoverTime[0m
[47m[30m
		Commands:[0m
[47m[30m
		Telemetry:[0m
[47m[30m
		Events:[0m
[1m[40m[35m
	Module[1792]::TlmChan[0m
[47m[30m
		Commands:[0m
[47m[30m
		Telemetry:[0m
[47m[30m
		Events:[0m
[1m[40m[35m
	Module[2048]::CommandDispatcher[0m
[47m[30m
		Commands:[0m

In [9]:
from IrisBackendv3.codec.fsw_data_codec import encode
bo = '<'
seq_num = 0x00
def pack(name_m: str, name_c: str, data: bytes) -> bytes:
    global seq_num
    module = standards.modules[name_m]
    cmd = module.commands[name_m+'_'+name_c]

    magic = Magic.COMMAND
    magic_bytes = magic.encode(byte_order=bo)
    opcode = struct.pack(bo+'H', module.ID | cmd.ID)

    seq_num = seq_num + 1
    vlp_len = MAGIC_SIZE + 2 + len(data)
    chk = 0x00
    CPH = struct.pack(bo+'B H B', seq_num, vlp_len, chk)

    out = CPH + magic_bytes + opcode + data
    return out
    
data = struct.pack(bo+'B H', 0x01, 0x0005) # 77 for modes, 60 for prep/deploy
packet = pack('Camera', 'TakeImage', data) 
printraw(packet)
print(packet.hex())

01 09 00 00 55 da ba 00 01 11 01 05 00
0109000055daba000111010500


In [14]:
from IrisBackendv3.codec.payload import Payload, CommandPayload
from IrisBackendv3.codec.metadata import DataPathway, DataSource
from IrisBackendv3.codec.magic import Magic

Payload.use_standards(standards)

def pack_watchdog(command_name: str, **kwargs):
    module, command = standards.global_command_lookup(command_name)
    payload = CommandPayload(
        pathway = DataPathway.WIRED,
        source = DataSource.GENERATED,
        magic = Magic.WATCHDOG_COMMAND,
        module_id = module.ID,
        command_id = command.ID,
        args = kwargs
    )
    payload.encode()
packet = pack_hercules('Camera_TakeImage', camera_num=0x01, callback_id = 0x05)
printraw(packet)

NameError: name 'CommandPayload' is not defined

In [1]:
from __future__ import annotations
from typing import Optional, Dict, List
import struct
import pickle
from abc import abstractmethod

class Dummy:
    __slots__: List[str] = [
        '_raw',
        '_endianness_code'
    ]

    _raw: Optional[bytes]
    _endianness_code: str

    def __init__(self,
        raw: Optional[bytes] = None,
        endianness_code = '!'
    ) -> None:
        self._raw = raw
        self._endianness_code = endianness_code

    @abstractmethod
    def encode(self) -> bytes:
        raise NotImplementedError()

    @classmethod
    @abstractmethod
    def decode(cls, raw: bytes, endianness_code:str) -> Dummy:
        raise NotImplementedError()

    def __reduce__(self) -> Tuple[Callable, Tuple[bytes, str], Optional[str]]:
        print("Dummy Reduce")

        if self._raw is None:
            self._raw = self.encode()

        if hasattr(self, '__getstate__'):
            state = self.__getattribute__('__getstate__')()
        else:
            state = None

        # "Callable object" returned will be the decoding function:
        # If a subclassed object is reduced, it will call that subclass' `decode`
        # function (assuming it's been implemented).
        return (self.__class__.decode, (self._raw, self._endianness_code), state)

class DummyThicc(Dummy):
    __slots__: List[str] = [
        'other_thing',
        '_data'
    ]

    other_thing: str
    _data: int

    def __init__(self,
        data: int,
        other_thing: str = "",
        raw: Optional[bytes] = None,
        endianness_code = '!'
    ) -> None:
        self.other_thing = other_thing
        self._data = data
        super().__init__(raw, endianness_code)

    def encode(self) -> bytes:
        print("Encoding")
        return struct.pack(self._endianness_code+'L', self._data)

    @classmethod
    def decode(cls, raw: bytes, endianness_code:str) -> Dummy:
        print("Decoding")
        return cls(
            data = struct.unpack(endianness_code+'L', raw)[0],
            raw = raw,
            endianness_code = endianness_code
        )

    def __repr__(self) -> str:
        return f"{self.other_thing}[{self._data}] -> {self._endianness_code}{self._raw}"

    def __eq__(self, other) -> bool:
        return self.other_thing == other.other_thing and self._data == other._data

    def __getstate__(self) -> str:
        print("Getting State")
        return self.other_thing

    def __setstate__(self, state: str) -> None:
        print("Setting State")
        self.other_thing = state


A = DummyThicc(other_thing='Apple', data=5)
B = DummyThicc(other_thing='Dead', data=0xBEEF)
print((A, B))

print('--pickle--')
pA = pickle.dumps(A)
pB = pickle.dumps(B)
print('--unpickle--')
upA = pickle.loads(pA)
upB = pickle.loads(pB)

print((A, B))
print((upA, upB))
print((upA==A, upB==B))

from typing import NamedTuple

class X(NamedTuple):
    a: int
    b: str

x = X(
    a = 0,
    b = '5'
)
x['b']


(Apple[5] -> !None, Dead[48879] -> !None)
--pickle--
Dummy Reduce
Encoding
Getting State
Dummy Reduce
Encoding
Getting State
--unpickle--
Decoding
Setting State
Decoding
Setting State
(Apple[5] -> !b'\x00\x00\x00\x05', Dead[48879] -> !b'\x00\x00\xbe\xef')
(Apple[5] -> !b'\x00\x00\x00\x05', Dead[48879] -> !b'\x00\x00\xbe\xef')
(True, True)


TypeError: tuple indices must be integers or slices, not str

In [2]:
# Data Transport:
file = './test-data/Iris_FSWv1.0.0_210409_Telemetry.pcapng' # PCAP logs
protocol = scp.UDP # Protocol FSW is using to send data
port = 8080 # Port on the spacecraft FSW is sending data to

# Data Formatting Settings:
packetgap = 0 # number of packets to ignore at beginning of pcap
deadspace = 0 # number of bytes of deadspace at the beginning of the
endianness_code = "<" # < = little, > = big, ! = network

# Image Formatting Settings:
image_settings = {
    "width": 2592,
    "height": 1944
}

# Magics:
file_magic = bytearray([0xda,0xba,0xd0,0x00])
if endianness_code == '<':
    file_magic = bytearray([0x00,0xd0,0xba,0xda]) # Little Endian it.

NameError: name 'scp' is not defined

## Decode First Packet
### Grab Packet

In [13]:
pcap = scp.rdpcap(file)

In [14]:
packets = list(filter(lambda x: x.dport == port, pcap[protocol][packetgap:]))
i = 0

In [15]:
packet = packets[i]

In [16]:
content = scp.raw(packet.getlayer(scp.Raw))[deadspace:]
# printraw(content)
packet_bytes = content

### Decode Common Packet Header

In [17]:
CPH_SIZE = 4
CPH = packet_bytes[:CPH_SIZE]
printraw(CPH)
cph_head, checksum = CPH[:-1], CPH[-1:]
seq_num, vlp_len = struct.unpack(endianness_code+' B H', cph_head)
print(seq_num, vlp_len, checksum)
#! TODO: Perform checksum check. (not impl. in FSW atm)
assert vlp_len == len(packet_bytes) - CPH_SIZE

00 cc 03 90
0 972 b'\x90'


### Decode Variable Length Payload
#### Decode First Payload

In [18]:
# Extract the Variable Length Payload
VLP = packet_bytes[CPH_SIZE:]

# Parse VLP:

# Strip off the magic:
magic_bytes, VLP = VLP[:CPH_SIZE], VLP[CPH_SIZE:]
magic = Magic.decode(magic_bytes, byte_order='<') # treat as a byte array, so let it use network
magic

<Magic.TELEMETRY: 3221229823>

In [19]:
if magic == Magic.TELEMETRY:
    # Grab Telemetry Header:
    module_id, channel_id, data_size, VLP = VLP[:1], VLP[1:2], VLP[2:4], VLP[4:]
    print(module_id, channel_id, data_size)
    # Unpack + Format Telemetry Header:
    module_id = struct.unpack(endianness_code+'B', module_id)[0] << 8
    channel_id = struct.unpack(endianness_code+'B', channel_id)[0]
    #! TODO: Not impl. in fsw. Once is, perform check against expected size.
    # ... Rn, replace w/expected size
    data_size = struct.unpack(endianness_code+'H', data_size)[0]
    try:
        module = standards.modules[module_id]
    except KeyError:
        logger.
    exp_data_size = standards.modules[module_id] 

    print(hex(module_id), hex(channel_id), data_size)
    # data, timestamp, VLP = VLP[:data_size], VLP[data_size:data_size+4], VLP[data_size+4:]

SyntaxError: invalid syntax (<ipython-input-19-1472e03e6c7f>, line 14)

In [82]:
standards.modules['Imu'].telemetry

[((['XAcc'], 0), Channel[0]::XAcc: int16), ((['YAcc'], 1), Channel[1]::YAcc: int16), ((['ZAcc'], 2), Channel[2]::ZAcc: int16), ((['XAng'], 3), Channel[3]::XAng: uint16), ((['YAng'], 4), Channel[4]::YAng: uint16), ((['ZAng'], 5), Channel[5]::ZAng: uint16)]

In [81]:
[t.datatype for t in standards.modules['Imu'].telemetry.vals]

[<FswDataType.I16: 'int16'>,
 <FswDataType.I16: 'int16'>,
 <FswDataType.I16: 'int16'>,
 <FswDataType.U16: 'uint16'>,
 <FswDataType.U16: 'uint16'>,
 <FswDataType.U16: 'uint16'>]