Skip to content

Commit

Permalink
Introduce COCOTB_TRUST_INERTIAL_WRITES
Browse files Browse the repository at this point in the history
Behavior and performance optimization by trusting inertial writes in the
VPI/VHPI/FLI rather than grossing mimicing them by applying writes in
ReadWrite callback.
  • Loading branch information
ktbarrett committed May 8, 2024
1 parent 2db2ac0 commit e737666
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 78 deletions.
12 changes: 12 additions & 0 deletions docs/source/building.rst
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ Cocotb
This is determined with ``cocotb-config --libpython`` in cocotb's makefiles.


.. envvar:: COCOTB_TRUST_INERTIAL_WRITES

Defining this variable enables a mode which allows cocotb to trust that VPI/VHPI/FLI inertial writes are applied properly.
This mode can lead to noticable performance improvements,
and also includes some behavioral difference that are considered by the cocotb maintainers to be "better".
Not all simulators handle inertial writes properly, so use with caution.

This is achieved by *not* scheduling writes to occur at the beginning of the ``ReadWrite`` mode,
but instead trusting that the simulator's inertial write mechanism is correct.
This allows cocotb to avoid a VPI callback into Python to apply writes.


Regression Manager
~~~~~~~~~~~~~~~~~~

Expand Down
1 change: 1 addition & 0 deletions docs/source/newsfragments/3873.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Introduced :envvar:`COCOTB_TRUST_INERTIAL_WRITES` to enable a mode where VPI/VHPI/FLI inertial writes are trusted to behave properly. Enabling this feature can lead to behavioral and noticable performance improvements. Some simulators do not handle writes properly, so use this option with caution.
6 changes: 6 additions & 0 deletions src/cocotb/_conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Copyright cocotb contributors
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause
import os

_trust_inertial = "COCOTB_TRUST_INERTIAL_WRITES" in os.environ
18 changes: 13 additions & 5 deletions src/cocotb/_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@
import warnings
from collections import OrderedDict
from collections.abc import Coroutine
from typing import Any, Callable, Union
from typing import Any, Callable, Dict, Sequence, Tuple, Union

import cocotb
from cocotb import _outcomes, _py_compat
from cocotb.handle import SimHandleBase
from cocotb.result import SimFailure, TestSuccess
from cocotb.task import Task
from cocotb.triggers import (
Expand Down Expand Up @@ -246,7 +247,9 @@ def __init__(self, test_complete_cb: Callable[[], None]) -> None:
# A dictionary of pending (write_func, args), keyed by handle.
# Writes are applied oldest to newest (least recently used).
# Only the last scheduled write to a particular handle in a timestep is performed.
self._write_calls = OrderedDict()
self._write_calls: Dict[
SimHandleBase, Tuple[Callable[..., None], Sequence[Any]]
] = OrderedDict()

self._pending_tasks = []
self._pending_triggers = []
Expand All @@ -273,7 +276,7 @@ async def _do_writes(self):
await self._read_write

while self._write_calls:
handle, (func, args) = self._write_calls.popitem(last=False)
_, (func, args) = self._write_calls.popitem(last=False)
func(*args)
self._writes_pending.clear()

Expand All @@ -298,7 +301,7 @@ def _check_termination(self):
self._timer1._prime(self._test_completed)
self._trigger2tasks = _py_compat.insertion_ordered_dict()
self._terminate = False
self._write_calls = OrderedDict()
self._write_calls.clear()
self._writes_pending.clear()
self._mode = Scheduler._MODE_TERM

Expand Down Expand Up @@ -518,7 +521,12 @@ def _unschedule(self, task):
)
self._abort_test(e)

def _schedule_write(self, handle, write_func, args):
def _schedule_write(
self,
handle: SimHandleBase,
write_func: Callable[..., None],
args: Sequence[Any],
) -> None:
"""Queue `write_func` to be called on the next ReadWrite trigger."""
if self._mode == Scheduler._MODE_READONLY:
raise Exception(
Expand Down
28 changes: 20 additions & 8 deletions src/cocotb/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,31 @@
cast,
)

import cocotb
from cocotb import simulator
from cocotb._conf import _trust_inertial
from cocotb._deprecation import deprecated
from cocotb._py_compat import cached_property
from cocotb.types import Logic, LogicArray
from cocotb.types.range import Range


def _write_now(
_: "ValueObjectBase[Any, Any]", f: Callable[..., None], args: Any
) -> None:
f(*args)


if _trust_inertial:
_inertial_write = _write_now

Check warning on line 65 in src/cocotb/handle.py

View check run for this annotation

Codecov / codecov/patch

src/cocotb/handle.py#L65

Added line #L65 was not covered by tests
else:
import cocotb

def _inertial_write(
handle: "ValueObjectBase[Any, Any]", f: Callable[..., None], args: Any
) -> None:
cocotb._scheduler._schedule_write(handle, f, args)


class _Limits(enum.IntEnum):
SIGNED_NBIT = 1
UNSIGNED_NBIT = 2
Expand Down Expand Up @@ -665,7 +682,7 @@ def set(

value_, action = _map_action_obj_to_value_action_enum_pair(self, value)

self._set_value(value_, action, cocotb._scheduler._schedule_write)
self._set_value(value_, action, _inertial_write)

def setimmediatevalue(
self,
Expand All @@ -682,14 +699,9 @@ def setimmediatevalue(
if self.is_const:
raise TypeError(f"{self._path} is constant")

def _call_now(
handle: "ValueObjectBase[Any, Any]", f: Callable[..., None], args: Any
) -> None:
f(*args)

value_, action = _map_action_obj_to_value_action_enum_pair(self, value)

self._set_value(value_, action, _call_now)
self._set_value(value_, action, _write_now)

@cached_property
def is_const(self) -> bool:
Expand Down
65 changes: 0 additions & 65 deletions tests/test_cases/test_cocotb/test_timing_triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
ReadWrite,
RisingEdge,
Timer,
with_timeout,
)
from cocotb.triggers import _TriggerException as TriggerException
from cocotb.utils import get_sim_time
Expand Down Expand Up @@ -323,67 +322,3 @@ async def test_timer_round_mode(_):
await cocotb.triggers.with_timeout(
Timer(1, "step"), timeout_time=2.5, timeout_unit="step", round_mode="round"
)


@cocotb.test
async def test_writes_on_timer_seen_on_edge(dut):
# steady state
dut.clk.value = 0
await Timer(10, "ns")

# inertial write on a signal
dut.clk.value = 1

# check we can register an edge trigger on the signal we just changed because it hasn't taken effect yet
await with_timeout(RisingEdge(dut.clk), 10, "ns")

# done here or will hang


@cocotb.test(expect_fail=True)
async def test_read_back_in_readwrite(dut):
# steady state
dut.clk.value = 0
await Timer(10, "ns")

# write in the "normal" phase
dut.clk.value = 1

# assert we can read back what we wrote in the ReadWrite phase
await ReadWrite()
assert dut.clk.value == 1


@cocotb.test
async def test_writes_dont_update_hdl_this_delta(dut):
cocotb.start_soon(Clock(dut.clk, 10, "ns").start())

# assert steady state
dut.stream_in_data.value = 0
await RisingEdge(dut.clk)
await ReadOnly()
assert dut.stream_out_data_registered.value == 0

# write on the clock edge
await RisingEdge(dut.clk)
dut.stream_in_data.value = 1

# ensure that the write data wasn't used on this clock cycle
await ReadOnly()
assert dut.stream_out_data_registered.value == 0

# ensure that the write data made it on the result of the next clock cycle
await RisingEdge(dut.clk)
await ReadOnly()
assert dut.stream_out_data_registered.value == 1


@cocotb.test(expect_fail=True)
async def test_write_in_timer_seen_before_readwrite(dut):
dut.clk.value = 0
await Timer(1, "ns")
dut.clk.value = 1
t1 = RisingEdge(dut.clk)
t2 = ReadWrite()
t_res = await First(t1, t2)
assert t_res is t1
12 changes: 12 additions & 0 deletions tests/test_cases/test_inertial_writes/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright cocotb contributors
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause

.PHONY: override_tests
override_tests:
$(MAKE) sim
$(MAKE) COCOTB_TRUST_INERTIAL_WRITES=1 sim

MODULE := inertial_writes_tests

include ../../designs/sample_module/Makefile
124 changes: 124 additions & 0 deletions tests/test_cases/test_inertial_writes/inertial_writes_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Copyright cocotb contributors
# Licensed under the Revised BSD License, see LICENSE for details.
# SPDX-License-Identifier: BSD-3-Clause
import os

import cocotb
from cocotb._conf import _trust_inertial
from cocotb.clock import Clock
from cocotb.result import SimTimeoutError
from cocotb.triggers import First, ReadOnly, ReadWrite, RisingEdge, Timer, with_timeout

SIM_NAME = cocotb.SIM_NAME.lower()
vhdl = os.environ.get("TOPLEVEL_LANG", "verilog").lower() == "vhdl"
verilog = os.environ.get("TOPLEVEL_LANG", "verilog").lower() == "verilog"


# Riviera's VHPI is skipped in all tests when COCOTB_TRUST_INERTIAL_WRITES mode is enable
# because it behaves extremely erratically. Their scheduler has some serious issues.


# Verilator currently only does vpiNoDelay writes.
@cocotb.test(
expect_error=SimTimeoutError
if (SIM_NAME.startswith("verilator") and _trust_inertial)
else (),
skip=SIM_NAME.startswith("riviera") and vhdl and _trust_inertial,
)
async def test_writes_on_timer_seen_on_edge(dut):
# steady state
dut.clk.value = 0
await Timer(10, "ns")

# inertial write on a signal
dut.clk.value = 1

# check we can register an edge trigger on the signal we just changed because it hasn't taken effect yet
await with_timeout(RisingEdge(dut.clk), 10, "ns")


if _trust_inertial:
expect_fail = False
elif SIM_NAME.startswith(("riviera", "modelsim")) and vhdl:
expect_fail = False
elif SIM_NAME.startswith("verilator"):
expect_fail = False
else:
expect_fail = True


# Verilator currently only does vpiNoDelay writes, so this works.
# Riviera and Questa on VHDL designs seem to apply inertial writes in this state immediately,
# presumably because it's the NBA application region.
# This test will fail because the ReadWrite write applicator task does inertial writes of it's own.
@cocotb.test(
expect_fail=expect_fail,
skip=SIM_NAME.startswith("riviera") and vhdl and _trust_inertial,
)
async def test_read_back_in_readwrite(dut):
# steady state
dut.clk.value = 0
await Timer(10, "ns")

# write in the "normal" phase
dut.clk.value = 1

# assert we can read back what we wrote in the ReadWrite phase
await ReadWrite()
assert dut.clk.value == 1


if not _trust_inertial:
expect_fail = False
elif SIM_NAME.startswith(("icarus", "verilator")):
expect_fail = True
elif SIM_NAME.startswith("modelsim") and verilog:
expect_fail = True
elif SIM_NAME.startswith("xmsim") and vhdl:
expect_fail = True


# Verilator currently only does vpiNoDelay writes.
# Icarus, Questa VPI, and Xcelium VHPI inertial writes aren't actually inertial.
@cocotb.test(
expect_fail=expect_fail,
skip=SIM_NAME.startswith("riviera") and vhdl and _trust_inertial,
)
async def test_writes_dont_update_hdl_this_delta(dut):
cocotb.start_soon(Clock(dut.clk, 10, "ns").start())

# assert steady state
dut.stream_in_data.value = 0
await RisingEdge(dut.clk)
await ReadOnly()
assert dut.stream_out_data_registered.value == 0

# write on the clock edge
await RisingEdge(dut.clk)
dut.stream_in_data.value = 1

# ensure that the write data wasn't used on this clock cycle
await ReadOnly()
assert dut.stream_out_data_registered.value == 0

# ensure that the write data made it on the result of the next clock cycle
await RisingEdge(dut.clk)
await ReadOnly()
assert dut.stream_out_data_registered.value == 1


# Verilator currently only does vpiNoDelay writes.
# This test will fail because writes scheduled in Timer won't be applied until ReadWrite.
@cocotb.test(
expect_fail=not _trust_inertial
or (SIM_NAME.startswith("verilator") and _trust_inertial),
skip=SIM_NAME.startswith("riviera") and vhdl and _trust_inertial,
)
async def test_write_in_timer_seen_before_readwrite(dut):
dut.clk.value = 0
await Timer(1, "ns")
dut.clk.value = 1
t1 = RisingEdge(dut.clk)
t2 = ReadWrite()
t_res = await First(t1, t2)
assert t_res is t1

0 comments on commit e737666

Please sign in to comment.