-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.go
347 lines (290 loc) · 13 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
// Copyright (c) 2023 Migwi Ndung'u
// See LICENSE for details.
package storage
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/dmigwi/dhamana-protocol/client/utils"
_ "github.com/lib/pq" // postgres
)
const (
// semVersion holds the current semantic version requires for the tables.
// The semantic version stored in the tables_version must match this value
// otherwise the system will not be able initiate the db instance until the
// user manually handles the data migration or creates a new
// database to use with the new tables.
semVersion = "v0.0.1"
// createVersionTable enables version tables preventing tables with
// incompatible schemas from being used.
createVersionTable = "CREATE TABLE IF NOT EXISTS tables_version (" +
"id SERIAL PRIMARY KEY," +
"sem_version VARCHAR(10) UNIQUE," +
"tables_created_on TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP)"
// createTableBond is a prepared statement creating a table identified with the
// name table_bond if it doesn't exists.
createTableBond = "CREATE TABLE IF NOT EXISTS table_bond (" +
"id SERIAL PRIMARY KEY," +
"bond_address VARCHAR(42) UNIQUE NOT NULL," +
"issuer_address VARCHAR(42) NOT NULL," +
"holder_address VARCHAR(42)," +
"created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP," +
"created_at_block INTEGER NOT NULL," +
"principal INTEGER," +
"coupon_rate SMALLINT CHECK (coupon_rate BETWEEN 0 AND 100)," +
"coupon_date SMALLINT CHECK (coupon_date BETWEEN 0 AND 50)," +
"maturity_date TIMESTAMPTZ," +
"currency SMALLINT CHECK (currency BETWEEN 0 AND 50)," +
"intro_msg TEXT," +
"last_status SMALLINT CHECK (last_status BETWEEN 0 AND 10)," +
"last_update TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP," +
"last_synced_block INTEGER NOT NULL)"
// createTableBondStatus is a prepared statement creating a table identified
// with the name table_status if it doesn't exists.
createTableBondStatus = "CREATE TABLE IF NOT EXISTS table_status (" +
"id SERIAL PRIMARY KEY," +
"sender VARCHAR(42) NOT NULL," +
"bond_address VARCHAR(42) NOT NULL," +
"bond_status SMALLINT NOT NULL CHECK(bond_status BETWEEN 0 AND 10)," +
"added_on TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP," +
"last_synced_block INTEGER NOT NULL)"
// createTableBondStatusSigned is a prepared statement creating a table
// identified with the name table_status if it doesn't exists.
createTableBondStatusSigned = "CREATE TABLE IF NOT EXISTS table_status_signed (" +
"id SERIAL PRIMARY KEY," +
"sender VARCHAR(42) NOT NULL," +
"bond_address VARCHAR(42) NOT NULL," +
"bond_status SMALLINT NOT NULL CHECK(bond_status BETWEEN 0 AND 10)," +
"signed_on TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP," +
"last_synced_block INTEGER NOT NULL)"
// createChatTable is a prepared statement creating a table identified with
// the name table_chat if it doesn't exists.
createChatTable = "CREATE TABLE IF NOT EXISTS table_chat (" +
"id SERIAL PRIMARY KEY," +
"sender VARCHAR(42) NOT NULL," +
"bond_address VARCHAR(42) NOT NULL," +
"chat_msg TEXT NOT NULL," +
"created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP," +
"last_synced_block INTEGER NOT NULL)"
// fetchBonds is a prepared statement that fetches all the bonds that owned
// by the bond party with the address or they are still in the negotiation stage.
fetchBonds = "SELECT bond_address,issuer_address,created_at,coupon_rate,currency,last_status " +
"FROM table_bond WHERE issuer_address = $1 OR last_status = 0 " +
"OR holder_address = $2 ORDER BY last_update DESC LIMIT $3 OFFSET $4"
// fetchBondByAddress is a prepared statement that returns a bond identified by
// the provided address if the sender is a party to the bond or the bond
// is still in the negotiation stage.
fetchBondByAddress = "SELECT bond_address,issuer_address,holder_address," +
"created_at,created_at_block,principal,coupon_rate,coupon_date," +
"maturity_date,currency,intro_msg,last_status,last_update,last_synced_block " +
"FROM table_bond WHERE bond_address = $1 AND " +
"(last_status = 0 OR issuer_address = $2 OR holder_address = $3)"
// fetchChats is a prepared statement that fetches the conversation within
// the bond identified by the provided address if the sender is a bond party
// or its still in the negotiation stage.
fetchChats = "SELECT c.sender, c.bond_address, c.chat_msg, c.created_at, " +
"c.last_synced_block FROM table_chat as c LEFT JOIN table_bond as b " +
"ON c.bond_address = b.bond_address WHERE b.bond_address = $1 AND " +
"(b.issuer_address = $2 OR b.last_status = 0 OR b.holder_address = $3) " +
"ORDER BY c.created_at DESC LIMIT $4 OFFSET $5"
// fetchTableVersion fetches the last set tables version.
fetchTableVersion = "SELECT sem_version,tables_created_on " +
"FROM tables_version ORDER BY id DESC LIMIT 1"
// fetchLastSyncBlock returns the last block to be synced on the table_bond.
fetchLastSyncBlock = "SELECT last_synced_block FROM table_bond ORDER BY" +
" last_synced_block DESC LIMIT 1"
// setBondBodyTerms updates the table_bond with data from the BondBodyTerms event.
setBondBodyTerms = "UPDATE table_bond SET principal = $1, coupon_rate = $2, " +
"coupon_date = $3, maturity_date = $4, currency = $5, last_update = $6, " +
"last_synced_block = $7 WHERE bond_address = $8"
// setBondMotivation update the table_bond with data from BondMotivation event.
setBondMotivation = "UPDATE table_bond SET intro_msg = $1, last_update = $2, " +
"last_synced_block = $3 WHERE bond_address = $4"
// setHolder updates table_bond with data from HolderUpdate event.
setHolder = "UPDATE table_bond SET holder_address = $1, last_update = $2," +
"last_synced_block = $3 WHERE bond_address = $4"
// setLastStatus updates table_bond with data from StatusChange event.
setLastStatus = "UPDATE table_bond SET last_status = $1, last_update = $2, " +
"last_synced_block = $3 WHERE bond_address = $4"
// addNewBondCreated inserts into table_bond new data from event NewBondCreated.
addNewBondCreated = "INSERT INTO table_bond (bond_address, issuer_address, " +
"created_at_block, last_synced_block) VALUES ($1, $2, $3, $4)"
// ddNewChatMessage inserts into table_chat new data from event NewChatMessage.
addNewChatMessage = "INSERT INTO table_chat (sender, bond_address, " +
"chat_msg, last_synced_block) VALUES ($1, $2, $3, $4)"
// addStatusChange inserts into table_status new data from event StatusChange.
addStatusChange = "INSERT INTO table_status (sender, bond_address, " +
"bond_status, last_synced_block) VALUES ($1, $2, $3, $4)"
// addStatusSigned inserts into table_status_signed new data from event StatusSigned.
addStatusSigned = "INSERT INTO table_status_signed (sender, bond_address, " +
"bond_status, last_synced_block) VALUES ($1, $2, $3, $4)"
// addTablesVersion inserts into tables_version the latest supported tables version.
addTablesVersion = "INSERT INTO tables_version (sem_version) VALUES ($1)"
dropTableBondRecords = "DELETE FROM table_bond WHERE last_synced_block = $1"
dropTableStatusRecords = "DELETE FROM table_status WHERE last_synced_block = $1"
dropTableStatusSignedRecords = "DELETE FROM table_status_signed WHERE last_synced_block = $1"
dropTableChatRecords = "DELETE FROM table_chat WHERE last_synced_block = $1"
)
// tablesToSQLStmt is an array of sql statements used to create the missing tables
// if they don't exist.
var tablesSQLStmt = []string{
createVersionTable,
createTableBond,
createTableBondStatus,
createTableBondStatusSigned,
createChatTable,
}
// This are clean up methods employed if corrupt or dirty writes are made at
// a certain last synced block.
var cleanUpStmt = []string{
dropTableBondRecords,
dropTableStatusRecords,
dropTableStatusSignedRecords,
dropTableChatRecords,
}
// reqToStmt matches the respective local type Methods supported to their sql queries.
var reqToStmt = map[utils.Method]string{
utils.GetBonds: fetchBonds,
utils.GetBondByAddress: fetchBondByAddress,
utils.GetChats: fetchChats,
// method needed locally. Results are not sent via the server
utils.GetLastSyncedBlock: fetchLastSyncBlock,
utils.UpdateBondBodyTerms: setBondBodyTerms,
utils.UpdateBondMotivation: setBondMotivation,
utils.UpdateHolder: setHolder,
utils.UpdateLastStatus: setLastStatus,
utils.InsertNewBondCreated: addNewBondCreated,
utils.InsertNewChatMessage: addNewChatMessage,
utils.InsertStatusChange: addStatusChange,
utils.InsertStatusSigned: addStatusSigned,
}
// DB defines the parameters needed to use a persistence db instance connect to.
type DB struct {
db *sql.DB
ctx context.Context
}
// Reader defines the method that reads the row fields into the require data interface.
// To read data, pass pointers to the expect field the parameter function.
type Reader interface {
Read(fn func(fields ...any) error) (interface{}, error)
}
// ConnectionString returns on the connection string format supported by postgres.
// https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
func ConnectionString(port uint16, host, user, password, dbname string) string {
return fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
host, port, user, password, dbname)
}
// NewDB returns an opened db instance whose connection has been tested with
// ping request. It generates the required tables if they don't exist.
func NewDB(ctx context.Context, connInfo string) (*DB, error) {
db, err := sql.Open("postgres", connInfo)
if err != nil {
log.Errorf("unable to open to postgres db: err %v", err)
return nil, err
}
if err = db.PingContext(ctx); err != nil {
log.Errorf("connection to postgres db failed: err %v", err)
return nil, err
}
log.Info("Confirming that all the database tables exists")
for i, stmt := range tablesSQLStmt {
if _, err := db.ExecContext(ctx, stmt); err != nil {
log.Errorf("creating a table index (%d) failed Error: %v", i, err)
return nil, err
}
}
dbInstance := &DB{
db: db,
ctx: ctx,
}
// -- Confirm the semantic version matched the required one --
var tableversion string
var createdDate time.Time
err = db.QueryRowContext(ctx, fetchTableVersion).Scan(&tableversion, &createdDate)
if err != nil && err != sql.ErrNoRows {
log.Errorf("unable to fetch tables versions: %v", err)
return nil, err
}
switch tableversion {
case "":
log.Infof("Versioning the newly created tables with version=%s", semVersion)
if _, err = db.ExecContext(ctx, addTablesVersion, semVersion); err != nil {
log.Errorf("unable version the newly created tables : %v", err)
return nil, err
}
case semVersion:
// The correct tables version was found.
log.Infof("Confirmed all the %d versioned tables exists", len(tablesSQLStmt))
default:
// versions mismatch found. Exit till the issue is resolved.
err = fmt.Errorf("expected the tables version %s but found version %s created on = %v",
semVersion, tableversion, createdDate)
return nil, err
}
return dbInstance, nil
}
// QueryLocalData executes the sql statement associated with the provided local
// method and uses the reader interface provided to read the row data result set.
// It then returns an array of data for each row read successfully otherwise
// an error is returned.
func (d *DB) QueryLocalData(method utils.Method, r Reader, sender string,
params ...interface{},
) ([]interface{}, error) {
stmt, ok := reqToStmt[method]
if !ok {
return nil, fmt.Errorf("missing query for method %q", method)
}
switch method {
case utils.GetBondByAddress:
params = append(params, []interface{}{sender, sender}...)
case utils.GetBonds, utils.GetChats:
// The last two param values are always {limit, offset}.
// Append sender params before those two params.
n := len(params)
var data []interface{}
if len(params[:n-2]) > 0 {
data = append(data, params[:n-2]...)
}
data = append(data, []interface{}{sender, sender}...)
data = append(data, params[n-2:]...) // {limit, offset}.
params = data
}
rows, err := d.db.QueryContext(d.ctx, stmt, params...)
if err != nil {
return nil, fmt.Errorf("fetching query for method %q failed: %v", method, err)
}
defer rows.Close()
var data []interface{}
for rows.Next() {
row, err := r.Read(rows.Scan)
if err != nil {
return nil, err
}
data = append(data, row)
}
return data, nil
}
// SetLocalData inserts the provided data using the sql staements associated with
// method param provided.
func (d *DB) SetLocalData(method utils.Method, params ...interface{}) error {
stmt, ok := reqToStmt[method]
if !ok {
return fmt.Errorf("missing query for method %q", method)
}
if _, err := d.db.ExecContext(d.ctx, stmt, params...); err != nil {
err = fmt.Errorf("inserting data for method %q failed: %v", method, err)
return err
}
return nil
}
// CleanUpLocalData removes any dirty writes that may have been written on a certain
// last synced block.
func (d *DB) CleanUpLocalData(lastSyncedBlock uint64) {
for _, stmt := range cleanUpStmt {
// if an error in one query occurs, do no stop.
if _, err := d.db.ExecContext(d.ctx, stmt, lastSyncedBlock); err != nil {
log.Errorf("query %q failed: %v", stmt, err)
}
}
}