-
Notifications
You must be signed in to change notification settings - Fork 47
/
client.go
163 lines (132 loc) · 6.3 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kafka
import (
"context"
"fmt"
"strings"
"github.com/Shopify/sarama"
"github.com/apache/skywalking-satellite/internal/pkg/config"
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/plugins/client/api"
)
const (
Name = "kafka-client"
ShowName = "Kafka Client"
)
type Client struct {
config.CommonFields
Brokers string `mapstructure:"brokers"` // The Kafka broker addresses (default `localhost:9092`).
Version string `mapstructure:"version"` // The version should follow this pattern, which is `major.minor.veryMinor.patch`.
EnableTLS bool `mapstructure:"enable_TLS"` // The TLS switch (default false).
ClientPemPath string `mapstructure:"client_pem_path"` // The file path of client.pem. The config only works when opening the TLS switch.
ClientKeyPath string `mapstructure:"client_key_path"` // The file path of client.key. The config only works when opening the TLS switch.
CaPemPath string `mapstructure:"ca_pem_path"` // The file path oca.pem. The config only works when opening the TLS switch.
RequiredAcks int16 `mapstructure:"required_acks"` // 0 means NoResponse, 1 means WaitForLocal and -1 means WaitForAll (default 1).
ProducerMaxRetry int `mapstructure:"producer_max_retry"` // The producer max retry times (default 3).
MetaMaxRetry int `mapstructure:"meta_max_retry"` // The meta max retry times (default 3).
RetryBackoff int `mapstructure:"retry_backoff"` // How long to wait for the cluster to settle between retries (default 100ms).
MaxMessageBytes int `mapstructure:"max_message_bytes"` // The max message bytes.
IdempotentWrites bool `mapstructure:"idempotent_writes"` // Ensure that exactly one copy of each message is written when is true.
ClientID string `mapstructure:"client_id"` // A user-provided string sent with every request to the brokers.
CompressionCodec int `mapstructure:"compression_codec"` // Represents the various compression codecs recognized by Kafka in messages.
RefreshPeriod int `mapstructure:"refresh_period"` // How frequently to refresh the cluster metadata.
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"` // Controls whether a client verifies the server's certificate chain and host name.
// components
client sarama.Client // The kafka producer.
listeners []chan<- api.ClientStatus
status api.ClientStatus
ctx context.Context // Parent ctx
cancel context.CancelFunc // Parent ctx cancel function
}
func (c *Client) Name() string {
return Name
}
func (c *Client) ShowName() string {
return ShowName
}
func (c *Client) Description() string {
return "The Kafka client is a sharing plugin to keep connection with the Kafka brokers and delivery the data to it."
}
func (c *Client) DefaultConfig() string {
return `
# The Kafka broker addresses (default localhost:9092). Multiple values are separated by commas.
brokers: localhost:9092
# The Kafka version should follow this pattern, which is major_minor_veryMinor_patch (default 1.0.0.0).
version: 1.0.0.0
# The TLS switch (default false).
enable_TLS: false
# The file path of client.pem. The config only works when opening the TLS switch.
client_pem_path: ""
# The file path of client.key. The config only works when opening the TLS switch.
client_key_path: ""
# The file path oca.pem. The config only works when opening the TLS switch.
ca_pem_path: ""
# 0 means NoResponse, 1 means WaitForLocal and -1 means WaitForAll (default 1).
required_acks: 1
# The producer max retry times (default 3).
producer_max_retry: 3
# The meta max retry times (default 3).
meta_max_retry: 3
# How long to wait for the cluster to settle between retries (default 100ms). Time unit is ms.
retry_backoff: 100
# The max message bytes.
max_message_bytes: 1000000
# If enabled, the producer will ensure that exactly one copy of each message is written (default false).
idempotent_writes: false
# A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes (default Satellite).
client_id: Satellite
# Compression codec represents the various compression codecs recognized by Kafka in messages. 0 : None, 1 : Gzip, 2 : Snappy, 3 : LZ4, 4 : ZSTD
compression_codec: 0
# How frequently to refresh the cluster metadata in the background. Defaults to 10 minutes. The unit is minute.
refresh_period: 10
# InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name.
insecure_skip_verify: true
`
}
func (c *Client) Prepare() error {
cfg, err := c.loadConfig()
if err != nil {
return fmt.Errorf("cannot init the kafka producer: %v", err)
}
sarama.Logger = log.Logger
client, err := sarama.NewClient(strings.Split(c.Brokers, ","), cfg)
if err != nil {
return fmt.Errorf("cannot init the kafka client: %v", err)
}
c.client = client
c.status = api.Connected
c.ctx, c.cancel = context.WithCancel(context.Background())
c.listeners = make([]chan<- api.ClientStatus, 0)
return nil
}
func (c *Client) Close() error {
c.cancel()
defer log.Logger.Info("kafka client is closed")
return c.client.Close()
}
func (c *Client) GetConnectedClient() interface{} {
return c.client
}
func (c *Client) RegisterListener(listener chan<- api.ClientStatus) {
c.listeners = append(c.listeners, listener)
}
func (c *Client) Start() error {
// start supported processes.
go c.snifferBrokerStatus()
return nil
}