-
Notifications
You must be signed in to change notification settings - Fork 6
/
pipeline.go
68 lines (52 loc) · 1.66 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package pipeline
import (
"github.com/alibaba/pairec/v2/context"
"github.com/alibaba/pairec/v2/module"
"github.com/alibaba/pairec/v2/recconf"
"github.com/alibaba/pairec/v2/service/debug"
)
var pipelineService *PipelineService
func init() {
pipelineService = NewPipelineService()
}
type PipelineService struct {
userRecommendSceneMap map[string][]*UserRecommendService
}
func NewPipelineService() *PipelineService {
service := PipelineService{
userRecommendSceneMap: make(map[string][]*UserRecommendService),
}
return &service
}
func (p *PipelineService) clear() {
p.userRecommendSceneMap = make(map[string][]*UserRecommendService, 0)
}
func LoadPipelineConfigs(conf *recconf.RecommendConfig) {
userRecommendSceneMap := make(map[string][]*UserRecommendService)
for sceneName, configs := range conf.PipelineConfs {
for _, config := range configs {
userRecommendService := NewUserRecommendService(&config)
userRecommendSceneMap[sceneName] = append(userRecommendSceneMap[sceneName], userRecommendService)
}
}
pipelineService.userRecommendSceneMap = userRecommendSceneMap
}
func Recommend(user *module.User, context *context.RecommendContext, debugService *debug.DebugService) (ret []*module.Item) {
scene := context.GetParameter("scene").(string)
services, ok := pipelineService.userRecommendSceneMap[scene]
if !ok {
return
}
ch := make(chan []*module.Item, len(services))
for _, service := range services {
go func(userRecommendService *UserRecommendService) {
ch <- userRecommendService.Recommend(user, context, debugService)
}(service)
}
for i := 0; i < len(services); i++ {
items := <-ch
ret = append(ret, items...)
}
close(ch)
return
}