-
Notifications
You must be signed in to change notification settings - Fork 165
/
core.py
218 lines (174 loc) · 8.11 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import inspect
import warnings
from .._utils import deprecated
from ..hdl.cd import *
from ..hdl.ir import *
from ._base import BaseEngine
__all__ = ["Settle", "Delay", "Tick", "Passive", "Active", "Simulator"]
class Command:
pass
class Settle(Command):
def __repr__(self):
return "(settle)"
class Delay(Command):
def __init__(self, interval=None):
self.interval = None if interval is None else float(interval)
def __repr__(self):
if self.interval is None:
return "(delay ε)"
else:
return "(delay {:.3}us)".format(self.interval * 1e6)
class Tick(Command):
def __init__(self, domain="sync"):
if not isinstance(domain, (str, ClockDomain)):
raise TypeError("Domain must be a string or a ClockDomain instance, not {!r}"
.format(domain))
assert domain != "comb"
self.domain = domain
def __repr__(self):
return "(tick {})".format(self.domain)
class Passive(Command):
def __repr__(self):
return "(passive)"
class Active(Command):
def __repr__(self):
return "(active)"
class Simulator:
def __init__(self, fragment, *, engine="pysim"):
if isinstance(engine, type) and issubclass(engine, BaseEngine):
pass
elif engine == "pysim":
from .pysim import PySimEngine
engine = PySimEngine
else:
raise TypeError("Value '{!r}' is not a simulation engine class or "
"a simulation engine name"
.format(engine))
self._fragment = Fragment.get(fragment, platform=None).prepare()
self._engine = engine(self._fragment)
self._clocked = set()
def _check_process(self, process):
if not (inspect.isgeneratorfunction(process) or inspect.iscoroutinefunction(process)):
raise TypeError("Cannot add a process {!r} because it is not a generator function"
.format(process))
return process
def add_process(self, process):
process = self._check_process(process)
def wrapper():
# Only start a bench process after comb settling, so that the reset values are correct.
yield Settle()
yield from process()
self._engine.add_coroutine_process(wrapper, default_cmd=None)
def add_sync_process(self, process, *, domain="sync"):
process = self._check_process(process)
def wrapper():
# Only start a sync process after the first clock edge (or reset edge, if the domain
# uses an asynchronous reset). This matches the behavior of synchronous FFs.
yield Tick(domain)
yield from process()
self._engine.add_coroutine_process(wrapper, default_cmd=Tick(domain))
def add_clock(self, period, *, phase=None, domain="sync", if_exists=False):
"""Add a clock process.
Adds a process that drives the clock signal of ``domain`` at a 50% duty cycle.
Arguments
---------
period : float
Clock period. The process will toggle the ``domain`` clock signal every ``period / 2``
seconds.
phase : None or float
Clock phase. The process will wait ``phase`` seconds before the first clock transition.
If not specified, defaults to ``period / 2``.
domain : str or ClockDomain
Driven clock domain. If specified as a string, the domain with that name is looked up
in the root fragment of the simulation.
if_exists : bool
If ``False`` (the default), raise an error if the driven domain is specified as
a string and the root fragment does not have such a domain. If ``True``, do nothing
in this case.
"""
if isinstance(domain, ClockDomain):
if (domain.name in self._fragment.domains and
domain is not self._fragment.domains[domain.name]):
warnings.warn("Adding a clock process that drives a clock domain object "
"named {!r}, which is distinct from an identically named domain "
"in the simulated design"
.format(domain.name),
UserWarning, stacklevel=2)
elif domain in self._fragment.domains:
domain = self._fragment.domains[domain]
elif if_exists:
return
else:
raise ValueError("Domain {!r} is not present in simulation"
.format(domain))
if domain in self._clocked:
raise ValueError("Domain {!r} already has a clock driving it"
.format(domain.name))
# We represent times internally in 1 ps units, but users supply float quantities of seconds
period = int(period * 1e12)
if phase is None:
# By default, delay the first edge by half period. This causes any synchronous activity
# to happen at a non-zero time, distinguishing it from the reset values in the waveform
# viewer.
phase = period // 2
self._engine.add_clock_process(domain.clk, phase=phase, period=period)
self._clocked.add(domain)
def reset(self):
"""Reset the simulation.
Assign the reset value to every signal in the simulation, and restart every user process.
"""
self._engine.reset()
# TODO(amaranth-0.4): replace with _real_step
@deprecated("instead of `sim.step()`, use `sim.advance()`")
def step(self):
return self.advance()
def advance(self):
"""Advance the simulation.
Run every process and commit changes until a fixed point is reached, then advance time
to the closest deadline (if any). If there is an unstable combinatorial loop,
this function will never return.
Returns ``True`` if there are any active processes, ``False`` otherwise.
"""
return self._engine.advance()
def run(self):
"""Run the simulation while any processes are active.
Processes added with :meth:`add_process` and :meth:`add_sync_process` are initially active,
and may change their status using the ``yield Passive()`` and ``yield Active()`` commands.
Processes compiled from HDL and added with :meth:`add_clock` are always passive.
"""
while self.advance():
pass
def run_until(self, deadline, *, run_passive=False):
"""Run the simulation until it advances to ``deadline``.
If ``run_passive`` is ``False``, the simulation also stops when there are no active
processes, similar to :meth:`run`. Otherwise, the simulation will stop only after it
advances to or past ``deadline``.
If the simulation stops advancing, this function will never return.
"""
# Convert deadline in seconds into internal 1 ps units
deadline = deadline * 1e12
assert self._engine.now <= deadline
while (self.advance() or run_passive) and self._engine.now < deadline:
pass
def write_vcd(self, vcd_file, gtkw_file=None, *, traces=()):
"""Write waveforms to a Value Change Dump file, optionally populating a GTKWave save file.
This method returns a context manager. It can be used as: ::
sim = Simulator(frag)
sim.add_clock(1e-6)
with sim.write_vcd("dump.vcd", "dump.gtkw"):
sim.run_until(1e-3)
Arguments
---------
vcd_file : str or file-like object
Verilog Value Change Dump file or filename.
gtkw_file : str or file-like object
GTKWave save file or filename.
traces : iterable of Signal
Signals to display traces for.
"""
if self._engine.now != 0:
for file in (vcd_file, gtkw_file):
if hasattr(file, "close"):
file.close()
raise ValueError("Cannot start writing waveforms after advancing simulation time")
return self._engine.write_vcd(vcd_file=vcd_file, gtkw_file=gtkw_file, traces=traces)