-
Notifications
You must be signed in to change notification settings - Fork 0
/
sync.go
180 lines (164 loc) · 5.15 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package comic
import (
"errors"
"fmt"
"github.com/aimeelaplant/comiccruncher/internal/log"
"github.com/go-redis/redis"
"go.uber.org/zap"
)
// Syncer is the interface for syncing yearly appearances from persistence to a cache.
type Syncer interface {
// Syncs appearances from postgres to redis. Returns the number of issues synced and an error if any.
Sync(slug CharacterSlug) (int, error)
}
// AppearancesByYearsWriter sets the appearances by years for a character.
type AppearancesByYearsWriter interface {
Set(apps AppearancesByYears) error
}
// AppearancesSyncer to sync yearly appearances from Postgres to Redis.
type AppearancesSyncer struct {
reader AppearancesByYearsRepository
writer AppearancesByYearsWriter
}
// Sync gets all the character's appearances from the database and syncs them to Redis.
// returns the total number of issues synced and an error if any.
func (s *AppearancesSyncer) Sync(slug CharacterSlug) (int, error) {
apps, err := s.reader.List(slug)
if err != nil {
return 0, err
}
total := apps.Total()
log.COMIC().Info("appearances to send to redis",
zap.String("character", slug.Value()),
zap.Int("total", total),
zap.Int("main", apps.MainTotal()),
zap.Int("alternate", apps.AlternateTotal()))
if apps.Aggregates != nil {
err = s.writer.Set(apps)
if err != nil {
log.COMIC().Info("successfully sent appearances to redis", zap.String("character", slug.Value()))
}
return total, err
}
return 0, nil
}
// CharacterStatsSyncer is the interface for syncing characters.
type CharacterStatsSyncer interface {
Sync(slug CharacterSlug) error
SyncAll(characters []*Character) <- chan CharacterSyncResult
}
// RedisHmSetter is a redis client for setting hash-sets.
type RedisHmSetter interface {
HMSet(key string, fields map[string]interface{}) *redis.StatusCmd
}
// RedisCharacterStatsSyncer is for syncing characters to redis.
type RedisCharacterStatsSyncer struct {
r RedisClient
cr CharacterRepository
pr PopularRepository
}
// Sync syncs the character's ranking stats to Redis.
func (s *RedisCharacterStatsSyncer) Sync(slug CharacterSlug) error {
c, err := s.cr.FindBySlug(slug, false)
if err != nil {
return err
}
if c == nil {
return errors.New("character doesn't exist or is disabled")
}
if c.Publisher.Slug == "marvel" {
rcm, err := s.pr.FindOneByMarvel(c.ID)
if err != nil {
return err
}
rc, err := s.pr.FindOneByAll(c.ID)
if err != nil {
return err
}
if err := s.set(c, rc, rcm); err != nil {
return err
}
}
if c.Publisher.Slug == "dc" {
rcd, err := s.pr.FindOneByDC(c.ID)
if err != nil {
return err
}
rc, err := s.pr.FindOneByAll(c.ID)
if err != nil {
return err
}
if err := s.set(c, rc, rcd); err != nil {
return err
}
}
return nil
}
// CharacterSyncResult is the result set for a synced character to redis and an error if any.
type CharacterSyncResult struct {
Slug CharacterSlug
Error error
}
func (s *RedisCharacterStatsSyncer) syncConcurrent(slugs <-chan CharacterSlug, results chan<- CharacterSyncResult) {
for slug := range slugs {
results <- CharacterSyncResult{Slug: slug, Error: s.Sync(slug)}
}
}
// SyncAll syncs multiple characters to redis in goroutines.
func (s *RedisCharacterStatsSyncer) SyncAll(characters []*Character) <-chan CharacterSyncResult {
slugLen := len(characters)
slugCh := make(chan CharacterSlug, slugLen)
defer close(slugCh)
resultCh := make(chan CharacterSyncResult, slugLen)
jobLimit := 50
// make sure we aren't firing off goroutines greater than the job limit.
if slugLen < jobLimit {
jobLimit = slugLen
}
for i := 0; i < jobLimit; i++ {
go s.syncConcurrent(slugCh, resultCh)
}
// send work over
for _, chrctr := range characters {
slugCh <- chrctr.Slug
}
// Return the results so caller can collect them.
return resultCh
}
func (s *RedisCharacterStatsSyncer) set(c *Character, allTime *RankedCharacter, main *RankedCharacter) error {
at := allTime.Stats
ma := main.Stats
m := make(map[string]interface{}, 8)
m["all_time_issue_count_rank"] = at.IssueCountRank
m["all_time_issue_count"] = at.IssueCount
m["all_time_average_per_year"] = at.Average
m["all_time_average_per_year_rank"] = at.AverageRank
m["main_issue_count_rank"] = ma.IssueCountRank
m["main_issue_count"] = ma.IssueCount
m["main_average_per_year"] = ma.Average
m["main_average_per_year_rank"] = ma.AverageRank
key := fmt.Sprintf("%s:stats", c.Slug)
return s.r.HMSet(key, m).Err()
}
// NewAppearancesSyncer returns a new appearances syncer
func NewAppearancesSyncer(r *PGRepositoryContainer, w *RedisAppearancesByYearsRepository) *AppearancesSyncer {
return &AppearancesSyncer{
reader: r.AppearancesByYearsRepository(),
writer: w,
}
}
// NewAppearancesSyncerRW returns a new appearances syncer with the reader and writer for the cache.
func NewAppearancesSyncerRW(r AppearancesByYearsRepository, w AppearancesByYearsWriter) *AppearancesSyncer {
return &AppearancesSyncer{
reader: r,
writer: w,
}
}
// NewCharacterStatsSyncer returns a new character stats syncer with dependencies.
func NewCharacterStatsSyncer(r RedisClient, cr CharacterRepository, pr PopularRepository) *RedisCharacterStatsSyncer {
return &RedisCharacterStatsSyncer{
r: r,
cr: cr,
pr: pr,
}
}