Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions modules/data_encoding/message_encoding_decoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,23 @@


def encode_position_global(
worker_name: str, global_position: position_global.PositionGlobal
worker_id: worker_enum.WorkerEnum, global_position: position_global.PositionGlobal
) -> "tuple[True, bytes] | tuple[False, None]":
"""
Encode PositionGlobal object into Bytes. Worker_ID to be encoded as the first byte of the message

Parameters:
worker_name: name of the worker defined by the name of its own file (worker_name = pathlib.Path(__file__).stem)
worker_id: ID of the worker defined by its constant in WorkerEnum
global_position: PositionGlobal object

Returns:
packed_coordinates (bytes): Encoded latitude, longitude, altitude of PositionGlobal object as bytes.
First byte dependant on which worker is calling the funciton, value depends on its corresponding enum value.
"""
try:
worker_id = worker_enum.WorkerEnum[worker_name.upper()]
if not worker_id: # If worker ID is not in the Enum Class
if not isinstance(
worker_id, worker_enum.WorkerEnum
): # If worker ID is not in the Enum Class
return False, None

# Encode message using PositionGlobal's latitude, longitude, altitude, with the worker ID in the front
Expand Down
9 changes: 5 additions & 4 deletions modules/data_encoding/metadata_encoding_decoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,23 @@


def encode_metadata(
worker_name: str, number_of_messages: int
worker_id: worker_enum.WorkerEnum, number_of_messages: int
) -> "tuple[True, bytes] | tuple[False, None]":
"""
Encode PositionGlobal object into Bytes. Worker_ID to be encoded as the first byte of the message

Parameters:
worker_name: name of the worker defined by the name of its own file (worker_name = pathlib.Path(__file__).stem)
worker_id: ID of the worker defined by its constant in WorkerEnum
number_of_messages: number of messages intended to be sent

Returns:
packed_coordinates (bytes): Encoded int corresponding to number of messages as bytes.
First byte dependant on which worker is calling the funciton, value depends on its corresponding enum value (see worker_enum.py)
"""
try:
worker_id = worker_enum.WorkerEnum[worker_name.upper()]
if not worker_id: # If worker ID is not in the Enum Class
if not isinstance(
worker_id, worker_enum.WorkerEnum
): # If worker ID is not in the Enum Class
return False, None

# Encode message using PositionGlobal's latitude, longitude, altitude, with the worker ID in the front
Expand Down
2 changes: 1 addition & 1 deletion modules/data_encoding/worker_enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ class WorkerEnum(Enum):
COMMUNICATIONS_WORKER = 2
DATA_MERGE_WORKER = 3
DETECT_TARGET_WORKER = 4
FLIGHT_INTERFERENCE_WORKER = 5
FLIGHT_INTERFACE_WORKER = 5
GEOLOCATION_WORKER = 6
VIDEO_INPUT_WORKER = 7
8 changes: 4 additions & 4 deletions tests/unit/data_encoding/test_message_encoding_decoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ def test_encoding_decoding() -> None:
"""
Function to test encoding
"""
# Step 1: Create a worker_name and PositionGlobal object
worker_name = "communications_worker" # =3 in Worker_Enum
# Step 1: Create a worker_id and PositionGlobal object
worker_id = worker_enum.WorkerEnum.COMMUNICATIONS_WORKER # =3 in Worker_Enum
success, original_position = position_global.PositionGlobal.create(
latitude=34.24902422, longitude=84.6233434, altitude=27.4343424
)
assert success

# Step 2: Encode the PositionGlobal object
result, encoded_bytes = message_encoding_decoding.encode_position_global(
worker_name, original_position
worker_id, original_position
)
assert result

Expand All @@ -31,7 +31,7 @@ def test_encoding_decoding() -> None:
assert result

# Step 4: Validate that the original and decoded objects match
assert worker_enum.WorkerEnum[worker_name.upper()] == worker # Checks if Enum type Matches
assert worker_id == worker # Checks if Enum type Matches

assert original_position.latitude == decoded_position.latitude
assert original_position.longitude == decoded_position.longitude
Expand Down
10 changes: 4 additions & 6 deletions tests/unit/data_encoding/test_metadata_encoding_decoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ def test_encoding_metadata() -> None:
"""
Function to test encoding
"""
# Step 1: Create a worker_name and PositionGlobal object
worker_name = "communications_worker" # =3 in worker_enum.py
# Step 1: Create a worker_id and PositionGlobal object
worker_id = worker_enum.WorkerEnum.COMMUNICATIONS_WORKER # =3 in worker_enum.py
number_of_messages = 5

# Step 2: Encode the WorkerEnum ID and number of messages
result, encoded_bytes = metadata_encoding_decoding.encode_metadata(
worker_name, number_of_messages
worker_id, number_of_messages
)
assert result

Expand All @@ -27,8 +27,6 @@ def test_encoding_metadata() -> None:
assert result

# Step 4: Validate that the original and decoded objects match
assert (
worker_enum.WorkerEnum[worker_name.upper()] == worker_class
) # Checks if Enum type Matches
assert worker_id == worker_class # Checks if Enum type Matches

assert number_of_messages == decoded_number_of_messages