-
Notifications
You must be signed in to change notification settings - Fork 21
/
catalogservice_cassandra.go
148 lines (120 loc) · 4.98 KB
/
catalogservice_cassandra.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package catalogsvc
import (
"encoding/json"
"net/http"
"github.com/golang/glog"
"golang.org/x/net/context"
"github.com/jazzl0ver/firecamp/api/catalog"
"github.com/jazzl0ver/firecamp/api/manage"
"github.com/jazzl0ver/firecamp/api/manage/error"
"github.com/jazzl0ver/firecamp/catalog/cassandra"
)
func (s *CatalogHTTPServer) createCasService(ctx context.Context, w http.ResponseWriter, r *http.Request, requuid string) error {
// parse the request
req := &catalog.CatalogCreateCassandraRequest{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
glog.Errorln("CatalogCreateCassandraRequest decode request error", err, "requuid", requuid)
return clienterr.New(http.StatusBadRequest, err.Error())
}
err = s.checkRequest(req.Service, req.Resource)
if err != nil {
glog.Errorln("CatalogCreateCassandraRequest invalid request, local cluster", s.cluster,
"region", s.region, "requuid", requuid, req.Service, "error", err)
return err
}
err = cascatalog.ValidateRequest(req)
if err != nil {
glog.Errorln("invalid request", err, "requuid", requuid, req.Service, req.Options)
return clienterr.New(http.StatusBadRequest, err.Error())
}
// create the service in the control plane and the container platform
crReq, jmxUser, jmxPasswd := cascatalog.GenDefaultCreateServiceRequest(s.platform, s.region, s.azs,
s.cluster, req.Service.ServiceName, req.Options, req.Resource)
err = s.managecli.CreateService(ctx, crReq)
if err != nil {
glog.Errorln("createCommonService error", err, "requuid", requuid, req.Service)
return err
}
glog.Infoln("Cassandra is created, add the init task, requuid", requuid, req.Service)
if req.Options.Replicas != 1 {
// run the init task in the background
s.addCasInitTask(ctx, crReq.Service, requuid)
} else {
glog.Infoln("single node Cassandra, skip the init task, requuid", requuid, req.Service, req.Options)
err = s.managecli.SetServiceInitialized(ctx, req.Service)
if err != nil {
return err
}
}
// send back the jmx remote user & passwd
resp := &catalog.CatalogCreateCassandraResponse{
JmxRemoteUser: jmxUser,
JmxRemotePasswd: jmxPasswd,
}
b, err := json.Marshal(resp)
if err != nil {
glog.Errorln("Marshal CatalogCreateCassandraResponse error", err, "requuid", requuid, req.Service, req.Options)
return clienterr.New(http.StatusInternalServerError, err.Error())
}
w.WriteHeader(http.StatusOK)
w.Write(b)
return nil
}
func (s *CatalogHTTPServer) addCasInitTask(ctx context.Context, req *manage.ServiceCommonRequest, requuid string) {
taskReq := cascatalog.GenDefaultInitTaskRequest(req, s.serverurl)
s.catalogSvcInit.addInitTask(ctx, taskReq)
glog.Infoln("add init task for service", req, "requuid", requuid)
}
func (s *CatalogHTTPServer) scaleCasService(ctx context.Context, r *http.Request, requuid string) error {
// parse the request
req := &catalog.CatalogScaleCassandraRequest{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
glog.Errorln("CatalogScaleCassandraRequest decode request error", err, "requuid", requuid)
return clienterr.New(http.StatusBadRequest, err.Error())
}
err = s.checkCommonRequest(req.Service)
if err != nil {
glog.Errorln("CatalogScaleCassandraRequest invalid request, local cluster", s.cluster,
"region", s.region, "requuid", requuid, req.Service, "error", err)
return err
}
attr, err := s.managecli.GetServiceAttr(ctx, req.Service)
if err != nil {
glog.Errorln("GetServiceAttr error", err, "requuid", requuid, req.Service)
return err
}
if req.Replicas == attr.Spec.Replicas {
glog.Infoln("service already has", req.Replicas, "replicas, requuid", requuid, attr)
return nil
}
// not allow scaling down, as Cassandra nodes are peers. When one node goes down,
// Cassandra will automatically recover the failed replica on another node.
if req.Replicas < attr.Spec.Replicas {
errmsg := "scale down Cassandra service is not supported"
glog.Errorln(errmsg, "requuid", requuid, req.Service)
return clienterr.New(http.StatusBadRequest, errmsg)
}
// TODO scaling from 1 node requires to add new seed nodes.
if attr.Spec.Replicas == 1 {
errmsg := "not support to scale from 1 node, please have at least 3 nodes"
glog.Errorln(errmsg, "requuid", requuid, req.Service)
return clienterr.New(http.StatusBadRequest, errmsg)
}
glog.Infoln("scale cassandra service from", attr.Spec.Replicas, "to", req.Replicas, "requuid", requuid, attr)
// TODO for now, simply create the specs for all members, same for CheckAndCreateServiceMembers.
// optimize to bypass the existing members in the future.
replicaConfigs := cascatalog.GenReplicaConfigs(s.platform, s.cluster, req.Service.ServiceName, s.azs, req.Replicas)
scaleReq := &manage.ScaleServiceRequest{
Service: req.Service,
ReplicaConfigs: replicaConfigs,
}
err = s.managecli.ScaleService(ctx, scaleReq)
if err != nil {
glog.Errorln("create new service members error", err, "requuid", requuid, req.Service)
return err
}
glog.Infoln("scale servie to", req.Replicas, "requuid", requuid, req.Service)
return nil
}