-
Notifications
You must be signed in to change notification settings - Fork 0
/
intranet.go
84 lines (65 loc) · 1.91 KB
/
intranet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package intranet
import (
"sync"
"github.com/getblank/blank-router/taskq"
"github.com/getblank/blank-sr/config"
)
var onEventHandler = func(string, interface{}, []string) {}
func Init() {
runServer()
}
// OnEvent sets intranet event handler
func OnEvent(fn func(string, interface{}, []string)) {
onEventHandler = fn
}
func srEventHandler(uri string, subscribers []string, event interface{}) {
if len(subscribers) == 0 {
return
}
onEventHandler(uri, event, subscribers)
}
var storeMigrationVersion = map[string]map[int]struct{}{}
var storeMigrationLocker sync.Mutex
func isStoreVersionProcessed(storeName string, version int) bool {
storeMigrationLocker.Lock()
defer storeMigrationLocker.Unlock()
if storeMigrationVersion[storeName] == nil {
return false
}
_, ok := storeMigrationVersion[storeName][version]
return ok
}
func setProcessedStoreVersion(storeName string, version int) {
storeMigrationLocker.Lock()
defer storeMigrationLocker.Unlock()
if storeMigrationVersion[storeName] == nil {
storeMigrationVersion[storeName] = map[int]struct{}{}
}
storeMigrationVersion[storeName][version] = struct{}{}
}
func runMigrationScripts(conf map[string]config.Store) {
for storeName, storeDesc := range conf {
if storeDesc.StoreLifeCycle.Migration == nil {
continue
}
if isStoreVersionProcessed(storeName, storeDesc.Version) {
continue
}
log.Infof("Will run migration scripts for store %s if needed", storeName)
t := &taskq.Task{
Type: taskq.StoreLifeCycle,
UserID: "system",
Store: storeName,
Arguments: map[string]interface{}{
"event": "migration",
},
}
res, err := taskq.PushAndGetResult(t, 0)
if err != nil {
log.Errorf("Migration scripts for store %s completed with error: %v", storeName, err)
continue
}
setProcessedStoreVersion(storeName, storeDesc.Version)
log.Infof("Migration scripts for store %s completed with result: %v", storeName, res)
}
}