/
analyzer.go
118 lines (101 loc) · 2.79 KB
/
analyzer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package container
import (
"context"
"log"
"strconv"
"strings"
"github.com/aglide100/dak-keyword/pkg/db"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm"
)
// const nodeRole = "node.role == worker"
func (c *Controller) CreateAnalyzerService(workerId, jobId, keyword string, dbConfig *db.DBConfig) (error, bool) {
ctx := context.Background()
max := uint64(1)
if c.cQueue.LenRunning() >= c.cQueue.GetLimitContainerCount() {
log.Println("Too many container to create analyzer container")
c.cQueue.EnqueueFromQueue(&ContainerSpec{
WorkerId: workerId,
Keyword: keyword,
Token: "",
Type: "Analyzer",
})
return nil, true
}
c.cli.ImagePull(ctx, "ghcr.io/aglide100/dak-keyword--analyzer:latest", types.ImagePullOptions{})
_, err := c.cli.ServiceCreate(ctx, swarm.ServiceSpec{
Annotations: swarm.Annotations{
Name: "analyzer_"+workerId,
},
TaskTemplate: swarm.TaskSpec{
ContainerSpec: &swarm.ContainerSpec{
Image: "ghcr.io/aglide100/dak-keyword--analyzer:latest",
// Labels: map[string]string{
// "com.docker.stack.namespace" : "keyword_analyzer",
// },
Command: []string{"python3", "Analyzer.py"},
// Mounts: []mount.Mount{
// {
// Type: "volume",
// Source: "keys",
// Target: "/keys/",
// VolumeOptions: &mount.VolumeOptions{},
// },
// },
Env: []string{
"Keyword=" + keyword,
"DB_ADDR=" + dbConfig.Host,
"DB_PORT=" + strconv.Itoa(dbConfig.Port),
"DB_USER=" + dbConfig.User,
"DB_PASSWORD=" + dbConfig.Password,
"DB_NAME=" + dbConfig.Dbname,
"WORKER_ID=" + workerId,
"JOB_ID=" + jobId,
},
},
Networks: []swarm.NetworkAttachmentConfig{
swarm.NetworkAttachmentConfig{
Target: "keyword-network",
},
},
RestartPolicy: &swarm.RestartPolicy{
MaxAttempts: &max,
Condition: swarm.RestartPolicyConditionOnFailure,
},
// Placement: &swarm.Placement{
// // MaxReplicas: 1,
// // Constraints: []string{nodeRole},
// },
},
}, types.ServiceCreateOptions{})
if err != nil {
if strings.Contains(err.Error(), "name conflicts with an existing object") {
log.Println("An analyzer service with the same name already exists.")
// c.cQueue.EnqueueFromQueue(&ContainerSpec{
// WorkerId: workerId,
// Keyword: keyword,
// Token: "",
// Type: "Analyzer",
// })
return nil, false
} else {
return err, false
}
}
c.cQueue.EnqueueFromRunning(&ContainerSpec{
WorkerId: workerId,
Keyword: keyword,
Token: "",
Type: "Analyzer",
})
return nil, false
}
func (c *Controller) RemoveAnalyzer(id string) (error) {
ctx := context.Background()
err := c.cli.ServiceRemove(ctx, "analyzer_"+id)
if err != nil {
return err
}
c.cQueue.DequeueFromRunning()
return nil
}