Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: load from byte array for live parsing #83

Closed
scandey opened this issue Aug 1, 2023 · 8 comments
Closed

Feature Request: load from byte array for live parsing #83

scandey opened this issue Aug 1, 2023 · 8 comments

Comments

@scandey
Copy link

scandey commented Aug 1, 2023

It seems like CCSDSpy would be viable for live plotting of telemetry if I could pass in a byte string/array of chunks of mixed binary data and process a second's worth of data at a time.

Absolutely reasonable to say this is out of scope, but it feels close to being viable with the existing structure. I see that _load wants a numpy array of bytes and I wish I could just dump that array in directly rather than saving to file first. For my particular data stream, I can guarantee even splits between packets so hopefully it'd be relatively clean.

In the longer term, allowing such a thing might benefit from a little bit of overhead to handle the case of missing bytes or extra bytes (if a packet is split across a chunk boundary). Both cases would preferably (to me) still return the successful packets up to that point and then return the incomplete packets or extra bytes for handling at a higher level.

@ddasilva
Copy link
Collaborator

ddasilva commented Aug 3, 2023

Hi @scandey ,

This is an interesting idea, and I think it's a more simple to do than we might think. I think we could make this work by modifying utils.iter_packet_bytes() to incrementally call .read(n) on a file-like object to read the next n bytes as needed, blocking for more input when its not available. Then, we could do something like this to handle streaming telemetry over a socket:

  1. Open a socket as a file-like object
  2. For each packet bytes in util_iter_packet_bytes(socket_file)
    • Get the APID with utils.read_primary_headers(packet_bytes)
    • Parse the packet bytes with the correct packet definition for the given APID
    • Do something with that parsed packet

Do you have a use case in mind for your project? It would cool to actually prove this works end-to-end.

In the longer term, allowing such a thing might benefit from a little bit of overhead to handle the case of missing bytes or extra bytes (if a packet is split across a chunk boundary). Both cases would preferably (to me) still return the successful packets up to that point and then return the incomplete packets or extra bytes for handling at a higher level.

Right now a number of functions will issue a warning if bytes are missing at the end of the stream to complete the length stated in the last packet (utils.iter_packet_bytes() does this). I have been thinking it actually makes more sense to report extra bytes instead of missing bytes. If you have an incomplete header at the end of the file, it's not quite possible to determine how many bytes are missing from the last packet.

@scandey
Copy link
Author

scandey commented Aug 4, 2023

I'm working on internal calibration for the electric fields instrument on the TRACERS spacecraft (project/SOC at UIowa, instrument at UC Berkeley). I have lots of relatively simple CCSDS packets streaming in that I currently chunk into files of few minutes at a time to look at with CCSDSpy. I currently split the files by APID into new files and read with CCSDSpy, but it would be nice to iterate over packets, grab the ones I want into byte arrays by APID using util_iter_packet_bytes and split_by_apid, and only do the full loading process on a subset (without making a series of files). That would also serve nicely for pulling in live data from a socket (though I'd still chunk the data elsewhere for archive).

I agree that number of extra bytes is better than missing bytes, even better if there's an option to return the bytes themselves (so they can be easily tacked on to the top of the next array/file as desired). All of this still assumes that the packets are generally well formed and that eventually well-formed packets are guaranteed to arrive (so no state machines needed).

@ddasilva
Copy link
Collaborator

ddasilva commented Aug 4, 2023

Thanks for explaining @scandey ! Does the solution I posted above with iter_packet_bytes() work for you? If not, do you have any ideas for what an API for this might look like?

@scandey
Copy link
Author

scandey commented Aug 4, 2023

That's close to what I'd like to do in practice, but I already have a Python-based command and telemetry system that provides bytearrays of raw ccsds packets and it would be nice to drop those into util_iter_packet_bytes() directly (and then split, process and return as you described).

@ddasilva
Copy link
Collaborator

ddasilva commented Aug 9, 2023

Thanks @scandey . Can you be more specific as how these byte arrays are represented in your system and how they stream in? Does a file just grow on disk, or something else? I might have been premature in assuming it was streamed over a socket

@scandey
Copy link
Author

scandey commented Aug 9, 2023

Sorry for confusing things! There are two very similar use-cases I'm working with at the moment. One of them is indeed a socket-based version for real-time analysis. I'm currently using an internal python-based command/data handling system which could easily dump well-formed CCSDS packets over a socket for real time analysis. The other use-case is mostly an extension of the first: I have plain CCSDS packets (no frame/sync info) going into a file (all the data from that same socket concept, but saved to a fresh file every 10 minutes). Packets are guaranteed to not split across file boundaries and I'm pretty confident that a half-saved file will also have all well-formed CCSDS packets (assuming I am handling the flush operations correctly).

For either/both of these cases, I'd like to harness the nice packet definition system of CCSDSpy to quickly inspect and collect packets one at a time in the style of iter_packet_bytes. I'd like one byte array (as provided by iter_packet_bytes) to be passed directly into a packet.load (looking at just the primary header, for instance) and then depending on the APID pass the same byte array into a different packet.load. I realize this intentionally defeats the purpose of the efficient numpy array processing (by only operating on one packet at a time). It also might end up taking up a Lot of memory for larger packets, depending on implementation. I'm tentatively hopeful that it is doable, even if it isn't the most efficient option for long-term processing.

The big benefit to me is that I can go straight from a mixed-APID file to data analysis in one step, rather than having to save out a file for each APID and then load that file back in. Pseudo-python logic for the chunked file as I imagine it:

byte_arrays = {} # per APID
# file is a mixed APID, assumed to all be well-formed packets
iterator_of_byte_arrays = utils.iter_packet_bytes(file)
byte_array = next(iterator_of_byte_arrays)
apid = primary_header_packet.load(byte_array)['CCSDS_APID'][0] # there's only one value in the returned array
if apid == my_favorite_packet_apid:
    packet_dict = my_favorite_packet.load(byte_array)
    # do something cool with the single packet (same dictionary of numpy array format)
    cool_thing(packet_dict)
    # and maybe save byte array to a larger collection of byte_arrays per APID held in memory for later use?
    byte_arrays[apid].append(byte_array)

# later in data analysis, once you've built up a long byte array for each APID
all_my_favorite_packets_dict = my_favorite_packet.load(byte_arrays[my_favorite_packet_apid])
# do something even cooler with many packets
cooler_thing(all_my_favorite_packets_dict)

EDIT: utils.split_by_apid can replace a lot of my logic above, so the simplified code would look like:

# file is a mixed APID, assumed to all be well-formed packets
streams_by_apid_dict = utils.split_by_apid(file)
favorite_packet_byte_array = numpy.frombuffer(streams_by_apid[my_favorite_packet_apid].getbuffer(), "u1")
all_my_favorite_packets_dict = my_favorite_packet.load(favorite_packet_byte_array)

I'm not sure the BytesIO from split_by_apid can be directly read as numpy.frombuffer and whether reading it like that would prevent the BytesIO object from being mutated later (to add more data if desired).

EDIT2: I went ahead and forked and adjusted the load/_load parameters (quick and messy change) to allow for passing numpy byte arrays directly, seems to work okay on first glance, though the BytesIO objects cannot be resized later (not sure how to get rid of the view provided by bytesio_object.getbuffer()). Since at the moment split_by_apid doesn't take existing ByteIO streams, the buffers are fixed once returned by split_by_apid and so this problem is moot.

@ddasilva
Copy link
Collaborator

ddasilva commented Aug 17, 2023

Hey Scott, just got back from vacation.

It should be easier to use utils.get_packet_apid() to determine the APID:
https://docs.ccsdspy.org/en/latest/api/ccsdspy.utils.get_packet_apid.html#ccsdspy.utils.get_packet_apid

I'm not sure why you are using numpy.frombuffer() on the BytesIO instance before passing to .load(). The .load() methods as well as all util.split_by_apid() and util.iter_packet_bytes() will take file-like objects directly (BytesIO is a file-like object). Maybe this is easier than you thought?

Does this solve your issue?

@scandey
Copy link
Author

scandey commented Aug 22, 2023

I guess I was making it massively overcomplicated, that seems to work. I'm not sure now how I got stuck on that idea of passing buffers around as opposed to BytesIO objects... maybe the issue that prompted that choice will turn up again but for now I'll close this massively overly-complicated issue. Thank you for taking the time to walk me through the logic!

(I'll split out the network / live feed idea into a separate issue and the extra bytes not missing bytes into another separate issue, since this one is quite... overloaded by my confusion from last week).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants