/
server.go
104 lines (90 loc) · 2.62 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// Copyright 2018 The go-fractal Authors
// This file is part of the go-fractal library.
// Package rpcserver contains implementations for net rpc server.
package rpcserver
import (
"context"
"fmt"
"github.com/fractal-platform/fractal/rpc"
"github.com/fractal-platform/fractal/utils/log"
"net/http"
"time"
"github.com/rs/cors"
)
const (
maxRequestContentLength = 1024 * 128
)
// Server represents a RPC server
type Server struct {
// http server
httpServer *http.Server
// callback handler for request
reqHandler *reqHandler
// where we provide the service functions
serviceHolder *serviceHolder
}
func newCorsHandler(srv http.Handler, allowedOrigins []string) http.Handler {
// disable CORS support if user has not specified a custom CORS configuration
if len(allowedOrigins) == 0 {
return srv
}
c := cors.New(cors.Options{
AllowedOrigins: allowedOrigins,
AllowedMethods: []string{http.MethodPost, http.MethodGet},
MaxAge: 600,
AllowedHeaders: []string{"*"},
})
return c.Handler(srv)
}
// NewServer create http server for rpc & websocket requests
func NewServer(cors []string, addr string) *Server {
serviceHolder := newServiceHolder()
reqHandler := &reqHandler{
rpcHandler: newRpcHandler(serviceHolder),
wsHandler: newWsHandler(serviceHolder),
}
httpServer := &http.Server{
Addr: addr,
Handler: newCorsHandler(reqHandler, cors),
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 120 * time.Second,
}
return &Server{
httpServer: httpServer,
reqHandler: reqHandler,
serviceHolder: serviceHolder,
}
}
// RegisterApis register api for server
func (srv *Server) RegisterApis(apis []RpcApi) {
for _, api := range apis {
err := srv.serviceHolder.register(api.Namespace, api.Service)
if err != nil {
log.Error("Register Api failed", "namespace", api.Namespace, "service", api.Service, "err", err)
}
}
}
// ListenAndServe starts the request handler loop
func (srv *Server) ListenAndServe() {
srv.httpServer.ListenAndServe()
}
func (srv *Server) Shutdown() {
srv.httpServer.Shutdown(context.Background())
}
// reqHandler encapsulate the handler for both rpc & websocket
type reqHandler struct {
rpcHandler *rpcHandler
wsHandler *wsHandler
}
// callback handler for http server
func (h *reqHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// we differs the rpc & websocket request by request uri
if r.RequestURI == rpc.RPCPath {
h.rpcHandler.handleRpc(w, r)
} else if r.RequestURI == rpc.WebSocketPath {
h.wsHandler.handleWs(w, r)
} else {
http.Error(w, fmt.Sprintf("uri %s not found", r.RequestURI), http.StatusNotFound)
}
}