Skip to content

Commit

Permalink
Merge pull request #673 from Shopify/versioned-responses
Browse files Browse the repository at this point in the history
Versioned responses
  • Loading branch information
eapache committed Jun 9, 2016
2 parents 50ae3cc + 37654da commit 2ff1d9e
Show file tree
Hide file tree
Showing 48 changed files with 229 additions and 77 deletions.
2 changes: 1 addition & 1 deletion api_versions_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func (r *ApiVersionsRequest) encode(pe packetEncoder) error {
return nil
}

func (r *ApiVersionsRequest) decode(pd packetDecoder) (err error) {
func (r *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) {
return nil
}

Expand Down
10 changes: 9 additions & 1 deletion api_versions_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (r *ApiVersionsResponse) encode(pe packetEncoder) error {
return nil
}

func (r *ApiVersionsResponse) decode(pd packetDecoder) error {
func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) error {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
Expand All @@ -72,3 +72,11 @@ func (r *ApiVersionsResponse) decode(pd packetDecoder) error {

return nil
}

func (r *ApiVersionsResponse) key() int16 {
return 18
}

func (r *ApiVersionsResponse) version() int16 {
return 0
}
2 changes: 1 addition & 1 deletion api_versions_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestApiVersionsResponse(t *testing.T) {
var response *ApiVersionsResponse

response = new(ApiVersionsResponse)
testDecodable(t, "no error", response, apiVersionResponse)
testVersionDecodable(t, "no error", response, apiVersionResponse, 0)
if response.Err != ErrNoError {
t.Error("Decoding error failed: no error expected but found", response.Err)
}
Expand Down
6 changes: 3 additions & 3 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroups
return response, nil
}

func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) {
func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()

Expand Down Expand Up @@ -355,7 +355,7 @@ func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, e
return &promise, nil
}

func (b *Broker) sendAndReceive(req requestBody, res decoder) error {
func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
promise, err := b.send(req, res != nil)

if err != nil {
Expand All @@ -368,7 +368,7 @@ func (b *Broker) sendAndReceive(req requestBody, res decoder) error {

select {
case buf := <-promise.packets:
return decode(buf, res)
return versionedDecode(buf, res, req.version())
case err = <-promise.errors:
return err
}
Expand Down
2 changes: 1 addition & 1 deletion consumer_metadata_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error {
return pe.putString(r.ConsumerGroup)
}

func (r *ConsumerMetadataRequest) decode(pd packetDecoder) (err error) {
func (r *ConsumerMetadataRequest) decode(pd packetDecoder, version int16) (err error) {
r.ConsumerGroup, err = pd.getString()
return err
}
Expand Down
10 changes: 9 additions & 1 deletion consumer_metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type ConsumerMetadataResponse struct {
CoordinatorPort int32 // deprecated: use Coordinator.Addr()
}

func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
tmp, err := pd.getInt16()
if err != nil {
return err
Expand Down Expand Up @@ -71,3 +71,11 @@ func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
pe.putInt32(r.CoordinatorPort)
return nil
}

func (r *ConsumerMetadataResponse) key() int16 {
return 10
}

func (r *ConsumerMetadataResponse) version() int16 {
return 0
}
2 changes: 1 addition & 1 deletion describe_groups_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ func (r *DescribeGroupsRequest) encode(pe packetEncoder) error {
return pe.putStringArray(r.Groups)
}

func (r *DescribeGroupsRequest) decode(pd packetDecoder) (err error) {
func (r *DescribeGroupsRequest) decode(pd packetDecoder, version int16) (err error) {
r.Groups, err = pd.getStringArray()
return
}
Expand Down
10 changes: 9 additions & 1 deletion describe_groups_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (r *DescribeGroupsResponse) encode(pe packetEncoder) error {
return nil
}

func (r *DescribeGroupsResponse) decode(pd packetDecoder) (err error) {
func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err error) {
n, err := pd.getArrayLength()
if err != nil {
return err
Expand All @@ -35,6 +35,14 @@ func (r *DescribeGroupsResponse) decode(pd packetDecoder) (err error) {
return nil
}

func (r *DescribeGroupsResponse) key() int16 {
return 15
}

func (r *DescribeGroupsResponse) version() int16 {
return 0
}

type GroupDescription struct {
Err KError
GroupId string
Expand Down
4 changes: 2 additions & 2 deletions describe_groups_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ func TestDescribeGroupsResponse(t *testing.T) {
var response *DescribeGroupsResponse

response = new(DescribeGroupsResponse)
testDecodable(t, "empty", response, describeGroupsResponseEmpty)
testVersionDecodable(t, "empty", response, describeGroupsResponseEmpty, 0)
if len(response.Groups) != 0 {
t.Error("Expected no groups")
}

response = new(DescribeGroupsResponse)
testDecodable(t, "populated", response, describeGroupsResponsePopulated)
testVersionDecodable(t, "populated", response, describeGroupsResponsePopulated, 0)
if len(response.Groups) != 2 {
t.Error("Expected two groups")
}
Expand Down
22 changes: 22 additions & 0 deletions encoder_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type decoder interface {
decode(pd packetDecoder) error
}

type versionedDecoder interface {
decode(pd packetDecoder, version int16) error
}

// Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes,
// interpreted using Kafka's encoding rules.
func decode(buf []byte, in decoder) error {
Expand All @@ -60,3 +64,21 @@ func decode(buf []byte, in decoder) error {

return nil
}

func versionedDecode(buf []byte, in versionedDecoder, version int16) error {
if buf == nil {
return nil
}

helper := realDecoder{raw: buf}
err := in.decode(&helper, version)
if err != nil {
return err
}

if helper.off != len(buf) {
return PacketDecodingError{"invalid length"}
}

return nil
}
2 changes: 1 addition & 1 deletion fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (f *FetchRequest) encode(pe packetEncoder) (err error) {
return nil
}

func (f *FetchRequest) decode(pd packetDecoder) (err error) {
func (f *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
if _, err = pd.getInt32(); err != nil {
return err
}
Expand Down
10 changes: 9 additions & 1 deletion fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
return pe.pop()
}

func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
func (fr *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
numTopics, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -116,6 +116,14 @@ func (fr *FetchResponse) encode(pe packetEncoder) (err error) {
return nil
}

func (r *FetchResponse) key() int16 {
return 1
}

func (r *FetchResponse) version() int16 {
return 0
}

func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
if fr.Blocks == nil {
return nil
Expand Down
4 changes: 2 additions & 2 deletions fetch_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var (

func TestEmptyFetchResponse(t *testing.T) {
response := FetchResponse{}
testDecodable(t, "empty", &response, emptyFetchResponse)
testVersionDecodable(t, "empty", &response, emptyFetchResponse, 0)

if len(response.Blocks) != 0 {
t.Error("Decoding produced topic blocks where there were none.")
Expand All @@ -40,7 +40,7 @@ func TestEmptyFetchResponse(t *testing.T) {

func TestOneMessageFetchResponse(t *testing.T) {
response := FetchResponse{}
testDecodable(t, "one message", &response, oneMessageFetchResponse)
testVersionDecodable(t, "one message", &response, oneMessageFetchResponse, 0)

if len(response.Blocks) != 1 {
t.Fatal("Decoding produced incorrect number of topic blocks.")
Expand Down
2 changes: 1 addition & 1 deletion heartbeat_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (r *HeartbeatRequest) encode(pe packetEncoder) error {
return nil
}

func (r *HeartbeatRequest) decode(pd packetDecoder) (err error) {
func (r *HeartbeatRequest) decode(pd packetDecoder, version int16) (err error) {
if r.GroupId, err = pd.getString(); err != nil {
return
}
Expand Down
10 changes: 9 additions & 1 deletion heartbeat_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ func (r *HeartbeatResponse) encode(pe packetEncoder) error {
return nil
}

func (r *HeartbeatResponse) decode(pd packetDecoder) error {
func (r *HeartbeatResponse) decode(pd packetDecoder, version int16) error {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
Expand All @@ -18,3 +18,11 @@ func (r *HeartbeatResponse) decode(pd packetDecoder) error {

return nil
}

func (r *HeartbeatResponse) key() int16 {
return 12
}

func (r *HeartbeatResponse) version() int16 {
return 0
}
2 changes: 1 addition & 1 deletion heartbeat_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func TestHeartbeatResponse(t *testing.T) {
var response *HeartbeatResponse

response = new(HeartbeatResponse)
testDecodable(t, "no error", response, heartbeatResponseNoError)
testVersionDecodable(t, "no error", response, heartbeatResponseNoError, 0)
if response.Err != ErrNoError {
t.Error("Decoding error failed: no error expected but found", response.Err)
}
Expand Down
2 changes: 1 addition & 1 deletion join_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (r *JoinGroupRequest) encode(pe packetEncoder) error {
return nil
}

func (r *JoinGroupRequest) decode(pd packetDecoder) (err error) {
func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) {
if r.GroupId, err = pd.getString(); err != nil {
return
}
Expand Down
10 changes: 9 additions & 1 deletion join_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (r *JoinGroupResponse) encode(pe packetEncoder) error {
return nil
}

func (r *JoinGroupResponse) decode(pd packetDecoder) (err error) {
func (r *JoinGroupResponse) decode(pd packetDecoder, version int16) (err error) {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
Expand Down Expand Up @@ -100,3 +100,11 @@ func (r *JoinGroupResponse) decode(pd packetDecoder) (err error) {

return nil
}

func (r *JoinGroupResponse) key() int16 {
return 11
}

func (r *JoinGroupResponse) version() int16 {
return 0
}
6 changes: 3 additions & 3 deletions join_group_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestJoinGroupResponse(t *testing.T) {
var response *JoinGroupResponse

response = new(JoinGroupResponse)
testDecodable(t, "no error", response, joinGroupResponseNoError)
testVersionDecodable(t, "no error", response, joinGroupResponseNoError, 0)
if response.Err != ErrNoError {
t.Error("Decoding Err failed: no error expected but found", response.Err)
}
Expand All @@ -58,7 +58,7 @@ func TestJoinGroupResponse(t *testing.T) {
}

response = new(JoinGroupResponse)
testDecodable(t, "with error", response, joinGroupResponseWithError)
testVersionDecodable(t, "with error", response, joinGroupResponseWithError, 0)
if response.Err != ErrInconsistentGroupProtocol {
t.Error("Decoding Err failed: ErrInconsistentGroupProtocol expected but found", response.Err)
}
Expand All @@ -76,7 +76,7 @@ func TestJoinGroupResponse(t *testing.T) {
}

response = new(JoinGroupResponse)
testDecodable(t, "with error", response, joinGroupResponseLeader)
testVersionDecodable(t, "with error", response, joinGroupResponseLeader, 0)
if response.Err != ErrNoError {
t.Error("Decoding Err failed: ErrNoError expected but found", response.Err)
}
Expand Down
2 changes: 1 addition & 1 deletion leave_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (r *LeaveGroupRequest) encode(pe packetEncoder) error {
return nil
}

func (r *LeaveGroupRequest) decode(pd packetDecoder) (err error) {
func (r *LeaveGroupRequest) decode(pd packetDecoder, version int16) (err error) {
if r.GroupId, err = pd.getString(); err != nil {
return
}
Expand Down
10 changes: 9 additions & 1 deletion leave_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ func (r *LeaveGroupResponse) encode(pe packetEncoder) error {
return nil
}

func (r *LeaveGroupResponse) decode(pd packetDecoder) (err error) {
func (r *LeaveGroupResponse) decode(pd packetDecoder, version int16) (err error) {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
Expand All @@ -18,3 +18,11 @@ func (r *LeaveGroupResponse) decode(pd packetDecoder) (err error) {

return nil
}

func (r *LeaveGroupResponse) key() int16 {
return 13
}

func (r *LeaveGroupResponse) version() int16 {
return 0
}
4 changes: 2 additions & 2 deletions leave_group_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ func TestLeaveGroupResponse(t *testing.T) {
var response *LeaveGroupResponse

response = new(LeaveGroupResponse)
testDecodable(t, "no error", response, leaveGroupResponseNoError)
testVersionDecodable(t, "no error", response, leaveGroupResponseNoError, 0)
if response.Err != ErrNoError {
t.Error("Decoding error failed: no error expected but found", response.Err)
}

response = new(LeaveGroupResponse)
testDecodable(t, "with error", response, leaveGroupResponseWithError)
testVersionDecodable(t, "with error", response, leaveGroupResponseWithError, 0)
if response.Err != ErrUnknownMemberId {
t.Error("Decoding error failed: ErrUnknownMemberId expected but found", response.Err)
}
Expand Down
2 changes: 1 addition & 1 deletion list_groups_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func (r *ListGroupsRequest) encode(pe packetEncoder) error {
return nil
}

func (r *ListGroupsRequest) decode(pd packetDecoder) (err error) {
func (r *ListGroupsRequest) decode(pd packetDecoder, version int16) (err error) {
return nil
}

Expand Down
Loading

0 comments on commit 2ff1d9e

Please sign in to comment.