Skip to content

Commit

Permalink
Add SASL handshake negotiation
Browse files Browse the repository at this point in the history
This patch adds the mandatory SASL handshake for SASL negotiation.
  • Loading branch information
guillaumebreton committed Nov 7, 2016
1 parent af0513c commit 610a3bd
Showing 1 changed file with 52 additions and 0 deletions.
52 changes: 52 additions & 0 deletions broker.go
Expand Up @@ -514,6 +514,53 @@ func (b *Broker) responseReceiver() {
close(b.done)
}

func (b *Broker) sendAndReceiveSASLPlainHandshake() error {
rb := &SaslHandshakeRequest{"PLAIN"}
req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req)
if err != nil {
return err
}

err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
if err != nil {
return err
}

bytes, err := b.conn.Write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
return err
}
b.correlationID++
//wait for the response
header := make([]byte, 8) // response header
n, err := io.ReadFull(b.conn, header)
b.updateIncomingCommunicationMetrics(n)
length := binary.BigEndian.Uint32(header[:4])
payload := make([]byte, length-4)
n, err = io.ReadFull(b.conn, payload)
if err != nil {
Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
return err
}
b.updateIncomingCommunicationMetrics(n)
res := &SaslHandshakeResponse{}
err = versionedDecode(payload, res, 0)
if err != nil {
Logger.Printf("Failed to parse SASL handshake : %s\n", err.Error())
return err
}
if res.Err != ErrNoError {
Logger.Printf("Invalid SASL Mechanism : %s\n", err.Error())
return res.Err
}
Logger.Print("Successful SASL handshake")
return nil

}

// Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
// Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
//
Expand All @@ -533,6 +580,11 @@ func (b *Broker) responseReceiver() {
// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
// of responding to bad credentials but thats how its being done today.
func (b *Broker) sendAndReceiveSASLPlainAuth() error {
handshakeErr := b.sendAndReceiveSASLPlainHandshake()
if handshakeErr != nil {
Logger.Printf("Error while performing SASL handshake %s\n", b.addr)
return handshakeErr
}
length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
authBytes := make([]byte, length+4) //4 byte length header + auth data
binary.BigEndian.PutUint32(authBytes, uint32(length))
Expand Down

0 comments on commit 610a3bd

Please sign in to comment.