Closed
Description
This "minimal" example is kind of long, but you said you[Whitequark] think you know what buy we're hitting.
I'll keep this example as is for now and try to trim it later.
"""Simple example of a FSM-based ALU
This demonstrates a design that follows the valid/ready protocol of the
ALU, but with a FSM implementation, instead of a pipeline. It is also
intended to comply with both the CompALU API and the nmutil Pipeline API
(Liskov Substitution Principle)
The basic rules are:
1) p.ready_o is asserted on the initial ("Idle") state, otherwise it keeps low.
2) n.valid_o is asserted on the final ("Done") state, otherwise it keeps low.
3) The FSM stays in the Idle state while p.valid_i is low, otherwise
it accepts the input data and moves on.
4) The FSM stays in the Done state while n.ready_i is low, otherwise
it releases the output data and goes back to the Idle state.
"""
from nmigen import Elaboratable, Signal, Module, Cat
cxxsim = True
if cxxsim:
from nmigen.sim.cxxsim import Simulator, Settle
else:
from nmigen.back.pysim import Simulator, Settle
from nmigen.cli import rtlil
from math import log2
class Dummy:
pass
class Shifter(Elaboratable):
"""Simple sequential shifter
Prev port data:
* p.data_i.data: value to be shifted
* p.data_i.shift: shift amount
* When zero, no shift occurs.
* On POWER, range is 0 to 63 for 32-bit,
* and 0 to 127 for 64-bit.
* Other values wrap around.
Next port data:
* n.data_o.data: shifted value
"""
class PrevData:
def __init__(self, width):
self.data = Signal(width, name="p_data_i")
self.shift = Signal(width, name="p_shift_i")
def _get_data(self):
return [self.data, self.shift]
class NextData:
def __init__(self, width):
self.data = Signal(width, name="n_data_o")
def _get_data(self):
return [self.data]
def __init__(self, width):
self.width = width
self.p = Dummy()
self.n = Dummy()
self.p.valid_i = Signal(name="p_valid_i")
self.p.ready_o = Signal(name="p_ready_o")
self.n.ready_i = Signal(name="n_ready_i")
self.n.valid_o = Signal(name="n_valid_o")
self.p.data_i = Shifter.PrevData(width)
self.n.data_o = Shifter.NextData(width)
def elaborate(self, platform):
m = Module()
# Note:
# It is good practice to design a sequential circuit as
# a data path and a control path.
# Data path
# ---------
# The idea is to have a register that can be
# loaded or shifted (left and right).
# the control signals
load = Signal()
shift = Signal()
# the data flow
shift_in = Signal(self.width)
shift_left_by_1 = Signal(self.width)
next_shift = Signal(self.width)
# the register
shift_reg = Signal(self.width, reset_less=True)
# build the data flow
m.d.comb += [
# connect input and output
shift_in.eq(self.p.data_i.data),
self.n.data_o.data.eq(shift_reg),
# generate shifted views of the register
shift_left_by_1.eq(Cat(0, shift_reg[:-1])),
]
# choose the next value of the register according to the
# control signals
# default is no change
m.d.comb += next_shift.eq(shift_reg)
with m.If(load):
m.d.comb += next_shift.eq(shift_in)
with m.Elif(shift):
m.d.comb += next_shift.eq(shift_left_by_1)
# register the next value
m.d.sync += shift_reg.eq(next_shift)
# Control path
# ------------
# The idea is to have a SHIFT state where the shift register
# is shifted every cycle, while a counter decrements.
# This counter is loaded with shift amount in the initial state.
# The SHIFT state is left when the counter goes to zero.
# Shift counter
shift_width = int(log2(self.width)) + 1
next_count = Signal(shift_width)
count = Signal(shift_width, reset_less=True)
m.d.sync += count.eq(next_count)
#m.d.comb += self.p.ready_o.eq(1)
with m.FSM():
with m.State("IDLE"):
m.d.comb += [
# keep p.ready_o active on IDLE
self.p.ready_o.eq(1),
# keep loading the shift register and shift count
load.eq(1),
next_count.eq(self.p.data_i.shift),
]
with m.If(self.p.valid_i):
# Leave IDLE when data arrives
with m.If(next_count == 0):
# short-circuit for zero shift
m.next = "DONE"
with m.Else():
m.next = "SHIFT"
with m.State("SHIFT"):
m.d.comb += [
# keep shifting, while counter is not zero
shift.eq(1),
# decrement the shift counter
next_count.eq(count - 1),
]
with m.If(next_count == 0):
# exit when shift counter goes to zero
m.next = "DONE"
with m.State("DONE"):
# keep n.valid_o active while the data is not accepted
m.d.comb += self.n.valid_o.eq(1)
with m.If(self.n.ready_i):
# go back to IDLE when the data is accepted
m.next = "IDLE"
return m
def __iter__(self):
yield self.p.data_i.data
yield self.p.data_i.shift
yield self.p.valid_i
yield self.p.ready_o
yield self.n.ready_i
yield self.n.valid_o
yield self.n.data_o.data
def ports(self):
return list(self)
def test_shifter():
m = Module()
m.submodules.shf = dut = Shifter(8)
print("Shifter port names:")
for port in dut:
print("-", port.name)
# generate RTLIL
# try "proc; show" in yosys to check the data path
il = rtlil.convert(dut, ports=dut.ports())
with open("test_shifter.il", "w") as f:
f.write(il)
sim = Simulator(m)
sim.add_clock(1e-6)
def send(data, shift):
yield
yield
yield
yield
yield
yield
yield
# present input data and assert valid_i
yield dut.p.data_i.data.eq(data)
yield dut.p.data_i.shift.eq(shift)
yield dut.p.valid_i.eq(1)
yield
print ("set up signals")
# wait for p.ready_o to be asserted
ready_o = yield dut.p.ready_o
print ("ready_o", ready_o)
while not (yield dut.p.ready_o):
ready_o = yield dut.p.ready_o
print ("ready_o", ready_o)
yield
print ("done ready check")
# clear input data and negate p.valid_i
yield dut.p.valid_i.eq(0)
yield dut.p.data_i.data.eq(0)
yield dut.p.data_i.shift.eq(0)
print ("done send")
def receive(expected):
yield
# signal readiness to receive data
yield dut.n.ready_i.eq(1)
yield
# wait for n.valid_o to be asserted
valid_o = yield dut.n.valid_o
print (" valid_o", valid_o)
while not (yield dut.n.valid_o):
valid_o = yield dut.n.valid_o
print (" valid_o", valid_o)
yield
# read result
result = yield dut.n.data_o.data
# "FIX" the problem with this line:
#yield # <---- remove this - pysim "works" but cxxsim does not
# negate n.ready_i
yield dut.n.ready_i.eq(0)
# check result
assert result == expected
print (" done receive")
def producer():
print ("start of producer")
yield from send(3, 4)
print ("end of producer")
def consumer():
yield
# the consumer is not in step with the producer, but the
# order of the results are preserved
# 3 << 4 = 48
print (" start of receiver")
yield from receive(48)
print (" end of receiver")
sim.add_sync_process(producer)
sim.add_sync_process(consumer)
sim_writer = sim.write_vcd(
"test_shifter.vcd",
)
with sim_writer:
sim.run()
if __name__ == "__main__":
test_shifter()