-
Notifications
You must be signed in to change notification settings - Fork 0
/
Orchestra.go
170 lines (143 loc) · 4.23 KB
/
Orchestra.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package main
import (
"flag"
"fmt"
"github.com/IBM/sarama"
"github.com/ankush-003/distributed-load-testing/kafka"
"github.com/ankush-003/distributed-load-testing/orchestrator"
"github.com/dgraph-io/badger/v3"
"github.com/gin-gonic/gin"
"log"
"os"
"os/signal"
"sync"
"time"
)
func main() {
// default configurations
broker_address := "localhost:9092"
heartbeat_timeout := 5 * time.Minute
num_drivers := 2
opts := badger.DefaultOptions("./dato")
db, err := badger.Open(opts)
if err != nil {
log.Fatal(err)
}
defer db.Close()
// Clear a register item
err = db.Update(func(txn *badger.Txn) error {
key := []byte("register")
return txn.Delete(key)
})
if err != nil {
log.Println("Register need not be deleted as it does not exist")
log.Fatal(err)
}
broker := flag.String("broker", broker_address, "Address of the Kafka broker")
heartbeatTimeout := flag.Duration("heartbeat-timeout", heartbeat_timeout, "Timeout for heartbeat messages")
numDrivers := flag.Int("num-drivers", num_drivers, "Number of drivers to wait for before starting the test")
flag.Parse()
brokers := []string{*broker}
// Consumer for Metrics
metricsConfig := sarama.NewConfig()
metricsConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
metricLogger := log.New(os.Stdout, "KafkaConsumer: ", log.Ldate|log.Ltime|log.Lshortfile)
metricsConsumer, err := kafka.NewConsumer(brokers, metricsConfig, metricLogger)
if err != nil {
log.Fatal(err)
}
defer func() {
if err := metricsConsumer.Consumer.Close(); err != nil {
log.Fatal(err)
}
}()
// Consumer for heartbeat
heartbeatConfig := sarama.NewConfig()
heartbeatConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
heartbeatLogger := log.New(os.Stdout, "KafkaConsumer: ", log.Ldate|log.Ltime|log.Lshortfile)
heartbeatConsumer, err := kafka.NewConsumer(brokers, heartbeatConfig, heartbeatLogger)
if err != nil {
log.Fatal(err)
}
defer func() {
if err := heartbeatConsumer.Consumer.Close(); err != nil {
log.Fatal(err)
}
}()
// Consumer for register
registerConfig := sarama.NewConfig()
registerConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
registerLogger := log.New(os.Stdout, "KafkaConsumer: ", log.Ldate|log.Ltime|log.Lshortfile)
registerConsumer, err := kafka.NewConsumer(brokers, registerConfig, registerLogger)
if err != nil {
log.Fatal(err)
}
defer func() {
if err := registerConsumer.Consumer.Close(); err != nil {
log.Fatal(err)
}
}()
// Producer for trigger message
producerConfig := sarama.NewConfig()
producer, err := kafka.NewProducer(brokers, producerConfig, log.New(os.Stdout, "KafkaProducer: ", log.Ldate|log.Ltime|log.Lshortfile))
if err != nil {
log.Fatal(err)
}
defer func() {
if err := producer.Producer.Close(); err != nil {
log.Fatal(err)
}
}()
// Producer for test config message
testProducerConfig := sarama.NewConfig()
testConfigProducer, err := kafka.NewProducer(brokers, testProducerConfig, log.New(os.Stdout, "KafkaProducer: ", log.Ldate|log.Ltime|log.Lshortfile))
if err != nil {
log.Fatal(err)
}
defer func() {
if err := testConfigProducer.Producer.Close(); err != nil {
log.Fatal(err)
}
}()
// Create the orchestrator instance
orchestrator := orchestrator.NewOrchestrator(
heartbeatConsumer,
metricsConsumer,
registerConsumer,
producer,
testConfigProducer,
*heartbeatTimeout,
db,
)
// Wait group for the register consumer
var wg sync.WaitGroup
wg.Add(1)
go orchestrator.RunRegisterConsumer(&wg, *numDrivers)
wg.Wait()
// Start the Kafka metrics, heartbeat, and register consumers.
go orchestrator.RunHeartbeatConsumer()
go orchestrator.RunMetricsConsumer()
// Create a new gin router
router := gin.Default()
orchestrator.SetupHTTPHandlers(router)
// Create a signal channel to listen for the interrupt signal.
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
// Start the HTTP server
go func() {
if err := router.Run(":8081"); err != nil {
log.Fatal(err)
}
}()
// Wait for either an interrupt signal or the HTTP server to exit.
select {
case <-signalChan:
fmt.Println("Received interrupt signal. Shutting down...")
// Close the BadgerDB connection.
if err := db.Close(); err != nil {
log.Fatal(err)
}
// Exit the application.
os.Exit(0)
}
}