This repository has been archived by the owner on Jul 21, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
/
aggregate.go
103 lines (86 loc) 路 2.52 KB
/
aggregate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package centiment
import (
"context"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
)
// Aggregator aggregates results from an analysis run.
type Aggregator struct {
logger log.Logger
db DB
}
// NewAggregator creates a new Aggregator: call Run to collect results and save
// them to the given DB.
func NewAggregator(logger log.Logger, db DB) (*Aggregator, error) {
agg := &Aggregator{
db: db,
logger: logger,
}
return agg, nil
}
// Run an aggregatation on the provided results.
func (ag *Aggregator) Run(ctx context.Context, results <-chan *AnalyzerResult) error {
var sentiments = make(map[string]*Sentiment)
// TODO(matt): Handle cancellation. Use for-select here with two cases.
for res := range results {
topic := res.SearchTerm.Topic
if sentiments[topic] == nil {
sentiments[topic] = &Sentiment{}
}
// Update the rolling aggregate for each topic.
sentiments[topic] = ag.updateAggregate(
res.Score,
res.Magnitude,
res.TweetID,
sentiments[topic],
)
sentiments[topic].populateWithSearch(res.SearchTerm)
}
for topic, sentiment := range sentiments {
sentiment.finalize()
id, err := ag.db.SaveSentiment(ctx, *sentiment)
if err != nil {
// TODO(matt): Implement retry logic w/ back-off.
ag.logger.Log(
"err", errors.Wrap(err, "failed to save topic"),
"topic", topic,
)
continue
}
ag.logger.Log(
"state", "saved",
"topic", sentiment.Topic,
"slug", sentiment.Slug,
"id", id,
"score", sentiment.Score,
"count", sentiment.Count,
"stddev", sentiment.StdDev,
"variance", sentiment.Variance,
)
}
return nil
}
func (ag *Aggregator) updateAggregate(score float32, magnitude float32, tweetID int64, sentiment *Sentiment) *Sentiment {
sentiment.Count++
oldAverage := sentiment.Score
sentiment.Score = updateAverage(score, sentiment.Score, sentiment.Count)
sentiment.Variance = updateVariance(
score,
sentiment.Variance,
oldAverage,
sentiment.Score,
sentiment.Count,
)
// Record the largest (newest) Tweet ID we've seen across our results for this
// topic, as the checkpoint for future searches.
if tweetID > sentiment.LastSeenID {
sentiment.LastSeenID = tweetID
}
return sentiment
}
func updateAverage(value float32, currentAverage float64, count int64) float64 {
return currentAverage + ((float64(value) - currentAverage) / float64(count))
}
func updateVariance(value float32, variance float64, oldAverage float64, newAverage float64, count int64) float64 {
return variance + (float64(value)-oldAverage)*(float64(value)-newAverage)
}