Skip to content

Commit

Permalink
Merge pull request #2105 from yyforyongyu/fix-batch-mem-leak
Browse files Browse the repository at this point in the history
rpcclient: make sure batch requests are GCed
  • Loading branch information
Roasbeef committed Jan 23, 2024
2 parents 17fdc52 + 9cda0f7 commit 62e6af0
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 20 deletions.
58 changes: 38 additions & 20 deletions rpcclient/infrastructure.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,21 @@ func (c *Client) removeRequest(id uint64) *jsonRequest {
c.requestLock.Lock()
defer c.requestLock.Unlock()

element := c.requestMap[id]
if element != nil {
delete(c.requestMap, id)
request := c.requestList.Remove(element).(*jsonRequest)
return request
element, ok := c.requestMap[id]
if !ok {
return nil
}

return nil
delete(c.requestMap, id)

var request *jsonRequest
if c.batch {
request = c.batchList.Remove(element).(*jsonRequest)
} else {
request = c.requestList.Remove(element).(*jsonRequest)
}

return request
}

// removeAllRequests removes all the jsonRequests which contain the response
Expand Down Expand Up @@ -1733,28 +1740,38 @@ func (c *Client) Send() error {
return nil
}

// clear batchlist in case of an error
defer func() {
batchResp, err := c.sendAsync().Receive()
if err != nil {
// Clear batchlist in case of an error.
//
// TODO(yy): need to double check to make sure there's no
// concurrent access to this batch list, otherwise we may miss
// some batched requests.
c.batchList = list.New()
}()

result, err := c.sendAsync().Receive()

if err != nil {
return err
}

for iter := c.batchList.Front(); iter != nil; iter = iter.Next() {
var requestError error
request := iter.Value.(*jsonRequest)
individualResult := result[request.id]
fullResult, err := json.Marshal(individualResult.Result)
// Iterate each response and send it to the corresponding request.
for id, resp := range batchResp {
// Perform a GC on batchList and requestMap before moving
// forward.
request := c.removeRequest(id)

// If there's an error, we log it and continue to the next
// request.
fullResult, err := json.Marshal(resp.Result)
if err != nil {
return err
log.Errorf("Unable to marshal result: %v for req=%v",
err, request.id)

continue
}

if individualResult.Error != nil {
requestError = individualResult.Error
// If there's a response error, we send it back the request.
var requestError error
if resp.Error != nil {
requestError = resp.Error
}

result := Response{
Expand All @@ -1763,5 +1780,6 @@ func (c *Client) Send() error {
}
request.responseChan <- &result
}

return nil
}
14 changes: 14 additions & 0 deletions wire/msgblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ type MsgBlock struct {
Transactions []*MsgTx
}

// Copy creates a deep copy of MsgBlock.
func (msg *MsgBlock) Copy() *MsgBlock {
block := &MsgBlock{
Header: msg.Header,
Transactions: make([]*MsgTx, len(msg.Transactions)),
}

for i, tx := range msg.Transactions {
block.Transactions[i] = tx.Copy()
}

return block
}

// AddTransaction adds a transaction to the message.
func (msg *MsgBlock) AddTransaction(tx *MsgTx) error {
msg.Transactions = append(msg.Transactions, tx)
Expand Down

0 comments on commit 62e6af0

Please sign in to comment.