/
airbyte.py
133 lines (117 loc) · 4.74 KB
/
airbyte.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import time
from typing import Any, Optional
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
class AirbyteHook(HttpHook):
"""
Hook for Airbyte API
:param airbyte_conn_id: Required. The name of the Airflow connection to get
connection information for Airbyte.
:type airbyte_conn_id: str
:param api_version: Optional. Airbyte API version.
:type api_version: str
"""
conn_name_attr = 'airbyte_conn_id'
default_conn_name = 'airbyte_default'
conn_type = 'airbyte'
hook_name = 'Airbyte'
RUNNING = "running"
SUCCEEDED = "succeeded"
CANCELLED = "cancelled"
PENDING = "pending"
FAILED = "failed"
ERROR = "error"
def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: Optional[str] = "v1") -> None:
super().__init__(http_conn_id=airbyte_conn_id)
self.api_version: str = api_version
def wait_for_job(
self, job_id: str, wait_seconds: Optional[float] = 3, timeout: Optional[float] = 3600
) -> None:
"""
Helper method which polls a job to check if it finishes.
:param job_id: Required. Id of the Airbyte job
:type job_id: str
:param wait_seconds: Optional. Number of seconds between checks.
:type wait_seconds: float
:param timeout: Optional. How many seconds wait for job to be ready.
Used only if ``asynchronous`` is False.
:type timeout: float
"""
state = None
start = time.monotonic()
while True:
if timeout and start + timeout < time.monotonic():
raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
time.sleep(wait_seconds)
try:
job = self.get_job(job_id=job_id)
state = job.json()["job"]["status"]
except AirflowException as err:
self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
continue
if state in (self.RUNNING, self.PENDING):
continue
if state == self.SUCCEEDED:
break
if state == self.ERROR:
raise AirflowException(f"Job failed:\n{job}")
elif state == self.CANCELLED:
raise AirflowException(f"Job was cancelled:\n{job}")
else:
raise Exception(f"Encountered unexpected state `{state}` for job_id `{job_id}`")
def submit_sync_connection(self, connection_id: str) -> Any:
"""
Submits a job to a Airbyte server.
:param connection_id: Required. The ConnectionId of the Airbyte Connection.
:type connection_id: str
"""
return self.run(
endpoint=f"api/{self.api_version}/connections/sync",
json={"connectionId": connection_id},
headers={"accept": "application/json"},
)
def get_job(self, job_id: int) -> Any:
"""
Gets the resource representation for a job in Airbyte.
:param job_id: Required. Id of the Airbyte job
:type job_id: int
"""
return self.run(
endpoint=f"api/{self.api_version}/jobs/get",
json={"id": job_id},
headers={"accept": "application/json"},
)
def test_connection(self):
"""Tests the Airbyte connection by hitting the health API"""
self.method = 'GET'
try:
res = self.run(
endpoint=f"api/{self.api_version}/health",
headers={"accept": "application/json"},
extra_options={'check_response': False},
)
if res.status_code == 200:
return True, 'Connection successfully tested'
else:
return False, res.text
except Exception as e: # noqa pylint: disable=broad-except
return False, str(e)
finally:
self.method = 'POST'