-
Notifications
You must be signed in to change notification settings - Fork 112
/
handler.go
138 lines (122 loc) · 3.19 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright (c) 2023, donnie <donnie4w@gmail.com>
// All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//
// github.com/donnie4w/tim
//
package level1
import (
"context"
"sync/atomic"
"time"
. "github.com/donnie4w/gofer/util"
"github.com/donnie4w/simplelog/logging"
. "github.com/donnie4w/tim/stub"
"github.com/donnie4w/tim/util"
)
type tnetServer struct {
servermux *serverMux
ok []byte
}
func (this *tnetServer) handle(processor Itnet, handler func(tc *tlContext), cliError func(tc *tlContext)) {
if this.servermux == nil {
this.servermux = &serverMux{}
this.servermux.Handle(processor, handler, cliError)
}
}
func (this *tnetServer) Serve(_addr string) (err error) {
if _addr, err = util.ParseAddr(_addr); err != nil {
return
}
err = tsfclientserver.server( _addr, this.servermux.processor, this.servermux.handler, this.servermux.cliError, this.ok)
return
}
func (this *tnetServer) Connect(_addr string, async bool) (err error) {
if _addr, err = util.ParseAddr(_addr); err != nil {
return
}
return tsfserverclient.server(_addr, this.servermux.processor, this.servermux.handler, this.servermux.cliError, async)
}
type serverMux struct {
processor Itnet
handler func(tc *tlContext)
cliError func(tc *tlContext)
}
func (this *serverMux) Handle(processor Itnet, handler func(tc *tlContext), cliError func(tc *tlContext)) {
this.processor = processor
this.handler = handler
this.cliError = cliError
}
func myServer2ClientHandler(tc *tlContext) {
}
func myClient2ServerHandler(tc *tlContext) {
defer util.Recover()
if !tc.isClose {
ab := newchapBean()
ab.IDcard = tc.id
if bs, err := encodeChapBean(ab); err == nil {
if err := tc.iface.Chap(context.Background(), bs); err != nil {
logging.Error(err)
}
}
}
}
func mySecvErrorHandler(tc *tlContext) {
go reconn(tc)
}
func myCliErrorHandler(tc *tlContext) {
clientLinkCache.Del(tc.remoteAddr)
go reconn(tc)
}
func reconn(tc *tlContext) {
if tc == nil || tc.remoteAddr == "" || tc.remoteUuid == 0 {
return
}
defer util.Recover()
logging.Info(">>>[", tc.remoteUuid, "][", tc.remoteAddr, "]")
nodeWare.del(tc)
if !tc.isServer && !tc._do_reconn {
tc._do_reconn = true
i := 0
for !nodeWare.hasUUID(tc.remoteUuid) {
if clientLinkCache.Has(tc.remoteAddr) {
<-time.After(time.Duration(Rand(6)) * time.Second)
} else {
if err1, err2 := tnetservice.Connect(tc.remoteAddr, false); err1 != nil {
break
} else if err2 != nil {
if i < 100 {
i++
} else if i > 1<<13 {
break
}
<-time.After(time.Duration(Rand(6+i)) * time.Second)
}
}
}
}
}
func heardbeat() {
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ticker.C:
<-time.After(time.Duration(Rand(5)) * time.Second)
_heardbeat(nodeWare.GetAllTlContext())
}
}
}
func _heardbeat(tcs []*tlContext) {
defer util.Recover()
for _, tc := range tcs {
func(tc *tlContext) {
defer util.Recover()
tc.iface.Ping(context.TODO(), piBs(tc))
if atomic.AddInt64(&tc.pingNum, 1) > 8 {
logging.Error("ping failed:[", tc.remoteUuid, "][", tc.remoteAddr, "] ping number:", tc.pingNum)
go reconn(tc)
}
}(tc)
}
}