diff --git a/consumer_metadata_response.go b/consumer_metadata_response.go index 457ab97a8..7e56dfb8f 100644 --- a/consumer_metadata_response.go +++ b/consumer_metadata_response.go @@ -1,10 +1,16 @@ package sarama +import ( + "net" + "strconv" +) + type ConsumerMetadataResponse struct { Err KError - CoordinatorID int32 - CoordinatorHost string - CoordinatorPort int32 + Coordinator *Broker + CoordinatorID int32 // deprecated: use Coordinator.ID() + CoordinatorHost string // deprecated: use Coordinator.Addr() + CoordinatorPort int32 // deprecated: use Coordinator.Addr() } func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) { @@ -14,20 +20,24 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) { } r.Err = KError(tmp) - r.CoordinatorID, err = pd.getInt32() - if err != nil { + r.Coordinator = new(Broker) + if err := r.Coordinator.decode(pd); err != nil { return err } - r.CoordinatorHost, err = pd.getString() + // this can all go away in 2.0, but we have to fill in deprecated fields to maintain + // backwards compatibility + host, portstr, err := net.SplitHostPort(r.Coordinator.Addr()) if err != nil { return err } - - r.CoordinatorPort, err = pd.getInt32() + port, err := strconv.ParseInt(portstr, 10, 32) if err != nil { return err } + r.CoordinatorID = r.Coordinator.ID() + r.CoordinatorHost = host + r.CoordinatorPort = int32(port) return nil } diff --git a/consumer_metadata_response_test.go b/consumer_metadata_response_test.go index b5a8cc443..1d852fd77 100644 --- a/consumer_metadata_response_test.go +++ b/consumer_metadata_response_test.go @@ -58,4 +58,12 @@ func TestConsumerMetadataResponseSuccess(t *testing.T) { if response.CoordinatorPort != 0xCCDD { t.Error("Decoding produced incorrect coordinator port.") } + + if response.Coordinator.ID() != 0xAB { + t.Error("Decoding produced incorrect coordinator ID.") + } + + if response.Coordinator.Addr() != "foo:52445" { + t.Error("Decoding produced incorrect coordinator address.") + } }