Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added port searching functionality #257

Merged
merged 10 commits into from
Apr 1, 2020
2 changes: 1 addition & 1 deletion hardware/CommunicationsPi/lan_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def run(server_class=HTTPServer, handler_class=S, port=8080):
logging.basicConfig(level=logging.INFO)
server_address = ("", port)
httpd = server_class(server_address, handler_class)
logging.info("Starting httpd...\n")
log.info("Starting httpd...\n")
try:
httpd.serve_forever()
except KeyboardInterrupt:
Expand Down
85 changes: 85 additions & 0 deletions hardware/CommunicationsPi/radio_transceiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import os
import serial
import json
import serial.tools.list_ports

from .utils import get_logger, get_serial_stream


class Transceiver:
def __init__(self, log_file_name=None, port=None):
if log_file_name is None:
self.logging = get_logger("TRANSMITTER_LOG_FILE")
else:
self.logging = get_logger(log_file_name, log_file_name)

self.port = os.environ["RADIO_TRANSMITTER_PORT"] if port is None else port

if not self.port:
self.port_vid = None
self.port_pid = None
self.port_vendor = None
self.port_intf = None
self.port_serial_number = None
self.find_port()

baudrate = 9600
parity = serial.PARITY_NONE
stopbits = serial.STOPBITS_ONE
bytesize = serial.EIGHTBITS
timeout = 1

self.logging.info("Opening serial on: " + str(self.port))
self.serial = serial.Serial(
port=self.port,
baudrate=baudrate,
parity=parity,
stopbits=stopbits,
bytesize=bytesize,
timeout=timeout,
)

def find_port(self):
for port in serial.tools.list_ports.comports():
if self.is_usb_serial(port):
self.logging.info("Port device found: " + str(port.device))
self.port = port.device
return

return

def is_usb_serial(self, port):
if port.vid is None:
return False
if self.port_vid is not None:
if port.vid != self.port_vid:
return False
if self.port_pid is not None:
if port.pid != self.port_pid:
return False
if self.port_vendor is not None:
if not port.manufacturer.startswith(self.port_vendor):
return False
if self.port_serial_number is not None:
if not port.serial_number.startswith(self.port_serial_number):
return False
if self.port_intf is not None:
if port.interface is None or self.port_intf not in port.interface:
return False
return True

def send(self, payload):
self.logging.info("sending")
self.serial.write(get_serial_stream(payload))
self.logging.info(payload)

def listen(self):
payload = self.serial.readline().decode("utf-8")
message = "Error: Check logs"
if payload != "":
try:
message = json.loads(payload)
self.logging.info(message)
except json.JSONDecodeError:
self.logging.error(json.JSONDecodeError)
return message
27 changes: 0 additions & 27 deletions hardware/CommunicationsPi/serial_read.py

This file was deleted.

28 changes: 0 additions & 28 deletions hardware/CommunicationsPi/serial_write.py

This file was deleted.

6 changes: 4 additions & 2 deletions hardware/CommunicationsPi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ def get_serial_stream(s):
return (json.dumps(s) + "\n").encode()


def get_logger(key):
logger = Logger(name=key, filename=os.environ[key])
def get_logger(key, file_name=None):
if file_name is None:
file_name = key
logger = Logger(name=key, filename=os.environ[file_name])
return logger