Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memorator Log Uploading Script #86

Merged
merged 14 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,11 @@ INFLUX_ADMIN_PASSWORD=""

INFLUX_ORG="UBC Solar"

# used to store random data for debugging purposes
INFLUX_DEBUG_BUCKET="Debug"

# used to store real data from the car
INFLUX_CAN_BUCKET="CAN"
# Needed to Initialize InfluxDB
INFLUX_INIT_BUCKET="Init_test"
INFLUX_DEBUG_BUCKET="CAN_test"

# Parser secret key

SECRET_KEY=""

# Access tokens
Expand All @@ -219,11 +216,9 @@ INFLUX_ADMIN_PASSWORD="new_password"

INFLUX_ORG="UBC Solar"

# used to store random data for debugging purposes
INFLUX_DEBUG_BUCKET="Debug"

# used to store real data from the car
INFLUX_CAN_BUCKET="CAN"
# Needed to Initialize InfluxDB
INFLUX_INIT_BUCKET="Init_test"
INFLUX_DEBUG_BUCKET="CAN_test"

# Secret key

Expand All @@ -241,7 +236,7 @@ For the `GRAFANA_ADMIN_USERNAME` and `GRAFANA_ADMIN_PASSWORD` fields, you may ch

The `SECRET_KEY` field must be generated.

> :warning: **WARNING: Make sure not to change the `INFLUX_ORG`, `INFLUX_DEBUG_BUCKET`, and `INFLUX_PROD_BUCKET` variables from their defaults since that might break the provisioned Grafana dashboards.**
> :warning: **WARNING: Make sure not to change the `INFLUX_ORG`, `INFLUX_INIT_BUCKET`, and `INFLU_DEBUG_BUCKET` variables from their defaults since that might break the provisioned Grafana dashboards.**

#### Generating the secret key

Expand Down Expand Up @@ -349,8 +344,6 @@ If all your tokens are correctly set up, the parser should return the following:

- If your output looks like the above, then congratulations! You've finished setting up the telemetry cluster! :heavy_check_mark:

## Seting up PCAN drivers

## Telemetry link setup

The telemetry link must be set up on the host machine on which the radio receiver is connected. This links the radio module to the telemetry cluster and enables using radio as a data source.
Expand Down Expand Up @@ -479,7 +472,7 @@ Here are some example invocations:

## Running the Offline Log Uploader

To run the offline log uploader the `logfiles` folder should have a generated log file to read and request the parser to write to InfluxDB in the `_test` buckets (like in debug mode). To do this use the -u (--log-upload) flag as follows:
To run the offline log uploader the `logfiles` folder should have a generated log file to read and request the parser to write to InfluxDB in the specified buckets (_test or _prod based on --debug or --prod options respectively). To do this use the -u (--log-upload) flag as follows:

```bash
./link_telemetry.py -u
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ services:
- INFLUX_ADMIN_PASSWORD=${INFLUX_ADMIN_PASSWORD}
- INFLUX_TOKEN=${INFLUX_TOKEN}
- INFLUX_ORG=${INFLUX_ORG}
- INFLUX_BUCKET=${INFLUX_PROD_BUCKET}
- INFLUX_BUCKET=${INFLUX_INIT_BUCKET}

parser:
build: .
Expand All @@ -54,7 +54,7 @@ services:
- DOCKER_INFLUXDB_INIT_USERNAME=${INFLUX_ADMIN_USERNAME}
- DOCKER_INFLUXDB_INIT_PASSWORD=${INFLUX_ADMIN_PASSWORD}
- DOCKER_INFLUXDB_INIT_ORG=${INFLUX_ORG}
- DOCKER_INFLUXDB_INIT_BUCKET=${INFLUX_PROD_BUCKET}
- DOCKER_INFLUXDB_INIT_BUCKET=${INFLUX_INIT_BUCKET}
- INFLUX_DEBUG_BUCKET=${INFLUX_DEBUG_BUCKET}
restart: always

Expand Down
158 changes: 71 additions & 87 deletions link_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import warnings

import concurrent.futures
from tools.MemoratorUploader import memorator_upload_script


__PROGRAM__ = "link_telemetry"
__VERSION__ = "0.4"
Expand Down Expand Up @@ -65,6 +67,7 @@
# API endpoints
DEBUG_WRITE_ENDPOINT = f"{PARSER_URL}/api/v1/parse/write/debug"
PROD_WRITE_ENDPOINT = f"{PARSER_URL}/api/v1/parse/write/production"
LOG_WRITE_ENDPOINT = f"{PARSER_URL}/api/v1/parse/write/log"
NO_WRITE_ENDPOINT = f"{PARSER_URL}/api/v1/parse"
HEALTH_ENDPOINT = f"{PARSER_URL}/api/v1/health"

Expand Down Expand Up @@ -298,9 +301,9 @@ def process_response(future: concurrent.futures.Future, args):
print(f"{ANSI_BOLD}Config file location:{ANSI_ESCAPE} \"{TOML_CONFIG_FILE.absolute()}\"\n")
return

if response.status_code != 200:
print(f"{ANSI_BOLD}Response HTTP status code:{ANSI_ESCAPE} {ANSI_YELLOW}{response.status_code}{ANSI_ESCAPE}")
print(f"{ANSI_BOLD}Response HTTP status code:{ANSI_ESCAPE} {ANSI_GREEN}{response.status_code}{ANSI_ESCAPE}")
# if response.status_code != 200:
# print(f"{ANSI_BOLD}Response HTTP status code:{ANSI_ESCAPE} {ANSI_YELLOW}{response.status_code}{ANSI_ESCAPE}")
# print(f"{ANSI_BOLD}Response HTTP status code:{ANSI_ESCAPE} {ANSI_GREEN}{response.status_code}{ANSI_ESCAPE}")

try:
parse_response: dict = response.json()
Expand All @@ -310,36 +313,39 @@ def process_response(future: concurrent.futures.Future, args):
return

if parse_response["result"] == "OK":
# Create a table
table = BeautifulTable()

# Set the table title
table.set_style(BeautifulTable.STYLE_RST)
table.column_widths = [110]
table.width_exceed_policy = BeautifulTable.WEP_WRAP

# Title
table.rows.append([f"{ANSI_GREEN}{parse_response['type']}{ANSI_ESCAPE}"])
display_data = parse_response['message']

# Add columns as subtable
subtable = BeautifulTable()
subtable.set_style(BeautifulTable.STYLE_GRID)

cols = display_data["COL"]
subtable.rows.append(cols.keys())
for i in range(len(list(cols.values())[0])):
subtable.rows.append([val[i] for val in cols.values()])

table.rows.append([subtable])

# Add rows
rows = display_data["ROW"]
for row_head, row_data in rows.items():
table.rows.append([f"{ANSI_BOLD}{row_head}{ANSI_ESCAPE}"])
table.rows.append(row_data)

print(table)
table = None
if args.log is not None or args.table_on:
# Create a table
table = BeautifulTable()

# Set the table title
table.set_style(BeautifulTable.STYLE_RST)
table.column_widths = [110]
table.width_exceed_policy = BeautifulTable.WEP_WRAP

# Title
table.rows.append([f"{ANSI_GREEN}{parse_response['type']}{ANSI_ESCAPE}"])
display_data = parse_response['message']

# Add columns as subtable
subtable = BeautifulTable()
subtable.set_style(BeautifulTable.STYLE_GRID)

cols = display_data["COL"]
subtable.rows.append(cols.keys())
for i in range(len(list(cols.values())[0])):
subtable.rows.append([val[i] for val in cols.values()])

table.rows.append([subtable])

# Add rows
rows = display_data["ROW"]
for row_head, row_data in rows.items():
table.rows.append([f"{ANSI_BOLD}{row_head}{ANSI_ESCAPE}"])
table.rows.append(row_data)

if args.table_on:
print(table)

if parse_response["logMessage"]:
write_to_log_file(table, LOG_FILE_NAME, convert_to_hex=False)
Expand All @@ -361,7 +367,6 @@ def process_response(future: concurrent.futures.Future, args):
else:
print(f"Unexpected response: {parse_response['result']}")

print()

def read_lines_from_file(file_path):
"""
Expand All @@ -371,37 +376,34 @@ def read_lines_from_file(file_path):
for line in file:
yield line.strip()

def upload_logs(args, live_filters):
# Get a list of all .txt files in the logfiles directory
txt_files = [file for file in glob.glob(FAIL_DIRECTORY + '/*.txt') if not file[len(FAIL_DIRECTORY):].startswith('FAILED_UPLOADS')]
print(f"Found {len(txt_files)} .txt files in {FAIL_DIRECTORY}\n")

# Iterate over each .txt file
for file_path in txt_files:
print(f"Reading file {file_path}...")
message_generator = read_lines_from_file(file_path)
"""
Purpose: Sends data and filters to parser and registers a callback to process the response
Parameters:
message - raw byte data to be parsed on parser side
live_filters - filters for which messages to live stream to Grafana
log_filters - filters for which messages to log to file
args - the arguments passed to ./link_telemetry.py
parser_endpoint - the endpoint to send the data to
Returns: None
"""
def sendToParser(message: str, live_filters: list, log_filters: list, args: list, parser_endpoint: str):
payload = {
"message" : message,
"live_filters" : live_filters,
"log_filters" : log_filters
}

# submit to thread pool
future = executor.submit(parser_request, payload, parser_endpoint)

# register done callback with future (lambda function to pass in arguments)
future.add_done_callback(lambda future: process_response(future, args))

while True:
try:
# Converts a string of hex characters to a string of ASCII characters
# Preserves weird characters to be written and copied correctly
log_line = bytes.fromhex(next(message_generator)).decode('latin-1')
except StopIteration:
break

# Create payload
payload = {
"message" : log_line,
"live_filters" : live_filters
}

future = executor.submit(parser_request, payload, DEBUG_WRITE_ENDPOINT)

# register done callback with future (lambda function to pass in arguments)
future.add_done_callback(lambda future: process_response(future, args))

print(f"Done reading {file_path}")
print()
def upload_logs(args, live_filters, log_filters, endpoint):
# Call the memorator log uploader function
memorator_upload_script(sendToParser, live_filters, log_filters, args, endpoint)


"""
Expand All @@ -417,6 +419,8 @@ def process_message(message: str, buffer: str = "") -> list:
# Remove 00 0a from the start if present
if message.startswith("000a"):
message = message[4:]
elif message.startswith("0a"):
message = message[2:]

# Add buffer to the start of the message
message = buffer + message
Expand All @@ -430,29 +434,6 @@ def process_message(message: str, buffer: str = "") -> list:
return [bytes.fromhex(part).decode('latin-1') for part in parts] , buffer


"""
Purpose: Sends data and filters to parser and registers a callback to process the response
Parameters:
message - raw byte data to be parsed on parser side
live_filters - filters for which messages to live stream to Grafana
log_filters - filters for which messages to log to file
args - the arguments passed to ./link_telemetry.py
parser_endpoint - the endpoint to send the data to
Returns: None
"""
def sendToParser(message: str, live_filters: list, log_filters: list, args: list, parser_endpoint: str):
payload = {
"message" : message,
"live_filters" : live_filters,
"log_filters" : log_filters
}

# submit to thread pool
future = executor.submit(parser_request, payload, parser_endpoint)

# register done callback with future (lambda function to pass in arguments)
future.add_done_callback(lambda future: process_response(future, args))


def main():
"""
Expand Down Expand Up @@ -484,6 +465,8 @@ def main():
help=("Requests parser to write parsed data to the debug InfluxDB bucket."))
write_group.add_argument("--prod", action="store_true",
help=("Requests parser to write parsed data to the production InfluxDB bucket."))
write_group.add_argument("--table-on", action="store_true",
help=("Will display pretty tables. Normally off and parse fails only show"))
write_group.add_argument("--no-write", action="store_true",
help=(("Requests parser to skip writing to the InfluxDB bucket and streaming"
"to Grafana. Cannot be used with --debug and --prod options.")))
Expand Down Expand Up @@ -625,7 +608,7 @@ def main():
DEBUG_FILE_NAME = os.path.join(DEBUG_DIRECTORY, LOG_FILE)

if args.log_upload:
upload_logs(args, live_filters)
upload_logs(args, live_filters, log_filters, LOG_WRITE_ENDPOINT)
return 0

while True:
Expand Down Expand Up @@ -683,4 +666,5 @@ def main():
if __name__ == "__main__":
signal.signal(signal.SIGINT, sigint_handler)
main()



5 changes: 5 additions & 0 deletions parser/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ def parse_and_write_request():
def parse_and_write_request_to_prod():
return parse_and_write_request_bucket("_prod")

@app.post(f"{API_PREFIX}/parse/write/log")
@auth.login_required
def parse_and_write_request_to_log():
return parse_and_write_request_bucket("_log")

"""
Parses incoming request, writes the parsed measurements to InfluxDB bucket (debug or production)
that is specifc to the message type (CAN, GPS, IMU, for example).
Expand Down
6 changes: 5 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
annotated-types==0.7.0
argparse-addons==0.8.0
attrs==22.1.0
autopep8==2.0.1
beautifultable==1.1.0
bitstruct==8.15.1
blinker==1.6.2
canlib==1.25.393
cantools==37.2.0
certifi==2021.10.8
charset-normalizer==2.0.12
Expand All @@ -27,6 +29,8 @@ pluggy==1.0.0
prettytable==3.8.0
py==1.11.0
pycodestyle==2.10.0
pydantic==2.7.1
pydantic_core==2.18.2
pyparsing==3.0.9
pyserial==3.5
pytest==7.1.3
Expand All @@ -44,7 +48,7 @@ six==1.16.0
textparser==0.24.0
toml==0.10.2
tomli==2.0.1
typing_extensions==4.4.0
typing_extensions==4.11.0
urllib3==1.26.9
wcwidth==0.2.6
websockets==11.0.3
Expand Down
8 changes: 3 additions & 5 deletions templates/template_dotenv.env
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ INFLUX_ORG="UBC Solar"

MESSAGE_TYPES="CAN,GPS,IMU"

# used to store random data for debugging purposes
INFLUX_DEBUG_BUCKET="Debug"

# used to store real data from the car
INFLUX_CAN_BUCKET="CAN"
# Needed to Initialize InfluxDB
INFLUX_INIT_BUCKET="Init_test"
INFLUX_DEBUG_BUCKET="CAN_test"

DS_INFLUXDB="o2uhkwje8832ha"

Expand Down
Loading
Loading