Skip to content

Commit

Permalink
Added support for SCRAM authentication mechanism and use of TLS for t…
Browse files Browse the repository at this point in the history
…ransport with kafka output
  • Loading branch information
Mats Christensen committed Dec 2, 2022
1 parent c7bfa8c commit 739eaf7
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 0 deletions.
6 changes: 6 additions & 0 deletions conf/cb-event-forwarder.example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,12 @@ compress_http_payload=false
# Optional custom kafka topic
# topic = mytopic

# Optional usage of SCRAM, supports SCRAM-SHA-256 and SCRAM-SHA-512
# algorithm = SCRAM-SHA-256

# Optinal usage of TLS for transport (NOT client auth)
# use_tls_transport = true

[splunk]
# Uncomment ca_cert to specify a file containing PEM-encoded CA certificates for verifying the peer server
# ca_cert=/etc/cb/integrations/event-forwarder/ca-certs.pem
Expand Down
15 changes: 15 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ type Configuration struct {
KafkaSSLCertificateLocation *string
KafkaSSLCALocation *string

KafkaAlgorithm string
KafkaUseTLSTransport bool

// Splunkd
SplunkToken *string

Expand Down Expand Up @@ -825,6 +828,18 @@ func (config *Configuration) parseKafkaSettings(input *ini.File) {
SSLKeyLocation := key.Value()
config.KafkaSSLKeyLocation = &SSLKeyLocation
}
if input.Section("kafka").HasKey("algorithm") {
key := input.Section("kafka").Key("algorithm")
kafkaAlgorithm := key.Value()
config.KafkaAlgorithm = kafkaAlgorithm
}
if input.Section("kafka").HasKey("use_tls_transport") {
key := input.Section("kafka").Key("use_tls_transport")
config.KafkaUseTLSTransport = false
if key.Value() == "true" {
config.KafkaUseTLSTransport = true
}
}
}

func (config *Configuration) parseSplunkSettings(input *ini.File) {
Expand Down
48 changes: 48 additions & 0 deletions pkg/outputs/kafka_output.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package outputs

import (
"crypto/sha256"
"crypto/sha512"
"crypto/tls"
"crypto/x509"
"encoding/json"
Expand All @@ -17,6 +19,7 @@ import (
. "github.com/carbonblack/cb-event-forwarder/pkg/config"
"github.com/rcrowley/go-metrics"
log "github.com/sirupsen/logrus"
"github.com/xdg-go/scram"
)

func NewTLSConfig(config *Configuration, clientCertFile, clientKeyFile, caCertFile string) (*tls.Config, error) {
Expand All @@ -43,6 +46,35 @@ func NewTLSConfig(config *Configuration, clientCertFile, clientKeyFile, caCertFi
return &tlsConfig, err
}

var (
SHA256 scram.HashGeneratorFcn = sha256.New
SHA512 scram.HashGeneratorFcn = sha512.New
)

type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}

type KafkaOutput struct {
Config *Configuration
brokers []string
Expand Down Expand Up @@ -116,8 +148,24 @@ func (o *KafkaOutput) Initialize(unused string) (err error) {
kafkaConfig.Net.SASL.User = o.Config.KafkaUsername
kafkaConfig.Net.SASL.Password = o.Config.KafkaPassword
kafkaConfig.Net.SASL.Enable = true
if o.Config.KafkaAlgorithm == "SCRAM-SHA-512" {
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
}
if o.Config.KafkaAlgorithm == "SCRAM-SHA-256" {
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
}
}

if o.Config.KafkaUseTLSTransport {
kafkaConfig.Net.TLS.Enable = true
tlsConfig := &tls.Config {
ClientAuth: 0,
}
kafkaConfig.Net.TLS.Config = tlsConfig
}

producer, err := sarama.NewAsyncProducer(o.brokers, kafkaConfig)

o.producer = producer
Expand Down

0 comments on commit 739eaf7

Please sign in to comment.