Skip to content

Commit

Permalink
TX message guaranteed delivery (#421)
Browse files Browse the repository at this point in the history
* wait for tx slots before clearing nak

* fix bootstub

* Fixed misra

* Cleanup

* Added bulk write test to test USB NAK on bulk CAN messages

* Added automated bulk tx test

* Fixed linter

* Fixed latency test influence

* Added timeout to python API

* Disabled can write timeout in bulk write test

Co-authored-by: Robbe <robbe.derks@gmail.com>
  • Loading branch information
gregjhogan and robbederks committed Apr 6, 2020
1 parent d8f6184 commit da8e00f
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 8 deletions.
27 changes: 27 additions & 0 deletions board/drivers/can.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ void can_set_forwarding(int from, int to);

void can_init(uint8_t can_number);
void can_init_all(void);
bool can_tx_check_min_slots_free(uint32_t min);
void can_send(CAN_FIFOMailBox_TypeDef *to_push, uint8_t bus_number, bool skip_tx_hook);
bool can_pop(can_ring *q, CAN_FIFOMailBox_TypeDef *elem);

Expand Down Expand Up @@ -107,6 +108,20 @@ bool can_push(can_ring *q, CAN_FIFOMailBox_TypeDef *elem) {
return ret;
}

uint32_t can_slots_empty(can_ring *q) {
uint32_t ret = 0;

ENTER_CRITICAL();
if (q->w_ptr >= q->r_ptr) {
ret = q->fifo_size - 1U - q->w_ptr + q->r_ptr;
} else {
ret = q->r_ptr - q->w_ptr - 1U;
}
EXIT_CRITICAL();

return ret;
}

void can_clear(can_ring *q) {
ENTER_CRITICAL();
q->w_ptr = 0;
Expand Down Expand Up @@ -317,6 +332,10 @@ void process_can(uint8_t can_number) {
CAN->sTxMailBox[0].TDHR = to_send.RDHR;
CAN->sTxMailBox[0].TDTR = to_send.RDTR;
CAN->sTxMailBox[0].TIR = to_send.RIR;

if (can_tx_check_min_slots_free(MAX_CAN_MSGS_PER_BULK_TRANSFER)) {
usb_outep3_resume_if_paused();
}
}
}

Expand Down Expand Up @@ -405,6 +424,14 @@ void CAN3_TX_IRQ_Handler(void) { process_can(2); }
void CAN3_RX0_IRQ_Handler(void) { can_rx(2); }
void CAN3_SCE_IRQ_Handler(void) { can_sce(CAN3); }

bool can_tx_check_min_slots_free(uint32_t min) {
return
(can_slots_empty(&can_tx1_q) >= min) &&
(can_slots_empty(&can_tx2_q) >= min) &&
(can_slots_empty(&can_tx3_q) >= min) &&
(can_slots_empty(&can_txgmlan_q) >= min);
}

void can_send(CAN_FIFOMailBox_TypeDef *to_push, uint8_t bus_number, bool skip_tx_hook) {
if (skip_tx_hook || safety_tx_hook(to_push) != 0) {
if (bus_number < BUS_MAX) {
Expand Down
23 changes: 19 additions & 4 deletions board/drivers/usb.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ typedef union _USB_Setup {
}
USB_Setup_TypeDef;

#define MAX_CAN_MSGS_PER_BULK_TRANSFER 4U

void usb_init(void);
int usb_cb_control_msg(USB_Setup_TypeDef *setup, uint8_t *resp, bool hardwired);
int usb_cb_ep1_in(void *usbdata, int len, bool hardwired);
void usb_cb_ep2_out(void *usbdata, int len, bool hardwired);
void usb_cb_ep3_out(void *usbdata, int len, bool hardwired);
void usb_cb_ep3_out_complete(void);
void usb_cb_enumeration_complete(void);
void usb_outep3_resume_if_paused(void);

// **** supporting defines ****

Expand Down Expand Up @@ -380,6 +384,7 @@ USB_Setup_TypeDef setup;
uint8_t usbdata[0x100];
uint8_t* ep0_txdata = NULL;
uint16_t ep0_txlen = 0;
bool outep3_processing = false;

// Store the current interface alt setting.
int current_int0_alt_setting = 0;
Expand Down Expand Up @@ -744,6 +749,7 @@ void usb_irqhandler(void) {
}

if (endpoint == 3) {
outep3_processing = true;
usb_cb_ep3_out(usbdata, len, 1);
}
} else if (status == STS_SETUP_UPDT) {
Expand Down Expand Up @@ -816,15 +822,17 @@ void usb_irqhandler(void) {
#ifdef DEBUG_USB
puts(" OUT3 PACKET XFRC\n");
#endif
USBx_OUTEP(3)->DOEPTSIZ = (1U << 19) | 0x40U;
USBx_OUTEP(3)->DOEPCTL |= USB_OTG_DOEPCTL_EPENA | USB_OTG_DOEPCTL_CNAK;
// NAK cleared by process_can (if tx buffers have room)
outep3_processing = false;
usb_cb_ep3_out_complete();
} else if ((USBx_OUTEP(3)->DOEPINT & 0x2000) != 0) {
#ifdef DEBUG_USB
puts(" OUT3 PACKET WTF\n");
#endif
// if NAK was set trigger this, unknown interrupt
USBx_OUTEP(3)->DOEPTSIZ = (1U << 19) | 0x40U;
USBx_OUTEP(3)->DOEPCTL |= USB_OTG_DOEPCTL_CNAK;
// TODO: why was this here? fires when TX buffers when we can't clear NAK
// USBx_OUTEP(3)->DOEPTSIZ = (1U << 19) | 0x40U;
// USBx_OUTEP(3)->DOEPCTL |= USB_OTG_DOEPCTL_CNAK;
} else if ((USBx_OUTEP(3)->DOEPINT) != 0) {
puts("OUTEP3 error ");
puth(USBx_OUTEP(3)->DOEPINT);
Expand Down Expand Up @@ -932,6 +940,13 @@ void usb_irqhandler(void) {
//USBx->GINTMSK = 0xFFFFFFFF & ~(USB_OTG_GINTMSK_NPTXFEM | USB_OTG_GINTMSK_PTXFEM | USB_OTG_GINTSTS_SOF | USB_OTG_GINTSTS_EOPF);
}

void usb_outep3_resume_if_paused() {
if (!outep3_processing && (USBx_OUTEP(3)->DOEPCTL & USB_OTG_DOEPCTL_NAKSTS) != 0) {
USBx_OUTEP(3)->DOEPTSIZ = (1U << 19) | 0x40U;
USBx_OUTEP(3)->DOEPCTL |= USB_OTG_DOEPCTL_EPENA | USB_OTG_DOEPCTL_CNAK;
}
}

void OTG_FS_IRQ_Handler(void) {
NVIC_DisableIRQ(OTG_FS_IRQn);
//__disable_irq();
Expand Down
6 changes: 6 additions & 0 deletions board/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ void usb_cb_ep3_out(void *usbdata, int len, bool hardwired) {
}
}

void usb_cb_ep3_out_complete() {
if (can_tx_check_min_slots_free(MAX_CAN_MSGS_PER_BULK_TRANSFER)) {
usb_outep3_resume_if_paused();
}
}

void usb_cb_enumeration_complete() {
puts("USB enumeration complete\n");
is_enumerated = 1;
Expand Down
1 change: 1 addition & 0 deletions board/pedal/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ void usb_cb_ep3_out(void *usbdata, int len, bool hardwired) {
UNUSED(len);
UNUSED(hardwired);
}
void usb_cb_ep3_out_complete(void) {}
void usb_cb_enumeration_complete(void) {}

int usb_cb_control_msg(USB_Setup_TypeDef *setup, uint8_t *resp, bool hardwired) {
Expand Down
1 change: 1 addition & 0 deletions board/spi_flasher.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ void usb_cb_ep3_out(void *usbdata, int len, bool hardwired) {
UNUSED(len);
UNUSED(hardwired);
}
void usb_cb_ep3_out_complete(void) {}

int is_enumerated = 0;
void usb_cb_enumeration_complete(void) {
Expand Down
13 changes: 9 additions & 4 deletions python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,12 @@ def set_uart_callback(self, uart, install):

# ******************* can *******************

def can_send_many(self, arr):
# The panda will NAK CAN writes when there is CAN congestion.
# libusb will try to send it again, with a max timeout.
# Timeout is in ms. If set to 0, the timeout is infinite.
CAN_SEND_TIMEOUT_MS = 10

def can_send_many(self, arr, timeout=CAN_SEND_TIMEOUT_MS):
snds = []
transmit = 1
extended = 4
Expand All @@ -501,13 +506,13 @@ def can_send_many(self, arr):
for s in snds:
self._handle.bulkWrite(3, s)
else:
self._handle.bulkWrite(3, b''.join(snds))
self._handle.bulkWrite(3, b''.join(snds), timeout=timeout)
break
except (usb1.USBErrorIO, usb1.USBErrorOverflow):
print("CAN: BAD SEND MANY, RETRYING")

def can_send(self, addr, dat, bus):
self.can_send_many([[addr, None, dat, bus]])
def can_send(self, addr, dat, bus, timeout=CAN_SEND_TIMEOUT_MS):
self.can_send_many([[addr, None, dat, bus]], timeout=timeout)

def can_recv(self):
dat = bytearray()
Expand Down
42 changes: 42 additions & 0 deletions tests/automated/7_can_loopback.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import time
import random
import threading
from panda import Panda
from nose.tools import assert_equal, assert_less, assert_greater
from .helpers import panda_jungle, start_heartbeat_thread, reset_pandas, time_many_sends, test_all_pandas, test_all_gen2_pandas, clear_can_buffers, panda_connect_and_init
Expand Down Expand Up @@ -200,3 +201,44 @@ def test(p_send, p_recv):
finally:
# Set back to silent mode
p.set_safety_mode(Panda.SAFETY_SILENT)

@test_all_pandas
@panda_connect_and_init
def test_bulk_write(p):
# The TX buffers on pandas is 0x100 in length.
NUM_MESSAGES_PER_BUS = 10000

def flood_tx(panda):
print('Sending!')
msg = b"\xaa"*4
packet = [[0xaa, None, msg, 0], [0xaa, None, msg, 1], [0xaa, None, msg, 2]] * NUM_MESSAGES_PER_BUS

# Disable timeout
panda.can_send_many(packet, timeout=0)
print(f"Done sending {3*NUM_MESSAGES_PER_BUS} messages!")

# Start heartbeat
start_heartbeat_thread(p)

# Set safety mode and power saving
p.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
p.set_power_save(False)

# Start transmisson
threading.Thread(target=flood_tx, args=(p,)).start()

# Receive as much as we can in a few second time period
rx = []
old_len = 0
start_time = time.time()
while time.time() - start_time < 2 or len(rx) > old_len:
old_len = len(rx)
rx.extend(panda_jungle.can_recv())
print(f"Received {len(rx)} messages")

# All messages should have been received
if len(rx) != 3*NUM_MESSAGES_PER_BUS:
Exception("Did not receive all messages!")

# Set back to silent mode
p.set_safety_mode(Panda.SAFETY_SILENT)
38 changes: 38 additions & 0 deletions tests/bulk_write_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env python3
import time
import threading

from panda import Panda

# The TX buffers on pandas is 0x100 in length.
NUM_MESSAGES_PER_BUS = 10000

def flood_tx(panda):
print('Sending!')
msg = b"\xaa"*4
packet = [[0xaa, None, msg, 0], [0xaa, None, msg, 1], [0xaa, None, msg, 2]] * NUM_MESSAGES_PER_BUS
panda.can_send_many(packet)
print(f"Done sending {3*NUM_MESSAGES_PER_BUS} messages!")

if __name__ == "__main__":
serials = Panda.list()
if len(serials) != 2:
raise Exception("Connect two pandas to perform this test!")

sender = Panda(serials[0])
receiver = Panda(serials[1])

sender.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
receiver.set_safety_mode(Panda.SAFETY_ALLOUTPUT)

# Start transmisson
threading.Thread(target=flood_tx, args=(sender,)).start()

# Receive as much as we can in a few second time period
rx = []
old_len = 0
start_time = time.time()
while time.time() - start_time < 2 or len(rx) > old_len:
old_len = len(rx)
rx.extend(receiver.can_recv())
print(f"Received {len(rx)} messages")

0 comments on commit da8e00f

Please sign in to comment.