Skip to content

Commit

Permalink
chore(proto): Optimize thriftproto code
Browse files Browse the repository at this point in the history
Change-Id: I58690c37fbb6c4b645bb4dd6bfee40cd51039769
  • Loading branch information
andeya committed Jun 25, 2019
1 parent 308f909 commit 8e88eb2
Showing 1 changed file with 37 additions and 22 deletions.
59 changes: 37 additions & 22 deletions proto/thriftproto/thriftproto.go
Expand Up @@ -28,18 +28,18 @@ func NewTProtoFunc(transFactory thrift.TTransportFactory, protoFactory thrift.TP
name: "thrift",
rwCounter: utils.NewReadWriteCounter(rw),
}
var tTransport thrift.TTransport = &BaseTTransport{
p.tTransport = &BaseTTransport{
ReadWriteCounter: p.rwCounter,
}
if transFactory != nil {
t, err := transFactory.GetTransport(tTransport)
trans, err := transFactory.GetTransport(p.tTransport)
if err != nil {
tp.Errorf("still using the base transport because it failed to wrap it: %s", err.Error())
} else {
tTransport = t
p.tTransport = trans
}
}
p.tProtocol = protoFactory.GetProtocol(tTransport)
p.tProtocol = protoFactory.GetProtocol(p.tTransport)
p.payloadPool = sync.Pool{
New: func() interface{} {
return NewPayload()
Expand All @@ -54,6 +54,7 @@ type thriftproto struct {
name string
rwCounter *utils.ReadWriteCounter
tProtocol thrift.TProtocol
tTransport thrift.TTransport
packLock sync.Mutex
unpackLock sync.Mutex
payloadPool sync.Pool
Expand Down Expand Up @@ -83,15 +84,6 @@ func (t *thriftproto) Pack(m tp.Message) error {
*pd = Payload{}
defer t.payloadPool.Put(pd)

var typeID thrift.TMessageType
switch m.Mtype() {
case tp.TypeCall:
typeID = thrift.CALL
case tp.TypeReply:
typeID = thrift.REPLY
case tp.TypePush:
typeID = thrift.ONEWAY
}
pd.Meta = m.Meta().QueryString()
pd.BodyCodec = int32(m.BodyCodec())
pd.XferPipe = m.XferPipe().IDs()
Expand All @@ -102,7 +94,7 @@ func (t *thriftproto) Pack(m tp.Message) error {

// pack
t.rwCounter.WriteCounter.Zero()
if err := t.tProtocol.WriteMessageBegin(m.ServiceMethod(), typeID, m.Seq()); err != nil {
if err := WriteMessageBegin(t.tProtocol, m); err != nil {
return err
}

Expand All @@ -113,7 +105,7 @@ func (t *thriftproto) Pack(m tp.Message) error {
if err = t.tProtocol.WriteMessageEnd(); err != nil {
return err
}
if err = t.tProtocol.Flush(nil); err != nil {
if err = t.tProtocol.Flush(m.Context()); err != nil {
return err
}

Expand Down Expand Up @@ -147,7 +139,8 @@ func (t *thriftproto) unpack(m tp.Message) (*Payload, error) {
t.unpackLock.Lock()
defer t.unpackLock.Unlock()
t.rwCounter.WriteCounter.Zero()
rMethod, rTypeID, rSeqID, err := t.tProtocol.ReadMessageBegin()

err := ReadMessageBegin(t.tProtocol, m)
if err != nil {
return nil, err
}
Expand All @@ -165,9 +158,35 @@ func (t *thriftproto) unpack(m tp.Message) (*Payload, error) {
return nil, err
}

if err = t.tProtocol.ReadMessageEnd(); err != nil {
t.payloadPool.Put(pd)
return nil, err
}
return pd, nil
}

// WriteMessageBegin write a message header to the wire.
func WriteMessageBegin(tProtocol thrift.TProtocol, m tp.Message) error {
var typeID thrift.TMessageType
switch m.Mtype() {
case tp.TypeCall:
typeID = thrift.CALL
case tp.TypeReply:
typeID = thrift.REPLY
case tp.TypePush:
typeID = thrift.ONEWAY
}
return tProtocol.WriteMessageBegin(m.ServiceMethod(), typeID, m.Seq())
}

// ReadMessageBegin read a message header.
func ReadMessageBegin(tProtocol thrift.TProtocol, m tp.Message) error {
rMethod, rTypeID, rSeqID, err := tProtocol.ReadMessageBegin()
if err != nil {
return err
}
m.SetServiceMethod(rMethod)
m.SetSeq(rSeqID)

switch rTypeID {
case thrift.CALL:
m.SetMtype(tp.TypeCall)
Expand All @@ -178,11 +197,7 @@ func (t *thriftproto) unpack(m tp.Message) (*Payload, error) {
default:
m.SetMtype(tp.TypePush)
}
if err = t.tProtocol.ReadMessageEnd(); err != nil {
t.payloadPool.Put(pd)
return nil, err
}
return pd, nil
return nil
}

// BaseTTransport the base thrift transport
Expand Down

0 comments on commit 8e88eb2

Please sign in to comment.