Skip to content

Commit

Permalink
pylint
Browse files Browse the repository at this point in the history
  • Loading branch information
attdona committed Oct 4, 2017
1 parent 02784e5 commit 22b8288
Showing 1 changed file with 47 additions and 27 deletions.
74 changes: 47 additions & 27 deletions pynais/junction.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@
import asyncio
import signal
import socket
import websockets
import logging
import json
import sys
from contextlib import contextmanager
import traceback
import concurrent
import websockets

from hbmqtt.client import MQTTClient, ClientException
from hbmqtt.mqtt.constants import QOS_1, QOS_2
from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_1

from pynais import SYNC_START_B, SYNC_END_B, SLINE, DLINE
from pynais import SYNC_START_B, SYNC_END_B, DLINE
from pynais.nais import is_protobuf, entity_from, is_ack, marshall, unmarshall, ConnectionClosed
from pynais.packet import Packet

Expand Down Expand Up @@ -45,33 +43,33 @@
LOG.setLevel(logging.DEBUG)

def my_ip_address():
"""Return the public ip of the local host
"""return the public ip of the local host
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect(("8.8.8.8", 80))
return sock.getsockname()[0]

class Channel:
"""Encapsulate the (reader, writer) stream pair
"""encapsulate the (reader, writer) stream pair
"""

def __init__(self, reader, writer):
self.reader = reader
self.writer = writer

def close(self):
"""Close the channel
"""close the channel
"""
self.writer.close()

def pretty_print_peers(peers):
"""Return the peers list formatted as a readable string
"""return the peers list formatted as a readable string
"""
return ["{}".format(peer) for peer in peers]


class MessageTape:
"""State machine for processing input bytes
"""state machine for processing input bytes
"""
def __init__(self):
self.expected_len = 65536
Expand Down Expand Up @@ -125,7 +123,7 @@ def add_char(self, channel):
# return self.msg

def receive_from(reader, expect_json=False):
"""Return a message from a channel
"""return a message from a channel
Detect the type of message, currently protobuf encoded or ascii
strings \n terminated.
Expand All @@ -142,7 +140,7 @@ def receive_from(reader, expect_json=False):

read_tape = True

while (read_tape):
while read_tape:

channel = reader(1)
if (channel == b''):
Expand All @@ -152,11 +150,13 @@ def receive_from(reader, expect_json=False):

return parser.msg


#pylint: disable=C0103
class create_connection:

"""wrapper class around a network socket
"""
def __init__(self, address):
self.address = address
self.sock = None

def __enter__(self, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None):
self.sock = socket.create_connection(
Expand All @@ -167,36 +167,55 @@ def __exit__(self, *args):
self.sock.close()

def proto_send(self, msg):
"""send a protobuf `msg`
"""
self.sock.send(marshall(msg))

def ascii_send(self, msg):
"""send an ascii `msg`
"""
if type(msg) == str:
msg = msg.encode('utf-8')

n = self.sock.send(msg)

def ascii_receive(self, expect_json=False):
"""block until an ascii message is received
An ascii message is \n terminated
Returns:
bytes: a \n terminated ascii message
"""
while True:
msg = receive_from(self.sock.recv, expect_json)

if not is_protobuf(msg):
return msg

def proto_receive(self):
"""block until a protobuf message is received
Returns:
object: a protobuf instance
"""
while True:
msg = receive_from(self.sock.recv)

if is_protobuf(msg):
return unmarshall(msg)

#pylint: enable=C0103



async def connect(host, port):
"""coroutine connect as soon as the server is available. return a Channel
"""
while True:
try:
reader, writer = await asyncio.open_connection(host, port)
await asyncio.sleep(0.1)
#await asyncio.sleep(0.1)
return Channel(reader, writer)
except ConnectionRefusedError:
pass
Expand All @@ -207,10 +226,9 @@ async def wsconnect(host, port):
"""
while True:
try:
fd = await websockets.client.connect('ws://{}:{}'.format(host, port))
await asyncio.sleep(0.1)
LOG.debug('connected to ws://{}:{}'.format(host, port))
return fd
sock = await websockets.client.connect('ws://{}:{}'.format(host, port))
LOG.debug('connected to ws://%s:%d', host, port)
return sock
except ConnectionRefusedError:
pass

Expand All @@ -227,29 +245,31 @@ async def proto_send(channel, message, wait_ack=False):
async def msg_send(channel, message, wait_ack=False):
"""coroutine send an array of bytes or a string ``message``
"""
if (type(message) == str):
if isinstance(message, str):
message = message.encode('utf-8')
channel.writer.write(message)
while wait_ack:
data = await msg_receive(channel)

LOG.debug("msg_send (waiting for ack): recv |%r|", data)
response = entity_from(data)
if (not is_ack(response, message)):
LOG.debug("ACK expected, ignoring msg: |%s|" % response)
if not is_ack(response, message):
LOG.debug("ACK expected, ignoring msg: |%s|", response)
else:
wait_ack = False


async def proto_receive(channel, expect_json=False):
"""coroutine block until a protobuf message is received
"""
while True:
msg = await msg_receive(channel, expect_json)
if is_protobuf(msg):
return unmarshall(msg)


async def msg_receive(channel, expect_json=False):
"""Return a message from a channel
"""coroutine block until a message string/json/protobuf is received
Detect the type of message, currently protobuf encoded or ascii
strings \n terminated.
Expand All @@ -268,7 +288,7 @@ async def msg_receive(channel, expect_json=False):
while read_tape:

chan = await channel.reader.read(1)
if (chan == b''):
if chan == b'':
raise ConnectionClosed("connection closed: {}".format(channel.reader))

read_tape = parser.add_char(chan)
Expand Down Expand Up @@ -301,13 +321,13 @@ def __init__(self, type):
self.msg_queue = asyncio.Queue()

def alloc_sid(self):
""" Return a unique line instance id
"""return a unique line instance id
"""
Line.sid += 1
return Line.sid

def free_sid(self, id):
"""Do nothing
"""do nothing
To be implemented if a previous line id may be reused by a new line
instance
Expand Down

0 comments on commit 22b8288

Please sign in to comment.