diff --git a/kafka.go b/kafka.go index 7039ad6..20fb744 100644 --- a/kafka.go +++ b/kafka.go @@ -12,6 +12,7 @@ import ( trafficpb "github.com/akto-api-security/mirroring-api-logging/protobuf/traffic_payload" "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" "google.golang.org/protobuf/proto" ) @@ -29,12 +30,21 @@ var useTLS = false var InsecureSkipVerify = true var tlsCACertPath = "./ca.crt" +var isAuthImplemented = false +var kafkaUsername = "" +var kafkaPassword = "" + func init() { InitVar("USE_TLS", &useTLS) InitVar("INSECURE_SKIP_VERIFY", &InsecureSkipVerify) InitVar("TLS_CA_CERT_PATH", &tlsCACertPath) + // Initialize SASL authentication variables + InitVar("IS_AUTH_IMPLEMENTED", &isAuthImplemented) + InitVar("KAFKA_USERNAME", &kafkaUsername) + InitVar("KAFKA_PASSWORD", &kafkaPassword) + } func Produce(kafkaWriter *kafka.Writer, ctx context.Context, value *trafficpb.HttpResponseParam) error { @@ -132,12 +142,24 @@ func GetKafkaWriter(kafkaURL, topic string, batchSize int, batchTimeout time.Dur Compression: kafka.Zstd, } + // Configure transport with TLS and/or SASL authentication + transport := &kafka.Transport{} + if useTLS { tlsConfig, _ := NewTLSConfig(tlsCACertPath) - kafkaWriter.Transport = &kafka.Transport{ - TLS: tlsConfig, + transport.TLS = tlsConfig + } + + // Add SASL authentication if enabled + if isAuthImplemented && kafkaUsername != "" && kafkaPassword != "" { + slog.Info("Configuring SASL plain authentication", "username", kafkaUsername) + transport.SASL = plain.Mechanism{ + Username: kafkaUsername, + Password: kafkaPassword, } } + + kafkaWriter.Transport = transport return &kafkaWriter } @@ -152,13 +174,25 @@ func GetCredential(kafkaURL string, groupID string, topic string) Credential { MaxBytes: 10e6, // 10MB } + // Configure dialer with TLS and/or SASL authentication + dialer := &kafka.Dialer{} + if useTLS { tlsConfig, _ := NewTLSConfig(tlsCACertPath) - config.Dialer = &kafka.Dialer{ - TLS: tlsConfig, + dialer.TLS = tlsConfig + } + + // Add SASL authentication if enabled + if isAuthImplemented && kafkaUsername != "" && kafkaPassword != "" { + slog.Info("Configuring SASL plain authentication for reader", "username", kafkaUsername) + dialer.SASLMechanism = plain.Mechanism{ + Username: kafkaUsername, + Password: kafkaPassword, } } + config.Dialer = dialer + r := kafka.NewReader(config) // Set up a context with a timeout