diff --git a/broker.go b/broker.go index a42257150..adf8f3a47 100644 --- a/broker.go +++ b/broker.go @@ -514,6 +514,53 @@ func (b *Broker) responseReceiver() { close(b.done) } +func (b *Broker) sendAndReceiveSASLPlainHandshake() error { + rb := &SaslHandshakeRequest{"PLAIN"} + req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb} + buf, err := encode(req) + if err != nil { + return err + } + + err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)) + if err != nil { + return err + } + + bytes, err := b.conn.Write(buf) + b.updateOutgoingCommunicationMetrics(bytes) + if err != nil { + Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error()) + return err + } + b.correlationID++ + //wait for the response + header := make([]byte, 8) // response header + n, err := io.ReadFull(b.conn, header) + b.updateIncomingCommunicationMetrics(n) + length := binary.BigEndian.Uint32(header[:4]) + payload := make([]byte, length-4) + n, err = io.ReadFull(b.conn, payload) + if err != nil { + Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error()) + return err + } + b.updateIncomingCommunicationMetrics(n) + res := &SaslHandshakeResponse{} + err = versionedDecode(payload, res, 0) + if err != nil { + Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error()) + return err + } + if res.Err != ErrNoError { + Logger.Printf("Invalid SASL Mechanism : %s\n", err.Error()) + return res.Err + } + Logger.Print("Successful SASL handshake") + return nil + +} + // Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149) // Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9 // @@ -533,6 +580,11 @@ func (b *Broker) responseReceiver() { // When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way // of responding to bad credentials but thats how its being done today. func (b *Broker) sendAndReceiveSASLPlainAuth() error { + handshakeErr := b.sendAndReceiveSASLPlainHandshake() + if handshakeErr != nil { + Logger.Printf("Error while performing SASL handshake %s\n", b.addr) + return handshakeErr + } length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password) authBytes := make([]byte, length+4) //4 byte length header + auth data binary.BigEndian.PutUint32(authBytes, uint32(length))