Skip to content

Commit

Permalink
fix bookmark
Browse files Browse the repository at this point in the history
  • Loading branch information
ToniRamirezM committed Apr 30, 2024
1 parent 575ba7a commit 90f8258
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 658 deletions.
File renamed without changes.
File renamed without changes
16 changes: 12 additions & 4 deletions sequencesender/sequencesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package sequencesender

import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -159,8 +158,17 @@ func (s *SequenceSender) Start(ctx context.Context) {

// Set starting point of the streaming
s.fromStreamBatch = s.latestVirtualBatch
bookmark := []byte{state.BookMarkTypeBatch}
bookmark = binary.BigEndian.AppendUint64(bookmark, s.fromStreamBatch)

bookmark := &datastream.BookMark{
Type: datastream.BookmarkType_BOOKMARK_TYPE_BATCH,
Value: s.fromStreamBatch,
}

marshalledBookmark, err := proto.Marshal(bookmark)
if err != nil {
log.Fatalf("[SeqSender] failed to marshal bookmark, error: %v", err)
}

log.Infof("[SeqSender] stream client from bookmark %v", bookmark)

// Current batch to sequence
Expand All @@ -171,7 +179,7 @@ func (s *SequenceSender) Start(ctx context.Context) {
go s.sequenceSending(ctx)

// Start receiving the streaming
err = s.streamClient.ExecCommandStartBookmark(bookmark)
err = s.streamClient.ExecCommandStartBookmark(marshalledBookmark)
if err != nil {
log.Fatalf("[SeqSender] failed to connect to the streaming")
}
Expand Down
Loading

0 comments on commit 90f8258

Please sign in to comment.