-
Notifications
You must be signed in to change notification settings - Fork 15
/
helper.py
314 lines (254 loc) · 10.8 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
import attr, backoff, dateutil, datetime, os, requests
import simplejson as json
from urllib.parse import quote as urlquote
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
from dateutil.tz import tzoffset
import singer
from singer import utils
import singer.metrics as metrics
USER_AGENT = ("Mozilla/5.0 (Macintosh; scitylana.singer.io) " +
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 " +
"Safari/537.36 ")
LOGGER = singer.get_logger()
# StitchData compatible timestamp meta data
# https://www.stitchdata.com/docs/data-structure/system-tables-and-columns
# The timestamp of the record extracted from the source
EXTRACT_TIMESTAMP = "_sdc_extracted_at"
# The timestamp of the record submit to the destination
# (kept null at extraction)
BATCH_TIMESTAMP = "_sdc_batched_at"
@attr.s
class Stream(object):
tap_stream_id = attr.ib()
kwargs = attr.ib()
def get_abs_path(path):
"""Returns the absolute path"""
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)
def parse_datetime_tz(datetime_str, default_tz_offset=0):
d = dateutil.parser.parse(datetime_str)
if not d.tzinfo:
d = d.replace(tzinfo=tzoffset(None, default_tz_offset))
return d
def human_readable(bookmark_type, t):
readable = t
if t is not None and bookmark_type == "timestamp":
readable = str(t) + " (" + str(
datetime.datetime.fromtimestamp(t)) + ")"
return readable
def get_record(raw_item, record_level):
"""
Dig the items until the target schema
"""
if not record_level:
return raw_item
record = raw_item
for x in record_level.split(","):
record = record[x.strip()]
return record
def get_record_list(data, record_list_level):
"""
Dig the raw data to the level that contains the list of the records
"""
if not record_list_level:
return data
for x in record_list_level.split(","):
data = data[x.strip()]
return data
def get_bookmark_type(config):
if config.get("timestamp_key"):
return "timestamp"
if config.get("datetime_key"):
return "datetime"
if config.get("index_key"):
return "index"
raise KeyError("You need to set timestamp_key, datetime_key, or index_key")
def get_streams_to_sync(streams, state):
'''Get the streams to sync'''
current_stream = singer.get_currently_syncing(state)
result = streams
if current_stream:
for key in result.keys():
if result[key].tap_stream_id != current_stream:
result.pop(key, None)
if not result:
raise Exception("Unknown stream {} in state".format(current_stream))
return result
def get_selected_streams(remaining_streams, annotated_schema):
selected_streams = []
for key in remaining_streams.keys():
stream = remaining_streams[key]
tap_stream_id = stream.tap_stream_id
for stream_idx, annotated_stream in enumerate(
annotated_schema.streams):
if tap_stream_id == annotated_stream.tap_stream_id:
schema = annotated_stream.schema
if (hasattr(schema, "selected")) and (schema.selected is True):
selected_streams.append(stream)
return selected_streams
def get_start(config, state, tap_stream_id, bookmark_key):
"""
state file, given by --state <state_file> prioritizes over the start
value given by config or args
For human convenience, start_datetime (more human readable) is also looked
up when timestamp_key is set but start_timestamp is not set.
"""
current_bookmark = singer.get_bookmark(state, tap_stream_id, bookmark_key)
if current_bookmark is None:
if config.get("timestamp_key"):
if (not config.get("start_timestamp") and
not config.get("start_datetime")):
raise KeyError("timestamp_key is set but neither " +
"start_timestamp or start_datetime is set")
current_bookmark = config.get("start_timestamp")
if current_bookmark is None:
current_bookmark = dateutil.parser.parse(
config["start_datetime"]).timestamp()
elif config.get("datetime_key"):
if not config.get("start_datetime"):
raise KeyError(
"datetime_key is set but start_datetime is not set")
current_bookmark = config.get("start_datetime")
elif config.get("index_key"):
if config.get("start_index") is None:
raise KeyError("index_key is set but start_index is not set")
current_bookmark = config.get("start_index")
return current_bookmark
def get_end(config):
"""
For human convenience, end_datetime (more human readable) is also looked
up when timestamp_key is set but end_timestamp is not set.
"""
if config.get("timestamp_key"):
end_from_config = config.get("end_timestamp")
if end_from_config is None:
if config.get("end_timestamp") is not None:
end_from_config = dateutil.parser.parse(
config["end_datetime"]).timestamp()
else:
end_from_config = datetime.datetime.now().timestamp()
elif config.get("datetime_key"):
if config.get("end_datetime") is not None:
end_from_config = config.get("end_datetime")
else:
end_from_config = datetime.datetime.now().isoformat()
elif config.get("index_key"):
end_from_config = config.get("end_index")
return end_from_config
def get_last_update(config, record, current):
last_update = current
if config.get("timestamp_key"):
key = config["timestamp_key"]
if (key in record) and record[key] > current:
# Handle the data with sub-seconds converted to int
ex_digits = len(str(int(record[key]))) - 10
last_update = float(record[key]) / (pow(10, ex_digits))
else:
KeyError("timestamp_key not found in the record")
elif config.get("datetime_key"):
key = config["datetime_key"]
if key not in record:
KeyError("datetime_key not found in the record")
record_datetime = parse_datetime_tz(record[key])
current_datetime = parse_datetime_tz(current)
if record_datetime > current_datetime:
last_update = record_datetime.isoformat()
elif config.get("index_key"):
key = config["index_key"]
current_index = str(record.get(key))
LOGGER.debug("Last update will be updated from %s to %s" %
(last_update, current_index))
# When index is an integer, it's dangerous to compare 9 and 10 as
# string for example.
try:
current_index = int(current_index)
except ValueError:
# When the index suddenly changes to str, fall back to string
LOGGER.warning("Previously index was throught to be integer. Now" +
" it seems to be string type. %s %s" %
(last_update, current_index))
last_update = str(last_update)
if current_index and (not current or current_index > current):
last_update = current_index
else:
KeyError("index_key not found in the record")
else:
raise KeyError(
"Neither timestamp_key, datetime_key, or index_key is set")
return last_update
def get_init_endpoint_params(config, state, tap_stream_id):
params = config
start = get_start(config, state, tap_stream_id, "last_update")
end = get_end(config)
if config.get("timestamp_key"):
params.update({"start_timestamp": start})
params.update({"end_timestamp": end})
params.update({"start_datetime":
datetime.datetime.fromtimestamp(start).isoformat()})
params.update({"end_datetime":
datetime.datetime.fromtimestamp(end).isoformat()})
elif config.get("datetime_key"):
params.update({"start_datetime": start})
params.update({"end_datetime": end})
params.update({"start_timestamp":
dateutil.parser.parse(start).timestamp()})
params.update({"end_timestamp":
dateutil.parser.parse(end).timestamp()})
elif config.get("index_key"):
params.update({"start_index": start})
params.update({"end_index": end})
params.update({"current_page": config.get("page_start", 0)})
params.update({"current_offset": config.get("offset_start", 0)})
params.update({"last_update": start})
return params
def get_http_headers(config=None):
if not config or not config.get("http_headers"):
return {"User-Agent": USER_AGENT,
"Content-type": "application/json"}
headers = config["http_headers"]
if type(headers) == str:
headers = json.loads(headers)
LOGGER.debug(headers)
return headers
def get_endpoint(url_format, tap_stream_id, data):
""" Get the full url for the endpoint including query
In addition to data passed from config values, it will create "resource"
that is derived from tap_stream_id.
The special characters in query are quoted with "%XX"
URL can be something like:
https://api.example.com/1/{resource}? \
last_update_start={start_datetime}&last_update_end={end_datetime}& \
items_per_page={items_per_page}&page={current_page}
"""
params = dict()
for key in data:
params[key] = urlquote(str(data[key]).encode("utf-8"))
params["resource"] = urlquote(str(tap_stream_id).encode("utf-8"))
return url_format.format(**params)
def _giveup(exc):
return exc.response is not None \
and 400 <= exc.response.status_code < 500 \
and exc.response.status_code != 429
@utils.backoff((backoff.expo, requests.exceptions.RequestException), _giveup)
@utils.ratelimit(20, 1)
def generate_request(stream_id, url, auth_method="no_auth", headers=None,
username=None, password=None):
"""
url: URL with pre-encoded query. See get_endpoint()
"""
if not auth_method or auth_method == "no_auth":
auth = None
elif auth_method == "basic":
auth = HTTPBasicAuth(username, password)
elif auth_method == "digest":
auth = HTTPDigestAuth(username, password)
else:
raise ValueError("Unknown auth method: " + auth_method)
LOGGER.info("Using %s authentication method." % auth_method)
headers = headers or get_http_headers()
with metrics.http_request_timer(stream_id) as timer:
resp = requests.get(url,
headers=headers,
auth=auth)
timer.tags[metrics.Tag.http_status_code] = resp.status_code
resp.raise_for_status()
return resp.json()