Skip to content

Commit

Permalink
improve streams and add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
M0r13n committed Nov 6, 2019
1 parent 2271135 commit 8b04a1a
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 101 deletions.
8 changes: 8 additions & 0 deletions examples/file_stream.py
@@ -0,0 +1,8 @@
from pyais.stream import FileReaderStream

filename = "sample.ais"

for msg in FileReaderStream(filename):
decoded_message = msg.decode()
ais_content = decoded_message.content
# Do something with the ais message
7 changes: 7 additions & 0 deletions examples/sample.ais
@@ -0,0 +1,7 @@
# taken from https://www.aishub.net/ais-dispatcher
!AIVDM,1,1,,A,13HOI:0P0000VOHLCnHQKwvL05Ip,0*23
!AIVDM,1,1,,A,133sVfPP00PD>hRMDH@jNOvN20S8,0*7F
!AIVDM,1,1,,B,100h00PP0@PHFV`Mg5gTH?vNPUIp,0*3B
!AIVDM,1,1,,B,13eaJF0P00Qd388Eew6aagvH85Ip,0*45
!AIVDM,1,1,,A,14eGrSPP00ncMJTO5C6aBwvP2D0?,0*7A
!AIVDM,1,1,,A,15MrVH0000KH<:V:NtBLoqFP2H9:,0*2F
9 changes: 9 additions & 0 deletions examples/single_message.py
@@ -0,0 +1,9 @@
from pyais.messages import NMEAMessage

message = NMEAMessage(b"!AIVDM,1,1,,B,15M67FC000G?ufbE`FepT@3n00Sa,0*5C")
print(message.decode())

# or

message = NMEAMessage.from_string("!AIVDM,1,1,,B,15M67FC000G?ufbE`FepT@3n00Sa,0*5C")
print(message.decode())
10 changes: 10 additions & 0 deletions examples/tcp_stream.py
@@ -0,0 +1,10 @@
from pyais.stream import TCPStream

url = 'ais.exploratorium.edu'
port = 80

for msg in TCPStream(url, port=80):
decoded_message = msg.decode()
ais_content = decoded_message.content
print(ais_content)
# Do something with the ais message
7 changes: 4 additions & 3 deletions pyais/constants.py
Expand Up @@ -30,10 +30,11 @@ class ManeuverIndicator(IntEnum):
NotAvailable = 0
NoSpecialManeuver = 1
SpecialManeuver = 2
UNDEFINED = 3

@classmethod
def _missing_(cls, value):
return ManeuverIndicator.NotAvailable
return ManeuverIndicator.UNDEFINED


class EpfdType(IntEnum):
Expand Down Expand Up @@ -85,7 +86,7 @@ class ShipType(IntEnum):
PortTender = 53
AntiPollutionEquipment = 54
LawEnforcement = 55
Spare_LocalVessel = 56
SPARE = 56
MedicalTransport = 58
NonCombatShip = 59
# 60's
Expand Down Expand Up @@ -130,7 +131,7 @@ def _missing_(cls, value):
return ShipType.HSC_Reserved

if 55 < value < 58:
return ShipType.Spare
return ShipType.SPARE

if 64 < value < 69:
return ShipType.Passenger_Reserved
Expand Down
4 changes: 4 additions & 0 deletions pyais/messages.py
Expand Up @@ -78,6 +78,10 @@ def __init__(self, raw: bytes):
def __str__(self):
return str(self.raw)

@classmethod
def from_string(cls, nmea_str: str):
return cls(str.encode(nmea_str))

@classmethod
def assemble_from_iterable(cls, messages: Sequence):
"""
Expand Down
31 changes: 16 additions & 15 deletions pyais/stream.py
Expand Up @@ -5,16 +5,22 @@

class Stream:

def __init__(self, fobj):
self._fobj = fobj

def __enter__(self):
# Enables use of with statement
return self

def __exit__(self, exc_type, exc_val, exc_tb):
pass
self._fobj.close()

def __iter__(self):
return self._assemble_messages()

def __next__(self):
return next(iter(self))

def _iter_messages(self) -> Iterable[bytes]:
raise NotImplementedError()

Expand Down Expand Up @@ -56,16 +62,13 @@ def __init__(self, filename, mode="rb"):
self.mode = mode
# Try to open file
try:
self.file = open(self.filename, mode=self.mode)
file = open(self.filename, mode=self.mode)
except Exception as e:
raise ValueError(f"Could not open file {self.filename}") from e

def __exit__(self, exc_type, exc_val, exc_tb):
self.file.close()
super().__init__(file)

def _iter_messages(self) -> Iterable[bytes]:
yield from self.file.readlines()
self.file.close()
yield from self._fobj.readlines()


class TCPStream(Stream):
Expand All @@ -76,21 +79,19 @@ class TCPStream(Stream):

BUF_SIZE = 4096

def __init__(self, host: str = 'ais.exploratorium.edu', port: int = 80):
self.sock = socket(AF_INET, SOCK_STREAM)
def __init__(self, host: str, port: int = 80):
sock = socket(AF_INET, SOCK_STREAM)
try:
self.sock.connect((host, port))
sock.connect((host, port))
except ConnectionRefusedError as e:
self.sock.close()
sock.close()
raise ValueError(f"Failed to connect to {host}:{port}") from e

def __exit__(self, exc_type, exc_val, exc_tb):
self.sock.close()
super().__init__(sock)

def _iter_messages(self) -> Iterable[bytes]:
partial = b''
while True:
body = self.sock.recv(self.BUF_SIZE)
body = self._fobj.recv(self.BUF_SIZE)

# Server closed connection
if not body:
Expand Down
29 changes: 29 additions & 0 deletions tests/test_constants.py
@@ -0,0 +1,29 @@
import unittest
from pyais.constants import NavigationStatus, ManeuverIndicator, ShipType, NavAid


class TestConstants(unittest.TestCase):
def test_nav_status(self):
self.assertEqual(NavigationStatus(3), NavigationStatus.RestrictedManoeuverability)
self.assertEqual(NavigationStatus(17), NavigationStatus.Undefined)

def test_maneuver_indication(self):
self.assertEqual(ManeuverIndicator(0), ManeuverIndicator.NotAvailable)
self.assertEqual(ManeuverIndicator(1), ManeuverIndicator.NoSpecialManeuver)
self.assertEqual(ManeuverIndicator(2), ManeuverIndicator.SpecialManeuver)
self.assertEqual(ManeuverIndicator(3), ManeuverIndicator.UNDEFINED)
self.assertEqual(ManeuverIndicator(4), ManeuverIndicator.UNDEFINED)

def test_ship_types(self):
self.assertEqual(ShipType(25), ShipType.WIG_Reserved)
self.assertEqual(ShipType(46), ShipType.HSC_Reserved)
self.assertEqual(ShipType(57), ShipType.SPARE)
self.assertEqual(ShipType(68), ShipType.Passenger_Reserved)
self.assertEqual(ShipType(78), ShipType.Cargo_Reserved)
self.assertEqual(ShipType(85), ShipType.Tanker_Reserved)
self.assertEqual(ShipType(96), ShipType.OtherType_Reserved)
self.assertEqual(ShipType(100), ShipType.NotAvailable)

def test_navaid(self):
self.assertEqual(NavAid(23), NavAid.CARDINAL_MARK_W)
self.assertEqual(NavAid(32), NavAid.DEFAULT)
11 changes: 9 additions & 2 deletions tests/test_file_stream.py
Expand Up @@ -4,16 +4,23 @@


class TestFileReaderStream(unittest.TestCase):
FILENAME = "tests/ais_test_messages"

def test_reader(self):
filename = "tests/ais_test_messages"
messages = [msg for msg in FileReaderStream(filename)]
messages = [msg for msg in FileReaderStream(self.FILENAME)]
assert len(messages) == 6
for msg in messages:
assert type(msg) == NMEAMessage
assert msg.is_valid
assert msg.decode().content is not None

def test_reader_with_open(self):
with FileReaderStream(self.FILENAME) as stream:
msg = next(stream)
assert type(msg) == NMEAMessage
assert msg.is_valid
assert msg.decode().content is not None

def test_invalid_filename(self):
with self.assertRaises(ValueError):
FileReaderStream("doesnotexist")
6 changes: 6 additions & 0 deletions tests/test_nmea.py
Expand Up @@ -31,6 +31,12 @@ def test_single(self):
assert NMEAMessage(single).is_single
assert not NMEAMessage(single).is_multi

def test_from_str(self):
old = NMEAMessage(b"!AIVDM,1,1,,B,15M67FC000G?ufbE`FepT@3n00Sa,0*5C").decode().content
new = NMEAMessage.from_string("!AIVDM,1,1,,B,15M67FC000G?ufbE`FepT@3n00Sa,0*5C").decode().content

assert old == new

def test_message_assembling(self):
multi = NMEAMessage.assemble_from_iterable(messages=[
NMEAMessage(b"!AIVDM,2,1,4,A,55O0W7`00001L@gCWGA2uItLth@DqtL5@F22220j1h742t0Ht0000000,0*08"),
Expand Down
87 changes: 8 additions & 79 deletions tests/test_tcp_stream.py
@@ -1,90 +1,19 @@
import unittest
from pyais.messages import NMEAMessage
from pyais.messages import NMEAMessage, AISMessage
from pyais.stream import TCPStream
import socket
import threading


class MockSocket:
"""
Mock socket that mimics a valid NMEA Stream Server
"""

def __init__(self, messages, host="127.0.0.1", port=12345):
self.messages = messages
self.host = host
self.port = port
self.sock = None

def start_sending(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.host, self.port))
# wait for connection
self.sock.listen()
conn, addr = self.sock.accept()
with conn:
# send messages
for msg in self.messages:
conn.sendall(msg + b"\r\n")
self.sock.close()


def start_mock_server(messages, port):
mock = MockSocket(messages, port=port)
mock.start_sending()


@unittest.skip("Broken on Github Actions")
class TestTCPStream(unittest.TestCase):
def setUp(self) -> None:
self.mock_server_thread = None

def tearDown(self) -> None:
"""
Await open threads
"""
if self.mock_server_thread:
self.mock_server_thread.join()

def test_default_buf_size(self):
with TCPStream() as stream:
assert stream.BUF_SIZE == 4096

def test_socket(self):
messages = [
b"!AIVDM,1,1,,B,15M67FC000G?ufbE`FepT@3n00Sa,0*5C",
b"!AIVDM,1,1,,B,15NG6V0P01G?cFhE`R2IU?wn28R>,0*05",
b"!AIVDM,1,1,,A,15NJQiPOl=G?m:bE`Gpt<aun00S8,0*56"
]
self.mock_server_thread = threading.Thread(target=start_mock_server, args=(messages, 12345))
self.mock_server_thread.start()

with TCPStream("127.0.0.1", 12345) as stream:
received = list(stream._iter_messages())

assert received == messages

def test_assemble_messages(self):
messages = [
b"!AIVDM,2,1,4,A,55O0W7`00001L@gCWGA2uItLth@DqtL5@F22220j1h742t0Ht0000000,0*08",
b"!AIVDM,2,2,4,A,000000000000000,2*20",
b"!AIVDM,2,1,9,B,53nFBv01SJ<thHp6220H4heHTf2222222222221?50:454o<`9QSlUDp,0*09",
b"!AIVDM,2,2,9,B,888888888888880,2*2E",
b"!AIVDM,2,1,6,B,56:fS:D0000000000008v0<QD4r0`T4v3400000t0`D147?ps1P00000,0*3D",
b"!AIVDM,2,2,6,B,000000000000008,2*29"
]

self.mock_server_thread = threading.Thread(target=start_mock_server, args=(messages, 12346))
self.mock_server_thread.start()
self.assertEqual(TCPStream.BUF_SIZE, 4096)

with TCPStream("127.0.0.1", 12346) as stream:
received = list(stream)
def test_socket_with_real_data(self):
for i, msg in enumerate(TCPStream('ais.exploratorium.edu')):
if i >= 10:
break

assert len(received) == 3
for msg in received:
assert isinstance(msg, NMEAMessage)
assert msg.is_multi
self.assertTrue(isinstance(msg, NMEAMessage))
self.assertTrue(isinstance(msg.decode(), AISMessage))

def test_invalid_endpoint(self):
with self.assertRaises(ValueError):
Expand Down
4 changes: 2 additions & 2 deletions tests/various.py
Expand Up @@ -44,11 +44,11 @@ def large_file_test():


def live_demo():
for msg in TCPStream():
for msg in TCPStream('ais.exploratorium.edu'):
print(msg.decode().content)


for msg in FileReaderStream("nmea-sample"):
cont = msg.decode().content
if cont and cont['type'] >= 22:
print(cont['type'], msg.raw)
print(cont['type'], msg.raw, cont)

0 comments on commit 8b04a1a

Please sign in to comment.