/
tasks.go
88 lines (71 loc) · 1.62 KB
/
tasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package messages
import (
"context"
"sync"
"time"
"github.com/capcom6/sms-gateway/internal/sms-gateway/repositories"
"go.uber.org/fx"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
type HashingTaskConfig struct {
Interval time.Duration
}
type HashingTaskParams struct {
fx.In
Messages *repositories.MessagesRepository
Config HashingTaskConfig
Logger *zap.Logger
}
type HashingTask struct {
Messages *repositories.MessagesRepository
Config HashingTaskConfig
Logger *zap.Logger
queue map[uint64]struct{}
mux sync.Mutex
}
func (t *HashingTask) Run(ctx context.Context) {
t.Logger.Info("Starting hashing task...")
ticker := time.NewTicker(t.Config.Interval)
defer ticker.Stop()
t.Logger.Info("Initial hashing...")
if err := t.Messages.HashProcessed([]uint64{}); err != nil {
t.Logger.Error("Can't hash messages", zap.Error(err))
}
t.Logger.Info("Initial hashing...Done")
for {
select {
case <-ctx.Done():
t.Logger.Info("Stopping hashing task...")
return
case <-ticker.C:
t.process()
}
}
}
func (t *HashingTask) Enqeue(id uint64) {
t.mux.Lock()
t.queue[id] = struct{}{}
t.mux.Unlock()
}
func (t *HashingTask) process() {
t.mux.Lock()
ids := maps.Keys(t.queue)
maps.Clear(t.queue)
t.mux.Unlock()
if len(ids) == 0 {
return
}
t.Logger.Debug("Hashing messages...")
if err := t.Messages.HashProcessed(ids); err != nil {
t.Logger.Error("Can't hash messages", zap.Error(err))
}
}
func NewHashingTask(params HashingTaskParams) *HashingTask {
return &HashingTask{
Messages: params.Messages,
Config: params.Config,
Logger: params.Logger,
queue: map[uint64]struct{}{},
}
}