Skip to content

Commit

Permalink
Merge pull request #607 from shenhui0509/master
Browse files Browse the repository at this point in the history
feat(produce) : reduce memory copy for on-wire protocol
  • Loading branch information
duhenglucky committed Mar 1, 2021
2 parents 7e36c75 + 9736ba8 commit 6cd3181
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
38 changes: 38 additions & 0 deletions internal/remote/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"io"
"sync/atomic"

jsoniter "github.com/json-iterator/go"
Expand Down Expand Up @@ -132,6 +133,43 @@ var (
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// + len | 4bytes | 4bytes | (21 + r_len + e_len) bytes | remain bytes +
// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
func (command *RemotingCommand) WriteTo(w io.Writer) error {
var (
header []byte
err error
)

switch codecType {
case JsonCodecs:
header, err = jsonSerializer.encodeHeader(command)
case RocketMQCodecs:
header, err = rocketMqSerializer.encodeHeader(command)
}

if err != nil {
return err
}

frameSize := 4 + len(header) + len(command.Body)
err = binary.Write(w, binary.BigEndian, int32(frameSize))
if err != nil {
return err
}

err = binary.Write(w, binary.BigEndian, markProtocolType(int32(len(header))))
if err != nil {
return err
}

_, err = w.Write(header)
if err != nil {
return err
}

_, err = w.Write(command.Body)
return err
}

func encode(command *RemotingCommand) ([]byte, error) {
var (
header []byte
Expand Down
8 changes: 3 additions & 5 deletions internal/remote/remote_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,9 @@ func (c *remotingClient) sendRequest(conn *tcpConnWrapper, request *RemotingComm
}

func (c *remotingClient) doRequest(conn *tcpConnWrapper, request *RemotingCommand) error {
content, err := encode(request)
if err != nil {
return err
}
_, err = conn.Write(content)
conn.Lock()
defer conn.Unlock()
err := request.WriteTo(conn)
if err != nil {
c.closeConnection(conn)
return err
Expand Down
2 changes: 2 additions & 0 deletions internal/remote/tcp_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package remote
import (
"context"
"net"
"sync"

"go.uber.org/atomic"
)

// TODO: Adding TCP Connections Pool, https://github.com/apache/rocketmq-client-go/v2/issues/298
type tcpConnWrapper struct {
net.Conn
sync.Mutex
closed atomic.Bool
}

Expand Down

0 comments on commit 6cd3181

Please sign in to comment.