-
Notifications
You must be signed in to change notification settings - Fork 0
/
hashes.go
56 lines (44 loc) · 1.03 KB
/
hashes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package destination
import (
"context"
"sync/atomic"
"github.com/auho/go-toolkit/flow/storage"
"github.com/auho/go-toolkit/flow/storage/redis"
"github.com/auho/go-toolkit/redis/client"
)
var _ keyer[storage.MapEntry] = (*hashes)(nil)
type hashes struct {
redis.Hashes
amount int64
}
func (h *hashes) stateAmount() int64 {
return h.amount
}
func NewHashes(config Config) (*key[storage.MapEntry], error) {
return newKey[storage.MapEntry](config, &hashes{})
}
func (h *hashes) accept(itemsChan <-chan []storage.MapEntry, c *client.Redis, key string, pageSize int64) {
ctx := context.Background()
pipe := c.Pipeline()
for items := range itemsChan {
l := len(items)
for i := 0; i < l; i += int(pageSize) {
end := i + int(pageSize)
if end > l {
end = l
}
entries := items[i:end]
for _, entry := range entries {
for k, v := range entry {
pipe.HMSet(ctx, key, k, v)
}
}
_, err := pipe.Exec(ctx)
if err != nil {
panic(err)
}
}
atomic.AddInt64(&h.amount, int64(l))
}
_ = pipe.Close()
}