-
Notifications
You must be signed in to change notification settings - Fork 88
/
tasks.go
149 lines (126 loc) · 3.83 KB
/
tasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package v2scar
import (
"context"
"log"
"net/http"
"time"
"google.golang.org/grpc"
v2proxyman "v2ray.com/core/app/proxyman/command"
v2stats "v2ray.com/core/app/stats/command"
)
var API_ENDPOINT string
var GRPC_ENDPOINT string
type UserConfig struct {
UserId int `json:"user_id"`
Email string `json:"email"`
UUID string `json:"uuid"`
AlterId uint32 `json:"alter_id"`
Level uint32 `json:"level"`
Enable bool `json:"enable"`
}
type UserTraffic struct {
UserId int `json:"user_id"`
DownloadTraffic int64 `json:"dt"`
UploadTraffic int64 `json:"ut"`
}
type syncReq struct {
UserTraffics []*UserTraffic `json:"user_traffics"`
}
type syncResp struct {
Configs []*UserConfig
Tag string `json:"tag"`
}
func SyncTask(up *UserPool) {
// Connect to v2ray rpc
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, GRPC_ENDPOINT, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Printf("[WARNING]: GRPC连接失败,请检查V2ray是否运行并开放对应grpc端口 当前GRPC地址: %v 错误信息: %v", GRPC_ENDPOINT, err.Error())
return
} else {
defer conn.Close()
}
// Init Client
proxymanClient := v2proxyman.NewHandlerServiceClient(conn)
statClient := v2stats.NewStatsServiceClient(conn)
httpClient := &http.Client{Timeout: 3 * time.Second}
resp := syncResp{}
err = getJson(httpClient, API_ENDPOINT, &resp)
if err != nil {
log.Printf("[WARNING]: API连接失败,请检查API地址 当前地址: %v 错误信息:%v", API_ENDPOINT, err.Error())
return
}
// init or update user config
initOrUpdateUser(up, proxymanClient, &resp)
// sync user traffic
syncUserTrafficToServer(up, statClient, httpClient)
}
func initOrUpdateUser(up *UserPool, c v2proxyman.HandlerServiceClient, sr *syncResp) {
// Enable line numbers in logging
log.SetFlags(log.LstdFlags | log.Lshortfile)
log.Println("[INFO] Call initOrUpdateUser")
syncUserMap := make(map[string]bool)
for _, cfg := range sr.Configs {
syncUserMap[cfg.Email] = true
user, _ := up.GetUserByEmail(cfg.Email)
if user == nil {
// New User
newUser, err := up.CreateUser(cfg.UserId, cfg.Email, cfg.UUID, cfg.Level, cfg.AlterId, cfg.Enable)
if err != nil {
log.Fatalln(err)
}
if newUser.Enable {
AddInboundUser(c, sr.Tag, newUser)
}
} else {
// Old User
if user.Enable != cfg.Enable {
// update enable status
user.setEnable(cfg.Enable)
}
// change user uuid
if user.UUID != cfg.UUID {
log.Printf("[INFO] user: %s 更换了uuid old: %s new: %s", user.Email, user.UUID, cfg.UUID)
RemoveInboundUser(c, sr.Tag, user)
user.setUUID(cfg.UUID)
AddInboundUser(c, sr.Tag, user)
}
// remove not enable user
if !user.Enable && user.running {
// Close Not Enable user
RemoveInboundUser(c, sr.Tag, user)
}
// start not runing user
if user.Enable && !user.running {
// Start Not Running user
AddInboundUser(c, sr.Tag, user)
}
}
}
// remote user not in server
for _, user := range up.GetAllUsers() {
if _, ok := syncUserMap[user.Email]; !ok {
RemoveInboundUser(c, sr.Tag, user)
up.RemoveUserByEmail(user.Email)
}
}
}
func syncUserTrafficToServer(up *UserPool, c v2stats.StatsServiceClient, hc *http.Client) {
GetAndResetUserTraffic(c, up)
tfs := make([]*UserTraffic, 0, up.GetUsersNum())
for _, user := range up.GetAllUsers() {
tf := user.DownloadTraffic + user.UploadTraffic
if tf > 0 {
log.Printf("[INFO] User: %v Now Used Total Traffic: %v", user.Email, tf)
tfs = append(tfs, &UserTraffic{
UserId: user.UserId,
DownloadTraffic: user.DownloadTraffic,
UploadTraffic: user.UploadTraffic,
})
user.resetTraffic()
}
}
postJson(hc, API_ENDPOINT, &syncReq{UserTraffics: tfs})
log.Printf("[INFO] Call syncUserTrafficToServer ONLINE USER COUNT: %d", len(tfs))
}