/
worker.go
100 lines (92 loc) · 2.06 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package gotodoit
import (
"database/sql"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/achiku/qg"
"github.com/jackc/pgx"
"github.com/jackc/pgx/stdlib"
"github.com/rs/xlog"
)
// Priority
const (
QueNameHighPriority = "high"
QueNameLowPriority = ""
)
// NewQueClient create que client
func NewQueClient(db *sql.DB) (*qg.Client, error) {
qc := qg.NewClient(db)
return qc, nil
}
// NewWorkerDB create DB
func NewWorkerDB(config *Config) (*sql.DB, error) {
dbCfg := &stdlib.DriverConfig{
ConnConfig: pgx.ConnConfig{
Host: config.DBHost,
User: config.DBUser,
Password: config.DBPass,
Database: config.DBName,
Port: config.DBPort,
},
AfterConnect: qg.PrepareStatements,
}
stdlib.RegisterDriverConfig(dbCfg)
db, err := sql.Open("pgx", dbCfg.ConnectionString(""))
if err != nil {
return nil, err
}
db.SetMaxOpenConns(20)
db.SetMaxIdleConns(10)
// db.SetConnMaxLifetime(time.Second * 10)
return db, nil
}
// StartWorker starts workers
func StartWorker(confPath string) error {
appCfg, err := NewConfig(confPath)
if err != nil {
return err
}
db, err := NewWorkerDB(appCfg)
if err != nil {
return err
}
qc, err := NewQueClient(db)
if err != nil {
return err
}
jobs := JobApp{
BaseApp: BaseApp{
Config: appCfg,
},
}
wm := qg.WorkMap{
"updateUserJob": jobs.UpdateUserInfo,
}
wPoolLow := qg.NewWorkerPool(qc, wm, appCfg.NumWorkers)
wPoolLow.Queue = QueNameLowPriority
wPoolHigh := qg.NewWorkerPool(qc, wm, appCfg.NumWorkers)
wPoolHigh.Queue = QueNameHighPriority
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM)
go func() {
for {
s := <-signalChan
if s == syscall.SIGTERM {
log.Printf("received SIGTERM. shutting down worker. (PID=%d)", os.Getpid())
wPoolLow.Shutdown()
wPoolHigh.Shutdown()
log.Printf("shutting down worker. (PID=%d)", os.Getpid())
}
}
}()
xlog.Infof("starting workers. num workers: %d, (PID=%d)", appCfg.NumWorkers, os.Getpid())
var wg sync.WaitGroup
wg.Add(2)
go wPoolLow.Start()
go wPoolHigh.Start()
wg.Wait()
return nil
}