Skip to content
Permalink
Browse files
Merge pull request #54 from apache/feature/delet_wq
Rem: session.wQ
  • Loading branch information
AlexStocks committed Dec 13, 2020
2 parents ff08429 + 020cac6 commit a68aa365ef3e73613d8a7b47b3b16fbc55f0d5e4
Showing 20 changed files with 130 additions and 263 deletions.
@@ -24,7 +24,7 @@ jobs:
# DING_SIGN: SECbcc50d56d7315e57da8469d05da306d6cd825348a781861a42084e9579f1aebb
DING_TOKEN: ${{ secrets.DING_TOKEN }}
DING_SIGN: ${{ secrets.DING_SIGN }}

steps:

- name: Set up Go ${{ matrix.go_version }}
@@ -59,6 +59,12 @@ jobs:
go fmt ./... && [[ -z `git status -s` ]]
/tmp/tools/license/license-header-checker -v -a -r -i vendor /tmp/tools/license/license.txt . go && [[ -z `git status -s` ]]
- name: Install go ci lint
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.27.0

- name: Run Linter
run: golangci-lint run --timeout=10m -v --disable-all --enable=govet --enable=staticcheck --enable=ineffassign --enable=misspell

- name: Test
run: go mod vendor && go test $(go list ./... | grep -v vendor | grep -v demo) -coverprofile=coverage.txt -covermode=atomic

@@ -71,7 +77,7 @@ jobs:
uses: zcong1993/actions-ding@v3.0.1
# Whether job is successful or not, always () is always true.
if: |
always() &&
always() &&
github.event_name == 'push' &&
github.repository == 'apache/dubbo-getty'
with:
@@ -87,11 +93,11 @@ jobs:
"text": "## Github Actions \n - name: CI \n - repository: ${{ github.repository }} \n - trigger: ${{ github.actor }} \n - event: ${{ github.event_name }} \n - ref: ${{ github.ref }} \n - status: [${{ job.status }}](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}) \n - environment: ${{ runner.os }} \n - SHA: [${{ github.sha }}](${{ github.event.compare }})"
}
}
- name: DingTalk Message Notify only PR
uses: zcong1993/actions-ding@v3.0.1
if: |
always() &&
if: |
always() &&
github.event_name == 'pull_request' &&
github.repository == 'apache/dubbo-getty'
with:
@@ -126,7 +126,7 @@ func NewWSSClient(opts ...ClientOption) Client {
c := newClient(WSS_CLIENT, opts...)

if c.cert == "" {
panic(fmt.Sprintf("@certs:%s", c.cert))
panic(fmt.Sprintf("@cert:%s", c.cert))
}
if !strings.HasPrefix(c.addr, "wss://") {
panic(fmt.Sprintf("the prefix @serverAddr:%s is not wss://", c.addr))
@@ -135,11 +135,11 @@ func NewWSSClient(opts ...ClientOption) Client {
return c
}

func (c client) ID() EndPointID {
func (c *client) ID() EndPointID {
return c.endPointID
}

func (c client) EndPointType() EndPointType {
func (c *client) EndPointType() EndPointType {
return c.endPointType
}

@@ -154,7 +154,7 @@ func (c *client) dialTCP() Session {
return nil
}
if c.sslEnabled {
if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil {
if sslConfig, buildTlsConfErr := c.tlsConfigBuilder.BuildTlsConfig(); buildTlsConfErr == nil && sslConfig != nil {
d := &net.Dialer{Timeout: connectTimeout}
conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig)
}
@@ -285,7 +285,7 @@ func (c *client) dialWSS() Session {
if c.cert != "" {
certPEMBlock, err := ioutil.ReadFile(c.cert)
if err != nil {
panic(fmt.Sprintf("ioutil.ReadFile(certs:%s) = error:%+v", c.cert, perrors.WithStack(err)))
panic(fmt.Sprintf("ioutil.ReadFile(cert:%s) = error:%+v", c.cert, perrors.WithStack(err)))
}

var cert tls.Certificate
@@ -307,7 +307,7 @@ func (c *client) dialWSS() Session {
for _, c := range config.Certificates {
roots, err = x509.ParseCertificates(c.Certificate[len(c.Certificate)-1])
if err != nil {
panic(fmt.Sprintf("error parsing server's root certs: %+v\n", perrors.WithStack(err)))
panic(fmt.Sprintf("error parsing server's root cert: %+v\n", perrors.WithStack(err)))
}
for _, root = range roots {
certPool.AddCert(root)
@@ -84,15 +84,13 @@ func (p *Package) Unmarshal(buf *bytes.Buffer) (int, error) { return 0, nil }
func newSessionCallback(session Session, handler *MessageHandler) error {
var pkgHandler PackageHandler
session.SetName("hello-client-session")
session.SetMaxMsgLen(1024)
session.SetMaxMsgLen(128 * 1024) // max message package length 128k
session.SetPkgHandler(&pkgHandler)
session.SetEventListener(handler)
session.SetWQLen(32)
session.SetReadTimeout(3e9)
session.SetWriteTimeout(3e9)
session.SetCronPeriod((int)(30e9 / 1e6))
session.SetWaitTime(3e9)
session.SetTaskPool(nil)

return nil
}
@@ -295,8 +293,10 @@ func TestNewWSClient(t *testing.T) {
assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
err = ss.WriteBytes([]byte("hello"))
assert.Nil(t, err)
assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
err = ss.WriteBytesArray([]byte("hello"), []byte("hello"))
assert.Nil(t, err)
assert.Equal(t, beforeWritePkgNum+3, atomic.LoadUint32(&conn.writePkgNum))
err = conn.writePing()
assert.Nil(t, err)
@@ -16,8 +16,6 @@ run server:
Or run server in task pool mode:
```bash
go run tcp/server/server.go -taskPool=true \
-task_queue_length=128 \
-task_queue_number=16 \
-task_pool_size=2000 \
-pprof_port=60000
```
@@ -31,8 +29,6 @@ go run tcp/client/client.go
Or run client in task pool mode:
```bash
go run tcp/client/client.go -taskPool=true \
-task_queue_length=100 \
-task_queue_number=4 \
-task_pool_size=50 \
-pprof_port=60001
```
@@ -27,13 +27,14 @@ var (

func ClientRequest() {
for _, session := range Sessions {
ss := session
go func() {
echoTimes := 10
for i := 0; i < echoTimes; i++ {
err := session.WritePkg("hello", WritePkgTimeout)
err := ss.WritePkg("hello", WritePkgTimeout)
if err != nil {
log.Infof("session.WritePkg(session{%s}, error{%v}", session.Stat(), err)
session.Close()
log.Infof("session.WritePkg(session{%s}, error{%v}", ss.Stat(), err)
ss.Close()
}
}
log.Infof("after loop %d times", echoTimes)
@@ -36,11 +36,9 @@ var (
ip = flag.String("ip", "127.0.0.1", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections")

taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
)

var (
@@ -55,16 +53,13 @@ func main() {
util.Profiling(*pprofPort)

if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize)
}

client := getty.NewTCPClient(
getty.WithServerAddress(*ip+":8090"),
getty.WithConnectionNumber(*connections),
getty.WithClientTaskPool(taskPool),
)

client.RunEventLoop(NewHelloClientSession)
@@ -62,8 +62,7 @@ func InitialSession(session getty.Session) (err error) {
}

session.SetName("hello")
session.SetMaxMsgLen(128)
session.SetWQLen(512)
session.SetMaxMsgLen(128 * 1024) // max message package length is 128k
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(hello.CronPeriod / 1e6))
@@ -32,11 +32,9 @@ import (
)

var (
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
)

var (
@@ -53,11 +51,8 @@ func main() {
options := []getty.ServerOption{getty.WithLocalAddress(":8090")}

if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize)
options = append(options, getty.WithServerTaskPool(taskPool))
}

server := getty.NewTCPServer(options...)
@@ -37,15 +37,13 @@ var (
ip = flag.String("ip", "127.0.0.1", "server IP")
connections = flag.Int("conn", 1, "number of tcp connections")

taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65431, "pprof http port")
)

var (
taskPool *gxsync.TaskPool
taskPool gxsync.GenericTaskPool
)

func main() {
@@ -56,11 +54,7 @@ func main() {
util.Profiling(*pprofPort)

if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize)
}
keyPath, _ := filepath.Abs("./demo/hello/tls/certs/ca.key")
caPemPath, _ := filepath.Abs("./demo/hello/tls/certs/ca.pem")
@@ -74,6 +68,7 @@ func main() {
getty.WithClientSslEnabled(true),
getty.WithClientTlsConfigBuilder(config),
getty.WithConnectionNumber(*connections),
getty.WithClientTaskPool(taskPool),
)

client.RunEventLoop(NewHelloClientSession)
@@ -42,8 +42,7 @@ func InitialSession(session getty.Session) (err error) {
_, ok := session.Conn().(*tls.Conn)
if ok {
session.SetName("hello")
session.SetMaxMsgLen(128)
session.SetWQLen(512)
session.SetMaxMsgLen(128 * 1024) // max message package length is 128k
session.SetReadTimeout(time.Second)
session.SetWriteTimeout(5 * time.Second)
session.SetCronPeriod(int(hello.CronPeriod / 1e6))
@@ -33,16 +33,14 @@ import (
)

var (
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolQueueLength = flag.Int("task_queue_length", 100, "task queue length")
taskPoolQueueNumber = flag.Int("task_queue_number", 4, "task queue number")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
Sessions []getty.Session
taskPoolMode = flag.Bool("taskPool", false, "task pool mode")
taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size")
pprofPort = flag.Int("pprof_port", 65432, "pprof http port")
Sessions []getty.Session
)

var (
taskPool *gxsync.TaskPool
taskPool gxsync.GenericTaskPool
)

func main() {
@@ -61,17 +59,13 @@ func main() {
ServerTrustCertCollectionPath: caPemPath,
}

if *taskPoolMode {
taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize)
}
options := []getty.ServerOption{getty.WithLocalAddress(":8090"),
getty.WithServerSslEnabled(true),
getty.WithServerTlsConfigBuilder(c),
}

if *taskPoolMode {
taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
)
getty.WithServerTaskPool(taskPool),
}

server := getty.NewTCPServer(options...)
@@ -86,7 +80,6 @@ func NewHelloServerSession(session getty.Session) (err error) {
if err != nil {
return
}
session.SetTaskPool(taskPool)

return
}
File renamed without changes.
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package util

func SetLimit() {
}
@@ -29,7 +29,7 @@ type Closer interface {

func WaitCloseSignals(closer Closer) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, os.Kill, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
signal.Notify(signals, os.Interrupt, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
<-signals
closer.Close()
}
@@ -163,10 +163,7 @@ type Session interface {
SetWriter(Writer)
SetCronPeriod(int)

SetWQLen(int)
SetWaitTime(time.Duration)
// Deprecated: don't use SetTaskPool, move to endpoints layer.
SetTaskPool(*gxsync.TaskPool)

GetAttribute(interface{}) interface{}
SetAttribute(interface{}, interface{})
2 go.mod
@@ -3,7 +3,7 @@ module github.com/apache/dubbo-getty
go 1.14

require (
github.com/dubbogo/gost v1.9.8
github.com/dubbogo/gost v1.9.9
github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.2
github.com/pkg/errors v0.9.1

0 comments on commit a68aa36

Please sign in to comment.