/
test_instigation.py
99 lines (84 loc) · 3.51 KB
/
test_instigation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from dagster_graphql.test.utils import (
execute_dagster_graphql,
infer_instigation_selector,
infer_repository_selector,
)
from dagster.core.test_utils import (
SingleThreadPoolExecutor,
create_test_daemon_workspace,
wait_for_futures,
)
from dagster.daemon import get_default_daemon_logger
from dagster.daemon.sensor import execute_sensor_iteration
from .graphql_context_test_suite import NonLaunchableGraphQLContextTestMatrix
INSTIGATION_QUERY = """
query JobQuery($instigationSelector: InstigationSelector!) {
instigationStateOrError(instigationSelector: $instigationSelector) {
__typename
... on PythonError {
message
stack
}
... on InstigationState {
id
nextTick {
timestamp
}
}
}
}
"""
def _create_sensor_tick(graphql_context):
with create_test_daemon_workspace(
graphql_context.process_context.workspace_load_target,
graphql_context.instance,
) as workspace:
logger = get_default_daemon_logger("SensorDaemon")
futures = {}
list(
execute_sensor_iteration(
graphql_context.instance,
logger,
workspace,
threadpool_executor=SingleThreadPoolExecutor(),
debug_futures=futures,
)
)
wait_for_futures(futures)
class TestNextTickRepository(NonLaunchableGraphQLContextTestMatrix):
def test_schedule_next_tick(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
external_repository = graphql_context.get_repository_location(
repository_selector["repositoryLocationName"]
).get_repository(repository_selector["repositoryName"])
schedule_name = "no_config_pipeline_hourly_schedule"
external_schedule = external_repository.get_external_schedule(schedule_name)
selector = infer_instigation_selector(graphql_context, schedule_name)
# need to be running in order to generate a future tick
graphql_context.instance.start_schedule(external_schedule)
result = execute_dagster_graphql(
graphql_context, INSTIGATION_QUERY, variables={"instigationSelector": selector}
)
assert result.data
assert result.data["instigationStateOrError"]["__typename"] == "InstigationState"
next_tick = result.data["instigationStateOrError"]["nextTick"]
assert next_tick
def test_sensor_next_tick(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
external_repository = graphql_context.get_repository_location(
repository_selector["repositoryLocationName"]
).get_repository(repository_selector["repositoryName"])
sensor_name = "always_no_config_sensor"
external_sensor = external_repository.get_external_sensor(sensor_name)
selector = infer_instigation_selector(graphql_context, sensor_name)
# need to be running and create a sensor tick in the last 30 seconds in order to generate a
# future tick
graphql_context.instance.start_sensor(external_sensor)
_create_sensor_tick(graphql_context)
result = execute_dagster_graphql(
graphql_context, INSTIGATION_QUERY, variables={"instigationSelector": selector}
)
assert result.data
assert result.data["instigationStateOrError"]["__typename"] == "InstigationState"
next_tick = result.data["instigationStateOrError"]["nextTick"]
assert next_tick