Skip to content

Commit

Permalink
add kafka.SeekDontCheck (segmentio#295)
Browse files Browse the repository at this point in the history
* add kafka.SeekDontCheck

* fix logic

* touch less code

* use SeekDontCheck in ReadBatch

* add test
  • Loading branch information
Achille committed Jun 21, 2019
1 parent 8174c4c commit 0b38267
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 1 deletion.
31 changes: 30 additions & 1 deletion conn.go
Expand Up @@ -595,6 +595,12 @@ const (
SeekAbsolute = 1 // Seek to an absolute offset.
SeekEnd = 2 // Seek relative to the last offset available in the partition.
SeekCurrent = 3 // Seek relative to the current offset.

// This flag may be combined to any of the SeekAbsolute and SeekCurrent
// constants to skip the bound check that the connection would do otherwise.
// Programs can use this flag to avoid making a metadata request to the kafka
// broker to read the current first and last offsets of the partition.
SeekDontCheck = 1 << 31
)

// Seek sets the offset for the next read or write operation according to whence, which
Expand All @@ -604,12 +610,32 @@ const (
// as in lseek(2) or os.Seek.
// The method returns the new absolute offset of the connection.
func (c *Conn) Seek(offset int64, whence int) (int64, error) {
seekDontCheck := (whence & SeekDontCheck) != 0
whence &= ^SeekDontCheck

switch whence {
case SeekStart, SeekAbsolute, SeekEnd, SeekCurrent:
default:
return 0, fmt.Errorf("whence must be one of 0, 1, 2, or 3. (whence = %d)", whence)
}

if seekDontCheck {
if whence == SeekAbsolute {
c.mutex.Lock()
c.offset = offset
c.mutex.Unlock()
return offset, nil
}

if whence == SeekCurrent {
c.mutex.Lock()
c.offset += offset
offset = c.offset
c.mutex.Unlock()
return offset, nil
}
}

if whence == SeekAbsolute {
c.mutex.Lock()
unchanged := offset == c.offset
Expand All @@ -618,6 +644,7 @@ func (c *Conn) Seek(offset int64, whence int) (int64, error) {
return offset, nil
}
}

if whence == SeekCurrent {
c.mutex.Lock()
offset = c.offset + offset
Expand Down Expand Up @@ -726,7 +753,9 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes (%d) > maxBytes (%d)", cfg.MinBytes, cfg.MaxBytes)}
}

offset, err := c.Seek(c.Offset())
offset, whence := c.Offset()

offset, err := c.Seek(offset, whence|SeekDontCheck)
if err != nil {
return &Batch{err: dontExpectEOF(err)}
}
Expand Down
26 changes: 26 additions & 0 deletions conn_test.go
Expand Up @@ -152,6 +152,11 @@ func TestConn(t *testing.T) {
function: testConnSeekRandomOffset,
},

{
scenario: "unchecked seeks allow the connection to be positionned outside the boundaries of the partition",
function: testConnSeekDontCheck,
},

{
scenario: "writing and reading messages sequentially should preserve the order",
function: testConnWriteReadSequentially,
Expand Down Expand Up @@ -439,6 +444,27 @@ func testConnSeekRandomOffset(t *testing.T, conn *Conn) {
}
}

func testConnSeekDontCheck(t *testing.T, conn *Conn) {
for i := 0; i != 10; i++ {
if _, err := conn.Write([]byte(strconv.Itoa(i))); err != nil {
t.Fatal(err)
}
}

offset, err := conn.Seek(42, SeekAbsolute|SeekDontCheck)
if err != nil {
t.Error(err)
}

if offset != 42 {
t.Error("bad offset:", offset)
}

if _, err := conn.ReadMessage(1024); err != OffsetOutOfRange {
t.Error("unexpected error:", err)
}
}

func testConnWriteReadSequentially(t *testing.T, conn *Conn) {
for i := 0; i != 10; i++ {
if _, err := conn.Write([]byte(strconv.Itoa(i))); err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -9,4 +9,5 @@ require (
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/xdg/stringprep v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 // indirect
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
)
1 change: 1 addition & 0 deletions go.sum
Expand Up @@ -11,6 +11,7 @@ github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 h1:rlLehGeYg6jfoyz/eDqDU1iRXLKfR42nnNh57ytKEWo=
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down

0 comments on commit 0b38267

Please sign in to comment.