Skip to content

Commit

Permalink
Merge pull request #676 from Shopify/version-config
Browse files Browse the repository at this point in the history
Support configuring target kafka version
  • Loading branch information
eapache committed Jun 9, 2016
2 parents 2ff1d9e + 5e8b6ff commit 1ab4225
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 2 deletions.
11 changes: 9 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ type Config struct {
// in the background while user code is working, greatly improving throughput.
// Defaults to 256.
ChannelBufferSize int
// The version of Kafka that Sarama will assume it is running against.
// Defaults to the oldest supported stable version. Since Kafka provides
// backwards-compatibility, setting it to a version older than you have
// will not break anything, although it may prevent you from using the
// latest features. Setting it to a version greater than you are actually
// running may lead to random breakage.
Version KafkaVersion
}

// NewConfig returns a new configuration instance with sane defaults.
Expand Down Expand Up @@ -258,9 +265,9 @@ func NewConfig() *Config {
c.Consumer.Offsets.CommitInterval = 1 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest

c.ChannelBufferSize = 256

c.ClientID = defaultClientID
c.ChannelBufferSize = 256
c.Version = V0_8_2_0

return c
}
Expand Down
38 changes: 38 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,41 @@ func newBufConn(conn net.Conn) *bufConn {
func (bc *bufConn) Read(b []byte) (n int, err error) {
return bc.buf.Read(b)
}

// KafkaVersion instances represent versions of the upstream Kafka broker.
type KafkaVersion struct {
// it's a struct rather than just typing the array directly to make it opaque and stop people
// generating their own arbitrary versions
version [4]uint
}

func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion {
return KafkaVersion{
version: [4]uint{major, minor, veryMinor, patch},
}
}

// IsAtLeast return true if and only if the version it is called on is
// greater than or equal to the version passed in:
// V1.IsAtLeast(V2) // false
// V2.IsAtLeast(V1) // true
func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool {
for i := range v.version {
if v.version[i] > other.version[i] {
return true
} else if v.version[i] < other.version[i] {
return false
}
}
return true
}

// Effective constants defining the supported kafka versions.
var (
V0_8_2_0 = newKafkaVersion(0, 8, 2, 0)
V0_8_2_1 = newKafkaVersion(0, 8, 2, 1)
V0_8_2_2 = newKafkaVersion(0, 8, 2, 2)
V0_9_0_0 = newKafkaVersion(0, 9, 0, 0)
V0_9_0_1 = newKafkaVersion(0, 9, 0, 1)
V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
)
21 changes: 21 additions & 0 deletions utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package sarama

import "testing"

func TestVersionCompare(t *testing.T) {
if V0_8_2_0.IsAtLeast(V0_8_2_1) {
t.Error("0.8.2.0 >= 0.8.2.1")
}
if !V0_8_2_1.IsAtLeast(V0_8_2_0) {
t.Error("! 0.8.2.1 >= 0.8.2.0")
}
if !V0_8_2_0.IsAtLeast(V0_8_2_0) {
t.Error("! 0.8.2.0 >= 0.8.2.0")
}
if !V0_9_0_0.IsAtLeast(V0_8_2_1) {
t.Error("! 0.9.0.0 >= 0.8.2.1")
}
if V0_8_2_1.IsAtLeast(V0_10_0_0) {
t.Error("0.8.2.1 >= 0.10.0.0")
}
}

0 comments on commit 1ab4225

Please sign in to comment.