-
Notifications
You must be signed in to change notification settings - Fork 6
/
assets_reader.go
159 lines (145 loc) · 3.99 KB
/
assets_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/*
Copyright 2022 Adevinta
*/
package store
import (
"fmt"
"github.com/adevinta/errors"
"github.com/adevinta/vulcan-api/pkg/api"
"github.com/jinzhu/gorm"
)
// NewAssetReader creates a new [AssetsReader] with the given page size. If the
// lock param is set to true it will lock for writing the following tables:
// Assets, Teams and AssetAnnotations.
func (db vulcanitoStore) NewAssetReader(lock bool, pageSize int) (AssetsReader, error) {
if pageSize < 1 {
err := fmt.Errorf("invalid page size %d, it must be greater than 0", pageSize)
return AssetsReader{}, err
}
tx := db.Conn.Begin()
if tx.Error != nil {
return AssetsReader{}, db.logError(errors.Database(tx.Error))
}
// Even though, theoretically, between calls to the Read method it could
// pass enough time for the transaction to be timed out by Postgres
// depending on the configured max amount of time for a transaction to be
// idle, it's in practice not very likely to happen, so by now, we are not
// implementing a `NOP loop` to ensure the transaction is not closed for
// this reason.
if lock {
// Lock the teams, assets, and asset_annotations tables for writing.
err := db.lockTablesUnchecked(tx, "teams", "assets", "asset_annotations")
if err != nil {
tx.Rollback()
err := fmt.Errorf("error locking table teams: %w", err)
return AssetsReader{}, db.logError(err)
}
}
reader := AssetsReader{
pageSize: pageSize,
tx: tx,
more: true,
lock: lock,
}
return reader, nil
}
// AssetsReader reads all the assets stored in Vulcan using pages with
// a configurable size.
type AssetsReader struct {
next string
pageSize int
tx *gorm.DB
more bool
lock bool
assets []*api.Asset
err error
}
// Read returns the next page of the assets according to the page size of the
// [*AssetsReader]. Returns true if the read operation was successful, in that
// case the assets can be retrieved by calling [*AssetsReader.Assets].
func (a *AssetsReader) Read() bool {
if !a.more {
return false
}
// Check if this is the first call to read.
if a.next == "" {
return a.readFirst()
}
// Clean the slice.
a.assets = a.assets[:0]
limit := a.pageSize + 1
next := a.next
tx := a.tx
res := tx.Preload("Team").
Preload("AssetType").
Preload("AssetAnnotations").
Where("id >= ?", next).
Order("id", true).
Limit(limit).
Find(&a.assets)
if res.Error != nil {
tx.Rollback()
err := fmt.Errorf("error reading assets: %w", res.Error)
a.more = false
a.err = err
return false
}
// There are more assets to read.
if len(a.assets) == limit {
a.next = a.assets[len(a.assets)-1].ID
a.more = true
a.assets = a.assets[0 : len(a.assets)-1]
return true
}
// No more assets.
a.next = ""
a.more = false
return len(a.assets) > 0
}
// Close closes the reader and unlocks the tables that were locked when it was
// created.
func (a *AssetsReader) Close() error {
// Notice the tables are automatically unlocked when the transaction is
// committed.
return a.tx.Commit().Error
}
// Err returns the error produced by the last call to [*AssetsReader.Read],
// returns nil if the last call didn't produce any error.
func (a *AssetsReader) Err() error {
return a.err
}
// Assets returns the assets produced by the last call to [*AssetsReader.Read].
func (a *AssetsReader) Assets() []*api.Asset {
return a.assets
}
func (a *AssetsReader) readFirst() bool {
tx := a.tx
assets := make([]*api.Asset, 0, a.pageSize)
pageSize := a.pageSize
limit := pageSize + 1
res := tx.Preload("Team").
Preload("AssetType").
Preload("AssetAnnotations").
Order("id", true).
Limit(limit).
Find(&assets)
if res.Error != nil {
tx.Rollback()
a.err = fmt.Errorf("error reading assets: %w", res.Error)
a.more = false
return false
}
// There are more assets.
if len(assets) == limit {
a.next = assets[len(assets)-1].ID
a.more = true
assets = assets[0 : len(assets)-1]
a.assets = assets
return true
}
// No more assets.
a.next = ""
a.more = false
a.assets = assets
return len(a.assets) > 0
}