forked from ozonmp/omp-demo-api
/
repository_package_retranslator.go
153 lines (120 loc) · 3.57 KB
/
repository_package_retranslator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package repo
import (
"context"
"time"
"github.com/hablof/logistic-package-api/internal/model"
"github.com/rs/zerolog/log"
sq "github.com/Masterminds/squirrel"
)
type packageEventScanStruct struct {
ID uint64 `db:"package_event_id"`
PackageID uint64 `db:"package_id"`
Type string `db:"event_type"`
Status string `db:"event_status"`
Created time.Time `db:"created_at"`
Payload []byte `db:"payload"`
}
const (
defaultTimeout = 5 * time.Second
)
// Lock implements consumer.RepoEventConsumer
func (r *repository) Lock(limit uint64) ([]model.PackageEvent, error) { // use r.batchsize instead argument limit ?
// log.Debug().Msgf("repository.Lock was called to lock %d entries", limit)
query, args, err := r.initQuery.Update("package_event").
Set("event_status", "Locked").
Where("package_event_id IN (SELECT package_event_id FROM package_event WHERE event_status = ? LIMIT ? FOR UPDATE SKIP LOCKED)", "Unlocked", limit).
Suffix("RETURNING package_event_id, package_id, event_type, event_status, payload, created_at").
ToSql()
if err != nil {
return nil, err
}
ctx, cf := context.WithTimeout(context.Background(), defaultTimeout)
defer cf()
tx, err := r.db.BeginTxx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback()
rows, err := tx.QueryxContext(ctx, query, args...)
if err != nil {
return nil, err
}
scanUnit := packageEventScanStruct{}
output := make([]model.PackageEvent, 0, limit)
for rows.Next() {
if err := rows.StructScan(&scanUnit); err != nil {
return nil, err
}
unit := r.decodeScanStruct(scanUnit)
output = append(output, unit)
}
if err := tx.Commit(); err != nil {
return nil, err
}
// ****************** //
lockedSize := len(output)
if lockedSize > 0 {
returningIDs := make([]uint64, 0)
for _, elem := range output {
returningIDs = append(returningIDs, elem.ID)
}
log.Debug().Msgf("repository.Lock locked events with IDs: %v", returningIDs)
}
// ****************** //
return output, nil
}
// Postgres ENUM to golang types
func (*repository) decodeScanStruct(scanUnit packageEventScanStruct) model.PackageEvent {
unit := model.PackageEvent{
ID: scanUnit.ID,
PackageID: scanUnit.PackageID,
Created: scanUnit.Created,
Payload: scanUnit.Payload,
}
switch scanUnit.Status {
case "Locked":
unit.Status = model.Locked
case "Unlocked":
unit.Status = model.Unlocked
}
switch scanUnit.Type {
case "Created":
unit.Type = model.Created
case "Updated":
unit.Type = model.Updated
case "Removed":
unit.Type = model.Removed
}
return unit
}
// Remove implements cleaner.RepoEventCleaner
func (r *repository) Remove(eventIDs []uint64) error {
log.Debug().Msgf("repository.Remove was called with arg: %v", eventIDs)
query, args, err := r.initQuery.Delete("package_event").
Where(sq.Eq{"package_event_id": eventIDs}).ToSql()
if err != nil {
return err
}
ctx, cf := context.WithTimeout(context.Background(), defaultTimeout)
defer cf()
if _, err := r.db.ExecContext(ctx, query, args...); err != nil {
return err
}
return nil
}
// Unlock implements cleaner.RepoEventCleaner
func (r *repository) Unlock(eventIDs []uint64) error {
log.Debug().Msgf("repository.Unlock was called with arg: %v", eventIDs)
query, args, err := r.initQuery.Update("package_event").
Set("event_status", "Unlocked").
Where(sq.Eq{"package_event_id": eventIDs}).ToSql()
if err != nil {
return err
}
ctx, cf := context.WithTimeout(context.Background(), defaultTimeout)
defer cf()
if _, err := r.db.ExecContext(ctx, query, args...); err != nil {
return err
}
return nil
}