-
Notifications
You must be signed in to change notification settings - Fork 466
/
executors.py
87 lines (65 loc) · 2.65 KB
/
executors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import random
import threading
from typing import Any, Optional
from materialize.cloudtest.application import MaterializeApplication
from materialize.mzcompose import Composition
# from materialize.cloudtest import
class Executor:
pass
def testdrive(self, input: str) -> Any:
assert False
def mzcompose_composition(self) -> Composition:
assert False
def cloudtest_application(self) -> MaterializeApplication:
assert False
def join(self, handle: Any) -> None:
pass
class MzcomposeExecutor(Executor):
def __init__(self, composition: Composition) -> None:
self.composition = composition
def mzcompose_composition(self) -> Composition:
return self.composition
def testdrive(self, input: str) -> None:
self.composition.testdrive(input)
class MzcomposeExecutorParallel(MzcomposeExecutor):
def __init__(self, composition: Composition) -> None:
self.composition = composition
self.exception: Optional[BaseException] = None
def testdrive(self, input: str) -> Any:
thread = threading.Thread(target=self._testdrive, args=[input])
thread.start()
return thread
def _testdrive(self, input: str) -> None:
try:
self.composition.testdrive(input)
except BaseException as e:
self.exception = e
def join(self, handle: Any) -> None:
assert type(handle) is threading.Thread
handle.join()
if self.exception:
raise self.exception
class CloudtestExecutor(Executor):
def __init__(self, application: MaterializeApplication) -> None:
self.application = application
self.seed = random.getrandbits(32)
def cloudtest_application(self) -> MaterializeApplication:
return self.application
def testdrive(self, input: str) -> None:
self.application.testdrive.run(input=input, no_reset=True, seed=self.seed)