From b6346ac8f082e27e2c41a61b7dc05d0c85541166 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Nguyen?= Date: Thu, 26 Mar 2020 22:00:13 +0100 Subject: [PATCH] serial: fix receiver bit order. Also, port to nmigen v0.1, add more tests and some docs. --- nmigen_stdio/serial.py | 127 +++++++++++++++++++--- nmigen_stdio/test/test_serial.py | 174 +++++++++++++++++++++++++++---- 2 files changed, 262 insertions(+), 39 deletions(-) diff --git a/nmigen_stdio/serial.py b/nmigen_stdio/serial.py index 4da264f..338710a 100644 --- a/nmigen_stdio/serial.py +++ b/nmigen_stdio/serial.py @@ -1,11 +1,17 @@ from nmigen import * -from nmigen.lib.cdc import MultiReg -from nmigen.tools import bits_for +from nmigen.lib.cdc import FFSynchronizer +from nmigen.utils import bits_for __all__ = ["AsyncSerialRX", "AsyncSerialTX", "AsyncSerial"] +def _check_divisor(divisor, bound): + if divisor < bound: + raise ValueError("Invalid divisor {!r}; must be greater than or equal to {}" + .format(divisor, bound)) + + def _check_parity(parity): choices = ("none", "mark", "space", "even", "odd") if parity not in choices: @@ -29,18 +35,55 @@ def _compute_parity_bit(data, parity): def _wire_layout(data_bits, parity="none"): return [ - ("stop", 1), - ("parity", 0 if parity == "none" else 1), - ("data", data_bits), ("start", 1), + ("data", data_bits), + ("parity", 0 if parity == "none" else 1), + ("stop", 1), ] class AsyncSerialRX(Elaboratable): + """Asynchronous serial receiver. + + Parameters + ---------- + divisor : int + Clock divisor reset value. Should be set to ``int(clk_frequency // baudrate)``. + divisor_bits : int + Optional. Clock divisor width. If omitted, ``bits_for(divisor)`` is used instead. + data_bits : int + Data width. + parity : ``"none"``, ``"mark"``, ``"space"``, ``"even"``, ``"odd"`` + Parity mode. + pins : :class:`nmigen.lib.io.Pin` + Optional. UART pins. See :class:`nmigen_boards.resources.UARTResource` for layout. + + Attributes + ---------- + divisor : Signal, in + Clock divisor. + data : Signal, out + Read data. Valid only when ``rdy`` is asserted. + err.overflow : Signal, out + Error flag. A new frame has been received, but the previous one was not acknowledged. + err.frame : Signal, out + Error flag. The received bits do not fit in a frame. + err.parity : Signal, out + Error flag. The parity check has failed. + rdy : Signal, out + Read strobe. + ack : Signal, in + Read acknowledge. Must be held asserted while data can be read out of the receiver. + i : Signal, in + Serial input. If ``pins`` has been specified, ``pins.rx.i`` drives it. + """ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pins=None): _check_parity(parity) self._parity = parity + # The clock divisor must be at least 5 to keep the FSM synchronized with the serial input + # during a DONE->IDLE->BUSY transition. + _check_divisor(divisor, 5) self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor) self.data = Signal(data_bits) @@ -61,10 +104,10 @@ def elaborate(self, platform): timer = Signal.like(self.divisor) shreg = Record(_wire_layout(len(self.data), self._parity)) - bitno = Signal.range(len(shreg)) + bitno = Signal(range(len(shreg))) if self._pins is not None: - m.d.submodules += MultiReg(self._pins.rx.i, self.i, reset=1) + m.submodules += FFSynchronizer(self._pins.rx.i, self.i, reset=1) with m.FSM() as fsm: with m.State("IDLE"): @@ -80,9 +123,9 @@ def elaborate(self, platform): m.d.sync += timer.eq(timer - 1) with m.Else(): m.d.sync += [ - shreg.eq(Cat(self.i, shreg)), + shreg.eq(Cat(shreg[1:], self.i)), bitno.eq(bitno - 1), - timer.eq(self.divisor), + timer.eq(self.divisor - 1), ] with m.If(bitno == 0): m.next = "DONE" @@ -105,17 +148,46 @@ def elaborate(self, platform): class AsyncSerialTX(Elaboratable): + """Asynchronous serial transmitter. + + Parameters + ---------- + divisor : int + Clock divisor reset value. Should be set to ``int(clk_frequency // baudrate)``. + divisor_bits : int + Optional. Clock divisor width. If omitted, ``bits_for(divisor)`` is used instead. + data_bits : int + Data width. + parity : ``"none"``, ``"mark"``, ``"space"``, ``"even"``, ``"odd"`` + Parity mode. + pins : :class:`nmigen.lib.io.Pin` + Optional. UART pins. See :class:`nmigen_boards.resources.UARTResource` for layout. + + Attributes + ---------- + divisor : Signal, in + Clock divisor. + data : Signal, in + Write data. Valid only when ``ack`` is asserted. + rdy : Signal, out + Write ready. Asserted when the transmitter is ready to transmit data. + ack : Signal, in + Write strobe. Data gets transmitted when both ``rdy`` and ``ack`` are asserted. + o : Signal, out + Serial output. If ``pins`` has been specified, it drives ``pins.tx.o``. + """ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pins=None): _check_parity(parity) self._parity = parity + _check_divisor(divisor, 1) self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor) self.data = Signal(data_bits) self.rdy = Signal() self.ack = Signal() - self.o = Signal() + self.o = Signal(reset=1) self._pins = pins @@ -124,7 +196,7 @@ def elaborate(self, platform): timer = Signal.like(self.divisor) shreg = Record(_wire_layout(len(self.data), self._parity)) - bitno = Signal.range(len(shreg)) + bitno = Signal(range(len(shreg))) if self._pins is not None: m.d.comb += self._pins.tx.o.eq(self.o) @@ -132,7 +204,6 @@ def elaborate(self, platform): with m.FSM(): with m.State("IDLE"): m.d.comb += self.rdy.eq(1) - m.d.sync += self.o.eq(shreg[0]) with m.If(self.ack): m.d.sync += [ shreg.start .eq(0), @@ -140,7 +211,7 @@ def elaborate(self, platform): shreg.parity.eq(_compute_parity_bit(self.data, self._parity)), shreg.stop .eq(1), bitno.eq(len(shreg) - 1), - timer.eq(self.divisor), + timer.eq(self.divisor - 1), ] m.next = "BUSY" @@ -151,7 +222,7 @@ def elaborate(self, platform): m.d.sync += [ Cat(self.o, shreg).eq(shreg), bitno.eq(bitno - 1), - timer.eq(self.divisor), + timer.eq(self.divisor - 1), ] with m.If(bitno == 0): m.next = "IDLE" @@ -160,11 +231,35 @@ def elaborate(self, platform): class AsyncSerial(Elaboratable): + """Asynchronous serial transceiver. + + Parameters + ---------- + divisor : int + Clock divisor reset value. Should be set to ``int(clk_frequency // baudrate)``. + divisor_bits : int + Optional. Clock divisor width. If omitted, ``bits_for(divisor)`` is used instead. + data_bits : int + Data width. + parity : ``"none"``, ``"mark"``, ``"space"``, ``"even"``, ``"odd"`` + Parity mode. + pins : :class:`nmigen.lib.io.Pin` + Optional. UART pins. See :class:`nmigen_boards.resources.UARTResource` for layout. + + Attributes + ---------- + divisor : Signal, in + Clock divisor. + rx : :class:`AsyncSerialRX` + See :class:`AsyncSerialRX`. + tx : :class:`AsyncSerialTX` + See :class:`AsyncSerialTX`. + """ def __init__(self, *, divisor, divisor_bits=None, **kwargs): self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor) - self.rx = AsyncSerialRX(**kwargs) - self.tx = AsyncSerialTX(**kwargs) + self.rx = AsyncSerialRX(divisor=divisor, divisor_bits=divisor_bits, **kwargs) + self.tx = AsyncSerialTX(divisor=divisor, divisor_bits=divisor_bits, **kwargs) def elaborate(self, platform): m = Module() diff --git a/nmigen_stdio/test/test_serial.py b/nmigen_stdio/test/test_serial.py index 1b32c48..03f9750 100644 --- a/nmigen_stdio/test/test_serial.py +++ b/nmigen_stdio/test/test_serial.py @@ -1,6 +1,10 @@ +# nmigen: UnusedElaboratable=no + import unittest + from nmigen import * from nmigen.lib.fifo import SyncFIFO +from nmigen.lib.io import pin_layout from nmigen.back.pysim import * from ..serial import * @@ -15,7 +19,7 @@ def simulation_test(dut, process): class AsyncSerialRXTestCase(unittest.TestCase): def tx_period(self): - for _ in range((yield self.dut.divisor) + 1): + for _ in range((yield self.dut.divisor)): yield def tx_bits(self, bits): @@ -39,55 +43,66 @@ def process(): self.assertTrue((yield getattr(self.dut.err, error))) simulation_test(self.dut, process) + def test_invalid_divisor(self): + with self.assertRaisesRegex(ValueError, + r"Invalid divisor 1; must be greater than or equal to 5"): + self.dut = AsyncSerialRX(divisor=1) + + def test_invalid_parity(self): + with self.assertRaisesRegex(ValueError, + r"Invalid parity 'bad'; must be one of none, mark, space, even, odd"): + self.dut = AsyncSerialRX(divisor=5, parity="bad") + def test_8n1(self): - self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="none") - self.rx_test([0, 1,0,1,0,1,1,1,0, 1], data=0b10101110) + self.dut = AsyncSerialRX(divisor=5, data_bits=8, parity="none") + self.rx_test([0, 1,0,1,0,1,1,1,0, 1], data=0b01110101) def test_16n1(self): - self.dut = AsyncSerialRX(divisor=7, data_bits=16, parity="none") + self.dut = AsyncSerialRX(divisor=5, data_bits=16, parity="none") self.rx_test([0, 1,0,1,0,1,1,1,0,1,1,1,1,0,0,0,0, 1], - data=0b1010111011110000) + data=0b0000111101110101) def test_8m1(self): - self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="mark") - self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], data=0b10101110) - self.rx_test([0, 1,0,1,0,1,1,0,0, 1, 1], data=0b10101100) + self.dut = AsyncSerialRX(divisor=5, data_bits=8, parity="mark") + self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], data=0b01110101) + self.rx_test([0, 1,0,1,0,1,1,0,0, 1, 1], data=0b00110101) self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], errors={"parity"}) def test_8s1(self): - self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="space") - self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], data=0b10101110) - self.rx_test([0, 1,0,1,0,1,1,0,0, 0, 1], data=0b10101100) + self.dut = AsyncSerialRX(divisor=5, data_bits=8, parity="space") + self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], data=0b01110101) + self.rx_test([0, 1,0,1,0,1,1,0,0, 0, 1], data=0b00110101) self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], errors={"parity"}) def test_8e1(self): - self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="even") - self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], data=0b10101110) - self.rx_test([0, 1,0,1,0,1,1,0,0, 0, 1], data=0b10101100) + self.dut = AsyncSerialRX(divisor=5, data_bits=8, parity="even") + self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], data=0b01110101) + self.rx_test([0, 1,0,1,0,1,1,0,0, 0, 1], data=0b00110101) self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], errors={"parity"}) def test_8o1(self): - self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="odd") - self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], data=0b10101110) - self.rx_test([0, 1,0,1,0,1,1,0,0, 1, 1], data=0b10101100) + self.dut = AsyncSerialRX(divisor=5, data_bits=8, parity="odd") + self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], data=0b01110101) + self.rx_test([0, 1,0,1,0,1,1,0,0, 1, 1], data=0b00110101) self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], errors={"parity"}) def test_err_frame(self): - self.dut = AsyncSerialRX(divisor=7) + self.dut = AsyncSerialRX(divisor=5) self.rx_test([0, 0,0,0,0,0,0,0,0, 0], errors={"frame"}) def test_err_overflow(self): - self.dut = AsyncSerialRX(divisor=7) + self.dut = AsyncSerialRX(divisor=5) def process(): self.assertFalse((yield self.dut.rdy)) yield from self.tx_bits([0, 0,0,0,0,0,0,0,0, 1]) yield from self.tx_period() + yield self.assertFalse((yield self.dut.rdy)) self.assertTrue((yield self.dut.err.overflow)) simulation_test(self.dut, process) def test_fifo(self): - self.dut = AsyncSerialRX(divisor=7) + self.dut = AsyncSerialRX(divisor=5) self.fifo = SyncFIFO(width=8, depth=4) m = Module() m.submodules.rx = self.dut @@ -100,9 +115,122 @@ def test_fifo(self): def process(): yield from self.tx_bits([0, 1,0,1,0,1,0,1,0, 1, 0, 0,1,0,1,0,1,0,1, 1]) - yield from self.tx_period() - self.assertEqual((yield from self.fifo.read()), 0xAA) - self.assertEqual((yield from self.fifo.read()), 0x55) + self.assertTrue((yield self.fifo.r_rdy)) + self.assertEqual((yield self.fifo.r_data), 0x55) + yield self.fifo.r_en.eq(1) + yield + yield + while not (yield self.fifo.r_rdy): + yield + self.assertEqual((yield self.fifo.r_data), 0xAA) yield self.assertFalse((yield self.fifo.r_rdy)) simulation_test(m, process) + + +class AsyncSerialTXTestCase(unittest.TestCase): + def tx_period(self): + for _ in range((yield self.dut.divisor)): + yield + + def tx_test(self, data, *, bits): + def process(): + self.assertTrue((yield self.dut.rdy)) + yield self.dut.data.eq(data) + yield self.dut.ack.eq(1) + while (yield self.dut.rdy): + yield + for bit in bits: + yield from self.tx_period() + self.assertEqual((yield self.dut.o), bit) + simulation_test(self.dut, process) + + def test_invalid_divisor(self): + with self.assertRaisesRegex(ValueError, + r"Invalid divisor 0; must be greater than or equal to 1"): + self.dut = AsyncSerialTX(divisor=0) + + def test_invalid_parity(self): + with self.assertRaisesRegex(ValueError, + r"Invalid parity 'bad'; must be one of none, mark, space, even, odd"): + self.dut = AsyncSerialTX(divisor=1, parity="bad") + + def test_8n1(self): + self.dut = AsyncSerialTX(divisor=1, data_bits=8, parity="none") + self.tx_test(0b01110101, bits=[0, 1,0,1,0,1,1,1,0, 1]) + + def test_16n1(self): + self.dut = AsyncSerialTX(divisor=1, data_bits=16, parity="none") + self.tx_test(0b0000111101110101, bits=[0, 1,0,1,0,1,1,1,0,1,1,1,1,0,0,0,0, 1]) + + def test_8m1(self): + self.dut = AsyncSerialTX(divisor=1, data_bits=8, parity="mark") + self.tx_test(0b01110101, bits=[0, 1,0,1,0,1,1,1,0, 1, 1]) + self.tx_test(0b00110101, bits=[0, 1,0,1,0,1,1,0,0, 1, 1]) + + def test_8s1(self): + self.dut = AsyncSerialTX(divisor=1, data_bits=8, parity="space") + self.tx_test(0b01110101, bits=[0, 1,0,1,0,1,1,1,0, 0, 1]) + self.tx_test(0b00110101, bits=[0, 1,0,1,0,1,1,0,0, 0, 1]) + + def test_8e1(self): + self.dut = AsyncSerialTX(divisor=1, data_bits=8, parity="even") + self.tx_test(0b01110101, bits=[0, 1,0,1,0,1,1,1,0, 1, 1]) + self.tx_test(0b00110101, bits=[0, 1,0,1,0,1,1,0,0, 0, 1]) + + def test_8o1(self): + self.dut = AsyncSerialTX(divisor=1, data_bits=8, parity="odd") + self.tx_test(0b01110101, bits=[0, 1,0,1,0,1,1,1,0, 0, 1]) + self.tx_test(0b00110101, bits=[0, 1,0,1,0,1,1,0,0, 1, 1]) + + def test_fifo(self): + self.dut = AsyncSerialTX(divisor=1) + self.fifo = SyncFIFO(width=8, depth=4) + m = Module() + m.submodules.tx = self.dut + m.submodules.fifo = self.fifo + m.d.comb += [ + self.dut.ack.eq(self.fifo.r_rdy), + self.dut.data.eq(self.fifo.r_data), + self.fifo.r_en.eq(self.fifo.r_rdy & self.dut.rdy), + ] + def process(): + self.assertTrue((yield self.fifo.w_rdy)) + yield self.fifo.w_en.eq(1) + yield self.fifo.w_data.eq(0x55) + yield + self.assertTrue((yield self.fifo.w_rdy)) + yield self.fifo.w_data.eq(0xAA) + yield + yield self.fifo.w_en.eq(0) + yield + for bit in [0, 1,0,1,0,1,0,1,0, 1]: + yield from self.tx_period() + self.assertEqual((yield self.dut.o), bit) + yield + for bit in [0, 0,1,0,1,0,1,0,1, 1]: + yield from self.tx_period() + self.assertEqual((yield self.dut.o), bit) + simulation_test(m, process) + + +class AsyncSerialTestCase(unittest.TestCase): + def test_loopback(self): + pins = Record([("rx", pin_layout(1, dir="i")), + ("tx", pin_layout(1, dir="o"))]) + self.dut = AsyncSerial(divisor=5, pins=pins) + m = Module() + m.submodules.serial = self.dut + m.d.comb += pins.rx.i.eq(pins.tx.o) + def process(): + self.assertTrue((yield self.dut.tx.rdy)) + yield self.dut.tx.data.eq(0xAA) + yield self.dut.tx.ack.eq(1) + yield + yield self.dut.tx.ack.eq(0) + yield self.dut.rx.ack.eq(1) + yield + while not (yield self.dut.rx.rdy): + yield + self.assertEqual((yield self.dut.rx.data), 0xAA) + simulation_test(m, process)