-
Notifications
You must be signed in to change notification settings - Fork 17
/
job_id_validations.py
50 lines (36 loc) · 1.19 KB
/
job_id_validations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
"""
An example job_id_validations file for Stolos.
In this file, you define the components of job_ids.
You can also produce side-effects like configure Stolos's logging
"""
import logging
import datetime as dt
# Example side-effect: Configure logging for Stolos
from stolos.api import configure_logging
log = configure_logging(add_handler=True, colorize=True)
log.setLevel(logging.INFO)
# Example job_id validations
# These validations help Stolos parse your job_ids into component parts and
# verify that they are of acceptable types
def is_valid_date(date):
year = int(str(date)[:4])
month = int(str(date)[4:6])
day = int(str(date)[6:])
return bool(dt.date(year, month, day)) and int(date)
def to_int(inpt):
return int(inpt)
def is_valid_collection_name(collection_name):
return (
any(collection_name.startswith(x) for x in set(
['client', 'profile', 'purchase', 'content'])) and
collection_name)
def to_str(inpt):
if isinstance(inpt, bytes):
inpt = inpt.decode('utf8')
return str(inpt)
JOB_ID_VALIDATIONS = {
'date': is_valid_date,
'client_id': to_int,
'collection_name': is_valid_collection_name,
'testID': to_str,
}