Skip to content

Commit

Permalink
Remove message id map from source
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillem committed Jan 26, 2024
1 parent d256b18 commit 21de28d
Showing 1 changed file with 2 additions and 21 deletions.
23 changes: 2 additions & 21 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/apache/pulsar-client-go/pulsar/log"
Expand All @@ -32,16 +31,11 @@ type Source struct {
sdk.UnimplementedSource

consumer pulsar.Consumer
received map[string]pulsar.Message
mx *sync.Mutex
config SourceConfig
}

func NewSource() sdk.Source {
source := &Source{
mx: &sync.Mutex{},
received: make(map[string]pulsar.Message),
}
source := &Source{}

return sdk.SourceWithMiddleware(source, sdk.DefaultSourceMiddleware()...)
}
Expand Down Expand Up @@ -130,11 +124,6 @@ func (s *Source) Read(ctx context.Context) (sdk.Record, error) {
}

msgID := msg.ID()
msgIDStr := msgID.String()

s.mx.Lock()
s.received[msgIDStr] = msg
s.mx.Unlock()

position := Position{
MessageID: msgID.Serialize(),
Expand Down Expand Up @@ -166,15 +155,7 @@ func (s *Source) Ack(ctx context.Context, position sdk.Position) error {
return fmt.Errorf("failed to deserialize message ID: %w", err)
}

s.mx.Lock()
defer s.mx.Unlock()
msg, ok := s.received[msgID.String()]
if ok {
delete(s.received, msgID.String())
return s.consumer.Ack(msg)
}

return fmt.Errorf("message not found for position: %s", string(position))
return s.consumer.AckID(msgID)
}

func (s *Source) Teardown(_ context.Context) error {
Expand Down

0 comments on commit 21de28d

Please sign in to comment.