Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
去掉打印日志,tcp框架性能优化
Browse files Browse the repository at this point in the history
  • Loading branch information
cr-mao committed Aug 2, 2023
1 parent ec69382 commit 86ecebd
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 38 deletions.
5 changes: 2 additions & 3 deletions cluster/gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package gate

import (
"context"
"fmt"
"github.com/cr-mao/lorig/cluster"

"github.com/cr-mao/lorig/component"
Expand Down Expand Up @@ -93,14 +92,14 @@ func (g *Gate) stopNetworkServer() {
// 处理连接打开
func (g *Gate) handleConnect(conn network.Conn) {

fmt.Println("add conn to session", conn.ID())
//fmt.Println("add conn to session", conn.ID())
g.session.AddConn(conn)
// 无需通知node连接 ,相信大部分场景 是不用让node知道的
}

// 处理断开连接
func (g *Gate) handleDisconnect(conn network.Conn) {
fmt.Println("disconnect")
//fmt.Println("disconnect")
g.session.RemConn(conn)
// 断链推送 给 业务服务器....
ctx, cancel := context.WithTimeout(g.ctx, g.opts.timeout)
Expand Down
5 changes: 0 additions & 5 deletions cluster/gate/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package gate

import (
"context"
"fmt"
"github.com/cr-mao/lorig/errors"
"github.com/cr-mao/lorig/log"
)

var (
Expand All @@ -23,12 +21,9 @@ func newProvider(gate *Gate) *provider {

// Bind 绑定用户与网关间的关系
func (p *provider) Bind(ctx context.Context, cid, uid int64) error {
fmt.Println("绑定网关")
fmt.Println(cid, uid)
if cid <= 0 || uid <= 0 {
return ErrInvalidArgument
}
log.Infof("连接:%d ,绑定用户id:%d", cid, uid)
err := p.gate.session.Bind(cid, uid)
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions cluster/gate/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func (np *proxy) GetNodeServerConn() (network.Conn, error) {
tcpClient.OnReceive(func(conn network.Conn, data []byte) {
innerMsg := &cluster.InternalServerMsg{}
innerMsg.UnPack(data)
log.Info("tcp client 收到 connid :", innerMsg.ConnId)
log.Info("tcp client 收到 userid:", innerMsg.UserId)
//log.Info("tcp client 收到 connid :", innerMsg.ConnId)
//log.Info("tcp client 收到 userid:", innerMsg.UserId)
gateConn, err := np.gate.session.Conn(session.Conn, innerMsg.ConnId)
if err != nil {
log.Errorf("get conn by connid err:%+v", err)
Expand All @@ -70,10 +70,10 @@ func (np *proxy) GetNodeServerConn() (network.Conn, error) {
defer cancel()
// 第一次绑定用户
if gateConn.UID() <= 0 && innerMsg.UserId > 0 {
log.Infof("userId:%d, 绑定session", innerMsg.UserId)
//log.Infof("userId:%d, 绑定session", innerMsg.UserId)
err = np.gate.provider.Bind(ctx, innerMsg.ConnId, innerMsg.UserId)
if err != nil {
log.Errorf("")
log.Error(err)
return
}
}
Expand Down Expand Up @@ -123,9 +123,9 @@ func (np *proxy) unbindGate(ctx context.Context, connId int64, userId int64) err
return err
}

// 绑定用户与网关间的关系
// 绑定用户与网关间的关系 todo 确定下放在哪在哪里 合适。
func (p *proxy) bindGate(ctx context.Context, cid, uid int64) error {
log.Infof("location gate :%d 绑定:%d", p.gate.opts.id, uid)
// 这里有性能问题
err := p.gate.opts.location.Set(ctx, uid, cluster.Gate, xconv.Int32ToString(p.gate.opts.id))
if err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions network/tcp/client_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (c *clientConn) forceClose() (err error) {
c.rw.Unlock()

if c.client.disconnectHandler != nil {
c.client.disconnectHandler(c)
go c.client.disconnectHandler(c)
}

return
Expand Down Expand Up @@ -263,20 +263,20 @@ func (c *clientConn) write() {
return
}

c.rw.RLock()
//c.rw.RLock()
if r.typ == closeSig {
c.done <- struct{}{}
c.rw.RUnlock()
//c.rw.RUnlock()
return
}

if atomic.LoadInt32(&c.state) == int32(network.ConnClosed) {
c.rw.RUnlock()
//c.rw.RUnlock()
return
}

err := write(c.conn, r.msg)
c.rw.RUnlock()
//c.rw.RUnlock()

if err != nil {
log.Errorf("write message error: %v", err)
Expand Down
22 changes: 18 additions & 4 deletions network/tcp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,13 @@ func TestNewClient_Dial(t *testing.T) {

func Test_Benchmark(t *testing.T) {
// 并发数
concurrency := 1000
concurrency := 6000
// 消息量
total := 500000
total := 12000
// 总共发送的消息条数
totalSent := int64(0)
// 总共接收的消息条数
totalRecv := int64(0)

// 准备消息
msg, err := packet.Pack(&packet.Message{
Seq: 1,
Expand All @@ -100,6 +99,20 @@ func Test_Benchmark(t *testing.T) {
client.OnReceive(func(conn network.Conn, msg []byte) {
atomic.AddInt64(&totalRecv, 1)

message, err := packet.Unpack(msg)
if err != nil {
fmt.Println(err)
}
if message.Seq != 1 {
fmt.Println("seq error")
}
if message.Route != 1 {
fmt.Println("Route error")
}
if string(message.Buffer) != "login ok" {
fmt.Println("date error")
}

wg.Done()
})

Expand All @@ -118,7 +131,7 @@ func Test_Benchmark(t *testing.T) {
}

conns[i] = conn
time.Sleep(time.Millisecond)
time.Sleep(time.Millisecond * 2)
}

// 发送消息
Expand All @@ -142,6 +155,7 @@ func Test_Benchmark(t *testing.T) {
}
}
}(conn)

}

startTime := time.Now().UnixNano()
Expand Down
13 changes: 5 additions & 8 deletions network/tcp/server_conn.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package tcp

import (
"fmt"
"github.com/cr-mao/lorig/utils/xcall"
"net"
"sync"
Expand Down Expand Up @@ -53,7 +52,7 @@ func (c *serverConn) Unbind() {
atomic.StoreInt64(&c.uid, 0)
}

// Send 发送消息(同步)
// Send 发送消息(同步),废弃
func (c *serverConn) Send(msg []byte) error {
c.rw.RLock()
defer c.rw.RUnlock()
Expand Down Expand Up @@ -289,7 +288,6 @@ func (c *serverConn) write() {
var ticker *time.Ticker

if c.connMgr.server.opts.heartbeatInterval > 0 {
fmt.Println("心跳时间", c.connMgr.server.opts.heartbeatInterval)
ticker = time.NewTicker(c.connMgr.server.opts.heartbeatInterval)
defer ticker.Stop()
} else {
Expand All @@ -303,26 +301,25 @@ func (c *serverConn) write() {
return
}

c.rw.RLock()
//c.rw.RLock()
if r.typ == closeSig {
c.done <- struct{}{}
c.rw.RUnlock()
//c.rw.RUnlock()
return
}

if atomic.LoadInt32(&c.state) == int32(network.ConnClosed) {
c.rw.RUnlock()
//c.rw.RUnlock()
return
}

err := write(c.conn, r.msg)
c.rw.RUnlock()
//c.rw.RUnlock()

if err != nil {
log.Errorf("write message error: %v", err)
}
case <-ticker.C:
fmt.Println("心跳。。。。")
deadline := xtime.Now().Add(-2 * c.connMgr.server.opts.heartbeatInterval).Unix()
if atomic.LoadInt64(&c.lastHeartbeatTime) < deadline {
log.Debugf("server connection heartbeat timeout")
Expand Down
11 changes: 4 additions & 7 deletions session/session.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package session

import (
"fmt"
"github.com/cr-mao/lorig/log"
"sync"

Expand Down Expand Up @@ -42,8 +41,6 @@ func (s *Session) AddConn(conn network.Conn) {
cid, uid := conn.ID(), conn.UID()

s.conns[cid] = conn
fmt.Println("add connbbb", cid)
fmt.Printf("%v", s.conns)

if uid != 0 {
s.users[uid] = conn
Expand Down Expand Up @@ -314,17 +311,17 @@ func (s *Session) Conn(kind Kind, target int64) (network.Conn, error) {
switch kind {
case Conn:
conn, ok := s.conns[target]
log.Info("现在session中到连接 ", s.conns)
log.Info("现在session中到用户 ", s.users)
//log.Info("现在session中到连接 ", s.conns)
//log.Info("现在session中到用户 ", s.users)

if !ok {
return nil, ErrNotFoundSession
}
return conn, nil
case User:
conn, ok := s.users[target]
log.Info("现在session中到连接 ", s.conns)
log.Info("现在session中到用户 ", s.users)
//log.Info("现在session中到连接 ", s.conns)
//log.Info("现在session中到用户 ", s.users)
if !ok {
return nil, ErrNotFoundSession
}
Expand Down

0 comments on commit 86ecebd

Please sign in to comment.