diff --git a/README.md b/README.md index ced6eeef..595e6235 100644 --- a/README.md +++ b/README.md @@ -188,14 +188,11 @@ INFLUX_ADMIN_PASSWORD="" INFLUX_ORG="UBC Solar" -# used to store random data for debugging purposes -INFLUX_DEBUG_BUCKET="Debug" - -# used to store real data from the car -INFLUX_CAN_BUCKET="CAN" +# Needed to Initialize InfluxDB +INFLUX_INIT_BUCKET="Init_test" +INFLUX_DEBUG_BUCKET="CAN_test" # Parser secret key - SECRET_KEY="" # Access tokens @@ -219,11 +216,9 @@ INFLUX_ADMIN_PASSWORD="new_password" INFLUX_ORG="UBC Solar" -# used to store random data for debugging purposes -INFLUX_DEBUG_BUCKET="Debug" - -# used to store real data from the car -INFLUX_CAN_BUCKET="CAN" +# Needed to Initialize InfluxDB +INFLUX_INIT_BUCKET="Init_test" +INFLUX_DEBUG_BUCKET="CAN_test" # Secret key @@ -241,7 +236,7 @@ For the `GRAFANA_ADMIN_USERNAME` and `GRAFANA_ADMIN_PASSWORD` fields, you may ch The `SECRET_KEY` field must be generated. -> :warning: **WARNING: Make sure not to change the `INFLUX_ORG`, `INFLUX_DEBUG_BUCKET`, and `INFLUX_PROD_BUCKET` variables from their defaults since that might break the provisioned Grafana dashboards.** +> :warning: **WARNING: Make sure not to change the `INFLUX_ORG`, `INFLUX_INIT_BUCKET`, and `INFLU_DEBUG_BUCKET` variables from their defaults since that might break the provisioned Grafana dashboards.** #### Generating the secret key @@ -349,8 +344,6 @@ If all your tokens are correctly set up, the parser should return the following: - If your output looks like the above, then congratulations! You've finished setting up the telemetry cluster! :heavy_check_mark: -## Seting up PCAN drivers - ## Telemetry link setup The telemetry link must be set up on the host machine on which the radio receiver is connected. This links the radio module to the telemetry cluster and enables using radio as a data source. @@ -479,7 +472,7 @@ Here are some example invocations: ## Running the Offline Log Uploader -To run the offline log uploader the `logfiles` folder should have a generated log file to read and request the parser to write to InfluxDB in the `_test` buckets (like in debug mode). To do this use the -u (--log-upload) flag as follows: +To run the offline log uploader the `logfiles` folder should have a generated log file to read and request the parser to write to InfluxDB in the specified buckets (_test or _prod based on --debug or --prod options respectively). To do this use the -u (--log-upload) flag as follows: ```bash ./link_telemetry.py -u diff --git a/docker-compose.yaml b/docker-compose.yaml index 2de1085d..d3476055 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -27,7 +27,7 @@ services: - INFLUX_ADMIN_PASSWORD=${INFLUX_ADMIN_PASSWORD} - INFLUX_TOKEN=${INFLUX_TOKEN} - INFLUX_ORG=${INFLUX_ORG} - - INFLUX_BUCKET=${INFLUX_PROD_BUCKET} + - INFLUX_BUCKET=${INFLUX_INIT_BUCKET} parser: build: . @@ -54,7 +54,7 @@ services: - DOCKER_INFLUXDB_INIT_USERNAME=${INFLUX_ADMIN_USERNAME} - DOCKER_INFLUXDB_INIT_PASSWORD=${INFLUX_ADMIN_PASSWORD} - DOCKER_INFLUXDB_INIT_ORG=${INFLUX_ORG} - - DOCKER_INFLUXDB_INIT_BUCKET=${INFLUX_PROD_BUCKET} + - DOCKER_INFLUXDB_INIT_BUCKET=${INFLUX_INIT_BUCKET} - INFLUX_DEBUG_BUCKET=${INFLUX_DEBUG_BUCKET} restart: always diff --git a/link_telemetry.py b/link_telemetry.py index 5c742a72..63dc3944 100755 --- a/link_telemetry.py +++ b/link_telemetry.py @@ -25,6 +25,8 @@ import warnings import concurrent.futures +from tools.MemoratorUploader import memorator_upload_script + __PROGRAM__ = "link_telemetry" __VERSION__ = "0.4" @@ -65,6 +67,7 @@ # API endpoints DEBUG_WRITE_ENDPOINT = f"{PARSER_URL}/api/v1/parse/write/debug" PROD_WRITE_ENDPOINT = f"{PARSER_URL}/api/v1/parse/write/production" +LOG_WRITE_ENDPOINT = f"{PARSER_URL}/api/v1/parse/write/log" NO_WRITE_ENDPOINT = f"{PARSER_URL}/api/v1/parse" HEALTH_ENDPOINT = f"{PARSER_URL}/api/v1/health" @@ -298,9 +301,9 @@ def process_response(future: concurrent.futures.Future, args): print(f"{ANSI_BOLD}Config file location:{ANSI_ESCAPE} \"{TOML_CONFIG_FILE.absolute()}\"\n") return - if response.status_code != 200: - print(f"{ANSI_BOLD}Response HTTP status code:{ANSI_ESCAPE} {ANSI_YELLOW}{response.status_code}{ANSI_ESCAPE}") - print(f"{ANSI_BOLD}Response HTTP status code:{ANSI_ESCAPE} {ANSI_GREEN}{response.status_code}{ANSI_ESCAPE}") + # if response.status_code != 200: + # print(f"{ANSI_BOLD}Response HTTP status code:{ANSI_ESCAPE} {ANSI_YELLOW}{response.status_code}{ANSI_ESCAPE}") + # print(f"{ANSI_BOLD}Response HTTP status code:{ANSI_ESCAPE} {ANSI_GREEN}{response.status_code}{ANSI_ESCAPE}") try: parse_response: dict = response.json() @@ -310,36 +313,39 @@ def process_response(future: concurrent.futures.Future, args): return if parse_response["result"] == "OK": - # Create a table - table = BeautifulTable() - - # Set the table title - table.set_style(BeautifulTable.STYLE_RST) - table.column_widths = [110] - table.width_exceed_policy = BeautifulTable.WEP_WRAP - - # Title - table.rows.append([f"{ANSI_GREEN}{parse_response['type']}{ANSI_ESCAPE}"]) - display_data = parse_response['message'] - - # Add columns as subtable - subtable = BeautifulTable() - subtable.set_style(BeautifulTable.STYLE_GRID) - - cols = display_data["COL"] - subtable.rows.append(cols.keys()) - for i in range(len(list(cols.values())[0])): - subtable.rows.append([val[i] for val in cols.values()]) - - table.rows.append([subtable]) - - # Add rows - rows = display_data["ROW"] - for row_head, row_data in rows.items(): - table.rows.append([f"{ANSI_BOLD}{row_head}{ANSI_ESCAPE}"]) - table.rows.append(row_data) - - print(table) + table = None + if args.log is not None or args.table_on: + # Create a table + table = BeautifulTable() + + # Set the table title + table.set_style(BeautifulTable.STYLE_RST) + table.column_widths = [110] + table.width_exceed_policy = BeautifulTable.WEP_WRAP + + # Title + table.rows.append([f"{ANSI_GREEN}{parse_response['type']}{ANSI_ESCAPE}"]) + display_data = parse_response['message'] + + # Add columns as subtable + subtable = BeautifulTable() + subtable.set_style(BeautifulTable.STYLE_GRID) + + cols = display_data["COL"] + subtable.rows.append(cols.keys()) + for i in range(len(list(cols.values())[0])): + subtable.rows.append([val[i] for val in cols.values()]) + + table.rows.append([subtable]) + + # Add rows + rows = display_data["ROW"] + for row_head, row_data in rows.items(): + table.rows.append([f"{ANSI_BOLD}{row_head}{ANSI_ESCAPE}"]) + table.rows.append(row_data) + + if args.table_on: + print(table) if parse_response["logMessage"]: write_to_log_file(table, LOG_FILE_NAME, convert_to_hex=False) @@ -361,7 +367,6 @@ def process_response(future: concurrent.futures.Future, args): else: print(f"Unexpected response: {parse_response['result']}") - print() def read_lines_from_file(file_path): """ @@ -371,37 +376,34 @@ def read_lines_from_file(file_path): for line in file: yield line.strip() -def upload_logs(args, live_filters): - # Get a list of all .txt files in the logfiles directory - txt_files = [file for file in glob.glob(FAIL_DIRECTORY + '/*.txt') if not file[len(FAIL_DIRECTORY):].startswith('FAILED_UPLOADS')] - print(f"Found {len(txt_files)} .txt files in {FAIL_DIRECTORY}\n") - # Iterate over each .txt file - for file_path in txt_files: - print(f"Reading file {file_path}...") - message_generator = read_lines_from_file(file_path) +""" +Purpose: Sends data and filters to parser and registers a callback to process the response +Parameters: + message - raw byte data to be parsed on parser side + live_filters - filters for which messages to live stream to Grafana + log_filters - filters for which messages to log to file + args - the arguments passed to ./link_telemetry.py + parser_endpoint - the endpoint to send the data to +Returns: None +""" +def sendToParser(message: str, live_filters: list, log_filters: list, args: list, parser_endpoint: str): + payload = { + "message" : message, + "live_filters" : live_filters, + "log_filters" : log_filters + } + + # submit to thread pool + future = executor.submit(parser_request, payload, parser_endpoint) + + # register done callback with future (lambda function to pass in arguments) + future.add_done_callback(lambda future: process_response(future, args)) - while True: - try: - # Converts a string of hex characters to a string of ASCII characters - # Preserves weird characters to be written and copied correctly - log_line = bytes.fromhex(next(message_generator)).decode('latin-1') - except StopIteration: - break - - # Create payload - payload = { - "message" : log_line, - "live_filters" : live_filters - } - - future = executor.submit(parser_request, payload, DEBUG_WRITE_ENDPOINT) - - # register done callback with future (lambda function to pass in arguments) - future.add_done_callback(lambda future: process_response(future, args)) - print(f"Done reading {file_path}") - print() +def upload_logs(args, live_filters, log_filters, endpoint): + # Call the memorator log uploader function + memorator_upload_script(sendToParser, live_filters, log_filters, args, endpoint) """ @@ -417,6 +419,8 @@ def process_message(message: str, buffer: str = "") -> list: # Remove 00 0a from the start if present if message.startswith("000a"): message = message[4:] + elif message.startswith("0a"): + message = message[2:] # Add buffer to the start of the message message = buffer + message @@ -430,29 +434,6 @@ def process_message(message: str, buffer: str = "") -> list: return [bytes.fromhex(part).decode('latin-1') for part in parts] , buffer -""" -Purpose: Sends data and filters to parser and registers a callback to process the response -Parameters: - message - raw byte data to be parsed on parser side - live_filters - filters for which messages to live stream to Grafana - log_filters - filters for which messages to log to file - args - the arguments passed to ./link_telemetry.py - parser_endpoint - the endpoint to send the data to -Returns: None -""" -def sendToParser(message: str, live_filters: list, log_filters: list, args: list, parser_endpoint: str): - payload = { - "message" : message, - "live_filters" : live_filters, - "log_filters" : log_filters - } - - # submit to thread pool - future = executor.submit(parser_request, payload, parser_endpoint) - - # register done callback with future (lambda function to pass in arguments) - future.add_done_callback(lambda future: process_response(future, args)) - def main(): """ @@ -484,6 +465,8 @@ def main(): help=("Requests parser to write parsed data to the debug InfluxDB bucket.")) write_group.add_argument("--prod", action="store_true", help=("Requests parser to write parsed data to the production InfluxDB bucket.")) + write_group.add_argument("--table-on", action="store_true", + help=("Will display pretty tables. Normally off and parse fails only show")) write_group.add_argument("--no-write", action="store_true", help=(("Requests parser to skip writing to the InfluxDB bucket and streaming" "to Grafana. Cannot be used with --debug and --prod options."))) @@ -625,7 +608,7 @@ def main(): DEBUG_FILE_NAME = os.path.join(DEBUG_DIRECTORY, LOG_FILE) if args.log_upload: - upload_logs(args, live_filters) + upload_logs(args, live_filters, log_filters, LOG_WRITE_ENDPOINT) return 0 while True: @@ -683,4 +666,5 @@ def main(): if __name__ == "__main__": signal.signal(signal.SIGINT, sigint_handler) main() - \ No newline at end of file + + diff --git a/parser/main.py b/parser/main.py index 1751b5dc..6fbcd3d0 100644 --- a/parser/main.py +++ b/parser/main.py @@ -289,6 +289,11 @@ def parse_and_write_request(): def parse_and_write_request_to_prod(): return parse_and_write_request_bucket("_prod") +@app.post(f"{API_PREFIX}/parse/write/log") +@auth.login_required +def parse_and_write_request_to_log(): + return parse_and_write_request_bucket("_log") + """ Parses incoming request, writes the parsed measurements to InfluxDB bucket (debug or production) that is specifc to the message type (CAN, GPS, IMU, for example). diff --git a/requirements.txt b/requirements.txt index 97f187c9..7ec8260d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,11 @@ +annotated-types==0.7.0 argparse-addons==0.8.0 attrs==22.1.0 autopep8==2.0.1 beautifultable==1.1.0 bitstruct==8.15.1 blinker==1.6.2 +canlib==1.25.393 cantools==37.2.0 certifi==2021.10.8 charset-normalizer==2.0.12 @@ -27,6 +29,8 @@ pluggy==1.0.0 prettytable==3.8.0 py==1.11.0 pycodestyle==2.10.0 +pydantic==2.7.1 +pydantic_core==2.18.2 pyparsing==3.0.9 pyserial==3.5 pytest==7.1.3 @@ -44,7 +48,7 @@ six==1.16.0 textparser==0.24.0 toml==0.10.2 tomli==2.0.1 -typing_extensions==4.4.0 +typing_extensions==4.11.0 urllib3==1.26.9 wcwidth==0.2.6 websockets==11.0.3 diff --git a/templates/template_dotenv.env b/templates/template_dotenv.env index df02a703..9a13c813 100644 --- a/templates/template_dotenv.env +++ b/templates/template_dotenv.env @@ -12,11 +12,9 @@ INFLUX_ORG="UBC Solar" MESSAGE_TYPES="CAN,GPS,IMU" -# used to store random data for debugging purposes -INFLUX_DEBUG_BUCKET="Debug" - -# used to store real data from the car -INFLUX_CAN_BUCKET="CAN" +# Needed to Initialize InfluxDB +INFLUX_INIT_BUCKET="Init_test" +INFLUX_DEBUG_BUCKET="CAN_test" DS_INFLUXDB="o2uhkwje8832ha" diff --git a/tools/MemoratorUploader.py b/tools/MemoratorUploader.py new file mode 100644 index 00000000..091c627d --- /dev/null +++ b/tools/MemoratorUploader.py @@ -0,0 +1,118 @@ +import canlib.kvmlib as kvmlib +import re +import datetime +import struct + +# Script Constants +PATH = "D:\\LOG000{:02d}.KMF" +NUM_LOGS = 15 +MB_TO_KB = 1024 +EPOCH_START = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + +# Formatting Constants +ANSI_GREEN = "\033[92m" +ANSI_BOLD = "\033[1m" +ANSI_RED = "\033[91m" +ANSI_RESET = "\033[0m" + +# Regex Patterns for logfile parsing +PATTERN_DATETIME = re.compile(r't:\s+(.*?)\s+DateTime:\s+(.*)') +PATTERN_TRIGGER = re.compile(r't:\s+(.*?)\s+Log Trigger Event.*') +PATTERN_EVENT = re.compile(r't:\s+(.*?)\s+ch:0 f:\s+(.*?) id:\s+(.*?) dlc:\s+(.*?) d:(.*)') + + +def upload(log_file: kvmlib.LogFile, parserCallFunc: callable, live_filters: list, log_filters: list, args: list, endpoint: str): + start_time = None + for event in log_file: + str_event = str(event) + if PATTERN_DATETIME.search(str_event): + match = PATTERN_DATETIME.search(str_event) + date_time_str = match.group(2) + print(f"Matched DateTime: {date_time_str}") + date_time_obj = datetime.datetime.strptime(date_time_str, '%Y-%m-%d %H:%M:%S') + date_time_obj = date_time_obj.replace(tzinfo=datetime.timezone.utc) + start_time = (date_time_obj - EPOCH_START).total_seconds() + elif PATTERN_TRIGGER.search(str_event): + continue + elif PATTERN_EVENT.search(str_event): + match = PATTERN_EVENT.search(str_event) + timestamp = start_time + float(match.group(1)) + timestamp_str = struct.pack('>d', timestamp).decode('latin-1') + + id = int(match.group(3), 16) + id_str = id.to_bytes(4, 'big').decode('latin-1') + + dlc_str = match.group(4) + + data = bytes.fromhex(match.group(5).replace(' ', '')) + data_str = data.ljust(8, b'\0').decode('latin-1') + + can_str = timestamp_str + "#" + id_str + data_str + dlc_str + + parserCallFunc(can_str, live_filters, log_filters, args, endpoint) + + +def memorator_upload_script(parserCallFunc: callable, live_filters: list, log_filters: list, args: list, endpoint: str): + # Open each KMF file + for i in range(NUM_LOGS): + kmf_file = kvmlib.openKmf(PATH.format(i)) + print(f"{ANSI_GREEN}Opening file: {PATH.format(i)}{ANSI_RESET}") # Green stdout + + # Access the log attribute of the KMF object + log = kmf_file.log + + # First calculate total number of events + total_events = 0 + for log_file in log: + total_events += log_file.event_count_estimation() + + # Display the number of logs + num_logs = len(log) + print(f"{ANSI_BOLD}Found {num_logs} logs with {total_events} events total{ANSI_RESET}") + + # Iterate over all log files + for j, log_file in enumerate(log): + # Calculate and display the approximate size + num_events = log_file.event_count_estimation() + kmf_file_size = kmf_file.disk_usage[0] + kb_size = kmf_file_size * (num_events / total_events) * MB_TO_KB + + # Display information about each log + start_time = log_file.start_time.isoformat(' ') + end_time = log_file.end_time.isoformat(' ') + print(f"{ANSI_BOLD}\nLog Idx = {j}, Approximate size = {kb_size:.2f} KB:{ANSI_RESET}") + print(f"{ANSI_BOLD}\tEstimated events : {num_events}{ANSI_RESET}") + print(f"{ANSI_BOLD}\tStart time : {start_time}{ANSI_RESET}") + print(f"{ANSI_BOLD}\tEnd time : {end_time}{ANSI_RESET}") + + # Close the KMF file + kmf_file.close() + + upload_input = input(f"{ANSI_GREEN}Do you want to upload all logs now (y/n)?: {ANSI_RESET} ") + if upload_input.lower() == 'y' or upload_input.lower() == '\n': + for i in range(NUM_LOGS): + kmf_file = kvmlib.openKmf(PATH.format(i)) + print(f"{ANSI_GREEN}Opening file: {PATH.format(i)}{ANSI_RESET}") # Green stdout + + # Access the log attribute of the KMF object + log = kmf_file.log + + # Iterate over all log files + for j, log_file in enumerate(log): + upload(log[j], parserCallFunc, live_filters, log_filters, args, endpoint) + + # Clear the log files + log.delete_all() + + # Close the KMF file + kmf_file.close() + + +# TESTING PURPOSES +def main(): + memorator_upload_script() + + +if __name__ == "__main__": + main() +