-
Notifications
You must be signed in to change notification settings - Fork 2
/
video_collect_impl.go
182 lines (166 loc) · 5.8 KB
/
video_collect_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package internal
import (
"context"
"encoding/json"
"time"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"github.com/Shonminh/bilibee/apps/collect"
"github.com/Shonminh/bilibee/apps/collect/internal/repository/api"
"github.com/Shonminh/bilibee/apps/collect/internal/repository/model"
"github.com/Shonminh/bilibee/pkg/db"
"github.com/Shonminh/bilibee/pkg/logger"
time2 "github.com/Shonminh/bilibee/pkg/time"
collect2 "github.com/Shonminh/bilibee/third_party/bilibili/collect"
)
type VideoCollectServiceImpl struct {
CronTaskRepo api.CronTaskRepo
VideoInfoRepo api.VideoInfoRepo
BiliClient collect2.BilibiliClient
}
func (impl *VideoCollectServiceImpl) CreateCronTask(ctx context.Context, mid int64) (err error) {
err = db.Transaction(ctx, func(c context.Context) error {
err = impl.CronTaskRepo.CreateCronTask(c, model.NewCronTaskTab(mid))
if err != nil {
if !db.IsMysqlDuplicateErr(err) {
return errors.Wrap(err, "CreateCronTask")
}
// 是重复键冲突的话也返回正常。
}
return nil
})
return err
}
const defaultSize int = 100
const hour = 3600
const quarter = 900
func (impl *VideoCollectServiceImpl) CollectVideoInfo(ctx context.Context) (err error) {
defer func() {
// 针对状态为done的且更新时间大于1小时的任务,刷新任务状态为undo
if (time2.NowInt()/60)%quarter == 0 { // 每15分钟执行以下FlushUndoStatusTask
if e := impl.CronTaskRepo.FlushUndoStatusTask(ctx, hour); e != nil {
logger.LogErrorf("FlushUndoStatusTask, err%+v", e.Error())
time.Sleep(time.Second * 3)
}
}
}()
cronTaskList, err := impl.CronTaskRepo.QueryUndoCronTaskList(ctx, defaultSize)
if err != nil {
return errors.Wrap(err, "QueryUndoCronTaskList")
}
for index := range cronTaskList {
task := cronTaskList[index]
if err = impl.doSingleTask(ctx, task); err != nil {
return errors.Wrapf(err, "doSingleTask failed, task=%+v", task)
}
}
return nil
}
// doSingleTask 针对每一个任务单次处理
func (impl *VideoCollectServiceImpl) doSingleTask(ctx context.Context, task model.CronTaskTab) (err error) {
mid := task.GetMid()
totalCount := 0
defer func() {
if err != nil {
time.Sleep(time.Second * 3) // 停三秒
}
// 更新一下task的进度
count, e := impl.VideoInfoRepo.CountVideoInfo(ctx, mid, proto.Uint32(collect.OpStatusDone.Uint32()))
if e != nil {
logger.LogErrorf("CountVideoInfo failed, err=%+v", e.Error())
return
}
// 更新任务列表中的total num数量和offset num数量
updateArgs := map[string]interface{}{"offset_num": count, "total_num": totalCount}
if count == int64(totalCount) { // 相等的时候则更新为已完结
updateArgs["task_status"] = collect.TaskStatusDone.Uint32()
}
e = impl.CronTaskRepo.UpdateCronTaskInfo(ctx, task.TaskId, updateArgs)
if e != nil {
logger.LogErrorf("UpdateCronTaskInfo failed, err=%+v", e.Error())
return
}
}()
// 如果状态是已经完成的状态的话则不用处理了
if task.TaskStatus == collect.TaskStatusDone.Uint32() {
logger.LogInfof("task=%+v is done, no need to process...")
return nil
}
// 从b站查询mid所有的aid list
aidList, totalCount, err := impl.BiliClient.QueryMidTotalAidList(ctx, mid, nil)
if err != nil {
return errors.Wrap(err, "QueryMidTotalAidList")
}
// 先存储对应的aid list到video_info_tab表中
if err = impl.VideoInfoRepo.BatchCreateVideoInfos(ctx, genVideInfoTab(aidList, mid)); err != nil {
return errors.Wrap(err, "BatchCreateVideoInfos")
}
// 查询需要更新的vide_info信息。
if err = impl.batchUpdateVideoInfo(ctx, mid); err != nil {
return errors.Wrap(err, "batchUpdateVideoInfo")
}
return nil
}
func genVideInfoTab(aidList []int64, mid int64) (rows []model.VideoInfoTab) {
rows = make([]model.VideoInfoTab, len(aidList), len(aidList))
for index := range aidList {
rows[index] = model.VideoInfoTab{Mid: uint32(mid), Aid: uint64(aidList[index])}
}
return rows
}
const batchSize = 100
func (impl *VideoCollectServiceImpl) batchUpdateVideoInfo(ctx context.Context, mid int64) (err error) {
var limit = batchSize
needContinue := true
for needContinue {
videoInfoList, err := impl.VideoInfoRepo.QueryVideoInfoList(ctx, mid, nil, &limit, proto.Uint32(collect.OpStatusUndo.Uint32()))
if err != nil {
return errors.Wrap(err, "QueryVideoInfoList")
}
if len(videoInfoList) == 0 { // 为0的话则说明没有数据了
break
}
if len(videoInfoList) < limit { // 终结条件是查询的次数少于limit的时候,就可以
needContinue = false
}
for index := range videoInfoList {
aid := videoInfoList[index].Aid
info, err := impl.BiliClient.QueryVideoInfoByAid(ctx, int64(aid))
if err != nil {
logger.LogErrorf("QueryVideoInfoByAid failed, aid=%s, info=%+v", aid, info)
continue
}
// 把b站的数据转换成数据库的model
infoTab := impl.transformVideoInfo(info, mid, aid)
// 更新数据库video_info_tab
if err = impl.VideoInfoRepo.UpdateVideoInfo(ctx, infoTab); err != nil {
logger.LogErrorf("UpdateVideoInfo failed, aid=%s, need_update_info=%+v", aid, infoTab)
continue
}
}
}
return nil
}
const videoUri = "https://www.bilibili.com/video/"
func (impl *VideoCollectServiceImpl) transformVideoInfo(info *collect2.VideoInfo, mid int64, aid uint64) model.VideoInfoTab {
desc, _ := json.Marshal(info.DescV2)
rawStr, _ := json.Marshal(info)
url := ""
if len(info.BVID) > 0 {
url = videoUri + info.BVID
}
infoTab := model.VideoInfoTab{
Mid: uint32(mid),
Aid: aid,
Bvid: info.BVID,
Url: url,
Title: info.Title,
DescV2: string(desc),
Pubdate: uint64(info.Pubdate),
UserCtime: uint64(info.Ctime),
SubtitleContent: info.SubtitleContent,
RawStr: string(rawStr),
OpStatus: collect.OpStatusDone.Uint32(),
}
return infoTab
}