@@ -77,7 +77,7 @@ type Reader struct {
7777 cancel context.CancelFunc
7878 stop context.CancelFunc
7979 done chan struct {}
80- commits chan [] Message
80+ commits chan commitRequest
8181 version int64 // version holds the generation of the spawned readers
8282 offset int64
8383 lag int64
@@ -96,10 +96,12 @@ type Reader struct {
9696 stats readerStats
9797}
9898
99- // useConsumerGroup indicates the Reader is part of a consumer group
100- func (r * Reader ) useConsumerGroup () bool {
101- return r .config .GroupID != ""
102- }
99+ // useConsumerGroup indicates whether the Reader is part of a consumer group.
100+ func (r * Reader ) useConsumerGroup () bool { return r .config .GroupID != "" }
101+
102+ // useSyncCommits indicates whether the Reader is configured to perform sync or
103+ // async commits.
104+ func (r * Reader ) useSyncCommits () bool { return r .config .CommitInterval == 0 }
103105
104106// membership returns the group generationID and memberID of the reader.
105107//
@@ -676,20 +678,16 @@ func (r *Reader) commitOffsetsWithRetry(conn offsetCommitter, offsetStash offset
676678type offsetStash map [string ]map [int ]int64
677679
678680// merge updates the offsetStash with the offsets from the provided messages
679- func (o offsetStash ) merge (msgs ... Message ) {
680- if o == nil {
681- return
682- }
683-
684- for _ , m := range msgs {
685- offsetsByPartition , ok := o [m .Topic ]
681+ func (o offsetStash ) merge (commits []commit ) {
682+ for _ , c := range commits {
683+ offsetsByPartition , ok := o [c .topic ]
686684 if ! ok {
687685 offsetsByPartition = map [int ]int64 {}
688- o [m . Topic ] = offsetsByPartition
686+ o [c . topic ] = offsetsByPartition
689687 }
690688
691- if offset , ok := offsetsByPartition [m . Partition ]; ! ok || m . Offset > offset {
692- offsetsByPartition [m . Partition ] = m . Offset
689+ if offset , ok := offsetsByPartition [c . partition ]; ! ok || c . offset > offset {
690+ offsetsByPartition [c . partition ] = c . offset
693691 }
694692 }
695693}
@@ -701,35 +699,19 @@ func (o offsetStash) reset() {
701699 }
702700}
703701
704- // isEmpty returns true if the offsetStash contains no entries
705- func (o offsetStash ) isEmpty () bool {
706- return len (o ) == 0
707- }
708-
709702// commitLoopImmediate handles each commit synchronously
710703func (r * Reader ) commitLoopImmediate (conn offsetCommitter , stop <- chan struct {}) {
704+ offsetsByTopicAndPartition := offsetStash {}
705+
711706 for {
712707 select {
713708 case <- stop :
714709 return
715710
716- case msgs , ok := <- r .commits :
717- if ! ok {
718- r .withErrorLogger (func (l * log.Logger ) {
719- l .Println ("reader commit channel unexpectedly closed" )
720- })
721- return
722- }
723-
724- offsetsByTopicAndPartition := offsetStash {}
725- offsetsByTopicAndPartition .merge (msgs ... )
726-
727- if err := r .commitOffsetsWithRetry (conn , offsetsByTopicAndPartition , defaultCommitRetries ); err != nil {
728- r .withErrorLogger (func (l * log.Logger ) {
729- l .Printf ("unable to commit offset: %v" , err )
730- })
731- return
732- }
711+ case req := <- r .commits :
712+ offsetsByTopicAndPartition .merge (req .commits )
713+ req .errch <- r .commitOffsetsWithRetry (conn , offsetsByTopicAndPartition , defaultCommitRetries )
714+ offsetsByTopicAndPartition .reset ()
733715 }
734716 }
735717}
@@ -740,40 +722,25 @@ func (r *Reader) commitLoopInterval(conn offsetCommitter, stop <-chan struct{})
740722 ticker := time .NewTicker (r .config .HeartbeatInterval )
741723 defer ticker .Stop ()
742724
743- defer func () {
744- // commits any outstanding offsets on close
745- if err := r .commitOffsetsWithRetry (conn , r .offsetStash , defaultCommitRetries ); err == nil {
725+ commit := func () {
726+ if err := r .commitOffsetsWithRetry (conn , r .offsetStash , defaultCommitRetries ); err != nil {
727+ r .withErrorLogger (func (l * log.Logger ) { l .Print (err ) })
728+ } else {
746729 r .offsetStash .reset ()
747730 }
748- }()
731+ }
749732
750733 for {
751734 select {
752735 case <- stop :
736+ commit ()
753737 return
754738
755739 case <- ticker .C :
756- if len (r .offsetStash ) == 0 {
757- continue
758- }
759-
760- if err := r .commitOffsetsWithRetry (conn , r .offsetStash , defaultCommitRetries ); err != nil {
761- r .withErrorLogger (func (l * log.Logger ) {
762- l .Printf ("unable to commit offset: %v" , err )
763- })
764- return
765- }
766- r .offsetStash .reset ()
767-
768- case msgs , ok := <- r .commits :
769- if ! ok {
770- r .withErrorLogger (func (l * log.Logger ) {
771- l .Println ("reader commit channel unexpectedly closed" )
772- })
773- return
774- }
740+ commit ()
775741
776- r .offsetStash .merge (msgs ... )
742+ case req := <- r .commits :
743+ r .offsetStash .merge (req .commits )
777744 }
778745 }
779746}
@@ -1106,7 +1073,7 @@ func NewReader(config ReaderConfig) *Reader {
11061073 msgs : make (chan readerMessage , config .QueueCapacity ),
11071074 cancel : func () {},
11081075 done : make (chan struct {}),
1109- commits : make (chan [] Message ),
1076+ commits : make (chan commitRequest ),
11101077 stop : stop ,
11111078 offset : firstOffset ,
11121079 stctx : stctx ,
@@ -1247,6 +1214,47 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
12471214 }
12481215}
12491216
1217+ // CommitMessages commits the list of messages passed as argument. The program
1218+ // may pass a context to asynchronously cancel the commit operation when it was
1219+ // configured to be blocking.
1220+ func (r * Reader ) CommitMessages (ctx context.Context , msgs ... Message ) error {
1221+ if ! r .useConsumerGroup () {
1222+ return errNotAvailable
1223+ }
1224+
1225+ var errch <- chan error
1226+ var sync = r .useSyncCommits ()
1227+ var creq = commitRequest {
1228+ commits : makeCommits (msgs ... ),
1229+ }
1230+
1231+ if sync {
1232+ ch := make (chan error , 1 )
1233+ errch , creq .errch = ch , ch
1234+ }
1235+
1236+ select {
1237+ case r .commits <- creq :
1238+ case <- ctx .Done ():
1239+ return ctx .Err ()
1240+ case <- r .stctx .Done ():
1241+ // This context is used to ensure we don't allow commits after the
1242+ // reader was closed.
1243+ return io .ErrClosedPipe
1244+ }
1245+
1246+ if ! sync {
1247+ return nil
1248+ }
1249+
1250+ select {
1251+ case <- ctx .Done ():
1252+ return ctx .Err ()
1253+ case err := <- errch :
1254+ return err
1255+ }
1256+ }
1257+
12501258// ReadLag returns the current lag of the reader by fetching the last offset of
12511259// the topic and partition and computing the difference between that value and
12521260// the offset of the last message returned by ReadMessage.
@@ -1501,25 +1509,6 @@ func (r *Reader) start(offsetsByPartition map[int]int64) {
15011509 }
15021510}
15031511
1504- func (r * Reader ) CommitMessages (ctx context.Context , msgs ... Message ) error {
1505- if len (msgs ) == 0 {
1506- return nil
1507- }
1508-
1509- if ! r .useConsumerGroup () {
1510- return errNotAvailable
1511- }
1512-
1513- select {
1514- case <- ctx .Done ():
1515- return ctx .Err ()
1516- case <- r .stctx .Done ():
1517- return r .stctx .Err ()
1518- case r .commits <- msgs :
1519- return nil
1520- }
1521- }
1522-
15231512// A reader reads messages from kafka and produces them on its channels, it's
15241513// used as an way to asynchronously fetch messages while the main program reads
15251514// them using the high level reader API.
0 commit comments