/
userprofile_reader.go
71 lines (57 loc) · 1.74 KB
/
userprofile_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package userprofile
import (
"fmt"
fr "github.com/dailyhunt/feed/redis"
pb "github.com/dailyhunt/feed/user_profile/pb"
"github.com/go-redis/redis"
"github.com/gogo/protobuf/proto"
"sync"
)
type Options struct {
redisHosts string
prefRedisHosts string
}
type ProfileReader struct {
rawClient *redis.ClusterClient
prefClient *redis.ClusterClient
}
func (pr *ProfileReader) GetProfile(cid string, fields ...string) *RawUserProfile {
profile := &RawUserProfile{
raw: make(map[string]*pb.CFeaturewise),
}
var wg sync.WaitGroup
wg.Add(2)
go pr.getRawProfile(&wg, profile, "N4_"+cid, fields...)
go pr.getPrefProfile(&wg, profile, cid)
wg.Wait()
return profile
}
func (pr *ProfileReader) getRawProfile(wg *sync.WaitGroup, profile *RawUserProfile, key string, fields ...string) {
defer wg.Done()
fmt.Println("Getting profile for user ", key, " fields = ", fields)
values := pr.rawClient.HMGet(key, fields...).Val()
for i, field := range fields {
var fw pb.CFeaturewise
v := values[i].(string)
// todo : Error handling
if err := proto.Unmarshal([]byte(v), &fw); err != nil {
// TODO : Add LOG and increment counter
}
profile.raw[field] = &fw
fmt.Println("Read prof")
}
}
// TODO: LZ4 and message pack integration
func (pr *ProfileReader) getPrefProfile(wg *sync.WaitGroup, profile *RawUserProfile, cid string) {
defer wg.Done()
redisKey := fmt.Sprintf("NRT_V3~%s~pref", cid)
profile.pref = pr.prefClient.HGetAll(redisKey).Val()
}
func NewUserProfileReader(opts Options) *ProfileReader {
client := fr.NewRedisClusterClient(fr.NewRedisClusterConfig(opts.redisHosts))
prefClient := fr.NewRedisClusterClient(fr.NewRedisClusterConfig(opts.prefRedisHosts))
return &ProfileReader{
rawClient: client,
prefClient: prefClient,
}
}