-
Notifications
You must be signed in to change notification settings - Fork 1
/
records_to_knack.py
executable file
·189 lines (156 loc) · 6.24 KB
/
records_to_knack.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#!/usr/bin/env python
""" Fetch Knack records from Postgres(t) and upload to another Knack app """
import os
import arrow
import knackpy
from config.knack import CONFIG
from config.field_maps import FIELD_MAPS
import utils
def format_filter_date(date_from_args):
return "1970-01-01" if not date_from_args else arrow.get(date_from_args).isoformat()
def get_pks(field_map, app_name_dest):
"""return the src and destination field name of the primary key"""
pk_field = [f for f in field_map if f.get("primary_key")]
try:
assert len(pk_field) == 1
except AssertionError:
raise ValueError(
"One (and only one) primary key is required. There's an error in the field map configuration." # noqa E501
)
return pk_field[0]["src"], pk_field[0][app_name_dest]
def create_mapped_record(record, field_map, app_name_dest):
"""Map the data from the source Knack app to the destination app schema"""
mapped_record = {}
for field in field_map:
field_src = field["src"]
if field_src:
"""Note that a default value in the field map *never* overrides a value in
the src data unless the src field ID is None"""
val = record.get(field_src)
else:
try:
val = field["default"]
except KeyError:
raise ValueError(
"A default default is required when source field is None"
)
field_dest = field[app_name_dest]
handler_func = field.get("handler")
mapped_record[field_dest] = val if not handler_func else handler_func(val)
return mapped_record
def is_equal(rec_src, rec_dest, keys):
tests = [rec_src[key] == rec_dest[key] for key in keys]
return all(tests)
def remove_raw_tags(records):
"""
Removes "_raw" from field names so special compound datatypes such as Persons or Emails can be
left in their original format and then passed on to the destination Knack app.
"""
fields_to_rename = [f for f in list(records[0].keys()) if "_raw" in f]
if fields_to_rename:
for rec in records:
for f in fields_to_rename:
rec[f.replace("_raw", "")] = rec.pop(f)
return records
def handle_records(data_src, data_dest, field_map, app_name_dest):
"""
data_src: list of records from source knack app
data_dest: list of records in destination knack app
field_map: list of objects containing source app fields mapped to destination app fields
app_name_dest: name of destination knack app
"""
pk_src, pk_dest = get_pks(field_map, app_name_dest)
# make list of fields in records to compare for differences
compare_keys = [
field[app_name_dest] for field in field_map if not field.get("ignore_diff")
]
todos = []
for rec_src in data_src:
matched = False
# mapped record is record from source app with fields that match the destination app
mapped_record = create_mapped_record(rec_src, field_map, app_name_dest)
id_src = mapped_record[pk_dest]
for rec_dest in data_dest:
id_dest = rec_dest[pk_dest]
if id_src == id_dest:
matched = True
if not is_equal(mapped_record, rec_dest, compare_keys):
mapped_record["id"] = rec_dest["id"]
todos.append(mapped_record)
break
if not matched:
todos.append(mapped_record)
todos = remove_raw_tags(todos)
return todos
def main():
APP_ID_SRC = os.getenv("KNACK_APP_ID_SRC")
APP_ID_DEST = os.getenv("KNACK_APP_ID_DEST")
API_KEY_DEST = os.getenv("KNACK_API_KEY_DEST")
PGREST_JWT = os.getenv("PGREST_JWT")
PGREST_ENDPOINT = os.getenv("PGREST_ENDPOINT")
args = utils.args.cli_args(["app-name", "container", "date", "app-name-dest"])
logger.info(args)
app_name_src = args.app_name
app_name_dest = args.app_name_dest
container_src = args.container
config = CONFIG.get(app_name_src).get(container_src)
container_dest = config.get("dest_apps").get(app_name_dest).get("container")
object_dest = config.get("dest_apps").get(app_name_dest).get("object")
client_postgrest = utils.postgrest.Postgrest(PGREST_ENDPOINT, token=PGREST_JWT)
filter_iso_date_str = format_filter_date(args.date)
logger.info(
f"Downloading records from app {APP_ID_SRC} ({app_name_src}), container {container_src}."
)
data_src = client_postgrest.select(
"knack",
params={
"select": "record",
"app_id": f"eq.{APP_ID_SRC}",
"container_id": f"eq.{container_src}",
"updated_at": f"gte.{filter_iso_date_str}",
},
order_by="id",
)
logger.info(f"{len(data_src)} records to process")
if not data_src:
return
logger.info(
f"Updating/creating records in app {APP_ID_DEST} ({app_name_dest}), container {container_dest}."
)
# existing data in destination knack app
data_dest = client_postgrest.select(
"knack",
params={
"select": "record",
"app_id": f"eq.{APP_ID_DEST}",
"container_id": f"eq.{container_dest}",
},
order_by="id",
)
data_src = [r["record"] for r in data_src]
data_dest = [r["record"] for r in data_dest]
field_map = FIELD_MAPS.get(app_name_src).get(container_src)
# identify new/changed records and map to destination Knack app schema
todos = handle_records(data_src, data_dest, field_map, app_name_dest)
updates = sum([bool(rec.get("id")) for rec in todos])
creates = len(todos) - updates
logger.info(f"Updating {updates} records in the destination app.")
logger.info(f"Creating {creates} records in the destination app.")
if not todos:
return
count = 0
for record in todos:
if count % 10 == 0:
logger.info(f"Uploading record {count} of {len(todos)}")
method = "create" if not record.get("id") else "update"
knackpy.api.record(
app_id=APP_ID_DEST,
api_key=API_KEY_DEST,
obj=object_dest,
method=method,
data=record,
)
count += 1
if __name__ == "__main__":
logger = utils.logging.getLogger(__file__)
main()