/
server_context.go
113 lines (89 loc) · 3.03 KB
/
server_context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package rest
import (
"fmt"
"github.com/hyper-ml/hyperml/server/pkg/auth"
"github.com/hyper-ml/hyperml/server/pkg/base"
"github.com/hyper-ml/hyperml/server/pkg/flow"
"net/http"
"sync"
"time"
constants "github.com/hyper-ml/hyperml/server/pkg/config"
"github.com/hyper-ml/hyperml/server/pkg/db"
"github.com/hyper-ml/hyperml/server/pkg/pods"
qs_pkg "github.com/hyper-ml/hyperml/server/pkg/qs"
"github.com/hyper-ml/hyperml/server/pkg/requests"
"github.com/hyper-ml/hyperml/server/pkg/schedules"
"github.com/hyper-ml/hyperml/server/pkg/storage"
ws "github.com/hyper-ml/hyperml/server/pkg/workspace"
)
// ServerContext : Retains Server Context Parameters
type ServerContext struct {
config *constants.Config
DBContext db.DatabaseContext
lock sync.RWMutex
statsTicker *time.Ticker
HTTPClient *http.Client
authAPI auth.Server
objAPI storage.ObjectAPIServer
logAPI storage.ObjectAPIServer
flowAPI *flow.FlowServer
wsAPI ws.ApiServer
// vfsAPI *ws.VfsServer
podkeeper *pods.Keeper
usrReq *requests.RequestHandler
qs *qs_pkg.QueryServer
scheduler *schedules.NotebookScheduler
DisablePodRequest bool
}
// NewServerContext : Initiates a new server context
func NewServerContext(config *constants.Config) (*ServerContext, error) {
var err error
sc := &ServerContext{
config: config,
}
if config.GetBool(constants.Safemode) {
base.Out("Starting in safemode. k8s Errors will be ignored ")
}
sc.DBContext, err = db.NewDatabaseContext(config.DB)
if err != nil {
return nil, fmt.Errorf("failed to initiate db, err: %v", err)
}
// initiate query server
sc.qs = qs_pkg.NewQueryServer(sc.DBContext)
sc.authAPI = auth.NewAuthServer(sc.qs, config.NoAuth)
if !config.GetBool("DisableFlow") {
sc.objAPI, err = storage.NewObjectAPI(config.ObjStorage)
if err != nil {
return nil, fmt.Errorf("failed to initiate object server, err: %v", err)
}
sc.logAPI, err = storage.NewLogObjectAPI(config.ObjStorage)
if err != nil {
return nil, fmt.Errorf("failed to initiate log server, err: %v", err)
}
sc.wsAPI, err = ws.NewApiServer(sc.DBContext, sc.objAPI)
if err != nil {
return nil, fmt.Errorf("failed to iniate workspace server, err: %v", err)
}
sc.flowAPI, err = flow.NewFlowServer(config, sc.DBContext, sc.objAPI, sc.wsAPI, sc.logAPI)
if err != nil {
return nil, fmt.Errorf("failed to initate flow server, err: %v", err)
}
}
// initiate pod keeper
sc.podkeeper, err = pods.NewKeeper(config, sc.qs)
if err != nil {
if !config.GetBool(constants.Safemode) {
return sc, err
}
base.Error("Failed to Initiate Kubernetes Connection:", err)
base.Error("As the safe mode is ON, the server will allow queries but new Requests will now be disabled")
sc.DisablePodRequest = true
}
sc.scheduler = schedules.NewNotebookScheduler(sc.qs, sc.podkeeper, config)
sc.usrReq = requests.NewRequestHandler(sc.qs, config, sc.podkeeper, sc.scheduler)
return sc, nil
}
// ReadyForRequests : Disable pod requests in Safe Mode
func (sc *ServerContext) ReadyForRequests() bool {
return !sc.DisablePodRequest
}