-
Notifications
You must be signed in to change notification settings - Fork 0
/
datalog.go
88 lines (79 loc) · 1.93 KB
/
datalog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package dieci
import (
"bytes"
"io"
"github.com/bits-and-blooms/bloom/v3"
"github.com/golang/snappy"
)
// Datalog represents a datastore's datalog
type Datalog struct {
filter *bloom.BloomFilter
pki *PKI
backend Backend
}
// NewDatalog returns a new datalog for a given transaction
func NewDatalog(backend Backend) *Datalog {
f := bloom.New(20000, 5)
pki := NewPKI(backend)
return &Datalog{filter: f, pki: pki, backend: backend}
}
// Read is a read callback
func (dl *Datalog) Read(score Score) ([]byte, error) {
data, err := dl.backend.Read(score)
if err == nil && !dl.filter.Test(score) {
dl.filter.Add(score)
}
data1, err := dl.pki.Decrypt(data)
if err != nil {
return []byte{}, err
}
data2, err := decompress(data1)
return data2, err
}
// Write is a write callback
func (dl *Datalog) Write(data []byte) (Score, error) {
score := NewScore(data)
// if score is not in filter it's definitely not in backend
// if it is in filter then it's maybe in backend,
// so worth of read to confirm
if dl.filter.Test(score) {
if ok, err := dl.backend.Exists(score); ok {
return score, nil
} else if err != nil {
return Score{}, err
}
}
data1, err := compress(data)
if err != nil {
return Score{}, err
}
data2, err := dl.pki.Encrypt(data1)
if err != nil {
return Score{}, err
}
// this might be an idempotent update if this is a fresh start
// and score is not yet in filter
err = dl.backend.Write(score, data2)
if err != nil {
return Score{}, err
}
dl.filter.Add(score)
return score, nil
}
func compress(data []byte) ([]byte, error) {
var out bytes.Buffer
w := snappy.NewBufferedWriter(&out)
if _, err := w.Write(data); err != nil {
return []byte{}, err
}
w.Close()
return out.Bytes(), nil
}
func decompress(data []byte) ([]byte, error) {
var out bytes.Buffer
r := snappy.NewReader(bytes.NewReader(data))
if _, err := io.Copy(&out, r); err != nil {
return []byte{}, err
}
return out.Bytes(), nil
}