Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hardware Team: Created modules for Radio and LAN #259

Merged
merged 19 commits into from Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 40 additions & 20 deletions hardware/CommunicationsPi/lan_client.py
@@ -1,24 +1,44 @@
import os
import time
import json
import requests
from requests.exceptions import HTTPError
from utils import get_logger

logging = get_logger("LAN_CLIENT_LOG_FILE")

url = os.environ["LAN_SERVER"]

logging.info("Pinging")
while True:
try:
payload = {"key1": "value1", "key2": "value2"}
logging.info("data: " + json.dumps(payload))
response = requests.post(url, data=payload)
response.raise_for_status()
except HTTPError as http_err:
logging.error("HTTP error occurred: {}".format(str(http_err)))
except Exception as err:
logging.error("error occurred: {}".format(str(err)))
else:
time.sleep(1)
from .utils import get_logger


class LANClient:
def __init__(self, log_file_name=None, lan_server_url=None):
if log_file_name is None:
self.logging = get_logger("LAN_CLIENT_LOG_FILE")
else:
self.logging = log_file_name

if lan_server_url is None:
self.url = self.get_server_url_from_env()
else:
self.url = lan_server_url

def get_server_url_from_env(self):
ip = os.environ["LAN_SERVER_IP"]
port = os.environ["LAN_PORT"]
return "http://{}:{}".format(ip, port)

# Function to ping the LAN server
# Accepts payload as a python dictionary
def ping_lan_server(self, payload):
self.logging.info("Pinging")

try:
# payload = {"key1": "value1", "key2": "value2"}
self.logging.info("data: " + json.dumps(payload))
response = requests.post(self.url, data=payload)
response.raise_for_status()
return response

except HTTPError as http_err:
self.logging.error("HTTP error occurred: {}".format(str(http_err)))
# re-raised so that it can be handled further up the call stack
raise

except Exception as err:
self.logging.error("error occurred: {}".format(str(err)))
raise
54 changes: 29 additions & 25 deletions hardware/CommunicationsPi/lan_server.py
@@ -1,60 +1,64 @@
#!/usr/bin/env python3

import os
from http.server import BaseHTTPRequestHandler, HTTPServer
import logging
from utils import get_logger
from .utils import get_logger

# Logs status in a file
log = get_logger("LAN_SERVER_LOG_FILE")


class S(BaseHTTPRequestHandler):
class Server(BaseHTTPRequestHandler):
def _set_response(self):
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()

def do_GET(self):
logging.info(
"GET request,\nPath: %s\nHeaders:\n%s\n", str(self.path), str(self.headers)
global log

log.info(
"GET request,\nPath: %s\nHeaders:\n%s\n"
+ str(self.path)
+ str(self.headers)
)
self._set_response()
self.wfile.write("GET request for {}".format(self.path).encode("utf-8"))

def do_POST(self):
global log

content_length = int(
self.headers["Content-Length"]
) # <--- Gets the size of data
post_data = self.rfile.read(content_length) # <--- Gets the data itself
logging.info(
"POST request,\nPath: %s\nHeaders:\n%s\n\nBody:\n%s\n",
str(self.path),
str(self.headers),
post_data.decode("utf-8"),
log.info(
"POST request,\nPath: %s\nHeaders:\n%s\n\nBody:\n%s\n"
+ str(self.path)
+ str(self.headers)
+ post_data.decode("utf-8")
)
log.info("data: " + str(post_data))

self._set_response()
self.wfile.write("POST request for {}".format(self.path).encode("utf-8"))


def run(server_class=HTTPServer, handler_class=S, port=8080):
logging.basicConfig(level=logging.INFO)
def run(server_class=HTTPServer, handler_class=Server, log_file_name=None, port=None):
global log
log = (
get_logger("LAN_SERVER_LOG_FILE")
if log_file_name is None
else get_logger(log_file_name, log_file_name)
)

port = int(os.environ["LAN_PORT"]) if port is None else port

server_address = ("", port)
httpd = server_class(server_address, handler_class)
logging.info("Starting httpd...\n")
log.info("Starting server on port: " + str(port))
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
logging.info("Stopping httpd...\n")


if __name__ == "__main__":
from sys import argv

if len(argv) == 2:
run(port=int(argv[1]))
else:
run()
httpd.server_close()
log.info("Stopping\n")
85 changes: 85 additions & 0 deletions hardware/CommunicationsPi/radio_transceiver.py
@@ -0,0 +1,85 @@
import os
import serial
import json
import serial.tools.list_ports

from .utils import get_logger, get_serial_stream


class Transceiver:
def __init__(self, log_file_name=None, port=None):
if log_file_name is None:
self.logging = get_logger("TRANSMITTER_LOG_FILE")
else:
self.logging = get_logger(log_file_name, log_file_name)

self.port = os.environ["RADIO_TRANSMITTER_PORT"] if port is None else port

if not self.port:
self.port_vid = None
self.port_pid = None
self.port_vendor = None
self.port_intf = None
self.port_serial_number = None
self.find_port()

baudrate = 9600
parity = serial.PARITY_NONE
stopbits = serial.STOPBITS_ONE
bytesize = serial.EIGHTBITS
timeout = 1

self.logging.info("Opening serial on: " + str(self.port))
self.serial = serial.Serial(
port=self.port,
baudrate=baudrate,
parity=parity,
stopbits=stopbits,
bytesize=bytesize,
timeout=timeout,
)

def find_port(self):
for port in serial.tools.list_ports.comports():
if self.is_usb_serial(port):
self.logging.info("Port device found: " + str(port.device))
self.port = port.device
return

return

def is_usb_serial(self, port):
if port.vid is None:
return False
if self.port_vid is not None:
if port.vid != self.port_vid:
return False
if self.port_pid is not None:
if port.pid != self.port_pid:
return False
if self.port_vendor is not None:
if not port.manufacturer.startswith(self.port_vendor):
return False
if self.port_serial_number is not None:
if not port.serial_number.startswith(self.port_serial_number):
return False
if self.port_intf is not None:
if port.interface is None or self.port_intf not in port.interface:
return False
return True

def send(self, payload):
self.logging.info("sending")
self.serial.write(get_serial_stream(payload))
self.logging.info(payload)

def listen(self):
payload = self.serial.readline().decode("utf-8")
message = "Error: Check logs"
if payload != "":
try:
message = json.loads(payload)
self.logging.info(message)
except json.JSONDecodeError:
self.logging.error(json.JSONDecodeError)
return message
27 changes: 0 additions & 27 deletions hardware/CommunicationsPi/serial_read.py

This file was deleted.

28 changes: 0 additions & 28 deletions hardware/CommunicationsPi/serial_write.py

This file was deleted.

6 changes: 4 additions & 2 deletions hardware/CommunicationsPi/utils.py
Expand Up @@ -7,6 +7,8 @@ def get_serial_stream(s):
return (json.dumps(s) + "\n").encode()


def get_logger(key):
logger = Logger(name=key, filename=os.environ[key])
def get_logger(key, file_name=None):
if file_name is None:
file_name = key
logger = Logger(name=key, filename=os.environ[file_name])
return logger
3 changes: 2 additions & 1 deletion hardware/env
@@ -1,6 +1,7 @@
RADIO_TRANSMITTER_PORT=/dev/ttyUSB0
RADIO_RECEIVER_PORT=/dev/ttyUSB0
LAN_SERVER=http://169.254.187.204:1234
LAN_SERVER_IP=192.168.8.193
LAN_PORT=3322
LOG_DIRECTORY=logs
TRANSMITTER_LOG_FILE=transmitter.log
RECEIVER_LOG_FILE=receiver.log
Expand Down