-
Notifications
You must be signed in to change notification settings - Fork 11
/
main.go
182 lines (169 loc) · 5.71 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"sort"
"strconv"
"time"
"github.com/cloud-barista/cb-dragonfly/pkg/modules/monitoring/push/mcis/collector"
"github.com/cloud-barista/cb-dragonfly/pkg/types"
"github.com/cloud-barista/cb-dragonfly/pkg/util"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
type MetricCollector struct {
CreateOrder int
ConsumerKafkaConn *kafka.Consumer
Aggregator collector.Aggregator
}
var KafkaConfig *kafka.ConfigMap
func PrintPanicError(err error) {
if err != nil {
fmt.Println(err)
panic(err)
}
}
func DeleteDeployment(clientSet *kubernetes.Clientset, createOrder int, collectorUUID string, namespace string) {
fmt.Println("Deleting deployment...")
deploymentName := fmt.Sprintf("%s%d-%s", types.DeploymentName, createOrder, collectorUUID)
deploymentsClient := clientSet.AppsV1().Deployments(namespace)
deletePolicy := metav1.DeletePropagationForeground
if err := deploymentsClient.Delete(context.TODO(), deploymentName, metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
}); err != nil {
fmt.Println("Fail to delete deployment.")
fmt.Println(err)
}
}
// deployment 로 배포된 collector
// 일정 주기( aggreTime )를 가지고 configmap 을 조회
// configmap 의 데이터( topicMaps ) 파싱하여, 자신의 collector idx 값을 가진 topics 들을 구독
// 만약 자신의 collector idx 값이 없다면 스스로 deployment 삭제 요청
func main() {
/** Get Env Val Start */
kafkaEndpointUrl := os.Getenv("kafka_endpoint_url")
var createOrder int
createOrderString := os.Getenv("create_order")
if createOrderString == "" {
fmt.Println("Get Env Error")
return
}
createOrder, _ = strconv.Atoi(createOrderString)
aggregateType := types.AVG
namespace := os.Getenv("namespace")
dfAddr := os.Getenv("df_addr")
collectInterval, _ := strconv.Atoi(os.Getenv("mcis_collector_interval"))
collectorUUID := os.Getenv("collect_uuid")
if kafkaEndpointUrl == "" || namespace == "" || dfAddr == "" {
fmt.Println("Get Env Error")
return
}
/** Get Env Val End */
/** Set Kafka, ConfigMap Conn Start */
KafkaConfig = &kafka.ConfigMap{
"bootstrap.servers": kafkaEndpointUrl,
"group.id": fmt.Sprintf("mck8s-%d", createOrder),
"enable.auto.commit": true,
"auto.offset.reset": "earliest",
}
consumerKafkaConn, err := kafka.NewConsumer(KafkaConfig)
PrintPanicError(err)
config, err := rest.InClusterConfig()
PrintPanicError(err)
clientSet, err := kubernetes.NewForConfig(config)
PrintPanicError(err)
/** Set Kafka, ConfigMap Conn End */
/** Operate Collector Start */
mc := MetricCollector{
ConsumerKafkaConn: consumerKafkaConn,
CreateOrder: createOrder,
Aggregator: collector.Aggregator{
AggregateType: aggregateType,
},
}
fmt.Println(fmt.Sprintf("#### Group_%d collector Create ####", createOrder))
deadOrAliveCnt := map[string]int{}
configMapFailCnt := 0
for {
time.Sleep(time.Duration(collectInterval) * time.Second)
fmt.Println(fmt.Sprintf("#### Group_%d collector ####", createOrder))
fmt.Println("Get ConfigMap")
/** Get ConfigMap<Data: Collector UUID Map, BinaryData: Collector Topics> Start */
configMap, err := clientSet.CoreV1().ConfigMaps(namespace).Get(context.TODO(), types.ConfigMapName, metav1.GetOptions{})
if err != nil {
if configMapFailCnt == 5 {
DeleteDeployment(clientSet, createOrder, collectorUUID, namespace)
}
configMapFailCnt += 1
fmt.Println("Fail to Get ConfigMap")
fmt.Println(err)
continue
}
/** Get ConfigMap<Data: Collector UUID Map, BinaryData: Collector Topics> End */
/** Check My Collector UUID Start */
_, alive := configMap.Data[collectorUUID]
if !alive {
DeleteDeployment(clientSet, createOrder, collectorUUID, namespace)
}
/** Check My Collector UUID End */
/** Get My Allocated Topics Start */
topicMap := map[int][]string{}
if err := json.Unmarshal(configMap.BinaryData["topicMap"], &topicMap); err != nil {
fmt.Println("Fail to unMarshal ConfigMap Object Data")
}
var DeliveredTopicList []string
DeliveredTopicList, ok := topicMap[mc.CreateOrder]
if !ok {
fmt.Println("No topic on this Collector")
continue
}
fmt.Println(fmt.Sprintf("Group_%d collector Delivered : %s", mc.CreateOrder, DeliveredTopicList))
err = mc.ConsumerKafkaConn.SubscribeTopics(DeliveredTopicList, nil)
if err != nil {
fmt.Println(err)
}
/** Get My Allocated Topics End */
/** Processing Topics to TSDB & Transmit Dead Topics To DF Start */
start := time.Now()
aliveTopics, _ := mc.Aggregator.AggregateMetric(mc.ConsumerKafkaConn, DeliveredTopicList)
elapsed := time.Since(start)
sort.Strings(aliveTopics)
fmt.Println("Aggregate Time: ", elapsed)
for _, aliveTopic := range aliveTopics {
if _, ok := deadOrAliveCnt[aliveTopic]; ok {
delete(deadOrAliveCnt, aliveTopic)
}
}
if !cmp.Equal(DeliveredTopicList, aliveTopics) {
_ = mc.ConsumerKafkaConn.Unsubscribe()
deadTopics := util.ReturnDiffTopicList(DeliveredTopicList, aliveTopics)
var err error
for _, delTopic := range deadTopics {
if _, ok := deadOrAliveCnt[delTopic]; !ok {
deadOrAliveCnt[delTopic] = 0
} else if ok {
if deadOrAliveCnt[delTopic] == 3 {
getUrl := fmt.Sprintf("http://%s/dragonfly/topic/delete/%s", dfAddr, delTopic)
_, err = http.Get(getUrl)
if err != nil {
fmt.Println(err)
}
delete(deadOrAliveCnt, delTopic)
}
deadOrAliveCnt[delTopic] += 1
}
}
if err != nil {
fmt.Println("Sending Delete Topics to DF is Success")
}
}
/** Processing Topics to TSDB & Transmit Dead Topics To DF End */
}
/** Operate Collector End */
}