forked from rlynic/filebeat-output-clickhouse
/
clickhouse_integration_test.go
157 lines (134 loc) · 3.2 KB
/
clickhouse_integration_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package clickhouse_20200328
import (
"database/sql"
"fmt"
"github.com/ClickHouse/clickhouse-go"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/outest"
"github.com/gofrs/uuid"
"math/rand"
"testing"
"time"
)
var (
clickHouseUrl = "tcp://127.0.0.1:9000?debug=true"
columns = [3]string{"id", "name", "created_date"}
)
func TestPublish(t *testing.T) {
clickHouseConfig := map[string]interface{}{
"url": clickHouseUrl,
"table": "ck_test",
"columns": columns,
}
err := prepare()
if err != nil {
t.Fatalf("Error preparing test env: %v", err)
return
}
testPublishList(t, clickHouseConfig)
err = clean()
if err != nil {
t.Fatalf("Error cleaning test env: %v", err)
return
}
}
func testPublishList(t *testing.T, cfg map[string]interface{}) {
batches := 100
batchSize := 1000
output := newClickHouseTestingOutput(t, cfg)
err := sendTestEvents(output, batches, batchSize)
if err != nil {
t.Fatalf("Error reading config: %v", err)
}
}
func newClickHouseTestingOutput(t *testing.T, cfg map[string]interface{}) outputs.Client {
config, err := common.NewConfigFrom(cfg)
if err != nil {
t.Fatalf("Error reading config: %v", err)
}
plugin := outputs.FindFactory("clickHouse")
if plugin == nil {
t.Fatalf("clickhouse output module not registered")
}
out, err := plugin(beat.Info{Beat: "libbeat"}, outputs.NewNilObserver(), config)
if err != nil {
t.Fatalf("Failed to initialize clickhouse output: %v", err)
}
client := out.Clients[0].(outputs.NetworkClient)
if err := client.Connect(); err != nil {
t.Fatalf("Failed to connect to clickhouse host: %v", err)
}
return client
}
func sendTestEvents(out outputs.Client, batches, N int) error {
i := 1
for b := 0; b < batches; b++ {
events := make([]beat.Event, N)
for n := range events {
events[n] = createEvent(i)
i++
}
batch := outest.NewBatch(events...)
err := out.Publish(batch)
if err != nil {
return err
}
}
return nil
}
func createEvent(message int) beat.Event {
id, _ := uuid.NewV4()
return beat.Event{
Timestamp: time.Now(),
Meta: common.MapStr{
"ck-test": "ck-test-MetaValue",
},
Fields: common.MapStr{
"id": id.String(),
"name": fmt.Sprint("ck-test", rand.Intn(100000)),
"created_date": time.Now(),
},
}
}
func prepare() error {
connect, err := getConn()
if err != nil {
return err
}
clean()
_, err = connect.Exec(`
CREATE TABLE IF NOT EXISTS ck_test (
id FixedString(36),
name FixedString(50),
created_date DateTime
) engine=Memory
`)
return err
}
func clean() error {
connect, err := getConn()
if err != nil {
return err
}
_, err = connect.Exec(`
DROP TABLE IF EXISTS ck_test
`)
return err
}
func getConn() (*sql.DB, error) {
connect, err := sql.Open("clickhouse", clickHouseUrl)
if err != nil {
return connect, err
}
if err := connect.Ping(); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
fmt.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
} else {
fmt.Println(err)
}
return connect, err
}
return connect, nil
}