Skip to content

Commit

Permalink
edit teamwork
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaowc-mac committed Jan 17, 2024
1 parent 599849a commit 9e6791a
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 47 deletions.
42 changes: 24 additions & 18 deletions core/utils/go_panic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package utils

import (
"errors"
"fmt"
"log"
"runtime/debug"
"sync"
Expand Down Expand Up @@ -39,26 +40,31 @@ func GoPanic(handlers ...func() error) (err error) {
return
}

// GoPanicShutdown 并发调用服务,只要一个 handler panic 程序 shutdown
func GoPanicShutdown(handlers ...func() error) error {
// GoPanicToError 并发调用服务,只要一个 handler panic 程序 shutdown
func GoPanicToError(handlers func() error) error {
stopChan := make(chan string)
// 假设我们要调用handlers这么多个服务
for _, f := range handlers {
// 每个函数启动一个协程
go func(handler func() error) {
defer func() {
// 每个协程内部使用recover捕获可能在调用逻辑中发生的panic
if e := recover(); e != nil {
log.Println(e)
//打印错误堆栈信息
debug.PrintStack()
}
stopChan <- "panic shutdown"
}()
// 取第一个报错的handler调用逻辑,并最终向外返回
handler()
}(f)
}
// for _, f := range handlers {
// 每个函数启动一个协程
go func(handler func() error) {
defer func() {
// 每个协程内部使用recover捕获可能在调用逻辑中发生的panic
e := recover()
if e != nil {
log.Println(e)
//打印错误堆栈信息
debug.PrintStack()
stopChan <- fmt.Sprintf("%s", e)
}

}()
// 取第一个报错的handler调用逻辑,并最终向外返回
err := handler()
if err != nil {
stopChan <- err.Error()
}
}(handlers)
// }

t := <-stopChan
return errors.New(t)
Expand Down
8 changes: 4 additions & 4 deletions microdule.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func (s *service) Close() {

func (s *service) Start() error {
if s.opts.Http != nil {
s.opts.Teamwork.Reginster("http", func() {
s.opts.Http.Run()
s.opts.Teamwork.Reginster("http", func() error {
return s.opts.Http.Run()
})
}

if s.opts.Rpc != nil {
s.opts.Teamwork.Reginster("rpc ", func() {
s.opts.Rpc.Run()
s.opts.Teamwork.Reginster("rpc ", func() error {
return s.opts.Rpc.Run()
})
}

Expand Down
4 changes: 0 additions & 4 deletions rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,10 @@ func (g *Grpc) Run() error {

lis, err := net.Listen("tcp", address)

defer g.EtcdRegister.Close()

if err != nil {
panic("grpc net listen error :" + err.Error())
}

go g.EtcdRegister.ListenLeaseRespChan()

err = g.RpcSrv.Serve(lis)
if err != nil {
return err
Expand Down
8 changes: 5 additions & 3 deletions rpc/grpc/etcd_grpc_srv_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package grpc

import (
"context"
"errors"
"log"
"strconv"

Expand Down Expand Up @@ -65,18 +66,19 @@ func (s *ServiceRegister) putKeyWithLease(lease int64) error {
}

//ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
func (s *ServiceRegister) ListenLeaseRespChan() error {
for leaseKeepResp := range s.keepAliveChan {
_ = leaseKeepResp
// log.Println("续约成功", leaseKeepResp)
}

select {
case <-s.Ctx.Done():
// log.Println("grpc listen close :", s.leaseID)
return nil
default:
panic("lease panic close")
return errors.New("lease close")
}

}

// Close 注销服务
Expand Down
23 changes: 14 additions & 9 deletions teamwork/work.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package teamwork
import (
"context"
"errors"
"fmt"
"log"
"runtime/debug"

"golang.org/x/exp/maps"
)

type Teamwork interface {
Reginster(name string, habdle func()) TeamworkClose
Reginster(name string, habdle func() error) TeamworkClose
WorkNum() int
Start() error
Close()
Expand All @@ -26,20 +27,20 @@ type TeamworkCloseStruct struct {

type TeamworkStruct struct {
Ctx context.Context
Handles map[string]func()
Handles map[string]func() error
HandlesClose []func()
}

func NewTeamwork() Teamwork {
return &TeamworkStruct{
Ctx: context.Background(),
Handles: make(map[string]func()),
Handles: make(map[string]func() error),
HandlesClose: make([]func(), 0),
}
}

func (t *TeamworkStruct) Reginster(name string, handle func()) TeamworkClose {
maps.Copy(t.Handles, map[string]func(){name: handle})
func (t *TeamworkStruct) Reginster(name string, handle func() error) TeamworkClose {
maps.Copy(t.Handles, map[string]func() error{name: handle})
return &TeamworkCloseStruct{
T: t,
}
Expand All @@ -58,19 +59,23 @@ func (t *TeamworkStruct) Start() error {
// 假设我们要调用handlers这么多个服务
for k, f := range t.Handles {
// 每个函数启动一个协程
go func(handler func(), k string) {
go func(handler func() error, k string) {
defer func() {
// 每个协程内部使用recover捕获可能在调用逻辑中发生的panic
if e := recover(); e != nil {
e := recover()
if e != nil {
log.Println(e)
//打印错误堆栈信息
debug.PrintStack()
stopChan <- fmt.Sprintf("%s", e)
}
stopChan <- "panic shutdown"
}()
// 取第一个报错的handler调用逻辑,并最终向外返回
log.Println(k, "start: Success")
handler()
err := handler()
if err != nil {
stopChan <- err.Error()
}
}(f, k)
}

Expand Down
16 changes: 7 additions & 9 deletions teamwork/work_test.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
package teamwork_test

import (
"log"
"testing"
"time"

"github.com/hihibug/microdule/teamwork"
)

func TestTeamWork(t *testing.T) {
tw := teamwork.NewTeamwork()

tw.Reginster("test", func() {
log.Println("test")
time.Sleep(10 * time.Second)
panic("err")
}).HandleClose(func() {
log.Println("test close")
})
// tw.Reginster("test", func() {
// log.Println("test")
// time.Sleep(10 * time.Second)
// panic("err")
// }).HandleClose(func() {
// log.Println("test close")
// })

if err := tw.Start(); err != nil {
tw.Close()
Expand Down

0 comments on commit 9e6791a

Please sign in to comment.