forked from CodisLabs/codis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
migrate_task.go
211 lines (182 loc) · 4.71 KB
/
migrate_task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// Copyright 2014 Wandoujia Inc. All Rights Reserved.
// Licensed under the MIT (MIT-LICENSE.txt) license.
package main
import (
"fmt"
"sync"
"time"
"container/list"
"github.com/wandoulabs/codis/pkg/models"
"github.com/wandoulabs/codis/pkg/utils"
"github.com/juju/errors"
log "github.com/ngaut/logging"
)
var pendingMigrateTask = list.New()
var curMigrateTask *MigrateTask
var lck = sync.RWMutex{}
const (
MIGRATE_TASK_PENDING string = "pending"
MIGRATE_TASK_MIGRATING string = "migrating"
MIGRATE_TASK_FINISHED string = "finished"
MIGRATE_TASK_ERR string = "error"
)
type MigrateTaskForm struct {
FromSlot int `json:"from"`
ToSlot int `json:"to"`
NewGroupId int `json:"new_group"`
Delay int `json:"delay"`
CreateAt string `json:"create_at"`
Percent int `json:"percent"`
Status string `json:"status"`
Id string `json:"id"`
}
type MigrateTask struct {
MigrateTaskForm
stopChan chan struct{} `json:"-"`
}
func findPendingMigrateTask(id string) *MigrateTask {
for e := pendingMigrateTask.Front(); e != nil; e = e.Next() {
t := e.Value.(*MigrateTask)
if t.Id == id {
return t
}
}
return nil
}
func removePendingMigrateTask(id string) bool {
for e := pendingMigrateTask.Front(); e != nil; e = e.Next() {
t := e.Value.(*MigrateTask)
if t.Id == id && t.Status == "pending" {
pendingMigrateTask.Remove(e)
return true
}
}
return false
}
// migrate multi slots
func RunMigrateTask(task *MigrateTask) error {
conn := CreateZkConn()
defer conn.Close()
lock := utils.GetZkLock(conn, productName)
to := task.NewGroupId
task.Status = MIGRATE_TASK_MIGRATING
for slotId := task.FromSlot; slotId <= task.ToSlot; slotId++ {
err := func() error {
log.Info("start migrate slot:", slotId)
lock.Lock(fmt.Sprintf("migrate %d", slotId))
defer func() {
err := lock.Unlock()
if err != nil {
log.Info(err)
}
}()
s, err := models.GetSlot(conn, productName, slotId)
if err != nil {
log.Error(err)
return err
}
if s.State.Status != models.SLOT_STATUS_ONLINE && s.State.Status != models.SLOT_STATUS_MIGRATE {
log.Warning("status is not online && migrate", s)
return nil
}
from := s.GroupId
if s.State.Status == models.SLOT_STATUS_MIGRATE {
from = s.State.MigrateStatus.From
}
if from == to {
log.Warning("from == to, ignore", s)
return nil
}
// modify slot status
if err := s.SetMigrateStatus(conn, from, to); err != nil {
log.Error(err)
return err
}
// do real migrate
err = MigrateSingleSlot(conn, slotId, from, to, task.Delay, task.stopChan)
if err != nil {
log.Error(err)
return err
}
// migrate done, change slot status back
s.State.Status = models.SLOT_STATUS_ONLINE
s.State.MigrateStatus.From = models.INVALID_ID
s.State.MigrateStatus.To = models.INVALID_ID
if err := s.Update(zkConn); err != nil {
log.Error(err)
return err
}
return nil
}()
if err == ErrStopMigrateByUser {
log.Info("stop migration job by user")
break
} else if err != nil {
task.Status = MIGRATE_TASK_ERR
return err
}
task.Percent = (slotId - task.FromSlot + 1) * 100 / (task.ToSlot - task.FromSlot + 1)
log.Info("total percent:", task.Percent)
}
task.Status = MIGRATE_TASK_FINISHED
log.Info("migration finished")
return nil
}
func preMigrateCheck(t *MigrateTask) (bool, error) {
conn := CreateZkConn()
defer conn.Close()
slots, err := models.GetMigratingSlots(conn, productName)
if err != nil {
return false, err
}
// check if there is migrating slot
if len(slots) == 0 {
return true, nil
} else if len(slots) > 1 {
return false, errors.New("more than one slots are migrating, unknown error")
} else if len(slots) == 1 {
slot := slots[0]
if t.NewGroupId != slot.State.MigrateStatus.To || t.FromSlot != slot.Id || t.ToSlot != slot.Id {
return false, errors.Errorf("there is a migrating slot %+v, finish it first", slot)
}
}
return true, nil
}
func migrateTaskWorker() {
for {
select {
case <-time.After(1 * time.Second):
{
// check if there is new task
lck.RLock()
cnt := pendingMigrateTask.Len()
lck.RUnlock()
if cnt > 0 {
lck.RLock()
t := pendingMigrateTask.Front()
lck.RUnlock()
log.Info("new migrate task arrive")
if t != nil {
lck.Lock()
curMigrateTask = t.Value.(*MigrateTask)
lck.Unlock()
if ok, err := preMigrateCheck(curMigrateTask); ok {
RunMigrateTask(curMigrateTask)
} else {
log.Warning(err)
}
lck.Lock()
curMigrateTask = nil
lck.Unlock()
}
log.Info("migrate task", t, "done")
lck.Lock()
if t != nil {
pendingMigrateTask.Remove(t)
}
lck.Unlock()
}
}
}
}
}