Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions pkg/consensus/mimicry/p2p/reqresp/v1/chunked_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,22 @@ func NewChunkedHandler[TReq, TResp any](
func (h *ChunkedHandler[TReq, TResp]) HandleStream(ctx context.Context, stream network.Stream) {
defer stream.Close()

// Set deadline if configured
// Recover from panics
defer func() {
if r := recover(); r != nil {
h.log.WithField("panic", r).Error("Chunked handler panicked")
_ = h.writeErrorResponse(stream, StatusServerError)
}
}()

// Create context with timeout if configured
handlerCtx := ctx

var cancel context.CancelFunc
if h.config.RequestTimeout > 0 {
handlerCtx, cancel = context.WithTimeout(ctx, h.config.RequestTimeout)
defer cancel()

deadline := time.Now().Add(h.config.RequestTimeout)
if err := stream.SetDeadline(deadline); err != nil {
h.log.WithError(err).Debug("Failed to set stream deadline")
Expand Down Expand Up @@ -168,7 +182,7 @@ func (h *ChunkedHandler[TReq, TResp]) HandleStream(ctx context.Context, stream n
}

// Process request with chunked writer
err = h.handler(ctx, req, peerID, writer)
err = h.handler(handlerCtx, req, peerID, writer)
if err != nil {
h.log.WithError(err).WithField("peer", peerID).Debug("Chunked handler returned error")
// Try to send error status if writer hasn't written anything yet
Expand All @@ -194,6 +208,10 @@ func (h *ChunkedHandler[TReq, TResp]) readRequest(stream network.Stream) (TReq,
}

size := binary.BigEndian.Uint32(sizeBytes[:])
if size == 0 {
return req, fmt.Errorf("empty request")
}

if uint64(size) > h.protocol.MaxRequestSize() {
return req, fmt.Errorf("request size %d exceeds max %d", size, h.protocol.MaxRequestSize())
}
Expand Down
Loading
Loading