/
common.py
199 lines (160 loc) · 6.45 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import logging
import time
from typing import Any
from typing import Set
from typing import Tuple
import pytest
from cardano_clusterlib import clusterlib
from cardano_node_tests.utils import cluster_management
from cardano_node_tests.utils import cluster_nodes
from cardano_node_tests.utils import clusterlib_utils
from cardano_node_tests.utils import configuration
from cardano_node_tests.utils import pytest_utils
from cardano_node_tests.utils.versions import VERSIONS
LOGGER = logging.getLogger(__name__)
# common `skipif`s
SKIPIF_BUILD_UNUSABLE = pytest.mark.skipif(
not (
VERSIONS.transaction_era >= VERSIONS.MARY
and VERSIONS.transaction_era == VERSIONS.cluster_era
),
reason=(
f"cannot use `build` with cluster era '{VERSIONS.cluster_era_name}' "
f"and TX era '{VERSIONS.transaction_era_name}'"
),
)
SKIPIF_WRONG_ERA = pytest.mark.skipif(
not (
VERSIONS.cluster_era >= VERSIONS.DEFAULT_CLUSTER_ERA
and VERSIONS.transaction_era == VERSIONS.cluster_era
),
reason="meant to run with default era or higher, where cluster era == Tx era",
)
SKIPIF_TOKENS_UNUSABLE = pytest.mark.skipif(
VERSIONS.transaction_era < VERSIONS.MARY,
reason="native tokens are available only in Mary+ eras",
)
SKIPIF_PLUTUS_UNUSABLE = pytest.mark.skipif(
VERSIONS.transaction_era < VERSIONS.ALONZO,
reason="Plutus is available only in Alonzo+ eras",
)
SKIPIF_PLUTUSV2_UNUSABLE = pytest.mark.skipif(
VERSIONS.transaction_era < VERSIONS.BABBAGE or configuration.SKIP_PLUTUSV2,
reason="runs only with Babbage+ TX; needs PlutusV2 cost model",
)
# common parametrization
PARAM_USE_BUILD_CMD = pytest.mark.parametrize(
"use_build_cmd",
(
False,
pytest.param(True, marks=SKIPIF_BUILD_UNUSABLE),
),
ids=("build_raw", "build"),
)
PARAM_PLUTUS_VERSION = pytest.mark.parametrize(
"plutus_version",
(
"v1",
pytest.param("v2", marks=SKIPIF_PLUTUSV2_UNUSABLE),
),
ids=("plutus_v1", "plutus_v2"),
)
# intervals for `wait_for_epoch_interval` (negative values are counted from the end of an epoch)
if cluster_nodes.get_cluster_type().type == cluster_nodes.ClusterType.LOCAL:
# time buffer at the end of an epoch, enough to do something that takes several transactions
EPOCH_STOP_SEC_BUFFER = -40
# time when all ledger state info is available for the current epoch
EPOCH_START_SEC_LEDGER_STATE = -19
# time buffer at the end of an epoch after getting ledger state info
EPOCH_STOP_SEC_LEDGER_STATE = -15
else:
# we can be more generous on testnets
EPOCH_STOP_SEC_BUFFER = -200
EPOCH_START_SEC_LEDGER_STATE = -300
EPOCH_STOP_SEC_LEDGER_STATE = -200
def hypothesis_settings(max_examples: int = 100) -> Any:
# pylint: disable=import-outside-toplevel
import hypothesis
return hypothesis.settings(
max_examples=max_examples,
deadline=None,
suppress_health_check=(
hypothesis.HealthCheck.too_slow,
hypothesis.HealthCheck.function_scoped_fixture,
),
)
def get_test_id(cluster_obj: clusterlib.ClusterLib) -> str:
"""Return unique test ID - function name + assigned cluster instance + random string.
Log the test ID into cluster manager log file.
"""
curr_test = pytest_utils.get_current_test()
rand_str = clusterlib.get_rand_str(3)
test_id = f"{curr_test.test_function}_ci{cluster_obj.cluster_id}_{rand_str}"
# log test ID to cluster manager log file - getting test ID happens early
# after the start of a test, so the log entry can be used for determining
# time of the test start
cm: cluster_management.ClusterManager = cluster_obj._cluster_manager # type: ignore
cm._log(f"c{cm.cluster_instance_num}: got ID `{test_id}` for `{curr_test.full}`")
return test_id
def detect_fork(
cluster_manager: cluster_management.ClusterManager,
cluster_obj: clusterlib.ClusterLib,
temp_template: str,
) -> Tuple[Set[str], Set[str]]:
"""Detect if one or more nodes have forked blockchain or is out of sync."""
forked_nodes: Set[str] = set()
unsynced_nodes: Set[str] = set()
known_nodes = cluster_nodes.get_cluster_type().NODES
if len(known_nodes) <= 1:
LOGGER.warning("WARNING: Not enough nodes available to detect forks, skipping the check.")
return forked_nodes, unsynced_nodes
instance_num = cluster_nodes.get_instance_num()
# create a UTxO
payment_rec = cluster_obj.gen_payment_addr_and_keys(
name=temp_template,
)
tx_raw_output = clusterlib_utils.fund_from_faucet(
payment_rec,
cluster_obj=cluster_obj,
faucet_data=cluster_manager.cache.addrs_data["user1"],
amount=2_000_000,
)
assert tx_raw_output
utxos = cluster_obj.get_utxo(tx_raw_output=tx_raw_output)
# check if all nodes know about the UTxO
for node in known_nodes:
# set 'CARDANO_NODE_SOCKET_PATH' to point to socket of the selected node
cluster_nodes.set_cluster_env(instance_num=instance_num, socket_file_name=f"{node}.socket")
for __ in range(5):
if float(cluster_obj.get_tip()["syncProgress"]) == 100:
break
time.sleep(1)
else:
unsynced_nodes.add(node)
continue
if not cluster_obj.get_utxo(utxo=utxos):
forked_nodes.add(node)
# restore 'CARDANO_NODE_SOCKET_PATH' to original value
cluster_nodes.set_cluster_env(instance_num=instance_num)
# forked nodes are the ones that differ from the majority of nodes
if forked_nodes and len(forked_nodes) > (len(known_nodes) // 2):
forked_nodes = known_nodes - forked_nodes
return forked_nodes, unsynced_nodes
def fail_on_fork(
cluster_manager: cluster_management.ClusterManager,
cluster_obj: clusterlib.ClusterLib,
temp_template: str,
) -> None:
"""Fail if one or more nodes have forked blockchain or is out of sync."""
forked_nodes, unsynced_nodes = detect_fork(
cluster_manager=cluster_manager, cluster_obj=cluster_obj, temp_template=temp_template
)
err_msg = []
if forked_nodes:
err_msg.append(f"Following nodes appear to have forked blockchain: {sorted(forked_nodes)}")
if unsynced_nodes:
err_msg.append(f"Following nodes appear to be out of sync: {sorted(unsynced_nodes)}")
if err_msg:
# the local cluster needs to be restarted before it is usable again
cluster_manager.set_needs_restart()
raise AssertionError("\n".join(err_msg))