This repository has been archived by the owner on Sep 14, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
fetch.go
300 lines (259 loc) · 8.72 KB
/
fetch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
package shared
import (
"context"
"encoding/json"
"time"
"github.com/CSCfi/qvain-api/internal/psql"
"github.com/CSCfi/qvain-api/pkg/metax"
"github.com/rs/xid"
"github.com/rs/zerolog"
"github.com/wvh/uuid"
)
const DefaultRequestTimeout = 15 * time.Second
const (
SyncWritten = iota
SyncDeleted = iota
SyncSkipped = iota
SyncFailed = iota
)
func Fetch(api *metax.MetaxService, db *psql.DB, logger zerolog.Logger, uid uuid.UUID, extid string) error {
last, err := db.GetLastSync(uid)
if err != nil && err != psql.ErrNotFound {
return err
}
return fetch(api, db, logger, uid, extid, last)
}
func FetchSince(api *metax.MetaxService, db *psql.DB, logger zerolog.Logger, uid uuid.UUID, extid string, since time.Time) error {
return fetch(api, db, logger, uid, extid, since)
}
func FetchAll(api *metax.MetaxService, db *psql.DB, logger zerolog.Logger, uid uuid.UUID, extid string) error {
return fetch(api, db, logger, uid, extid, time.Time{})
}
// FetchDataset syncs a dataset from Metax and returns its Qvain identifier.
func FetchDataset(api *metax.MetaxService, db *psql.DB, logger zerolog.Logger, uid uuid.UUID, metaxIdentifier string) (*uuid.UUID, error) {
blob, err := api.GetId(metaxIdentifier)
if err != nil {
return nil, err
}
// setup DB batch transaction
batch, err := db.NewBatchForUser(uid)
if err != nil {
return nil, err
}
defer batch.Rollback()
// sync record
metaxDatasetQvainId, qvainDatasetSyncTime := getSyncInfo(db, logger, uid)
metaxRecord := metax.MetaxRawRecord{json.RawMessage(blob)}
qvainId, _, err := syncRecord(api, db, logger, batch, metaxDatasetQvainId, qvainDatasetSyncTime, uid, &metaxRecord)
if err != nil {
return nil, err
}
err = batch.Commit()
if err != nil {
return nil, err
}
return qvainId, nil
}
func fetch(api *metax.MetaxService, db *psql.DB, logger zerolog.Logger, uid uuid.UUID, extid string, since time.Time) error {
var params []metax.DatasetOption
// build query options
if extid == "" {
// search by Qvain owner
params = append(params, metax.WithOwner(uid.String()))
} else {
// search by external user identity
params = append(params, metax.WithUser(extid))
}
if !since.IsZero() {
params = append(params, metax.Since(since))
}
// fetch user datasets from Metax
logger.Info().Str("user", uid.String()).Str("identity", extid).Msg("starting sync")
err := syncBatch(api, db, logger, uid, params)
if err != nil {
logger.Info().Err(err).Msg("fetch failed")
return err
}
// fetch removed user datasets from Metax
logger.Info().Str("user", uid.String()).Str("identity", extid).Msg("syncing removed")
params = append(params, metax.WithRemoved())
err = syncBatch(api, db, logger, uid, params)
if err != nil {
logger.Info().Err(err).Msg("fetch failed")
return err
}
return nil
}
func getSyncInfo(db *psql.DB, logger zerolog.Logger, uid uuid.UUID) (map[string]*uuid.UUID, map[uuid.UUID]time.Time) {
metaxDatasetQvainId := make(map[string]*uuid.UUID)
qvainDatasetSyncTime := make(map[uuid.UUID]time.Time)
// get existing Qvain datasets for user
userDatasets, err := db.GetAllForUid(uid)
if err != nil {
logger.Error().Err(err).Msg("failed to get user datasets")
}
// Map Metax identifier in Qvain dataset to the dataset id.
// Used when a dataset from Metax does not have a Qvain id in its editor metadata.
// Also get per-dataset timestamp of last sync.
for _, ds := range userDatasets {
if ds.Family() != metax.MetaxDatasetFamily {
continue
}
qvainDatasetSyncTime[ds.Id] = ds.Synced
metaxIdentifier := metax.GetIdentifier(ds.Blob())
if metaxIdentifier == "" {
continue
}
if _, exists := metaxDatasetQvainId[metaxIdentifier]; exists {
logger.Warn().Str("identifier", metaxIdentifier).Msg("multiple datasets have the same Metax indentifier")
continue
}
metaxDatasetQvainId[metaxIdentifier] = &ds.Id
}
return metaxDatasetQvainId, qvainDatasetSyncTime
}
func syncBatch(api *metax.MetaxService, db *psql.DB, logger zerolog.Logger, uid uuid.UUID, params []metax.DatasetOption) error {
// setup DB batch transaction
batch, err := db.NewBatchForUser(uid)
if err != nil {
return err
}
defer batch.Rollback()
ctx, cancel := context.WithTimeout(context.Background(), DefaultRequestTimeout)
defer cancel()
// create sub-logger to correlate possibly multiple log entries
syncLogger := logger.With().Str("sync-id", xid.New().String()).Logger()
// make API request
total, c, errc, err := api.ReadStreamChannel(ctx, params...)
if err != nil {
return err
}
read := 0
written := 0
deleted := 0
skipped := 0
failed := 0
success := false
metaxDatasetQvainId := make(map[string]*uuid.UUID)
qvainDatasetSyncTime := make(map[uuid.UUID]time.Time)
if total > 0 {
metaxDatasetQvainId, qvainDatasetSyncTime = getSyncInfo(db, syncLogger, uid)
}
// loop until all read, error or timeout
Done:
for {
select {
case fdDataset, more := <-c:
if !more {
success = true
break Done
}
read++
_, status, _ := syncRecord(api, db, syncLogger, batch, metaxDatasetQvainId, qvainDatasetSyncTime, uid, fdDataset)
switch status {
case SyncWritten:
written++
case SyncDeleted:
deleted++
case SyncSkipped:
skipped++
case SyncFailed:
failed++
}
case err := <-errc:
// error while streaming
logger.Info().Err(err).Msg("api error")
return err
case <-ctx.Done():
// timeout
logger.Info().Err(ctx.Err()).Msg("api timeout")
return err
}
}
if success {
err = batch.Commit()
}
if err != nil {
logger.Info().Err(err).Msg("batch error")
return err
}
logger.Info().Int("total", total).Int("written", written).
Int("skipped", skipped).Int("deleted", deleted).Int("failed", failed).Msg("successful sync")
return nil
}
func syncRecord(api *metax.MetaxService, db *psql.DB, logger zerolog.Logger, batch *psql.BatchManager,
metaxDatasetQvainId map[string]*uuid.UUID, qvainDatasetSyncTime map[uuid.UUID]time.Time,
uid uuid.UUID, record *metax.MetaxRawRecord) (*uuid.UUID, int, error) {
// create dataset, use Qvain id from editor metadata if available
dataset, isNew, err := record.ToQvain()
if err != nil {
logger.Debug().Err(err).Msg("error parsing dataset")
return nil, SyncFailed, err
}
// was the Metax dataset not from Qvain?
if isNew {
// check if we already have a dataset with the same Metax identifier
identifier := metax.GetIdentifier(record.RawMessage)
if identifier != "" {
newId, found := metaxDatasetQvainId[identifier]
if found {
// update the existing dataset blob instead of creating a new dataset
isNew = false
dataset.Id = *newId
}
}
}
// delete qvain dataset
if dataset.Removed {
// if the map doesn't contain a previous sync, assume dataset does not exist in qvain
if qvainDatasetSyncTime[dataset.Id].IsZero() {
logger.Debug().Err(err).Str("id", dataset.Id.String()).Msg("not in qvain, skipping deletion")
return nil, SyncSkipped, nil
}
// delete qvain dataset
if err = batch.Delete(dataset.Id); err != nil {
logger.Debug().Err(err).Str("id", dataset.Id.String()).Msg("can't delete dataset")
return nil, SyncFailed, err
}
logger.Debug().Str("id", dataset.Id.String()).Msg("deleted dataset")
return nil, SyncDeleted, nil
}
// create new qvain dataset
if isNew {
// create new id
dataset.Id, err = uuid.NewUUID()
if err != nil {
logger.Debug().Err(err).Str("id", dataset.Id.String()).Msg("error generating uuid")
return nil, SyncFailed, err
}
// inject current user for datasets created externally
dataset.Creator = uid
dataset.Owner = uid
// dataset comes from upstream, so consider it published and valid
dataset.Published = true
dataset.SetValid(true)
if err = batch.CreateWithMetadata(dataset); err != nil {
logger.Debug().Err(err).Str("id", dataset.Id.String()).Msg("can't store dataset")
return nil, SyncFailed, err
}
logger.Debug().Err(err).Str("id", dataset.Id.String()).Msg("created new dataset")
return &dataset.Id, SyncWritten, err
}
// check if we have already synced the Qvain dataset based on modification dates
modified := metax.GetModificationDate(dataset.Blob())
if !modified.IsZero() && !modified.After(qvainDatasetSyncTime[dataset.Id]) {
logger.Debug().Str("id", dataset.Id.String()).Msg("dataset not modified in Metax after last sync")
if err = batch.UpdateSynced(dataset.Id); err != nil {
logger.Debug().Err(err).Str("id", dataset.Id.String()).Msg("could't update sync timestamp")
return nil, SyncFailed, err
}
return &dataset.Id, SyncSkipped, nil
}
// update qvain dataset
if err = batch.Update(dataset.Id, dataset.Blob()); err != nil {
logger.Debug().Err(err).Str("id", dataset.Id.String()).Msg("can't update dataset")
return nil, SyncFailed, err
}
logger.Debug().Bool("new", isNew).Str("id", dataset.Id.String()).Msg("updated dataset")
return &dataset.Id, SyncWritten, nil
}