-
Notifications
You must be signed in to change notification settings - Fork 232
/
api.go
186 lines (168 loc) · 6.15 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package ampq
import (
"fmt"
"github.com/RichardKnop/machinery/v2"
amqpbackend "github.com/RichardKnop/machinery/v2/backends/amqp"
amqpbroker "github.com/RichardKnop/machinery/v2/brokers/amqp"
"github.com/RichardKnop/machinery/v2/config"
eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
"github.com/RichardKnop/machinery/v2/tasks"
"github.com/hanc00l/nemo_go/pkg/conf"
"strings"
"sync"
"time"
)
const (
CREATED string = "CREATED" //任务创建,但还没有开始执行
REVOKED string = "REVOKED" //任务被取消执行
STARTED string = tasks.StateStarted //任务在执行中
SUCCESS string = tasks.StateSuccess //任务执行完成,结果为SUCCESS
FAILURE string = tasks.StateFailure //任务执行完成,结果为FAILURE
RECEIVED string = tasks.StateReceived //未使用
PENDING string = tasks.StatePending //未使用
RETRY string = tasks.StateRetry //未使用
TopicActive = "active"
TopicFinger = "finger"
TopicPassive = "passive"
TopicPocscan = "pocscan"
TopicCustom = "custom"
TopicMQPrefix = "nemo_mq"
)
type TaskResult struct {
Status string `json:"status"`
Msg string `json:"msg"`
}
type WorkerStatus struct {
sync.Mutex `json:"-"`
// worker's task status
WorkerName string `json:"worker_name"`
WorkerTopics string `json:"worker_topic"`
CreateTime time.Time `json:"create_time"`
UpdateTime time.Time `json:"update_time"`
TaskExecutedNumber int `json:"task_number"`
TaskStartedNumber int `json:"started_number"`
// worker's run status
ManualReloadFlag bool `json:"manual_reload_flag"`
ManualFileSyncFlag bool `json:"manual_file_sync_flag"`
ManualUpdateOptionFlag bool `json:"manual_update_daemon_option"`
CPULoad string `json:"cpu_load"`
MemUsed string `json:"mem_used"`
// worker's option
WorkerRunOption []byte `json:"worker_run_option"` //worker当前运行的启动参数
WorkerUpdateOption []byte `json:"worker_update_option"` //worker需要更新的启动参数
// daemon option
IsDaemonProcess bool `json:"is_daemon_process"`
WorkerDaemonUpdateTime time.Time `json:"worker_daemon_update_time"`
}
type WorkerRunTaskMode int
const (
TaskModeDefault WorkerRunTaskMode = iota
TaskModeActive
TaskModeFinger
TaskModePassive
TaskModePocscan
TaskModeCustom
)
// taskTopicDefineMap 每个task对应的队列名称,以便分配到执行不同任务的worker
var taskTopicDefineMap = map[string]string{
"portscan": TopicActive,
"batchscan": TopicActive,
"domainscan": TopicActive,
"subfinder": TopicPassive,
"subdomainbrute": TopicPassive,
"subdomaincrawler": TopicActive,
"iplocation": TopicPassive,
"fofa": TopicPassive,
"quake": TopicPassive,
"hunter": TopicPassive,
"xray": TopicPocscan,
"nuclei": TopicPocscan,
"goby": TopicPocscan,
"icpquery": TopicPassive,
"whoisquery": TopicPassive,
"fingerprint": TopicFinger,
"xportscan": TopicActive,
"xonlineapi": TopicPassive,
"xfofa": TopicPassive,
"xquake": TopicPassive,
"xhunter": TopicPassive,
"xdomainscan": TopicPassive,
"xsubfinder": TopicPassive,
"xsubdomainbrute": TopicPassive,
"xsubdomaincralwer": TopicActive,
"xfingerprint": TopicFinger,
"xxray": TopicPocscan,
"xnuclei": TopicPocscan,
"xgoby": TopicPocscan,
"xorgscan": TopicActive,
//test:
"test": TopicCustom,
}
// taskServer 复用的全局AMQP连接
var taskServerConn = make(map[string]*machinery.Server)
// CustomTaskWorkspaceMap 自定义任务关联的工作空间GUID
var CustomTaskWorkspaceMap = make(map[string]struct{})
// GetServerTaskAMPQServer 根据server配置文件,获取到消息中心的连接
func GetServerTaskAMPQServer(topicName string) *machinery.Server {
if _, ok := taskServerConn[topicName]; !ok {
rabbitmq := conf.GlobalServerConfig().Rabbitmq
taskServerConn[topicName] = startAMQPServer(rabbitmq.Username, rabbitmq.Password, rabbitmq.Host, rabbitmq.Port, topicName, 3)
}
return taskServerConn[topicName]
}
// GetWorkerAMPQServer 根据worker配置文件,获取到消息中心的连接
func GetWorkerAMPQServer(topicName string, prefetchCount int) *machinery.Server {
if _, ok := taskServerConn[topicName]; !ok {
rabbitmq := conf.GlobalWorkerConfig().Rabbitmq
taskServerConn[topicName] = startAMQPServer(rabbitmq.Username, rabbitmq.Password, rabbitmq.Host, rabbitmq.Port, topicName, prefetchCount)
}
return taskServerConn[topicName]
}
// startAMQPServer 连接到AMQP消息队列服务器
func startAMQPServer(username, password, host string, port int, topicName string, prefetchCount int) *machinery.Server {
amqpConfig := fmt.Sprintf("amqp://%s:%s@%s:%d/", username, password, host, port)
routingKey := GetRoutingKeyByTopic(topicName)
cnf := &config.Config{
Broker: amqpConfig,
DefaultQueue: routingKey,
ResultBackend: amqpConfig,
ResultsExpireIn: 300,
AMQP: &config.AMQPConfig{
Exchange: "nemo_mq_exchange",
ExchangeType: "topic",
BindingKey: routingKey,
PrefetchCount: prefetchCount,
},
}
// Create server instance
broker := amqpbroker.New(cnf)
backend := amqpbackend.New(cnf)
lock := eagerlock.New()
server := machinery.NewServer(cnf, broker, backend, lock)
return server
}
func GetTopicByTaskName(taskName string, workspaceGUID string) string {
if _, ok := CustomTaskWorkspaceMap[workspaceGUID]; ok {
// custom.1a0ca919-7960-4067-9981-9abcb4eaa735
return fmt.Sprintf("%s.%s", TopicCustom, workspaceGUID)
}
if queueName, ok := taskTopicDefineMap[taskName]; ok {
// active、finger...
return queueName
}
return ""
}
func GetTopicByMQRoutingKey(routingKey string) string {
keys := strings.Split(routingKey, ".")
if len(keys) == 3 {
// nemo_mq.custom.1a0ca919-7960-4067-9981-9abcb4eaa735
return fmt.Sprintf("%s.%s", keys[1], keys[2])
} else if len(keys) == 2 {
// nemo_mq.active...
return keys[1]
}
return ""
}
func GetRoutingKeyByTopic(topicName string) string {
return fmt.Sprintf("%s.%s", TopicMQPrefix, topicName)
}