-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
cloud_handler.py
42 lines (31 loc) · 1.45 KB
/
cloud_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# Licensed under LICENSE.md; also available at https://www.prefect.io/licenses/alpha-eula
from typing import Any, Optional
from prefect import config
from prefect.client import Client, FlowRuns, TaskRuns
from prefect.engine.state import State
class CloudHandler:
def __init__(self) -> None:
if config.get("prefect_cloud", None):
self.load_prefect_client()
def load_prefect_client(self) -> None:
client = Client()
client.login(email=config.email, password=config.password)
self.flow_runs_gql = FlowRuns(client=client)
self.task_runs_gql = TaskRuns(client=client)
self.flow_run_id = config.get("flow_run_id")
def setFlowRunState(self, version: int, state: State) -> None:
self.flow_runs_gql.set_state(
flow_run_id=self.flow_run_id, state=state, version=version
)
def getFlowRunVersion(self) -> int:
gql_output = self.flow_runs_gql.query(flow_run_id=self.flow_run_id)
return gql_output.flowRuns[0].version # type: ignore
def setTaskRunState(self, task_run_id: str, version: int, state: State) -> None:
self.task_runs_gql.set_state(
task_run_id=task_run_id, state=state, version=version
)
def getTaskRunIdAndVersion(self, task_id: Optional[str]) -> Any:
gql_output = self.task_runs_gql.query(
flow_run_id=self.flow_run_id, task_id=task_id
)
return gql_output.taskRuns[0] # type: ignore