-
Notifications
You must be signed in to change notification settings - Fork 652
/
offset.go
147 lines (124 loc) · 3.93 KB
/
offset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
/**
* Copyright 2017 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka
import (
"fmt"
"strconv"
)
/*
#include <stdlib.h>
#include "select_rdkafka.h"
static int64_t _c_rdkafka_offset_tail(int64_t rel) {
return RD_KAFKA_OFFSET_TAIL(rel);
}
*/
import "C"
// Offset type (int64) with support for canonical names
type Offset int64
// OffsetBeginning represents the earliest offset (logical)
const OffsetBeginning = Offset(C.RD_KAFKA_OFFSET_BEGINNING)
// OffsetEnd represents the latest offset (logical)
const OffsetEnd = Offset(C.RD_KAFKA_OFFSET_END)
// OffsetInvalid represents an invalid/unspecified offset
const OffsetInvalid = Offset(C.RD_KAFKA_OFFSET_INVALID)
// OffsetStored represents a stored offset
const OffsetStored = Offset(C.RD_KAFKA_OFFSET_STORED)
func (o Offset) String() string {
switch o {
case OffsetBeginning:
return "beginning"
case OffsetEnd:
return "end"
case OffsetInvalid:
return "unset"
case OffsetStored:
return "stored"
default:
return fmt.Sprintf("%d", int64(o))
}
}
// Set offset value, see NewOffset()
func (o *Offset) Set(offset interface{}) error {
n, err := NewOffset(offset)
if err == nil {
*o = n
}
return err
}
// NewOffset creates a new Offset using the provided logical string, an
// absolute int64 offset value, or a concrete Offset type.
// Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored"
func NewOffset(offset interface{}) (Offset, error) {
switch v := offset.(type) {
case string:
switch v {
case "beginning":
fallthrough
case "earliest":
return Offset(OffsetBeginning), nil
case "end":
fallthrough
case "latest":
return Offset(OffsetEnd), nil
case "unset":
fallthrough
case "invalid":
return Offset(OffsetInvalid), nil
case "stored":
return Offset(OffsetStored), nil
default:
off, err := strconv.Atoi(v)
return Offset(off), err
}
case int:
return Offset((int64)(v)), nil
case int64:
return Offset(v), nil
case Offset:
return Offset(v), nil
default:
return OffsetInvalid, newErrorFromString(ErrInvalidArg,
fmt.Sprintf("Invalid offset type: %t", v))
}
}
// OffsetTail returns the logical offset relativeOffset from current end of partition
func OffsetTail(relativeOffset Offset) Offset {
return Offset(C._c_rdkafka_offset_tail(C.int64_t(relativeOffset)))
}
// offsetsForTimes looks up offsets by timestamp for the given partitions.
//
// The returned offset for each partition is the earliest offset whose
// timestamp is greater than or equal to the given timestamp in the
// corresponding partition. If the provided timestamp exceeds that of the
// last message in the partition, a value of -1 will be returned.
//
// The timestamps to query are represented as `.Offset` in the `times`
// argument and the looked up offsets are represented as `.Offset` in the returned
// `offsets` list.
//
// The function will block for at most timeoutMs milliseconds.
//
// Duplicate Topic+Partitions are not supported.
// Per-partition errors may be returned in the `.Error` field.
func offsetsForTimes(H Handle, times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
cparts := newCPartsFromTopicPartitions(times)
defer C.rd_kafka_topic_partition_list_destroy(cparts)
cerr := C.rd_kafka_offsets_for_times(H.gethandle().rk, cparts, C.int(timeoutMs))
if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil, newError(cerr)
}
return newTopicPartitionsFromCparts(cparts), nil
}