-
Notifications
You must be signed in to change notification settings - Fork 1
/
produce_random_logs.go
133 lines (118 loc) · 2.85 KB
/
produce_random_logs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package utils
import (
"encoding/json"
"log"
"math/rand"
"os"
"strings"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/google/uuid"
randomstring "github.com/xyproto/randomstring"
"github.com/hyperbolicresearch/hlog/config"
"github.com/hyperbolicresearch/hlog/internal/core"
)
// GenerateRandomLogs generates logs every in k seconds
// in a choice of numTopics topics, simulating how many processes
// would produce logs in ra real-life scenario.
func GenerateRandomLogs(cfg *config.Config, stop chan os.Signal) {
kafkaConfigs := kafka.ConfigMap{
"bootstrap.servers": cfg.Simulator.KafkaConfigs.Server,
}
producer, err := kafka.NewProducer(&kafkaConfigs)
if err != nil {
panic(err)
}
go func() {
for e := range producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Printf("Error producing: %v", err)
} else {
log.Printf("Produced to topic=%-9v partition=%v",
*ev.TopicPartition.Topic,
ev.TopicPartition.Partition)
}
}
}
}()
ticker := time.NewTicker(time.Second * time.Duration(5))
run := true
for run {
select {
case <-stop:
ticker.Stop()
run = false
case <-ticker.C:
go Generate(producer, cfg)
}
}
}
// Generate generates a new log with random data and produces it
// to a random topic via the kafka producer provided to it.
func Generate(kafkaProducer *kafka.Producer, cfg *config.Config) {
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
index := rnd.Intn(len(cfg.Simulator.KafkaTopics))
topics := cfg.Simulator.KafkaTopics
channel := topics[index]
id := uuid.New().String()
index = rnd.Intn(5)
senderIds := []string{
"client-0001",
"client-0002",
"client-0003",
"client-0004",
"client-0005",
}
senderId := senderIds[index]
timestamp := time.Now().Unix()
index = rnd.Intn(5)
levels := []string{
"debug",
"info",
"warn",
"error",
"fatal",
}
level := levels[index]
message := ""
foo := ""
bar := ""
for i := 0; i < cfg.Simulator.MessageLength; i++ {
word := randomstring.HumanFriendlyEnglishString(7) + " "
wfoo := randomstring.HumanFriendlyEnglishString(7) + " "
wbar := randomstring.HumanFriendlyEnglishString(7) + " "
message += word
foo += wfoo
bar += wbar
}
message = strings.TrimSpace(message)
data := map[string]interface{}{
"foo": foo,
"bar": bar,
"count": index,
"firstname": "John",
"lastname": "Doe",
"company": "Acme Inc.",
}
sendableLog := core.Log{
Channel: channel,
LogId: id,
SenderId: senderId,
Timestamp: timestamp,
Level: level,
Message: message,
Data: data,
}
value, err := json.Marshal(sendableLog)
if err != nil {
panic(err)
}
kafkaProducer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &channel,
Partition: kafka.PartitionAny},
Value: []byte(value),
}, nil)
}