Skip to content

Commit

Permalink
chore(refactor): slightly adjust naming of ACKs
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Dec 18, 2020
1 parent 3685060 commit 5d18027
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
36 changes: 20 additions & 16 deletions telegram/ack.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,50 @@ func (c *Client) ackLoop(ctx context.Context) {
c.wg.Add(1)
defer c.wg.Done()

log := c.log.Named("ack")

const (
ackMaxBatchSize = 20
ackForcedSendTimeout = time.Second * 15
maxBatchSize = 20
forcedSendTimeout = time.Second * 15
)

var (
buff = make([]int64, 0, ackMaxBatchSize)
timer = time.NewTimer(ackForcedSendTimeout)
buf = make([]int64, 0, maxBatchSize)

// TODO(ernado): remove side-effect.
timer = time.NewTimer(forcedSendTimeout)
)

sendAcks := func(ctx context.Context) {
defer func() { buff = buff[:0] }()
send := func() {
defer func() { buf = buf[:0] }()

if err := c.writeServiceMessage(ctx, &mt.MsgsAck{MsgIds: buff}); err != nil {
c.log.Error("send acks", zap.Error(err))
if err := c.writeServiceMessage(ctx, &mt.MsgsAck{MsgIds: buf}); err != nil {
c.log.Error("Failed to ACK", zap.Error(err))
return
}

c.log.Info("sent acks", zap.Int64s("message-ids", buff))
log.Info("ACK", zap.Int64s("message_ids", buf))
}

for {
select {
case <-ctx.Done():
return
case <-timer.C:
if len(buff) > 0 {
sendAcks(ctx)
if len(buf) > 0 {
send()
}
case msgID := <-c.ackSendChan:
buff = append(buff, msgID)
if len(buff) == ackMaxBatchSize {
sendAcks(ctx)
timer.Reset(ackForcedSendTimeout)
buf = append(buf, msgID)
if len(buf) == maxBatchSize {
send()
timer.Reset(forcedSendTimeout)
}
}
}
}

func (c *Client) ackOutcomingRPC(ctx context.Context, req request) {
func (c *Client) rpcRetryUntilAck(ctx context.Context, req request) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down
10 changes: 7 additions & 3 deletions telegram/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,21 @@ type Client struct {
updateHandler UpdateHandler // immutable
sessionStorage SessionStorage // immutable

// callbacks for RPC requests, protected by rpcMux
// callbacks for RPC requests, protected by rpcMux.
// Key is request message id.
rpc map[int64]func(b *bin.Buffer, rpcErr error) error
rpcMux sync.Mutex

// callbacks for ack protected by ackMux
ack map[int64]func()
ackMux sync.Mutex

ackSendChan chan int64 // channel for outcoming acks
// ackSendChan is queue for outgoing message id's that require waiting for
// ACK from server.
ackSendChan chan int64

// callbacks for ping results protected by pingMux
// callbacks for ping results protected by pingMux.
// Key is ping id.
ping map[int64]func()
pingMux sync.Mutex

Expand Down
4 changes: 2 additions & 2 deletions telegram/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func (c *Client) rpcDoRequest(ctx context.Context, req request) error {
ackCtx, ackClose := context.WithCancel(c.ctx)
defer ackClose()

// resend request until we receive ack or response for it
go c.ackOutcomingRPC(ackCtx, req)
// Start retrying.
go c.rpcRetryUntilAck(ackCtx, req)

select {
case <-ctx.Done():
Expand Down

0 comments on commit 5d18027

Please sign in to comment.