diff --git a/board/drivers/can.h b/board/drivers/can.h index 93db2565ca5342..ff60473f7dc83f 100644 --- a/board/drivers/can.h +++ b/board/drivers/can.h @@ -28,6 +28,7 @@ void can_set_forwarding(int from, int to); void can_init(uint8_t can_number); void can_init_all(void); +bool can_tx_check_min_slots_free(uint32_t min); void can_send(CAN_FIFOMailBox_TypeDef *to_push, uint8_t bus_number, bool skip_tx_hook); bool can_pop(can_ring *q, CAN_FIFOMailBox_TypeDef *elem); @@ -107,6 +108,20 @@ bool can_push(can_ring *q, CAN_FIFOMailBox_TypeDef *elem) { return ret; } +uint32_t can_slots_empty(can_ring *q) { + uint32_t ret = 0; + + ENTER_CRITICAL(); + if (q->w_ptr >= q->r_ptr) { + ret = q->fifo_size - 1U - q->w_ptr + q->r_ptr; + } else { + ret = q->r_ptr - q->w_ptr - 1U; + } + EXIT_CRITICAL(); + + return ret; +} + void can_clear(can_ring *q) { ENTER_CRITICAL(); q->w_ptr = 0; @@ -317,6 +332,10 @@ void process_can(uint8_t can_number) { CAN->sTxMailBox[0].TDHR = to_send.RDHR; CAN->sTxMailBox[0].TDTR = to_send.RDTR; CAN->sTxMailBox[0].TIR = to_send.RIR; + + if (can_tx_check_min_slots_free(MAX_CAN_MSGS_PER_BULK_TRANSFER)) { + usb_outep3_resume_if_paused(); + } } } @@ -405,6 +424,14 @@ void CAN3_TX_IRQ_Handler(void) { process_can(2); } void CAN3_RX0_IRQ_Handler(void) { can_rx(2); } void CAN3_SCE_IRQ_Handler(void) { can_sce(CAN3); } +bool can_tx_check_min_slots_free(uint32_t min) { + return + (can_slots_empty(&can_tx1_q) >= min) && + (can_slots_empty(&can_tx2_q) >= min) && + (can_slots_empty(&can_tx3_q) >= min) && + (can_slots_empty(&can_txgmlan_q) >= min); +} + void can_send(CAN_FIFOMailBox_TypeDef *to_push, uint8_t bus_number, bool skip_tx_hook) { if (skip_tx_hook || safety_tx_hook(to_push) != 0) { if (bus_number < BUS_MAX) { diff --git a/board/drivers/usb.h b/board/drivers/usb.h index a970194ffd284e..a62e87902f93d9 100644 --- a/board/drivers/usb.h +++ b/board/drivers/usb.h @@ -23,12 +23,16 @@ typedef union _USB_Setup { } USB_Setup_TypeDef; +#define MAX_CAN_MSGS_PER_BULK_TRANSFER 4U + void usb_init(void); int usb_cb_control_msg(USB_Setup_TypeDef *setup, uint8_t *resp, bool hardwired); int usb_cb_ep1_in(void *usbdata, int len, bool hardwired); void usb_cb_ep2_out(void *usbdata, int len, bool hardwired); void usb_cb_ep3_out(void *usbdata, int len, bool hardwired); +void usb_cb_ep3_out_complete(void); void usb_cb_enumeration_complete(void); +void usb_outep3_resume_if_paused(void); // **** supporting defines **** @@ -380,6 +384,7 @@ USB_Setup_TypeDef setup; uint8_t usbdata[0x100]; uint8_t* ep0_txdata = NULL; uint16_t ep0_txlen = 0; +bool outep3_processing = false; // Store the current interface alt setting. int current_int0_alt_setting = 0; @@ -744,6 +749,7 @@ void usb_irqhandler(void) { } if (endpoint == 3) { + outep3_processing = true; usb_cb_ep3_out(usbdata, len, 1); } } else if (status == STS_SETUP_UPDT) { @@ -816,15 +822,17 @@ void usb_irqhandler(void) { #ifdef DEBUG_USB puts(" OUT3 PACKET XFRC\n"); #endif - USBx_OUTEP(3)->DOEPTSIZ = (1U << 19) | 0x40U; - USBx_OUTEP(3)->DOEPCTL |= USB_OTG_DOEPCTL_EPENA | USB_OTG_DOEPCTL_CNAK; + // NAK cleared by process_can (if tx buffers have room) + outep3_processing = false; + usb_cb_ep3_out_complete(); } else if ((USBx_OUTEP(3)->DOEPINT & 0x2000) != 0) { #ifdef DEBUG_USB puts(" OUT3 PACKET WTF\n"); #endif // if NAK was set trigger this, unknown interrupt - USBx_OUTEP(3)->DOEPTSIZ = (1U << 19) | 0x40U; - USBx_OUTEP(3)->DOEPCTL |= USB_OTG_DOEPCTL_CNAK; + // TODO: why was this here? fires when TX buffers when we can't clear NAK + // USBx_OUTEP(3)->DOEPTSIZ = (1U << 19) | 0x40U; + // USBx_OUTEP(3)->DOEPCTL |= USB_OTG_DOEPCTL_CNAK; } else if ((USBx_OUTEP(3)->DOEPINT) != 0) { puts("OUTEP3 error "); puth(USBx_OUTEP(3)->DOEPINT); @@ -932,6 +940,13 @@ void usb_irqhandler(void) { //USBx->GINTMSK = 0xFFFFFFFF & ~(USB_OTG_GINTMSK_NPTXFEM | USB_OTG_GINTMSK_PTXFEM | USB_OTG_GINTSTS_SOF | USB_OTG_GINTSTS_EOPF); } +void usb_outep3_resume_if_paused() { + if (!outep3_processing && (USBx_OUTEP(3)->DOEPCTL & USB_OTG_DOEPCTL_NAKSTS) != 0) { + USBx_OUTEP(3)->DOEPTSIZ = (1U << 19) | 0x40U; + USBx_OUTEP(3)->DOEPCTL |= USB_OTG_DOEPCTL_EPENA | USB_OTG_DOEPCTL_CNAK; + } +} + void OTG_FS_IRQ_Handler(void) { NVIC_DisableIRQ(OTG_FS_IRQn); //__disable_irq(); diff --git a/board/main.c b/board/main.c index d11f003f47bdb8..4144d80fd6c940 100644 --- a/board/main.c +++ b/board/main.c @@ -235,6 +235,12 @@ void usb_cb_ep3_out(void *usbdata, int len, bool hardwired) { } } +void usb_cb_ep3_out_complete() { + if (can_tx_check_min_slots_free(MAX_CAN_MSGS_PER_BULK_TRANSFER)) { + usb_outep3_resume_if_paused(); + } +} + void usb_cb_enumeration_complete() { puts("USB enumeration complete\n"); is_enumerated = 1; diff --git a/board/pedal/main.c b/board/pedal/main.c index 3192f4b2bd2f77..093f4c9357936d 100644 --- a/board/pedal/main.c +++ b/board/pedal/main.c @@ -76,6 +76,7 @@ void usb_cb_ep3_out(void *usbdata, int len, bool hardwired) { UNUSED(len); UNUSED(hardwired); } +void usb_cb_ep3_out_complete(void) {} void usb_cb_enumeration_complete(void) {} int usb_cb_control_msg(USB_Setup_TypeDef *setup, uint8_t *resp, bool hardwired) { diff --git a/board/spi_flasher.h b/board/spi_flasher.h index 1c703516ac1731..b0511c14efd2ca 100644 --- a/board/spi_flasher.h +++ b/board/spi_flasher.h @@ -110,6 +110,7 @@ void usb_cb_ep3_out(void *usbdata, int len, bool hardwired) { UNUSED(len); UNUSED(hardwired); } +void usb_cb_ep3_out_complete(void) {} int is_enumerated = 0; void usb_cb_enumeration_complete(void) { diff --git a/python/__init__.py b/python/__init__.py index 2865a7021750c9..bda1fedd2df498 100644 --- a/python/__init__.py +++ b/python/__init__.py @@ -478,7 +478,12 @@ def set_uart_callback(self, uart, install): # ******************* can ******************* - def can_send_many(self, arr): + # The panda will NAK CAN writes when there is CAN congestion. + # libusb will try to send it again, with a max timeout. + # Timeout is in ms. If set to 0, the timeout is infinite. + CAN_SEND_TIMEOUT_MS = 10 + + def can_send_many(self, arr, timeout=CAN_SEND_TIMEOUT_MS): snds = [] transmit = 1 extended = 4 @@ -501,13 +506,13 @@ def can_send_many(self, arr): for s in snds: self._handle.bulkWrite(3, s) else: - self._handle.bulkWrite(3, b''.join(snds)) + self._handle.bulkWrite(3, b''.join(snds), timeout=timeout) break except (usb1.USBErrorIO, usb1.USBErrorOverflow): print("CAN: BAD SEND MANY, RETRYING") - def can_send(self, addr, dat, bus): - self.can_send_many([[addr, None, dat, bus]]) + def can_send(self, addr, dat, bus, timeout=CAN_SEND_TIMEOUT_MS): + self.can_send_many([[addr, None, dat, bus]], timeout=timeout) def can_recv(self): dat = bytearray() diff --git a/tests/automated/7_can_loopback.py b/tests/automated/7_can_loopback.py index cb9f5570b5551f..0b0df1bf11fcea 100644 --- a/tests/automated/7_can_loopback.py +++ b/tests/automated/7_can_loopback.py @@ -1,6 +1,7 @@ import os import time import random +import threading from panda import Panda from nose.tools import assert_equal, assert_less, assert_greater from .helpers import panda_jungle, start_heartbeat_thread, reset_pandas, time_many_sends, test_all_pandas, test_all_gen2_pandas, clear_can_buffers, panda_connect_and_init @@ -200,3 +201,44 @@ def test(p_send, p_recv): finally: # Set back to silent mode p.set_safety_mode(Panda.SAFETY_SILENT) + +@test_all_pandas +@panda_connect_and_init +def test_bulk_write(p): + # The TX buffers on pandas is 0x100 in length. + NUM_MESSAGES_PER_BUS = 10000 + + def flood_tx(panda): + print('Sending!') + msg = b"\xaa"*4 + packet = [[0xaa, None, msg, 0], [0xaa, None, msg, 1], [0xaa, None, msg, 2]] * NUM_MESSAGES_PER_BUS + + # Disable timeout + panda.can_send_many(packet, timeout=0) + print(f"Done sending {3*NUM_MESSAGES_PER_BUS} messages!") + + # Start heartbeat + start_heartbeat_thread(p) + + # Set safety mode and power saving + p.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + p.set_power_save(False) + + # Start transmisson + threading.Thread(target=flood_tx, args=(p,)).start() + + # Receive as much as we can in a few second time period + rx = [] + old_len = 0 + start_time = time.time() + while time.time() - start_time < 2 or len(rx) > old_len: + old_len = len(rx) + rx.extend(panda_jungle.can_recv()) + print(f"Received {len(rx)} messages") + + # All messages should have been received + if len(rx) != 3*NUM_MESSAGES_PER_BUS: + Exception("Did not receive all messages!") + + # Set back to silent mode + p.set_safety_mode(Panda.SAFETY_SILENT) diff --git a/tests/bulk_write_test.py b/tests/bulk_write_test.py new file mode 100755 index 00000000000000..43a52bce57e22c --- /dev/null +++ b/tests/bulk_write_test.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +import time +import threading + +from panda import Panda + +# The TX buffers on pandas is 0x100 in length. +NUM_MESSAGES_PER_BUS = 10000 + +def flood_tx(panda): + print('Sending!') + msg = b"\xaa"*4 + packet = [[0xaa, None, msg, 0], [0xaa, None, msg, 1], [0xaa, None, msg, 2]] * NUM_MESSAGES_PER_BUS + panda.can_send_many(packet) + print(f"Done sending {3*NUM_MESSAGES_PER_BUS} messages!") + +if __name__ == "__main__": + serials = Panda.list() + if len(serials) != 2: + raise Exception("Connect two pandas to perform this test!") + + sender = Panda(serials[0]) + receiver = Panda(serials[1]) + + sender.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + receiver.set_safety_mode(Panda.SAFETY_ALLOUTPUT) + + # Start transmisson + threading.Thread(target=flood_tx, args=(sender,)).start() + + # Receive as much as we can in a few second time period + rx = [] + old_len = 0 + start_time = time.time() + while time.time() - start_time < 2 or len(rx) > old_len: + old_len = len(rx) + rx.extend(receiver.can_recv()) + print(f"Received {len(rx)} messages")