-
Notifications
You must be signed in to change notification settings - Fork 4
/
wp.py
368 lines (303 loc) · 16.4 KB
/
wp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
"""
Combined split/merge algorithm that:
1. Brings any changes from work packages to the master database
2. Regenerates work packages based on the master database
"""
import sqlite3
import os
import shutil
import pygeodiff
from pathlib import Path
import yaml
from .wp_utils import escape_double_quotes
from .remapping import remap_table_master_to_wp, remap_table_wp_to_master
# Layout of files:
#
# base/ -- geopackages as generated by previous run of this tool (must not be modified by user!)
# master.gpkg -- master DB containing all data
# WP1.gpkg
# WP2.gpkg
# input/ -- geopackages like in base/ subdir, but they may have been modified by users
# master.gpkg
# WP1.gpkg
# WP2.gpkg
#
# output/ -- geopackages that have been merged (master.gpkg is the single source of truth) and then split again
# master.gpkg
# WP1.gpkg
# WP2.gpkg
# master-input-output.diff -- difference between "input" and "output" (i.e. what are all the changes in WPs)
# master-base-output.diff -- difference between "base" and "output" (i.e. all master changes + all WP changes)
# WP1-input-output.diff -- difference between "input" and "output" (collated changes to be applied to the WP)
# set to True when testing/debugging temporary diffs
DEBUG_DIFFS = False
class WPTable(object):
"""Describes how to handle a table in work packages"""
FILTER_METHOD_COLUMN = "filter-column"
FILTER_METHOD_GEOMETRY = "filter-geometry"
def __init__(self, name, filter_method, filter_column_name=None):
"""
:param name: Name of the database table
:param filter_column_name: Name of the column used for filtering
"""
self.name = name
self.filter_method = filter_method
self.filter_column_name = filter_column_name
class WPName(object):
"""Describes configuration of a single work package"""
def __init__(self, name, value, mergin_project):
"""
:param name: Name of work package (user-defined), e.g. Team_A
:param value: Accepted value (or values) for filtering
:param mergin_project: Associated Mergin project (full name, e.g. lutraconsulting/survey-team-a)
"""
self.name = name
self.value = value
self.mergin_project = mergin_project
class WPConfig(object):
"""Full configuration of the work packaging algorithm"""
def __init__(self, master_gpkg, wp_names, wp_tables):
"""
:param wp_names: List of WPName objects
:param wp_tables: List of WPTable objects
"""
self.master_gpkg = master_gpkg
self.wp_names = wp_names
self.wp_tables = wp_tables
def load_config_from_yaml(config_yaml: str) -> WPConfig:
"""
Reads configuration of work packages from YAML config file.
Returns WPConfig instance or raises an exception if there was a parsing error.
"""
with open(config_yaml, "r") as stream:
try:
root_yaml = yaml.safe_load(stream)
except yaml.YAMLError as exc:
raise ValueError("Unable to parse config YAML:\n" + str(exc))
master_gpkg = root_yaml["file"]
wp_names, wp_tables = [], []
for name_yaml in root_yaml["work-packages"]:
wp_names.append(WPName(name_yaml["name"], name_yaml["value"], name_yaml["mergin-project"]))
for table_yaml in root_yaml["tables"]:
wp_tables.append(WPTable(table_yaml["name"], table_yaml["method"], table_yaml.get("filter-column-name")))
return WPConfig(master_gpkg, wp_names, wp_tables)
def make_work_packages(data_dir: str, wp_config: WPConfig) -> None:
"""
This is the core part of the algorithm for merging and splitting data for work packages.
It expects a data directory with layout of directories and files as described in the header
of this file.
The first stage collects changes from the master DB and the work package DBs and
combines them together, resolving any conflicts. At the end of the first stage we have
updated master database. The second stage then re-creates individual work package DBs.
"""
base_dir = os.path.join(data_dir, "base") # where the non-modified GPKGs from the last run should be
input_dir = os.path.join(data_dir, "input") # where the existing GPKG for each existing WP should be
output_dir = os.path.join(data_dir, "output") # !!!! we are deleting this directory and recreating it every time!
tmp_dir = os.path.join(data_dir, "tmp") # for any temporary stuff (also deleted + recreated)
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
os.makedirs(output_dir)
if os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
os.makedirs(tmp_dir)
old_wp_names = (
[]
) # names of WPs that have been processed before (and we expect their GPKGs exist and may be modified)
if os.path.exists(base_dir):
for path in Path(base_dir).iterdir():
filename = path.name
if filename == "master.gpkg":
continue # skip the master file - it's not a work package
if filename.endswith(".gpkg"):
wp_name = filename[:-5] # strip the suffix
old_wp_names.append(wp_name)
print("existing WPs: " + str(old_wp_names))
def _logger_callback(level, text_bytes):
text = text_bytes.decode() # convert bytes to str
print("GEODIFF: ", text)
geodiff = pygeodiff.GeoDiff()
# set up logging to get extra info from geodiff.
# geodiff.LevelDebug may be useful for debugging but it's too much info in most cases
geodiff.set_maximum_logger_level(geodiff.LevelInfo)
geodiff.set_logger_callback(_logger_callback)
master_gpkg_base = os.path.join(base_dir, "master.gpkg") # should not have been modified
master_gpkg_input = os.path.join(input_dir, "master.gpkg") # this could have been modified by users
master_gpkg_output = os.path.join(output_dir, "master.gpkg") # does not exist yet
if os.path.exists(master_gpkg_base):
# summarize changes that have happened in master (base master VS input master)
# (this is not needed anywhere in the code, but may be useful for debugging)
master_base_to_input = os.path.join(tmp_dir, "master-base-input.diff")
master_base_to_input_json = os.path.join(tmp_dir, "master-base-input.json")
geodiff.create_changeset(master_gpkg_base, master_gpkg_input, master_base_to_input)
if DEBUG_DIFFS:
geodiff.list_changes(master_base_to_input, master_base_to_input_json)
# create new master_gpkg in the output directory
shutil.copy(master_gpkg_input, master_gpkg_output)
# copy "base" remapping DB to "output" where we may be adding some more entries
remap_db_base = os.path.join(base_dir, "remap.db")
remap_db_output = os.path.join(output_dir, "remap.db")
if old_wp_names and not os.path.exists(remap_db_base):
raise ValueError("remap.db should exist!")
if not old_wp_names and os.path.exists(remap_db_base):
raise ValueError("remap.db should not exist yet!")
if os.path.exists(remap_db_base):
shutil.copy(remap_db_base, remap_db_output)
print("STAGE 1 [WP -> MASTER]")
# STAGE 1: Bring the changes from WPs to master
# (remap WP database + create changeset + rebase changeset)
for wp_name in old_wp_names:
print("WP " + wp_name)
# get max. fids for tables (so that we know where to start when remapping)
db = sqlite3.connect(master_gpkg_output)
c = db.cursor()
new_master_fids = {}
for wp_table in wp_config.wp_tables:
wp_table_name = wp_table.name
wp_tab_name_esc = escape_double_quotes(wp_table_name)
c.execute(f"""SELECT max(fid) FROM {wp_tab_name_esc};""")
new_master_fid = c.fetchone()[0]
if new_master_fid is None:
new_master_fid = 1 # empty table so far
else:
new_master_fid += 1
new_master_fids[wp_table_name] = new_master_fid
c = None
db = None
# TODO: check whether the changes in the DB are allowed (matching the deciding column)
wp_gpkg_base_wp_fids = os.path.join(base_dir, wp_name + ".gpkg") # should not have been modified by user
wp_gpkg_input_wp_fids = os.path.join(input_dir, wp_name + ".gpkg") # may have been modified by user
wp_gpkg_base = os.path.join(tmp_dir, wp_name + "-base.gpkg") # should not have been modified by user
wp_gpkg_input = os.path.join(tmp_dir, wp_name + "-input.gpkg") # may have been modified by user
shutil.copy(wp_gpkg_base_wp_fids, wp_gpkg_base)
shutil.copy(wp_gpkg_input_wp_fids, wp_gpkg_input)
# re-map local fids of the WP gpkg to master fids (based on previously created mapping DB)
for x in [wp_gpkg_base, wp_gpkg_input]:
db = sqlite3.connect(x)
db.enable_load_extension(True) # for spatialite
c = db.cursor()
c.execute("SELECT load_extension('mod_spatialite');") # TODO: how to deal with it?
c.execute("ATTACH ? AS remap", (remap_db_output,))
c.execute("BEGIN")
for wp_table in wp_config.wp_tables:
remap_table_wp_to_master(c, wp_table.name, wp_name, new_master_fids[wp_table.name])
c.execute("COMMIT")
db.close()
wp_changeset_base_input = os.path.join(tmp_dir, wp_name + "-base-input.diff")
wp_changeset_base_input_json = os.path.join(tmp_dir, wp_name + "-base-input.json")
tmp_master_base_output = os.path.join(tmp_dir, "tmp-master-base-output.diff")
tmp_master_base_output_json = os.path.join(tmp_dir, "tmp-master-base-output.json")
wp_rebased_changeset = os.path.join(tmp_dir, wp_name + "-rebased.diff")
wp_rebased_changeset_json = os.path.join(tmp_dir, wp_name + "-rebased.json")
wp_rebased_changeset_conflicts = os.path.join(tmp_dir, wp_name + "-rebased-conflicts.json")
# create changeset using pygeodiff using wp_gpkg_base + wp_gpkg_input
geodiff.create_changeset(wp_gpkg_base, wp_gpkg_input, wp_changeset_base_input)
if DEBUG_DIFFS:
geodiff.list_changes(wp_changeset_base_input, wp_changeset_base_input_json)
if not geodiff.has_changes(wp_changeset_base_input):
# there are no changes in the WP -> nothing to do
print(" -- no changes")
continue
geodiff.create_changeset(master_gpkg_base, master_gpkg_output, tmp_master_base_output)
if DEBUG_DIFFS:
geodiff.list_changes(tmp_master_base_output, tmp_master_base_output_json)
if not geodiff.has_changes(tmp_master_base_output):
# master gpkg has no difference between "base" and "output" yet,
# so we can just apply wp_changeset_base_input without rebasing
print(" -- apply (without rebase)")
geodiff.apply_changeset(master_gpkg_output, wp_changeset_base_input)
continue
# Create rebased changeset
# rebase changeset - to resolve conflicts, for example:
# - WP1 deleted a row that WP2 also wants to delete
# - WP1 updated a row that WP2 also updated
# - WP1 updated a row that WP2 deleted
# - WP1 deleted a row that WP2 updated
# - WP1 inserted a row with FID that WP2 also wants to insert -- this should not happen
# because remapping should assign unique master FIDs
geodiff.create_rebased_changeset_ex(
"sqlite",
"",
master_gpkg_base,
wp_changeset_base_input,
tmp_master_base_output,
wp_rebased_changeset,
wp_rebased_changeset_conflicts,
)
if DEBUG_DIFFS:
geodiff.list_changes(wp_rebased_changeset, wp_rebased_changeset_json)
print(" -- apply (with rebase)")
geodiff.apply_changeset(master_gpkg_output, wp_rebased_changeset)
# summarize changes that have happened in WPs (input master VS output master)
# (this is not needed anywhere in the code, but may be useful for debugging)
master_input_to_output = os.path.join(output_dir, "master-input-output.diff")
master_input_to_output_json = os.path.join(output_dir, "master-input-output.json")
geodiff.create_changeset(master_gpkg_input, master_gpkg_output, master_input_to_output)
if DEBUG_DIFFS:
geodiff.list_changes(master_input_to_output, master_input_to_output_json)
if os.path.exists(master_gpkg_base):
# summarize all the changes that have happened since last run (collated master changes + wp changes)
# (this is not needed anywhere in the code, but may be useful for debugging)
master_base_to_output = os.path.join(output_dir, "master-base-output.diff")
master_base_to_output_json = os.path.join(output_dir, "master-base-output.json")
geodiff.create_changeset(master_gpkg_base, master_gpkg_output, master_base_to_output)
if DEBUG_DIFFS:
geodiff.list_changes(master_base_to_output, master_base_to_output_json)
print("STAGE 2 [MASTER -> WP]")
# STAGE 2: Regenerate WP databases
# (make "new" WP database + filter database based on WP + remap DB)
for wp in wp_config.wp_names:
wp_name, wp_value, wp_mergin_project = wp.name, wp.value, wp.mergin_project
print("WP ", wp_name)
wp_gpkg_base = os.path.join(base_dir, wp_name + ".gpkg") # should not have been modified by user
wp_gpkg_input = os.path.join(input_dir, wp_name + ".gpkg") # may have been modified by user
wp_gpkg_output = os.path.join(output_dir, wp_name + ".gpkg") # does not exist yet
wp_changeset_input_to_output = os.path.join(output_dir, wp_name + "-input-output.diff")
wp_changeset_input_to_output_json = os.path.join(output_dir, wp_name + "-input-output.json")
# start from a copy of the master
shutil.copy(master_gpkg_output, wp_gpkg_output)
# filter out data that does not belong to the WP
# and remap fids in the DB from master to WP-local fids
db = sqlite3.connect(wp_gpkg_output)
db.enable_load_extension(True) # for spatialite
c = db.cursor()
c.execute("SELECT load_extension('mod_spatialite');") # TODO: how to deal with it?
c.execute("ATTACH ? AS remap", (remap_db_output,))
c.execute("BEGIN")
for wp_table in wp_config.wp_tables:
wp_table_name = wp_table.name
wp_filter_method = wp_table.filter_method
wp_tab_name_esc = escape_double_quotes(wp_table_name)
if wp_filter_method == WPTable.FILTER_METHOD_GEOMETRY:
intersects_query = f"ST_Intersects(GeomFromGPB(geometry), ST_GeomFromText('{wp_value}'))"
c.execute(f"""delete from {wp_tab_name_esc} where not {intersects_query}""")
else:
wp_filter_column = wp_table.filter_column_name
wp_filter_column_escaped = escape_double_quotes(wp_filter_column)
c.execute(f"""delete from {wp_tab_name_esc} where {wp_filter_column_escaped} IS NULL""")
if isinstance(wp_value, (str, int, float)):
c.execute(f"""delete from {wp_tab_name_esc} where {wp_filter_column_escaped} != ?""", (wp_value,))
elif isinstance(wp_value, list):
values_str = ",".join(["?"] * len(wp_value))
c.execute(
f"""delete from {wp_tab_name_esc} where {wp_filter_column_escaped} not in ({values_str})""",
wp_value,
)
else:
# we may want to support some custom SQL at some point too
raise ValueError("what?")
remap_table_master_to_wp(c, wp_table.name, wp_name)
# TODO: drop tables that are not listed at all (?)
c.execute("COMMIT")
# run VACUUM to purge anything that does not belong to the WP data
c.execute("VACUUM")
# explicitly close the connection to avoid possible
# "recovered N frames from WAL file" warnings from geodiff (due to two different sqlite3 libs)
db.close()
# get changeset between the one received from WP and newly created GPKG
if os.path.exists(wp_gpkg_input):
geodiff.create_changeset(wp_gpkg_input, wp_gpkg_output, wp_changeset_input_to_output)
if DEBUG_DIFFS:
geodiff.list_changes(wp_changeset_input_to_output, wp_changeset_input_to_output_json)
else:
# first time this WP is created...
pass # TODO: what to do?