/
framing.py
301 lines (260 loc) · 12.6 KB
/
framing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
"""
framing.py:
This module defines all the framing constructs used when adapting the F prime ground system for use with communications
or elsewhere in the system. There is an abstract base class called FramerDeframe, which defines the standard pattern of
'frame', 'deframe', and the helper 'deframe_all'. This allows users to frame and deframe packets using base classes
that implement this pattern. The current list of implementation classes are:
1. FpFramerDeframer: a class used to write the now-standard F prime ground packet format
2. TcpServerFramerDeframer: a non-symmetric framer/deframer for use interacting with the Tcp Server packet format
@author lestarch
"""
import abc
import copy
import struct
import sys
from typing import Type
from .checksum import calculate_checksum, CHECKSUM_MAPPING
from fprime_gds.plugin.definitions import gds_plugin_implementation, gds_plugin_specification
class FramerDeframer(abc.ABC):
"""
Abstract base class of the Framer/Deframer variety. Framers and Deframers have to define two methods, one for
framing a set of bytes and one for deframing a set of bytes into packets.
"""
@abc.abstractmethod
def frame(self, data):
"""
Frames outgoing data in the specified format. Expects incoming raw bytes to frame, and adds on the needed header
and footer bytes. This new array of bytes is returned from the method.
:param data: bytes to frame
:return: array of raw bytes representing a framed packet. Should be ready for uplink.
"""
@abc.abstractmethod
def deframe(self, data, no_copy=False):
"""
Deframes the incoming data from the specified format. Produces exactly one packet, and leftover bytes. Users
wanting all packets to be deframed should call "deframe_all". If no full packet is available, this method
returns None. Expects incoming raw bytes to deframe, and returns a deframed packet or None, the leftover
bytes that were unused, and any bytes discarded from the existing data stream. Will search and discard data up
until a start token is found. Note: data will be consumed up to the first start token found.
:param data: framed data bytes
:param no_copy: (optional) will prevent extra copy if True, but "data" input will be destroyed.
:return: (packet as array of bytes or None, leftover bytes, any discarded data)
"""
def deframe_all(self, data, no_copy):
"""
Deframes all available packets found in a single set of bytes by calling deframe until a None packet is
retrieved. This list of packets, and the remaining bytes are returned
:param data: framed data bytes
:param no_copy: (optional) will prevent extra copy if True, but "data" input will be destroyed.
:return: list of packets, remaining data, discarded/unframed/garbage data
"""
packets = []
if not no_copy:
data = copy.copy(data)
discarded_aggregate = b""
while True:
# Deframe and return only on None
(packet, data, discarded) = self.deframe(data, no_copy=True)
discarded_aggregate += discarded
if packet is None:
return packets, data, discarded_aggregate
packets.append(packet)
@classmethod
@gds_plugin_specification
def register_framing_plugin(cls) -> Type["FramerDeframer"]:
"""Register a plugin to provide framing capabilities
Plugin hook for registering a plugin that supplies a FramerDeframer implementation. Implementors of this hook must
return a non-abstract subclass of FramerDeframer. This class will be provided as a framing implementation option
that users may select via command line arguments.
Note: users should return the class, not an instance of the class. Needed arguments for instantiation are
determined from class methods, solicited via the command line, and provided at construction time to the chosen
instantiation.
Returns:
FramerDeframer subclass
"""
raise NotImplementedError()
class FpFramerDeframer(FramerDeframer):
"""
Framing object used to read/write in the standard F prime format. This format is compatible with the standard ground
system. It contains the following format for data:
| token: start word |
| token: data length |
| bytes: F prime packet bytes (Fw::Comm, Fw::FilePacket, etc.) |
| token: checksum |
Where a token is a big-endian integer of TOKEN_SIZE bytes in length (see below)
"""
# Size of an F prime framing token, and the type based on that size
TOKEN_SIZE = 4
# Total size of header data based on token size
HEADER_SIZE = TOKEN_SIZE * 2
# Size of checksum value, and the hardcoded value before CRC32 is available
CHECKSUM_SIZE = 4
MAXIMUM_DATA_SIZE = 4096
# Filled by set_constants()
TOKEN_TYPE = None
HEADER_FORMAT = None
START_TOKEN = None
def __init__(self, checksum_type):
"""Sets constants on construction."""
# Setup the constants as soon as possible.
FpFramerDeframer.set_constants()
self.checksum = checksum_type
@classmethod
def set_constants(cls):
"""
Setup the constants for the various token sizes. This will ensure that the system can read the tokens properly.
This can be changed to make the framing more efficient.
"""
if FpFramerDeframer.TOKEN_SIZE == 4:
FpFramerDeframer.TOKEN_TYPE = "I"
FpFramerDeframer.START_TOKEN = 0xDEADBEEF
elif FpFramerDeframer.TOKEN_SIZE == 2:
FpFramerDeframer.TOKEN_TYPE = "H"
FpFramerDeframer.START_TOKEN = 0xBEEF
elif FpFramerDeframer.TOKEN_SIZE == 1:
FpFramerDeframer.TOKEN_TYPE = "B"
FpFramerDeframer.START_TOKEN = 0xEF
else:
msg = f"Invalid TOKEN_SIZE of {FpFramerDeframer.TOKEN_SIZE}"
raise ValueError(msg)
FpFramerDeframer.HEADER_FORMAT = ">" + (FpFramerDeframer.TOKEN_TYPE * 2)
def frame(self, data):
"""
Frames outgoing data in the F prime standard format. Expects incoming raw bytes to frame, and adds on the
needed framing tokens to the front and end of the bytes.
:param data: bytes to frame
:return: array of raw bytes representing a framed packet. Should be ready for uplink.
"""
framed = struct.pack(
FpFramerDeframer.HEADER_FORMAT, FpFramerDeframer.START_TOKEN, len(data)
)
framed += data
framed += struct.pack(">I", calculate_checksum(framed, self.checksum))
return framed
def deframe(self, data, no_copy=False):
"""
Deframes the incoming data from the F prime standard format. Produces exactly one packet, and leftover bytes.
Users wanting all packets to be deframed should call "deframe_all". If no full packet is available, this method
returns None. Expects incoming raw bytes to deframe, and returns a deframed packet or None, and the leftover
bytes that were unused. Will search and discard data up until a start token is found. Note: data will be
consumed up to the first start token found.
:param data: framed data bytes
:param no_copy: (optional) will prevent extra copy if True, but "data" input will be destroyed.
:return: (packet as array of bytes or None, leftover bytes)
"""
discarded = b""
if not no_copy:
data = copy.copy(data)
# Continue until there is not enough data for the header, or until a packet is found (return)
while len(data) >= FpFramerDeframer.HEADER_SIZE:
# Read header information including start token and size and check if we have enough for the total size
start, data_size = struct.unpack_from(FpFramerDeframer.HEADER_FORMAT, data)
total_size = (
FpFramerDeframer.HEADER_SIZE
+ data_size
+ FpFramerDeframer.CHECKSUM_SIZE
)
# Invalid frame, rotate away a Byte and keep processing
if (
start != FpFramerDeframer.START_TOKEN
or data_size >= FpFramerDeframer.MAXIMUM_DATA_SIZE
):
discarded += data[0:1]
data = data[1:]
continue
# If the pool is large enough to read the whole frame, then read it
if len(data) >= total_size:
deframed, check = struct.unpack_from(
f">{data_size}sI", data, FpFramerDeframer.HEADER_SIZE
)
# If the checksum is valid, return the packet. Otherwise continue to rotate
if check == calculate_checksum(
data[: data_size + FpFramerDeframer.HEADER_SIZE],
self.checksum
):
data = data[total_size:]
return deframed, data, discarded
print(
"[WARNING] Checksum validation failed. Have you correctly set '--comm-checksum-type'",
file=sys.stderr,
)
# Bad checksum, rotate 1 and keep looking for non-garbage
discarded += data[0:1]
data = data[1:]
continue
# Case of not enough data for a full packet, return hoping for more later
return None, data, discarded
return None, data, discarded
@classmethod
def get_name(cls):
""" Get the name of this plugin """
return "fprime"
@classmethod
def get_arguments(cls):
""" Get arguments for the framer/deframer """
return {("--comm-checksum-type",): {
"dest": "checksum_type",
"action": "store",
"type": str,
"help": "Setup the checksum algorithm. [default: %(default)s]",
"choices": [
item
for item in CHECKSUM_MAPPING.keys()
if item != "default"
],
"default": "crc32",
}}
@classmethod
@gds_plugin_implementation
def register_framing_plugin(cls):
""" Register a bad plugin """
return cls
class TcpServerFramerDeframer(FramerDeframer):
"""
Framing object used to read/write in the Tcp Server format. This format is compatible with the older ground
system. It contains the following formats for data:
Uplink (command) data:
| string: 'ZZZZ' |
| bytes: F prime packet bytes (Fw::Comm, Fw::FilePacket, etc.) |
Downlink (ground-system) data:
| string: 'A5A5 GUI ' |
| U32: length of data |
| bytes: F prime packet bytes (Fw::Comm, Fw::FilePacket, etc.) |
WARNING: this is a non-symmetric framer/deframer as the Tcp Server's protocol isn't symmetric. Therefore, it should
never be used to deframe its own framed data.
"""
START_STRING = "ZZZZ"
def frame(self, data):
"""
Frames outgoing data in the Tcp server outgoing format. Expects incoming raw bytes to frame, and adds on the
needed framing tokens to the front and end of the bytes.
:param data: bytes to frame
:return: array of raw bytes representing a framed packet. Should be ready for uplink.
"""
return b"A5A5 GUI %s%s" % (struct.pack(">I", len(data)), data)
def deframe(self, data, no_copy=False):
"""
Deframes the incoming data from the F prime standard format. Produces exactly one packet, and leftover bytes.
Users wanting all packets to be deframed should call "deframe_all". If no full packet is available, this method
returns None. Expects incoming raw bytes to deframe, and returns a deframed packet or None, and the leftover
bytes that were unused. Will search and discard data up until a start token is found. Note: data will be
consumed up to the first start token found.
:param data: framed data bytes
:param no_copy: (optional) will prevent extra copy if True, but "data" input will be destroyed.
:return: (packet as array of bytes or None, leftover bytes)
"""
if not no_copy:
data = copy.copy(data)
# Shift over to ZZZZ
while len(data) > 4 and data[:4] != b"ZZZZ":
data = data[1:]
# Break out of data when not enough
if len(data) < 8:
return None, data, b""
# Read the length and break if not enough data
(data_len,) = struct.unpack_from(">I", data, 4)
if len(data) < data_len + 8:
return None, data, b""
packet = data[8 : data_len + 8]
data = data[data_len + 8 :]
return packet, data, b""