/
server.go
106 lines (91 loc) · 2.42 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package server
import (
"context"
"fmt"
"github.com/Haze-Lan/haze-go/discovery"
"github.com/Haze-Lan/haze-go/event"
"github.com/Haze-Lan/haze-go/logger"
"github.com/Haze-Lan/haze-go/option"
"google.golang.org/grpc"
gservice "google.golang.org/grpc/channelz/service"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/reflection"
"net"
"strconv"
"sync"
)
var log = grpclog.Component("application")
type Server struct {
discovery discovery.Discovery
rpc *grpc.Server
opt *option.ServerOptions
status int
lock sync.RWMutex
quit chan int
}
func NewServer() *Server {
logger.NewLogger()
rpc := grpc.NewServer()
gservice.RegisterChannelzServiceToServer(rpc)
discovery := discovery.NewRegistry()
reflection.Register(rpc)
server := &Server{
rpc: rpc,
quit: make(chan int),
opt: option.ServerOptionsInstance,
discovery: discovery,
}
return server
}
func (s *Server) Start() error {
defer s.Shutdown()
//启动grpc
go startGrpc(s)
go s.handleSignals()
go healthLis(s)
//注册服务
go registerThisService(s)
log.Infof("应用[%s]启动在本机[%d]端口完成", s.opt.Name, s.opt.Port)
<-s.quit
return nil
}
func (s *Server) RegisterService(sd grpc.ServiceDesc, ins interface{}) {
s.rpc.RegisterService(&sd, ins)
}
func (s *Server) Shutdown() {
log.Info("System starts to exit")
s.quit <- 1
s.rpc.Stop()
event.GlobalEventBus.Publish(event.EVENT_TOPIC_SERVER_QUIT, nil)
}
//启动rpc服务
func startGrpc(s *Server) {
defer func() { s.quit <- 1 }()
lis, err := net.Listen("tcp", ":"+strconv.FormatUint(s.opt.Port, 10))
if err != nil {
log.Fatal(err.Error())
}
err = s.rpc.Serve(lis)
if err != nil {
log.Fatal("The GRPC component failed to start ")
}
log.Info("The GRPC component is closed")
}
func (s *Server) GetService(serviceName string) *grpc.ClientConn {
go s.discovery.WatchServices(context.TODO(), serviceName)
conn, err := grpc.Dial(fmt.Sprintf("%s:///%s", "etcd", serviceName), grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
return conn
}
//注册本实例
func registerThisService(s *Server) {
var ins = discovery.NewInstance(fmt.Sprintf("%s:%d", s.opt.Host, s.opt.Port))
s.discovery.RegisterService(context.TODO(), ins)
}
func healthLis(s *Server) {
/*hsrv := health.NewServer()
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(s.rpc, hsrv)*/
}