-
Notifications
You must be signed in to change notification settings - Fork 908
/
messagestatscollector.go
120 lines (99 loc) · 2.69 KB
/
messagestatscollector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package messagestatscollector
import (
"strconv"
"time"
"github.com/botlabs-gg/yagpdb/v2/common"
"github.com/botlabs-gg/yagpdb/v2/lib/discordgo"
"github.com/mediocregopher/radix/v3"
"github.com/sirupsen/logrus"
)
// Collector is a message stats collector which will preiodically update the serberstats messages table with stats
type Collector struct {
MsgEvtChan chan *discordgo.Message
interval time.Duration
channels map[int64]*entry
// buf []*discordgo.Message
// channels []int64
l *logrus.Entry
}
type entry struct {
GuildID int64
ChannelID int64
Count int64
}
// NewCollector creates a new Collector
func NewCollector(l *logrus.Entry, updateInterval time.Duration) *Collector {
col := &Collector{
MsgEvtChan: make(chan *discordgo.Message, 10000),
interval: updateInterval,
l: l,
channels: make(map[int64]*entry),
}
go col.run()
return col
}
func (c *Collector) run() {
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
for {
select {
case msg := <-c.MsgEvtChan:
c.handleIncMessage(msg)
case <-ticker.C:
err := c.flush()
if err != nil {
c.l.Errorf("failed updating temp serverstats: %+v", err)
}
}
}
}
func (c *Collector) handleIncMessage(msg *discordgo.Message) {
if c, ok := c.channels[msg.ChannelID]; ok {
c.Count++
return
}
c.channels[msg.ChannelID] = &entry{
GuildID: msg.GuildID,
ChannelID: msg.ChannelID,
Count: 1,
}
}
func KeyMessageStats(guildID int64, year, day int) string {
return "serverstats_message_stats:" + strconv.FormatInt(guildID, 10) + ":" + strconv.Itoa(year) + ":" + strconv.Itoa(day)
}
func KeyActiveGuilds(year, day int) string {
return "serverstats_active_guilds:" + strconv.Itoa(year) + ":" + strconv.Itoa(day)
}
func (c *Collector) flush() error {
sleepBetweenCalls := time.Second
if len(c.channels) > 0 {
sleepBetweenCalls = c.interval / time.Duration(len(c.channels))
sleepBetweenCalls /= 2
}
c.l.Infof("message stats collector is flushing: lc: %d, sleep: %s", len(c.channels), sleepBetweenCalls.String())
if len(c.channels) < 1 {
return nil
}
ticker := time.NewTicker(sleepBetweenCalls)
defer ticker.Stop()
t := time.Now().UTC()
day := t.YearDay()
year := t.Year()
for k, v := range c.channels {
err := common.RedisPool.Do(radix.FlatCmd(nil, "ZINCRBY", KeyMessageStats(v.GuildID, year, day), v.Count, v.ChannelID))
if err != nil {
return err
}
err = common.RedisPool.Do(radix.FlatCmd(nil, "SADD", KeyActiveGuilds(year, day), v.GuildID))
if err != nil {
return err
}
delete(c.channels, k)
<-ticker.C
}
return nil
}
func RoundHour(t time.Time) time.Time {
t = t.UTC()
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location())
}