forked from protolambda/rumor
/
csv.go
80 lines (74 loc) · 1.68 KB
/
csv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package dstee
import (
"encoding/csv"
"encoding/hex"
"fmt"
ds "github.com/ipfs/go-datastore"
"github.com/sirupsen/logrus"
"strconv"
"sync"
"time"
)
type CSVTee struct {
Name string
CSV *csv.Writer
Log logrus.FieldLogger
sync.Mutex
}
func (t *CSVTee) String() string {
return fmt.Sprintf("CSV Tee: %s", t.Name)
}
func (t *CSVTee) OnPut(key ds.Key, value []byte) {
t.Lock()
defer t.Unlock()
if err := t.CSV.Write([]string{
string(Put),
strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10),
key.String(),
hex.EncodeToString(value), // TODO: maybe format some special sub paths, e.g. IPs, utf-8 values, etc.
}); err != nil {
t.Log.WithError(err).Error("Failed to write put")
} else {
t.CSV.Flush()
}
}
func (t *CSVTee) OnDelete(key ds.Key) {
t.Lock()
defer t.Unlock()
if err := t.CSV.Write([]string{
string(Delete),
strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10),
key.String(),
"",
}); err != nil {
t.Log.WithError(err).Error("Failed to write delete")
} else {
t.CSV.Flush()
}
}
func (t *CSVTee) OnBatch(puts []BatchItem, deletes []ds.Key) {
t.Lock()
defer t.Unlock()
m := strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10)
for i, p := range puts {
if err := t.CSV.Write([]string{
string(Put),
m,
p.Key.String(),
hex.EncodeToString(p.Value),
}); err != nil {
t.Log.WithError(err).WithField("i", i).Error("Failed to write batch put entry")
}
}
for i, d := range deletes {
if err := t.CSV.Write([]string{
string(Delete),
m,
d.String(),
"",
}); err != nil {
t.Log.WithError(err).WithField("i", i).Error("Failed to write batch delete entry")
}
}
t.CSV.Flush()
}