In [1]:
#requires python 3.10 for compatability with python-obd library

In [2]:
import obd
#pyserial is a dependency of obd but does not need to be imported
from obd import OBDStatus
import logging
import serial.tools.list_ports
import time
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow, QTableWidget, QTableWidgetItem, QVBoxLayout, QWidget, QCheckBox, QPushButton, QHBoxLayout
from PyQt5.QtCore import Qt
import os
import json

# List USB Ports

In [3]:
#if it needs to be set manually the top usb 3.0 port 
#Bus 001 Device 007: ID 0403:6015 Future Technology Devices International, Ltd Bridge(I2C/SPI/UART/FIFO)


# Create Interface

In [4]:
class PIDSelectionPage(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("PID Selection")
        self.showFullScreen()  # Launch in full-screen mode

        # Main layout
        self.layout = QVBoxLayout()

        # Top bar layout with Quit button
        self.top_bar_layout = QHBoxLayout()
        self.quit_button = QPushButton("Quit")
        self.quit_button.clicked.connect(self.close_application)
        self.top_bar_layout.addWidget(self.quit_button)
        self.top_bar_layout.addStretch()  # Pushes the quit button to the right
        self.layout.addLayout(self.top_bar_layout)

        # OBD Connect and Check Supported PIDs buttons
        self.connect_button = QPushButton("Connect to OBD")
        self.connect_button.clicked.connect(self.connect_to_obd)
        self.layout.addWidget(self.connect_button)

        self.check_pids_button = QPushButton("Check Vehicle Supported PIDs")
        self.check_pids_button.clicked.connect(self.check_supported_pids)
        self.layout.addWidget(self.check_pids_button)

        # Table widget for displaying PIDs
        self.table_widget = QTableWidget()
        self.layout.addWidget(self.table_widget)
        self.central_widget = QWidget()
        self.central_widget.setLayout(self.layout)
        self.setCentralWidget(self.central_widget)

        self.connection = None
        self.initUI()

    def initUI(self):
        # Populate the table with PIDs from module 1
        all_pids = {name: getattr(obd.commands, name) for name in dir(obd.commands) if not name.startswith("__")}
        module_one_pids = {name: cmd for name, cmd in all_pids.items() if isinstance(cmd, obd.OBDCommand) and cmd.mode == 1}

        self.table_widget.setColumnCount(5)  # PID, Name, Description, Response Value, Active
        self.table_widget.setHorizontalHeaderLabels(["PID", "Name", "Description", "Response Value", "Active"])

        self.table_widget.setRowCount(len(module_one_pids))
        for row, (pid_name, pid_cmd) in enumerate(sorted(module_one_pids.items(), key=lambda x: x[1].pid)):
            self.table_widget.setItem(row, 0, QTableWidgetItem(str(pid_cmd.pid)))  # PID
            self.table_widget.setItem(row, 1, QTableWidgetItem(pid_cmd.name))  # Name
            self.table_widget.setItem(row, 2, QTableWidgetItem(pid_cmd.desc))  # Description
            self.table_widget.setItem(row, 3, QTableWidgetItem(''))  # Response Value

            # Add a checkbox in the 'Active' column
            checkbox = QCheckBox()
            checkbox.setCheckState(Qt.Unchecked)
            self.table_widget.setCellWidget(row, 4, checkbox)

        self.table_widget.resizeColumnsToContents()

    def close_application(self):
        if self.connection:
            self.connection.close()  # Close the OBD connection
        self.close()  # Close the application
    def connect_to_obd(self):
        self.connection = obd.OBD()  # Try to connect to OBD-II adapter

    def check_supported_pids(self):
        cache_file = "supported_pids.json"
        if self.connection and self.connection.is_connected():
            supported_pids = self.get_supported_pids()
            # Cache the supported PIDs
            with open(cache_file, "w") as file:
                json.dump(supported_pids, file)
            self.update_table_with_supported_pids(supported_pids)
        elif os.path.exists(cache_file):
            # Load supported PIDs from cache if available
            with open(cache_file, "r") as file:
                supported_pids = json.load(file)
            self.update_table_with_supported_pids(supported_pids)

    def get_supported_pids(self):
        supported_pids = []
        for cmd in obd.commands:
            if self.connection.supports(cmd):
                supported_pids.append(cmd.name)
        return supported_pids

    def update_table_with_supported_pids(self, supported_pids):
        for row in range(self.table_widget.rowCount()):
            pid_name = self.table_widget.item(row, 1).text()
            if pid_name not in supported_pids:
                for col in range(self.table_widget.columnCount()):
                    item = self.table_widget.item(row, col)
                    if item:
                        item.setFlags(item.flags() & ~Qt.ItemIsEnabled)

def main():
    app = QApplication(sys.argv)
    ex = PIDSelectionPage()
    ex.show()
    sys.exit(app.exec_())

if __name__ == '__main__':
    main()


SystemExit: 0

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


## Establish Asynch Connection:

Async is a subclass of OBD, and therefore inherits all of the standard methods. However, Async adds a few in order to control a threaded update loop. This loop will keep the values of your commands up to date with the vehicle. This way, when the user querys the car, the latest response is returned immediately.

documentation:
https://python-obd.readthedocs.io/en/latest/Async%20Connections/



In [None]:
#set to auto if vehicle protocol unknown
manual_protocol_set = "J1850PWM" #1997 ford f360 specific

In [None]:

# Setting up logging for better debugging
logging.basicConfig(level=logging.INFO)

def on_connect(connection):
    if connection.is_connected():
        logging.info("Successfully connected to OBDLink EX.")
        # Introduce a 10 second delay before starting async commands
        time.sleep(10)
        start_monitoring()
    else:
        logging.error("Failed to connect to OBDLink EX. Please check the connection.")

def on_disconnect():
    logging.info("Disconnected from OBDLink EX.")

def start_monitoring():
    # Function to handle callback for each command
    def data_callback(response, cmd_name):
        if response.value is not None:
            logging.info(f"{cmd_name}: {response.value}")
        else:
            logging.warning(f"{cmd_name}: No data received")

    # Setting up monitoring for various OBD commands with delays
    commands = {
        "OIL_TEMP": obd.commands.OIL_TEMP,
        "FUEL_LEVEL": obd.commands.FUEL_LEVEL,
        "RUN_TIME": obd.commands.RUN_TIME,
        "SPEED": obd.commands.SPEED,
        "MAF": obd.commands.MAF,
        "FUEL_PRESSURE": obd.commands.FUEL_PRESSURE,
        "COOLANT_TEMP": obd.commands.COOLANT_TEMP,
        "ENGINE_LOAD": obd.commands.ENGINE_LOAD,
        "CONTROL_MODULE_VOLTAGE": obd.commands.CONTROL_MODULE_VOLTAGE # battery
          }

    for cmd_name, cmd in commands.items():
        connection.watch(cmd, callback=lambda r, cmd_name=cmd_name: data_callback(r, cmd_name))
        time.sleep(0.5)  # Introduce a 500ms delay between each command setup

# Configure the OBD connection
manual_protocol_set = "AUTO"  # Replace with specific protocol if needed
connection = obd.Async(port=None,  # manually set USB port if not working with None
                       fast=False,
                       baudrate=115200,
                       protocol=manual_protocol_set,
                       check_voltage=True)

# Setting up connection and disconnection handlers
connection.connect(on_connect_callback=on_connect, on_disconnect_callback=on_disconnect)

# Keep the script running to maintain connection and handle callbacks
try:
    while True:
        pass
except KeyboardInterrupt:
    # Disconnect before closing the script
    connection.close()
    logging.info("Program terminated.")
