Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Commit

Permalink
Merge branch 'release/7.0.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
ampledata committed Aug 18, 2017
2 parents cf79671 + 1b28ee7 commit 2f06fef
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 33 deletions.
17 changes: 9 additions & 8 deletions aprs/classes.py
Expand Up @@ -40,10 +40,11 @@ class Frame(object):

def __init__(self, source: bytes=b'', destination: bytes=b'',
path: list=[], info: bytes=b'') -> None:
self.source = aprs.Callsign(source)
self.destination = aprs.Callsign(destination)
self.source = aprs.parse_callsign(source)
self.destination = aprs.parse_callsign(destination)
# TODO: Add parse_path function
self.path = path
self.info = aprs.InformationField(info)
self.info = aprs.parse_info_field(info)

def __repr__(self) -> str:
"""
Expand Down Expand Up @@ -524,14 +525,14 @@ def __init__(self, source: bytes, destination: bytes, path: typing.List,
self.lat = lat
self.lng = lng
self.ambiguity = ambiguity
frame = self.create_frame()
super(PositionFrame, self).__init__(source, destination, path, frame)
info = self.create_info_field()
super(PositionFrame, self).__init__(source, destination, path, info)

def create_frame(self) -> bytes:
def create_info_field(self) -> bytes:
enc_lat = aprs.dec2dm_lat(self.lat)
enc_lat_amb = aprs.ambiguate(enc_lat, self.ambiguity)
enc_lat_amb = bytes(aprs.ambiguate(enc_lat, self.ambiguity), 'UTF-8')
enc_lng = aprs.dec2dm_lng(self.lng)
enc_lng_amb = aprs.ambiguate(enc_lng, self.ambiguity)
enc_lng_amb = bytes(aprs.ambiguate(enc_lng, self.ambiguity), 'UTF-8')
frame = [
b'=',
enc_lat_amb,
Expand Down
58 changes: 35 additions & 23 deletions aprs/functions.py
Expand Up @@ -65,18 +65,22 @@ def parse_frame_ax25(raw_frame: bytes) -> AprsFrame:
Parses and Extracts the components of an AX.25-Encoded Frame.
"""
parsed_frame = aprs.Frame()
kiss_call = False

_frame = raw_frame.strip(aprs.AX25_FLAG)
_frame = _frame.lstrip(aprs.KISS_DATA_FRAME)
_frame = _frame.rstrip(aprs.KISS_DATA_FRAME)
if (_frame.startswith(aprs.KISS_DATA_FRAME) or
_frame.endswith(aprs.KISS_DATA_FRAME)):
_frame = _frame.lstrip(aprs.KISS_DATA_FRAME)
_frame = _frame.rstrip(aprs.KISS_DATA_FRAME)
kiss_call = True

# Use these two fields as the address/information delimiter
frame_addressing, frame_information = _frame.split(aprs.ADDR_INFO_DELIM)

info_field = frame_information.rstrip(b'\xFF\x07')

destination = parse_callsign_ax25(frame_addressing)
source = parse_callsign_ax25(frame_addressing[7:])
destination = parse_callsign_ax25(frame_addressing, kiss_call)
source = parse_callsign_ax25(frame_addressing[7:], kiss_call)

paths = frame_addressing[7+7:]
n_paths = int(len(paths) / 7)
Expand Down Expand Up @@ -132,7 +136,7 @@ def parse_callsign_text(raw_callsign: bytes) -> AprsCallsign:
return parsed_callsign


def parse_callsign_ax25(raw_callsign: bytes) -> AprsCallsign:
def parse_callsign_ax25(raw_callsign: bytes, kiss_call: bool=False) -> AprsCallsign:
"""
Extracts a Callsign and SSID from a AX.25 Encoded APRS Frame.
Expand Down Expand Up @@ -162,9 +166,12 @@ def parse_callsign_ax25(raw_callsign: bytes) -> AprsCallsign:

# FIXME gba@20170809: This works for KISS frames, but not otherwise.
# Should consult: https://github.com/chrissnell/GoBalloon/blob/master/ax25/encoder.go
# if seven_chunk >> 1 & 0x80:
if seven_chunk & 0x80:
digi = True
if kiss_call:
if seven_chunk >> 1 & 0x80:
digi = True
else:
if seven_chunk & 0x80:
digi = True

parsed_callsign.set_callsign(_callsign)
parsed_callsign.set_ssid(ssid)
Expand All @@ -173,21 +180,26 @@ def parse_callsign_ax25(raw_callsign: bytes) -> AprsCallsign:
return parsed_callsign


def parse_info_field(data: bytes, handler=None) -> bytes:
data_type = b''
data_type_field = chr(data[0])
data_type = aprs.DATA_TYPE_MAP.get(data_type_field)

if data_type:
if handler:
handler_func = getattr(
handler,
"handle_data_type_%s" % data_type,
None
)
return handler_func(data, data_type)

return aprs.InformationField(data, data_type, safe=True)
def parse_info_field(raw_data: bytes, handler=None) -> bytes:
if not raw_data:
return bytes()
elif isinstance(raw_data, aprs.InformationField):
return raw_data
elif isinstance(raw_data, bytes) or isinstance(raw_data, bytearray):
data_type = b''
data_type_field = chr(raw_data[0])
data_type = aprs.DATA_TYPE_MAP.get(data_type_field)

if data_type:
if handler:
handler_func = getattr(
handler,
"handle_data_type_%s" % data_type,
None
)
return handler_func(raw_data, data_type)

return aprs.InformationField(raw_data, data_type, safe=True)


def default_data_handler(data: bytes, data_type: bytes) -> bytes:
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Expand Up @@ -16,7 +16,7 @@
import setuptools

__title__ = 'aprs'
__version__ = '7.0.0rc2'
__version__ = '7.0.0'
__author__ = 'Greg Albrecht W2GMD <oss@undef.net>' # NOQA pylint: disable=R0801
__copyright__ = 'Copyright 2017 Greg Albrecht and Contributors' # NOQA pylint: disable=R0801
__license__ = 'Apache License, Version 2.0' # NOQA pylint: disable=R0801
Expand Down Expand Up @@ -47,7 +47,7 @@ def publish():
url='https://github.com/ampledata/aprs',
zip_safe=False,
include_package_data=True,
setup_requires=[
tests_require=[
'coverage >= 4.4.1',
'nose >= 1.3.7',
'httpretty >= 0.8.14'
Expand Down
2 changes: 2 additions & 0 deletions tests/test_aprsframe.py
Expand Up @@ -54,8 +54,10 @@ def test_decode_aprs_frame(self):
"%s>APOTC1,WIDE1-1,WIDE2-1:!3745.94N/12228.05W>118/010/"
"A=000269 http://w2gmd.org/ Twitter: @ampledata" %
self.real_callsign)
self._logger.info('frame=%s', frame)

aprs_frame = aprs.parse_frame(frame)
self._logger.info('aprs_frame=%s', aprs_frame)

self.assertEqual(str(aprs_frame), frame)
self.assertEqual(str(aprs_frame.source), self.real_callsign)
Expand Down

0 comments on commit 2f06fef

Please sign in to comment.