/
handler.go
387 lines (322 loc) · 11.6 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
// The database package handles all the database operations.
// Note that for now it uses Mysql as a hardcoded data
//
// The database is creating a new service with the inproc reply controller.
// For any database operation interact with the service.
package main
import (
"context"
"fmt"
"github.com/ahmetson/common-lib/data_type/key_value"
"github.com/ahmetson/service-lib/communication/command"
"github.com/ahmetson/service-lib/communication/message"
databaseExtension "github.com/ahmetson/service-lib/extension/database"
"github.com/ahmetson/service-lib/log"
"github.com/ahmetson/service-lib/remote"
"github.com/ipfs/go-cid"
"github.com/web3-storage/go-w3s-client"
"os"
)
var w3client w3s.Client
type Storage struct {
FileName string
Cid cid.Cid
}
// getStorageParameterAt returns the FileName and Cid from the query request.
// It could fail if the index of the parameter is out of range or the CID is invalid
func getStorageParameterAt(queryParameters databaseExtension.QueryRequest, index int) (Storage, error) {
storage := Storage{}
if len(queryParameters.Fields) < index ||
len(queryParameters.Tables) < index {
return storage, fmt.Errorf("the index is out of range")
}
storage.FileName = queryParameters.Fields[index]
cidString := queryParameters.Tables[index]
cidDecoded, err := cid.Decode(cidString)
if err != nil {
return storage, fmt.Errorf("the '%s' table is invalid cid: %w", cidString, err)
} else {
storage.Cid = cidDecoded
}
return storage, nil
}
// read function reads the data from the remote web3 storage.
// It won't verify the w3client to be initialized. Therefore, call this only after setting w3client.
func read(storage Storage) (key_value.KeyValue, error) {
kv := key_value.Empty()
res, err := w3client.Get(context.Background(), storage.Cid)
if err != nil {
return kv, fmt.Errorf("failed to get %s cid from ipfs: %w", storage.Cid.String(), err)
}
//
_, fileSystem, _ := res.Files()
// Open a file in a directory
openedFile, err := fileSystem.Open("/" + storage.FileName)
if err != nil {
return kv, fmt.Errorf("failed to open %s file in %s cid: %w", storage.FileName, storage.Cid.String(), err)
}
fileStat, _ := openedFile.Stat()
if fileStat.Size() == 0 {
kv.Set(storage.FileName, "")
return kv, nil
}
var fileContent = make([]byte, fileStat.Size())
_, err = openedFile.Read(fileContent)
if err != nil {
return kv, fmt.Errorf("failed to read content of file %s in %s cid: %w", storage.FileName, storage.Cid.String(), err)
}
kv.Set(storage.FileName, string(fileContent))
return kv, nil
}
// write function writes the data to the remote web3 storage.
// It won't verify the w3client to be initialized. Therefore, call this only after setting w3client.
//
// Upon success, it will return the generated CID.
//
// this function depends on the folder
func write(fileName string, content string) (string, error) {
newFile, err := os.Create(fileName)
if err != nil {
return "", fmt.Errorf("failed to create a temporary file '%s': %w", fileName, err)
}
_, err = newFile.Write([]byte(content))
if err != nil {
_ = newFile.Close()
return "", fmt.Errorf("failed to write the content into temporary %s: %w", fileName, err)
}
if err := newFile.Close(); err != nil {
return "", fmt.Errorf("failed to close the file")
}
file, err := os.Open(fileName)
if err != nil {
return "", fmt.Errorf("failed to open a temporary '%s' file: %w", fileName, err)
}
fileCid, err := w3client.Put(context.Background(), file)
if err != nil {
_ = file.Close()
return "", fmt.Errorf("failed to put the file %s into w3storage: %w", fileName, err)
}
_ = file.Close()
_ = os.Remove(fileName)
return fileCid.String(), nil
}
// cidMatchesFileNames validates the incoming parameters
// the fieldLen arguments is given to make sure that request has exact given amount of cids and file names.
//
// if there could be arbitrary amount of cids and filenames, then pass fieldLen as 0.
func cidMatchesFileNames(queryParameters databaseExtension.QueryRequest, fieldLen int) error {
if fieldLen > 0 {
if len(queryParameters.Fields) != fieldLen {
return fmt.Errorf("required %d field length, but query request has %d fields", fieldLen, len(queryParameters.Fields))
} else if len(queryParameters.Tables) != fieldLen {
return fmt.Errorf("required %d tables, but query request has %d tables", fieldLen, len(queryParameters.Tables))
} else {
return nil
}
} else {
fieldLen = len(queryParameters.Fields)
if fieldLen == 0 {
return fmt.Errorf("required at least 1 field, but query is empty")
} else if len(queryParameters.Tables) != fieldLen {
return fmt.Errorf("required at least %d tables (same as fields), but query request has %d tables", fieldLen, len(queryParameters.Tables))
} else {
return nil
}
}
}
// onSelectAll selects all rows from the database
//
// intended to be used once during the app launch for caching.
//
// Minimize the database queries by using this
var onSelectAll = func(request message.Request, _ *log.Logger, _ ...*remote.ClientSocket) message.Reply {
if w3client == nil {
return message.Fail("w3client is null")
}
//parameters []interface{}, outputs []interface{}
var queryParameters databaseExtension.QueryRequest
err := request.Parameters.Interface(&queryParameters)
if err != nil {
return message.Fail("parameter validation:" + err.Error())
}
if err := cidMatchesFileNames(queryParameters, 0); err != nil {
return message.Fail("cidMatchesFileNames: " + err.Error())
}
length := len(queryParameters.Fields)
rows := make([]key_value.KeyValue, length)
for i := 0; i < length; i++ {
storage, err := getStorageParameterAt(queryParameters, i)
if err != nil {
return message.Fail("getStorageParameter: " + err.Error())
}
kv, err := read(storage)
if err != nil {
return message.Fail("failed to read data of file " + storage.FileName + " in " + storage.Cid.String() + " cid. error: " + err.Error())
}
rows[i] = kv
}
reply := databaseExtension.SelectAllReply{
Rows: rows,
}
replyMessage, err := command.Reply(&reply)
if err != nil {
return message.Fail("command.Reply: " + err.Error())
}
return replyMessage
}
// checks whether there are any rows that matches to the query
var onExist = func(request message.Request, _ *log.Logger, _ ...*remote.ClientSocket) message.Reply {
if w3client == nil {
return message.Fail("w3client is null")
}
//parameters []interface{}, outputs []interface{}
var queryParameters databaseExtension.QueryRequest
err := request.Parameters.Interface(&queryParameters)
if err != nil {
return message.Fail("parameter validation:" + err.Error())
}
if err := cidMatchesFileNames(queryParameters, 1); err != nil {
return message.Fail("cidMatchesFileNames: " + err.Error())
}
storage, err := getStorageParameterAt(queryParameters, 0)
if err != nil {
return message.Fail("getStorageParameter: " + err.Error())
}
kv, err := read(storage)
if err != nil {
return message.Fail("failed to read data of file " + storage.FileName + " in " + storage.Cid.String() + " cid. error: " + err.Error())
}
content, err := kv.GetString(storage.FileName)
if err != nil {
return message.Fail("failed to get file content from storage: " + err.Error())
}
reply := databaseExtension.ExistReply{}
reply.Exist = false
if len(content) > 0 {
reply.Exist = true
}
replyMessage, err := command.Reply(&reply)
if err != nil {
return message.Fail("command.Reply: " + err.Error())
}
return replyMessage
}
// Read the row only once
// func on_read_one_row(db *sql.DB, query string, parameters []interface{}, outputs []interface{}) ([]interface{}, error) {
var onSelectRow = func(request message.Request, _ *log.Logger, clients ...*remote.ClientSocket) message.Reply {
if w3client == nil {
return message.Fail("w3client is null")
}
//parameters []interface{}, outputs []interface{}
var queryParameters databaseExtension.QueryRequest
err := request.Parameters.Interface(&queryParameters)
if err != nil {
return message.Fail("parameter validation:" + err.Error())
}
if err := cidMatchesFileNames(queryParameters, 1); err != nil {
return message.Fail("cidMatchesFileNames: " + err.Error())
}
storage, err := getStorageParameterAt(queryParameters, 0)
if err != nil {
return message.Fail("getStorageParameter: " + err.Error())
}
kv, err := read(storage)
if err != nil {
return message.Fail("failed to read data of file " + storage.FileName + " in " + storage.Cid.String() + " cid. error: " + err.Error())
}
reply := databaseExtension.SelectRowReply{
Outputs: kv,
}
replyMessage, err := command.Reply(&reply)
if err != nil {
return message.Fail("command.Reply: " + err.Error())
}
return replyMessage
}
// Execute the deletion
var onDelete = func(request message.Request, logger *log.Logger, _ ...*remote.ClientSocket) message.Reply {
// heavily relying on onExist for the validation.
// in case of the file change, then make sure that onDelete has two parameters
existReply := onExist(request, logger, nil)
if !existReply.IsOK() {
return message.Fail("onExist failed: " + existReply.Message)
}
//parameters []interface{}, outputs []interface{}
var queryParameters databaseExtension.QueryRequest
err := request.Parameters.Interface(&queryParameters)
if err != nil {
return message.Fail("parameter validation:" + err.Error())
}
storage, err := getStorageParameterAt(queryParameters, 0)
if err != nil {
return message.Fail("getStorageParameter: " + err.Error())
}
content := ""
fileCid, err := write(storage.FileName, content)
if err != nil {
return message.Fail("failed to read data of file " + storage.FileName + " in " + storage.Cid.String() + " cid. error: " + err.Error())
}
reply := databaseExtension.DeleteReply{
Id: fileCid,
}
replyMessage, err := command.Reply(&reply)
if err != nil {
return message.Fail("command.Reply: " + err.Error())
}
return replyMessage
}
// Execute the insert
var onInsert = func(request message.Request, logger *log.Logger, _ ...*remote.ClientSocket) message.Reply {
if w3client == nil {
return message.Fail("w3client is null")
}
//parameters []interface{}, outputs []interface{}
var queryParameters databaseExtension.QueryRequest
err := request.Parameters.Interface(&queryParameters)
if err != nil {
return message.Fail("parameter validation:" + err.Error())
}
if len(queryParameters.Fields) != 1 {
return message.Fail("missing the file name in fields or too many file names were given")
}
if len(queryParameters.Arguments) != 1 {
return message.Fail("missing the file content in the arguments or too many contents were given")
}
content, ok := queryParameters.Arguments[0].(string)
if !ok {
return message.Fail("the argument should be a string but it's not")
}
fileName := queryParameters.Fields[0]
fileCid, err := write(fileName, content)
logger.Info("inserting a new file", "file name", fileName, "content", content, "cid", fileCid)
if err != nil {
return message.Fail("failed to write on web3storage: " + err.Error())
}
reply := databaseExtension.InsertReply{
Id: fileCid,
}
replyMessage, err := command.Reply(&reply)
if err != nil {
return message.Fail("command.Reply: " + err.Error())
}
return replyMessage
}
// Simply calls onInsert(). Because 'update' is an alias of 'insert'
var onUpdate = func(request message.Request, logger *log.Logger, _ ...*remote.ClientSocket) message.Reply {
reply := onInsert(request, logger, nil)
if !reply.IsOK() {
return reply
}
id, err := reply.Parameters.GetString("id")
if err != nil {
return message.Fail("onInsert didn't return id: " + err.Error())
}
updateReply := databaseExtension.UpdateReply{
Id: id,
}
replyMessage, err := command.Reply(&updateReply)
if err != nil {
return message.Fail("command.Reply: " + err.Error())
}
return replyMessage
}