diff --git a/internal/client.go b/internal/client.go index c3fafbc1..a6e1bc5b 100644 --- a/internal/client.go +++ b/internal/client.go @@ -43,7 +43,7 @@ const ( // Pulling topic information interval from the named server _PullNameServerInterval = 30 * time.Second - // Pulling topic information interval from the named server + // Sending heart beat interval to all broker _HeartbeatBrokerInterval = 30 * time.Second // Offset persistent interval for consumer @@ -54,7 +54,7 @@ const ( ) var ( - ErrServiceState = errors.New("service state is not running, please check") + ErrServiceState = errors.New("service close is not running, please check") _VIPChannelEnable = false ) @@ -129,6 +129,7 @@ type RMQClient struct { remoteClient *remote.RemotingClient hbMutex sync.Mutex + close bool } var clientMap sync.Map @@ -150,6 +151,9 @@ func GetOrNewRocketMQClient(option ClientOptions) *RMQClient { } func (c *RMQClient) Start() { + //ctx, cancel := context.WithCancel(context.Background()) + //c.cancel = cancel + c.close = false c.once.Do(func() { // TODO fetchNameServerAddr go func() {}() @@ -158,7 +162,7 @@ func (c *RMQClient) Start() { go func() { // delay time.Sleep(50 * time.Millisecond) - for { + for !c.close{ c.UpdateTopicRouteInfo() time.Sleep(_PullNameServerInterval) } @@ -166,7 +170,7 @@ func (c *RMQClient) Start() { // TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock go func() { - for { + for !c.close{ cleanOfflineBroker() c.SendHeartbeatToAllBrokerWithLock() time.Sleep(_HeartbeatBrokerInterval) @@ -176,7 +180,7 @@ func (c *RMQClient) Start() { // schedule persist offset go func() { //time.Sleep(10 * time.Second) - for { + for !c.close{ c.consumerMap.Range(func(key, value interface{}) bool { consumer := value.(InnerConsumer) consumer.PersistConsumerOffset() @@ -187,7 +191,7 @@ func (c *RMQClient) Start() { }() go func() { - for { + for !c.close{ c.RebalanceImmediately() time.Sleep(_RebalanceInterval) } @@ -196,7 +200,8 @@ func (c *RMQClient) Start() { } func (c *RMQClient) Shutdown() { - // TODO + c.remoteClient.ShutDown() + c.close = true } func (c *RMQClient) ClientID() string { @@ -209,18 +214,28 @@ func (c *RMQClient) ClientID() string { func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) { + if c.close { + return nil, ErrServiceState + } return c.remoteClient.InvokeSync(addr, request, timeoutMillis) } func (c *RMQClient) InvokeAsync(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error { + if c.close { + return ErrServiceState + } return c.remoteClient.InvokeAsync(addr, request, timeoutMillis, func(future *remote.ResponseFuture) { f(future.ResponseCommand, future.Err) }) + } func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) error { + if c.close { + return ErrServiceState + } return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis) } diff --git a/internal/remote/codec.go b/internal/remote/codec.go index aebf75bb..434678e9 100644 --- a/internal/remote/codec.go +++ b/internal/remote/codec.go @@ -73,7 +73,7 @@ func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingC } func (command *RemotingCommand) String() string { - return fmt.Sprintf("Code: %d, Opaque: %d, Remark: %s, ExtFields: %v", + return fmt.Sprintf("Code: %d, opaque: %d, Remark: %s, ExtFields: %v", command.Code, command.Opaque, command.Remark, command.ExtFields) } diff --git a/internal/remote/codec_test.go b/internal/remote/codec_test.go index 2e836c71..a3159471 100644 --- a/internal/remote/codec_test.go +++ b/internal/remote/codec_test.go @@ -96,7 +96,7 @@ func Test_decode(t *testing.T) { t.Fatalf("wrong Version. want=%d, got=%d", rc.Version, decodedRc.Version) } if rc.Opaque != decodedRc.Opaque { - t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque, decodedRc.Opaque) + t.Fatalf("wrong opaque. want=%d, got=%d", rc.Opaque, decodedRc.Opaque) } if rc.Remark != decodedRc.Remark { t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark, decodedRc.Remark) @@ -167,7 +167,7 @@ func Test_jsonCodec_decodeHeader(t *testing.T) { t.Fatalf("wrong Version. want=%d, got=%d", rc.Version, decodedRc.Version) } if rc.Opaque != decodedRc.Opaque { - t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque, decodedRc.Opaque) + t.Fatalf("wrong opaque. want=%d, got=%d", rc.Opaque, decodedRc.Opaque) } if rc.Remark != decodedRc.Remark { t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark, decodedRc.Remark) @@ -237,7 +237,7 @@ func Test_rmqCodec_decodeHeader(t *testing.T) { t.Fatalf("wrong Version. want=%d, got=%d", rc.Version, decodedRc.Version) } if rc.Opaque != decodedRc.Opaque { - t.Fatalf("wrong Opaque. want=%d, got=%d", rc.Opaque, decodedRc.Opaque) + t.Fatalf("wrong opaque. want=%d, got=%d", rc.Opaque, decodedRc.Opaque) } if rc.Remark != decodedRc.Remark { t.Fatalf("wrong remark. want=%s, got=%s", rc.Remark, decodedRc.Remark) diff --git a/internal/remote/future.go b/internal/remote/future.go index 62a9640a..93990e51 100644 --- a/internal/remote/future.go +++ b/internal/remote/future.go @@ -28,21 +28,21 @@ type ResponseFuture struct { SendRequestOK bool Err error Opaque int32 - TimeoutMillis time.Duration + Timeout time.Duration callback func(*ResponseFuture) - BeginTimestamp int64 + BeginTimestamp time.Duration Done chan bool callbackOnce sync.Once } // NewResponseFuture create ResponseFuture with opaque, timeout and callback -func NewResponseFuture(opaque int32, timeoutMillis time.Duration, callback func(*ResponseFuture)) *ResponseFuture { +func NewResponseFuture(opaque int32, timeout time.Duration, callback func(*ResponseFuture)) *ResponseFuture { return &ResponseFuture{ Opaque: opaque, Done: make(chan bool), - TimeoutMillis: timeoutMillis, + Timeout: timeout, callback: callback, - BeginTimestamp: time.Now().Unix() * 1000, + BeginTimestamp: time.Duration(time.Now().Unix()) * time.Second, } } @@ -55,8 +55,8 @@ func (r *ResponseFuture) executeInvokeCallback() { } func (r *ResponseFuture) isTimeout() bool { - diff := time.Now().Unix()*1000 - r.BeginTimestamp - return diff > int64(r.TimeoutMillis) + elapse := time.Duration(time.Now().Unix())*time.Second - r.BeginTimestamp + return elapse > r.Timeout } func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) { @@ -64,7 +64,7 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) { cmd *RemotingCommand err error ) - timer := time.NewTimer(r.TimeoutMillis * time.Millisecond) + timer := time.NewTimer(r.Timeout) for { select { case <-r.Done: diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go index fde17e46..1d0330a1 100644 --- a/internal/remote/remote_client.go +++ b/internal/remote/remote_client.go @@ -32,7 +32,6 @@ import ( var ( //ErrRequestTimeout for request timeout error ErrRequestTimeout = errors.New("request timeout") - connectionLocker sync.Mutex ) type ClientRequestFunc func(*RemotingCommand) *RemotingCommand @@ -42,10 +41,11 @@ type TcpOption struct { } type RemotingClient struct { - responseTable sync.Map - connectionTable sync.Map - option TcpOption - processors map[int16]ClientRequestFunc + responseTable sync.Map + connectionTable sync.Map + option TcpOption + processors map[int16]ClientRequestFunc + connectionLocker sync.Mutex } func NewRemotingClient() *RemotingClient { @@ -59,15 +59,15 @@ func (c *RemotingClient) RegisterRequestFunc(code int16, f ClientRequestFunc) { } // TODO: merge sync and async model. sync should run on async model by blocking on chan -func (c *RemotingClient) InvokeSync(addr string, request *RemotingCommand, timeoutMillis time.Duration) (*RemotingCommand, error) { +func (c *RemotingClient) InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) { conn, err := c.connect(addr) if err != nil { return nil, err } - resp := NewResponseFuture(request.Opaque, timeoutMillis, nil) + resp := NewResponseFuture(request.Opaque, timeout, nil) c.responseTable.Store(resp.Opaque, resp) - err = c.sendRequest(conn, request) defer c.responseTable.Delete(request.Opaque) + err = c.sendRequest(conn, request) if err != nil { return nil, err } @@ -75,13 +75,13 @@ func (c *RemotingClient) InvokeSync(addr string, request *RemotingCommand, timeo return resp.waitResponse() } -// InvokeAsync send request witout blocking, just return immediately. -func (c *RemotingClient) InvokeAsync(addr string, request *RemotingCommand, timeoutMillis time.Duration, callback func(*ResponseFuture)) error { +// InvokeAsync send request without blocking, just return immediately. +func (c *RemotingClient) InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error { conn, err := c.connect(addr) if err != nil { return err } - resp := NewResponseFuture(request.Opaque, timeoutMillis, callback) + resp := NewResponseFuture(request.Opaque, timeout, callback) c.responseTable.Store(resp.Opaque, resp) err = c.sendRequest(conn, request) if err != nil { @@ -107,27 +107,10 @@ func (c *RemotingClient) InvokeOneWay(addr string, request *RemotingCommand, tim return c.sendRequest(conn, request) } -func (c *RemotingClient) ScanResponseTable() { - rfs := make([]*ResponseFuture, 0) - c.responseTable.Range(func(key, value interface{}) bool { - if resp, ok := value.(*ResponseFuture); ok { - if (resp.BeginTimestamp + int64(resp.TimeoutMillis) + 1000) <= time.Now().Unix()*1000 { - rfs = append(rfs, resp) - c.responseTable.Delete(key) - } - } - return true - }) - for _, rf := range rfs { - rf.Err = ErrRequestTimeout - rf.executeInvokeCallback() - } -} - func (c *RemotingClient) connect(addr string) (net.Conn, error) { //it needs additional locker. - connectionLocker.Lock() - defer connectionLocker.Unlock() + c.connectionLocker.Lock() + defer c.connectionLocker.Unlock() conn, ok := c.connectionTable.Load(addr) if ok { return conn.(net.Conn), nil @@ -181,7 +164,7 @@ func (c *RemotingClient) receiveResponse(r net.Conn) { } } if scanner.Err() != nil { - rlog.Errorf("net: %s scanner exit, err: %s.", r.RemoteAddr().String(), scanner.Err()) + rlog.Errorf("net: %s scanner exit, Err: %s.", r.RemoteAddr().String(), scanner.Err()) } else { rlog.Infof("net: %s scanner exit.", r.RemoteAddr().String()) } @@ -237,3 +220,15 @@ func (c *RemotingClient) closeConnection(toCloseConn net.Conn) { } }) } + +func (c *RemotingClient) ShutDown() { + c.responseTable.Range(func(key, value interface{}) bool { + c.responseTable.Delete(key) + return true + }) + c.connectionTable.Range(func(key, value interface{}) bool { + conn := value.(net.Conn) + conn.Close() + return true + }) +} diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go index 6cc63cf2..4efb9907 100644 --- a/internal/remote/remote_client_test.go +++ b/internal/remote/remote_client_test.go @@ -19,6 +19,7 @@ package remote import ( "bytes" "errors" + "math/rand" "net" "reflect" "sync" @@ -31,23 +32,23 @@ import ( func TestNewResponseFuture(t *testing.T) { future := NewResponseFuture(10, time.Duration(1000), nil) if future.Opaque != 10 { - t.Errorf("wrong ResponseFuture's Opaque. want=%d, got=%d", 10, future.Opaque) + t.Errorf("wrong ResponseFuture's opaque. want=%d, got=%d", 10, future.Opaque) } if future.SendRequestOK != false { - t.Errorf("wrong ResposneFutrue's SendRequestOK. want=%t, got=%t", false, future.SendRequestOK) + t.Errorf("wrong ResposneFutrue's sendRequestOK. want=%t, got=%t", false, future.SendRequestOK) } if future.Err != nil { t.Errorf("wrong RespnseFuture's Err. want=, got=%v", future.Err) } - if future.TimeoutMillis != time.Duration(1000) { + if future.Timeout != time.Duration(1000) { t.Errorf("wrong ResponseFuture's TimeoutMills. want=%d, got=%d", - future.TimeoutMillis, time.Duration(1000)) + future.Timeout, time.Duration(1000)) } if future.callback != nil { t.Errorf("wrong ResponseFuture's callback. want=, got!=") } if future.Done == nil { - t.Errorf("wrong ResponseFuture's Done. want=, got=") + t.Errorf("wrong ResponseFuture's done. want=, got=") } } @@ -80,11 +81,11 @@ func TestResponseFutureTimeout(t *testing.T) { } func TestResponseFutureIsTimeout(t *testing.T) { - future := NewResponseFuture(10, time.Duration(500), nil) + future := NewResponseFuture(10, 500 * time.Millisecond, nil) if future.isTimeout() != false { t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", false, future.isTimeout()) } - time.Sleep(time.Duration(2000) * time.Millisecond) + time.Sleep(time.Duration(700) * time.Millisecond) if future.isTimeout() != true { t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", true, future.isTimeout()) } @@ -92,12 +93,12 @@ func TestResponseFutureIsTimeout(t *testing.T) { } func TestResponseFutureWaitResponse(t *testing.T) { - future := NewResponseFuture(10, time.Duration(500), nil) + future := NewResponseFuture(10, 500 * time.Millisecond, nil) if _, err := future.waitResponse(); err != ErrRequestTimeout { t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v", ErrRequestTimeout, err) } - future = NewResponseFuture(10, time.Duration(500), nil) + future = NewResponseFuture(10, 500 * time.Millisecond, nil) responseError := errors.New("response error") go func() { time.Sleep(100 * time.Millisecond) @@ -108,7 +109,7 @@ func TestResponseFutureWaitResponse(t *testing.T) { t.Errorf("wrong ResponseFuture waitResponse. want=%v. got=%v", responseError, err) } - future = NewResponseFuture(10, time.Duration(500), nil) + future = NewResponseFuture(10, 500 * time.Millisecond, nil) responseRemotingCommand := NewRemotingCommand(202, nil, nil) go func() { time.Sleep(100 * time.Millisecond) @@ -146,7 +147,7 @@ func TestCreateScanner(t *testing.T) { t.Fatalf("wrong Version. want=%d, got=%d", r.Version, rcr.Version) } if r.Opaque != rcr.Opaque { - t.Fatalf("wrong Opaque. want=%d, got=%d", r.Opaque, rcr.Opaque) + t.Fatalf("wrong opaque. want=%d, got=%d", r.Opaque, rcr.Opaque) } if r.Flag != rcr.Flag { t.Fatalf("wrong flag. want=%d, got=%d", r.Opaque, rcr.Opaque) @@ -167,7 +168,7 @@ func TestInvokeSync(t *testing.T) { client := NewRemotingClient() go func() { receiveCommand, err := client.InvokeSync(":3000", - clientSendRemtingCommand, time.Duration(1000)) + clientSendRemtingCommand, time.Second) if err != nil { t.Fatalf("failed to invoke synchronous. %s", err) } else { @@ -214,65 +215,58 @@ func TestInvokeSync(t *testing.T) { } func TestInvokeAsync(t *testing.T) { - clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ")) - serverSendRemotingCommand := NewRemotingCommand(20, nil, []byte("Welcome native")) - serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque - serverSendRemotingCommand.Flag = ResponseType - var wg sync.WaitGroup - wg.Add(1) + cnt := 50 + wg.Add(cnt) client := NewRemotingClient() - go func() { - time.Sleep(1 * time.Second) - t.Logf("invoke async method") - err := client.InvokeAsync(":3000", clientSendRemtingCommand, - time.Duration(1000), func(r *ResponseFuture) { - t.Logf("invoke async callback") - if string(r.ResponseCommand.Body) != "Welcome native" { - t.Errorf("wrong responseCommand.Body. want=%s, got=%s", - "Welcome native", string(r.ResponseCommand.Body)) + for i:=0; i < cnt; i++ { + go func(index int) { + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + t.Logf("[Send: %d] asychronous message", index) + sendRemotingCommand := randomNewRemotingCommand() + err := client.InvokeAsync(":3000", sendRemotingCommand, time.Second, func(r *ResponseFuture) { + t.Logf("[Receive: %d] asychronous message response", index) + if string(sendRemotingCommand.Body) != string(r.ResponseCommand.Body) { + t.Errorf("wrong response message. want=%s, got=%s", string(sendRemotingCommand.Body), + string(r.ResponseCommand.Body)) } wg.Done() }) - if err != nil { - t.Errorf("failed to invokeSync. %s", err) - } - - }() + if err != nil { + t.Errorf("failed to invokeAsync. %s", err) + } + }(i) + } l, err := net.Listen("tcp", ":3000") if err != nil { - t.Fatal(err) + t.Fatalf("failed to create tcp network. %s", err) } defer l.Close() + count := 0 for { conn, err := l.Accept() if err != nil { - return + t.Fatalf("failed to create connection. %s", err) } defer conn.Close() scanner := client.createScanner(conn) for scanner.Scan() { - t.Logf("receive request.") - receivedRemotingCommand, err := decode(scanner.Bytes()) + t.Log("receive request") + r, err := decode(scanner.Bytes()) if err != nil { - t.Errorf("failed to decode RemotingCommnad. %s", err) - } - if clientSendRemtingCommand.Code != receivedRemotingCommand.Code { - t.Errorf("wrong code. want=%d, got=%d", receivedRemotingCommand.Code, - clientSendRemtingCommand.Code) - } - t.Logf("encoding response") - body, err := encode(serverSendRemotingCommand) - if err != nil { - t.Fatalf("failed to encode RemotingCommand") + t.Errorf("failed to decode RemotingCommand %s", err) } + r.markResponseType() + body, _ := encode(r) _, err = conn.Write(body) - t.Logf("sent response") if err != nil { - t.Fatalf("failed to write body to conneciton.") + t.Fatalf("failed to send response %s", err) + } + count++ + if count >= cnt { + goto done } - goto done } } done: @@ -367,3 +361,5 @@ func TestInvokeOneWay(t *testing.T) { } wg.Done() } + + diff --git a/internal/validators.go b/internal/validators.go index 1f8f5f4a..8b6f1223 100644 --- a/internal/validators.go +++ b/internal/validators.go @@ -29,7 +29,7 @@ const ( ) var ( - _Pattern, _ = regexp.Compile("_ValidPattern") + _Pattern, _ = regexp.Compile(_ValidPattern) ) func ValidateGroup(group string) {