diff --git a/bcs-common/common/encrypt/des.go b/bcs-common/common/encrypt/des.go index 66905646bb..dbd9b53f75 100644 --- a/bcs-common/common/encrypt/des.go +++ b/bcs-common/common/encrypt/des.go @@ -18,7 +18,6 @@ import ( "crypto/cipher" "crypto/des" "encoding/base64" - "github.com/Tencent/bk-bcs/bcs-common/common/static" ) diff --git a/bcs-common/common/websocketDialer/session.go b/bcs-common/common/websocketDialer/session.go index 411231cf79..287872fb2a 100644 --- a/bcs-common/common/websocketDialer/session.go +++ b/bcs-common/common/websocketDialer/session.go @@ -96,7 +96,7 @@ func (s *Session) startPings(rootCtx context.Context) { if err := s.conn.conn.WriteControl(websocket.PingMessage, []byte(""), time.Now().Add(time.Second)); err != nil { blog.Errorf("Error writing ping: %s", err.Error()) } - blog.Debug("Wrote ping") + //blog.Debug("Wrote ping") s.conn.Unlock() } } diff --git a/bcs-k8s/bcs-kube-agent/app/websocket.go b/bcs-k8s/bcs-kube-agent/app/websocket.go index 56c2b48845..f4966cfe8b 100644 --- a/bcs-k8s/bcs-kube-agent/app/websocket.go +++ b/bcs-k8s/bcs-kube-agent/app/websocket.go @@ -102,7 +102,7 @@ func buildWebsocketToBke(cfg *rest.Config) error { go func() { for { - wsURL := fmt.Sprintf("wss://%s/bcsapi/v1/websocket/connect", bkeServerUrl.Host) + wsURL := fmt.Sprintf("wss://%s/bcsapi/v4/usermanager/v1/websocket/connect", bkeServerUrl.Host) blog.Infof("Connecting to %s with token %s", wsURL, registerToken) websocketDialer.ClientConnect(context.Background(), wsURL, headers, tlsConfig, nil, func(proto, address string) bool { diff --git a/bcs-mesos/bcs-mesos-driver/mesosdriver/websocket.go b/bcs-mesos/bcs-mesos-driver/mesosdriver/websocket.go index d1612fabe8..d12a7d4e6f 100644 --- a/bcs-mesos/bcs-mesos-driver/mesosdriver/websocket.go +++ b/bcs-mesos/bcs-mesos-driver/mesosdriver/websocket.go @@ -92,7 +92,7 @@ func (m *MesosDriver) buildWebsocketToApi() error { go func() { for { - wsURL := fmt.Sprintf("wss://%s/bcsapi/v1/websocket/connect", bcsApiUrl.Host) + wsURL := fmt.Sprintf("wss://%s/bcsapi/v4/usermanager/v1/websocket/connect", bcsApiUrl.Host) blog.Infof("Connecting to %s with token %s", wsURL, m.config.RegisterToken) websocketDialer.ClientConnect(context.Background(), wsURL, headers, tlsConfig, nil, func(proto, address string) bool { diff --git a/bcs-services/bcs-user-manager/app/app.go b/bcs-services/bcs-user-manager/app/app.go index b90bf9339c..cc6b0c2343 100644 --- a/bcs-services/bcs-user-manager/app/app.go +++ b/bcs-services/bcs-user-manager/app/app.go @@ -14,11 +14,13 @@ package app import ( + "fmt" "os" "github.com/Tencent/bk-bcs/bcs-common/common" "github.com/Tencent/bk-bcs/bcs-common/common/blog" "github.com/Tencent/bk-bcs/bcs-common/common/encrypt" + "github.com/Tencent/bk-bcs/bcs-common/common/ssl" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/metrics" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/config" @@ -27,11 +29,27 @@ import ( // Run run app func Run(op *options.UserManagerOptions) { - conf := parseConfig(op) - + conf, err := parseConfig(op) + if err != nil { + blog.Errorf("error parse config: %s", err.Error()) + os.Exit(1) + } + if conf.ClientCert.IsSSL { + cliTls, err := ssl.ClientTslConfVerity(conf.ClientCert.CAFile, conf.ClientCert.CertFile, conf.ClientCert.KeyFile, conf.ClientCert.CertPasswd) + if err != nil { + blog.Errorf("set client tls config error %s", err.Error()) + } else { + config.CliTls = cliTls + blog.Infof("set client tls config success") + } + } userManager := user_manager.NewUserManager(conf) + + //register to zk + userManager.RegDiscover() + //start userManager, and http service - err := userManager.Start() + err = userManager.Start() if err != nil { blog.Errorf("start processor error %s, and exit", err.Error()) os.Exit(1) @@ -42,13 +60,11 @@ func Run(op *options.UserManagerOptions) { blog.Error("fail to save pid: err:%s", err.Error()) } - //register to zk - userManager.RegDiscover() - metrics.RunMetric(conf) } -func parseConfig(op *options.UserManagerOptions) *config.UserMgrConfig { +// parseConfig parse the option to config +func parseConfig(op *options.UserManagerOptions) (*config.UserMgrConfig, error) { userMgrConfig := config.NewUserMgrConfig() userMgrConfig.Address = op.Address @@ -60,25 +76,23 @@ func parseConfig(op *options.UserManagerOptions) *config.UserMgrConfig { userMgrConfig.MetricPort = op.MetricPort userMgrConfig.BootStrapUsers = op.BootStrapUsers userMgrConfig.TKE = op.TKE + userMgrConfig.PeerToken = op.PeerToken config.Tke = op.TKE secretId, err := encrypt.DesDecryptFromBase([]byte(config.Tke.SecretId)) if err != nil { - blog.Errorf("error decrypting tke secretId and exit: %s", err.Error()) - os.Exit(1) + return nil, fmt.Errorf("error decrypting tke secretId and exit: %s", err.Error()) } config.Tke.SecretId = string(secretId) secretKey, err := encrypt.DesDecryptFromBase([]byte(config.Tke.SecretKey)) if err != nil { - blog.Errorf("error decrypting tke secretKey and exit: %s", err.Error()) - os.Exit(1) + return nil, fmt.Errorf("error decrypting tke secretKey and exit: %s", err.Error()) } config.Tke.SecretKey = string(secretKey) dsn, err := encrypt.DesDecryptFromBase([]byte(op.DSN)) if err != nil { - blog.Errorf("error decrypting db config and exit: %s", err.Error()) - os.Exit(1) + return nil, fmt.Errorf("error decrypting db config and exit: %s", err.Error()) } userMgrConfig.DSN = string(dsn) @@ -100,5 +114,5 @@ func parseConfig(op *options.UserManagerOptions) *config.UserMgrConfig { userMgrConfig.ClientCert.IsSSL = true } - return userMgrConfig + return userMgrConfig, nil } diff --git a/bcs-services/bcs-user-manager/app/app_test.go b/bcs-services/bcs-user-manager/app/app_test.go new file mode 100644 index 0000000000..60be16c2fe --- /dev/null +++ b/bcs-services/bcs-user-manager/app/app_test.go @@ -0,0 +1,34 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package app + +import ( + "testing" + + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/options" +) + +// TestParseConfig test the parseConfig func +func TestParseConfig(t *testing.T) { + op := options.UserManagerOptions{ + TKE: options.TKEOptions{ + SecretId: "abcdefg", + }, + } + _, err := parseConfig(&op) + if err == nil { + t.Error("empty EncryptionKey can't encrypt") + } + +} diff --git a/bcs-services/bcs-user-manager/app/tunnel-handler/k8s/proxier.go b/bcs-services/bcs-user-manager/app/tunnel-handler/k8s/proxier.go new file mode 100644 index 0000000000..62d652b032 --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel-handler/k8s/proxier.go @@ -0,0 +1,139 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package k8s + +import ( + "fmt" + "net/http" + "strings" + "time" + + "github.com/Tencent/bk-bcs/bcs-common/common/blog" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/metrics" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/utils" + "github.com/gorilla/mux" + "github.com/gorilla/websocket" +) + +var DefaultTunnelProxyDispatcher = NewTunnelProxyDispatcher("cluster_id", "sub_path") + +// TunnelProxyDispatcher is the handler which dispatch and proxy the incoming requests to external kube-apiserver with websocket tunnel +type TunnelProxyDispatcher struct { + // ClusterVarName is the path parameter name of cluster_id + ClusterVarName string + // SubPathVarName is the path parameter name of sub-path needs to be forwarded + SubPathVarName string +} + +type ClusterHandlerInstance struct { + ServerAddress string + Handler http.Handler +} + +// NewTunnelProxyDispatcher new a default TunnelProxyDispatcher +func NewTunnelProxyDispatcher(clusterVarName, subPathVarName string) *TunnelProxyDispatcher { + return &TunnelProxyDispatcher{ + ClusterVarName: clusterVarName, + SubPathVarName: subPathVarName, + } +} + +func (f *TunnelProxyDispatcher) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + start := time.Now() + + // first authenticate the request, only admin user be allowed + auth := utils.Authenticate(req) + if !auth { + status := utils.NewUnauthorized("anonymous requests is forbidden") + utils.WriteKubeAPIError(rw, status) + return + } + + vars := mux.Vars(req) + // Get cluster id + clusterId := vars[f.ClusterVarName] + + var proxyHandler *ClusterHandlerInstance + // 先从websocket dialer缓存中查找websocket链 + websocketHandler, found, err := lookupWsHandler(clusterId) + if err != nil { + blog.Errorf("error when lookup websocket conn: %s", err.Error()) + err := fmt.Errorf("error when lookup websocket conn: %s", err.Error()) + status := utils.NewInternalError(err) + status.ErrStatus.Reason = "CREATE_TUNNEL_ERROR" + utils.WriteKubeAPIError(rw, status) + return + } + // if found tunnel, use this tunnel to request to kube-apiserver + if found { + blog.Info("found websocket conn for k8s cluster %s", clusterId) + handlerServer := stripLeaveSlash(f.ExtractPathPrefix(req), websocketHandler) + proxyHandler = &ClusterHandlerInstance{ + Handler: handlerServer, + } + credentials := sqlstore.GetWsCredentials(clusterId) + bearerToken := "Bearer " + credentials.UserToken + req.Header.Set("Authorization", bearerToken) + + // set request scheme + req.URL.Scheme = "https" + + // if webconsole long request, then set the latency before ServerHTTP + if websocket.IsWebSocketUpgrade(req) { + metrics.RequestCount.WithLabelValues("k8s_tunnel_request", "websocket").Inc() + metrics.RequestLatency.WithLabelValues("k8s_tunnel_request", "websocket").Observe(time.Since(start).Seconds()) + } + proxyHandler.Handler.ServeHTTP(rw, req) + if !websocket.IsWebSocketUpgrade(req) { + metrics.RequestCount.WithLabelValues("k8s_tunnel_request", req.Method).Inc() + metrics.RequestLatency.WithLabelValues("k8s_tunnel_request", req.Method).Observe(time.Since(start).Seconds()) + } + + return + } + + message := "no cluster can be found using given cluster id" + status := utils.NewNotFound(utils.ClusterResource, clusterId, message) + utils.WriteKubeAPIError(rw, status) + return +} + +// like http.StripPrefix, but always leaves an initial slash. (so that our +// regexps will work.) +func stripLeaveSlash(prefix string, h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + blog.Debug(fmt.Sprintf("begin proxy for: %s", req.URL.Path)) + p := strings.TrimPrefix(req.URL.Path, prefix) + if len(p) >= len(req.URL.Path) { + http.NotFound(w, req) + return + } + if len(p) > 0 && p[:1] != "/" { + p = "/" + p + } + req.URL.Path = p + h.ServeHTTP(w, req) + }) +} + +// ExtractPathPrefix extracts the path prefix which needs to be stripped when the request is forwarded to the reverse +// proxy handler. +func (f *TunnelProxyDispatcher) ExtractPathPrefix(req *http.Request) string { + subPath := mux.Vars(req)[f.SubPathVarName] + fullPath := req.URL.Path + // We need to strip the prefix string before the request can be forward to apiserver, so we will walk over the full + // request path backwards, everything before the `sub_path` will be the prefix we need to strip + return fullPath[:len(fullPath)-len(subPath)] +} diff --git a/bcs-services/bcs-user-manager/app/tunnel-handler/k8s/proxier_test.go b/bcs-services/bcs-user-manager/app/tunnel-handler/k8s/proxier_test.go new file mode 100644 index 0000000000..bf63728980 --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel-handler/k8s/proxier_test.go @@ -0,0 +1,89 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package k8s + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/gorilla/mux" +) + +type TestHandler struct { + // ClusterVarName is the path parameter name of cluster_id + ClusterVarName string + // SubPathVarName is the path parameter name of sub-path needs to be forwarded + SubPathVarName string +} + +type BackendHandler struct { +} + +func (b *BackendHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + path := req.URL.Path + rw.Write([]byte(path)) +} + +var DefaultTestHandler = NewTestHandler("cluster_id", "sub_path") + +// NewTestHandler new a default TestHandler +func NewTestHandler(clusterVarName, subPathVarName string) *TestHandler { + return &TestHandler{ + ClusterVarName: clusterVarName, + SubPathVarName: subPathVarName, + } +} + +func (t *TestHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + backend := &BackendHandler{} + subPath := mux.Vars(req)[t.SubPathVarName] + fullPath := req.URL.Path + + handlerServer := stripLeaveSlash(fullPath[:len(fullPath)-len(subPath)], backend) + handlerServer.ServeHTTP(rw, req) +} + +func TestStripLeaveSlash(t *testing.T) { + router := mux.NewRouter() + router.Handle("/tunnels/clusters/{cluster_id}/{sub_path:.*}", DefaultTestHandler) + + urls := []string{"/tunnels/clusters/k8s-001/version", "/tunnels/clusters/k8s-001/apis/apps/v1/namespaces/default/deployments/test"} + for _, url := range urls { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + t.Fatal(err) + } + rr := httptest.NewRecorder() + router.ServeHTTP(rr, req) + stripPath := strings.TrimPrefix(url, "/tunnels/clusters/k8s-001") + if rr.Body.String() != stripPath { + t.Error("stripLeaveSlash got an unexpected path") + } + } +} + +func TestTunnelServeHTTP(t *testing.T) { + req, err := http.NewRequest("GET", "/tunnels/clusters/k8s-001/version", nil) + if err != nil { + t.Fatal(err) + } + + rr := httptest.NewRecorder() + DefaultTunnelProxyDispatcher.ServeHTTP(rr, req) + if rr.Code != http.StatusUnauthorized { + t.Error("tunnel api should provide admin token") + } +} diff --git a/bcs-services/bcs-user-manager/app/tunnel-handler/k8s/tunnel-dialer.go b/bcs-services/bcs-user-manager/app/tunnel-handler/k8s/tunnel-dialer.go new file mode 100644 index 0000000000..a017f1a444 --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel-handler/k8s/tunnel-dialer.go @@ -0,0 +1,91 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package k8s + +import ( + "crypto/tls" + "crypto/x509" + "net/http" + "net/url" + "strings" + "time" + + "github.com/Tencent/bk-bcs/bcs-common/common/blog" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/tunnel" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/models" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore" + "k8s.io/apimachinery/pkg/util/proxy" + "k8s.io/client-go/transport" +) + +type responder struct{} + +func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { + http.Error(w, err.Error(), http.StatusInternalServerError) +} + +// lookupWsHandler will lookup websocket dialer in cache +func lookupWsHandler(clusterId string) (*proxy.UpgradeAwareHandler, bool, error) { + credentials := sqlstore.GetWsCredentials(clusterId) + if credentials == nil { + return nil, false, nil + } + + serverAddress := credentials.ServerAddress + if !strings.HasSuffix(serverAddress, "/") { + serverAddress = serverAddress + "/" + } + u, err := url.Parse(serverAddress) + if err != nil { + return nil, false, err + } + + transport := getTransport(clusterId, credentials) + if transport == nil { + return nil, false, nil + } + + responder := &responder{} + proxyHandler := proxy.NewUpgradeAwareHandler(u, transport, true, false, responder) + proxyHandler.UseRequestLocation = true + + return proxyHandler, true, nil +} + +// getTransport generate transport with dialer from tunnel +func getTransport(clusterId string, credentials *models.BcsWsClusterCredentials) http.RoundTripper { + tp := &http.Transport{} + if credentials.CaCertData != "" { + certs := x509.NewCertPool() + caCrt := []byte(credentials.CaCertData) + certs.AppendCertsFromPEM(caCrt) + tp.TLSClientConfig = &tls.Config{ + RootCAs: certs, + } + } + + tunnelServer := tunnel.DefaultTunnelServer + if tunnelServer.HasSession(clusterId) { + blog.Infof("found sesseion for k8s: %s", clusterId) + // get dialer from tunnel sessions + cd := tunnelServer.Dialer(clusterId, 15*time.Second) + tp.Dial = cd + bearerToken := credentials.UserToken + bearerAuthRoundTripper := transport.NewBearerAuthRoundTripper(bearerToken, tp) + + return bearerAuthRoundTripper + } + + return nil +} diff --git a/bcs-services/bcs-user-manager/app/tunnel-handler/k8s/tunnel-dialer_test.go b/bcs-services/bcs-user-manager/app/tunnel-handler/k8s/tunnel-dialer_test.go new file mode 100644 index 0000000000..3bab0566ff --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel-handler/k8s/tunnel-dialer_test.go @@ -0,0 +1,45 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package k8s + +import ( + "net/http" + "testing" + + "github.com/Tencent/bk-bcs/bcs-common/common/websocketDialer" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/tunnel" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/models" +) + +func testAuthorize(req *http.Request) (string, bool, error) { + return "", true, nil +} + +func TestGetTransport(t *testing.T) { + clusterId := "k8s-001" + wsCred := models.BcsWsClusterCredentials{ + ServerKey: "k8s-001", + ServerAddress: "https://127.0.0.1:443", + } + + tunnel.DefaultTunnelServer = websocketDialer.New(testAuthorize, websocketDialer.DefaultErrorWriter, testCleanCredentials) + tp := getTransport(clusterId, &wsCred) + if tp != nil { + t.Error("should have no tunnel session and return nil transport") + } +} + +func testCleanCredentials(serverKey string) { + return +} diff --git a/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/actions.go b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/actions.go new file mode 100644 index 0000000000..9a60a41f00 --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/actions.go @@ -0,0 +1,32 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package mesos + +import ( + "github.com/Tencent/bk-bcs/bcs-common/common/http/httpserver" +) + +//Action restful action struct +type Action httpserver.Action + +var apiActions = []*httpserver.Action{} + +//RegisterAction register action to actions +func RegisterAction(action Action) { + apiActions = append(apiActions, httpserver.NewAction(action.Verb, action.Path, action.Params, action.Handler)) +} + +func GetApiAction() []*httpserver.Action { + return apiActions +} diff --git a/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/mesos-action.go b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/mesos-action.go new file mode 100644 index 0000000000..cb83cbf0ee --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/mesos-action.go @@ -0,0 +1,171 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package mesos + +import ( + "fmt" + "io/ioutil" + "strings" + "time" + + "github.com/Tencent/bk-bcs/bcs-common/common" + "github.com/Tencent/bk-bcs/bcs-common/common/blog" + bhttp "github.com/Tencent/bk-bcs/bcs-common/common/http" + "github.com/Tencent/bk-bcs/bcs-common/common/http/httpclient" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/metrics" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/config" + "github.com/emicklei/go-restful" + "github.com/ghodss/yaml" +) + +const ( + //mediaHeader key for http media content type + medieTypeHeader = "Content-Type" + //mediaTypeApplicationJSON json payload for http body + mediaTypeApplicationJSON = "application/json" + //mediaTypeApplicationYaml yaml payload for http body + mediaTypeApplicationYaml = "application/x-yaml" +) + +func init() { + RegisterAction(Action{Verb: "POST", Path: "/{uri:*}", Params: nil, Handler: handlerPostActions}) + RegisterAction(Action{Verb: "PUT", Path: "/{uri:*}", Params: nil, Handler: handlerPutActions}) + RegisterAction(Action{Verb: "GET", Path: "/{uri:*}", Params: nil, Handler: handlerGetActions}) + RegisterAction(Action{Verb: "DELETE", Path: "/{uri:*}", Params: nil, Handler: handlerDeleteActions}) +} + +func request2mesosapi(req *restful.Request, uri, method string) (string, error) { + start := time.Now() + + data, err := ioutil.ReadAll(req.Request.Body) + if err != nil { + metrics.RequestErrorCount.WithLabelValues("mesos_tunnel_request", method).Inc() + metrics.RequestErrorLatency.WithLabelValues("mesos_tunnel_request", method).Observe(time.Since(start).Seconds()) + blog.Error("handler url %s read request body failed, error: %s", uri, err.Error()) + err1 := bhttp.InternalError(common.BcsErrCommHttpReadBodyFail, common.BcsErrCommHttpReadBodyFailStr) + return err1.Error(), nil + } + //check application media type + if mediaTypeApplicationYaml == req.Request.Header.Get(medieTypeHeader) { + data, err = yamlTOJSON(data) + if err != nil { + blog.Errorf("bcs-user-manager handle url %s yaml to json failed, %s", uri, err.Error()) + mediaErr := bhttp.InternalError(common.BcsErrApiMediaTypeError, common.BcsErrApiMediaTypeErrorStr) + return mediaErr.Error(), nil + } else { + blog.V(3).Infof("bcs-user-manager handle url %s converting yaml to json successfully", uri) + } + } + + cluster := req.Request.Header.Get("BCS-ClusterID") + if cluster == "" { + metrics.RequestErrorCount.WithLabelValues("mesos_tunnel_request", method).Inc() + metrics.RequestErrorLatency.WithLabelValues("mesos_tunnel_request", method).Observe(time.Since(start).Seconds()) + blog.Error("handler url %s read header BCS-ClusterID is empty", uri) + err1 := bhttp.InternalError(common.BcsErrCommHttpParametersFailed, "http header BCS-ClusterID can't be empty") + return err1.Error(), nil + } + + httpcli := httpclient.NewHttpClient() + httpcli.SetHeader(medieTypeHeader, "application/json") + httpcli.SetHeader("Accept", "application/json") + + // 先从websocket dialer缓存中查找websocket链 + serverAddr, tp, found := LookupWsTransport(cluster) + if found { + url := fmt.Sprintf("%s%s", serverAddr, uri) + if strings.HasPrefix(serverAddr, "https") { + if config.CliTls == nil { + blog.Errorf("client tls is empty") + } + tp.TLSClientConfig = config.CliTls + } + httpcli.SetTransPort(tp) + + blog.Info(url) + reply, err := httpcli.Request(url, method, req.Request.Header, data) + if err != nil { + metrics.RequestErrorCount.WithLabelValues("mesos_tunnel_request", method).Inc() + metrics.RequestErrorLatency.WithLabelValues("mesos_tunnel_request", method).Observe(time.Since(start).Seconds()) + blog.Error("request url %s error %s", url, err.Error()) + err1 := bhttp.InternalError(common.BcsErrApiRequestMesosApiFail, common.BcsErrApiRequestMesosApiFailStr) + return err1.Error(), nil + } + + metrics.RequestCount.WithLabelValues("mesos_tunnel_request", method).Inc() + metrics.RequestLatency.WithLabelValues("mesos_tunnel_request", method).Observe(time.Since(start).Seconds()) + return string(reply), err + } + + err1 := bhttp.InternalError(common.BcsErrApiGetMesosApiFail, fmt.Sprintf("mesos cluster %s not found", cluster)) + return err1.Error(), nil +} + +func handlerPostActions(req *restful.Request, resp *restful.Response) { + blog.V(3).Infof("client %s request %s", req.Request.RemoteAddr, req.Request.URL.Path) + + url := req.Request.URL.Path + + if req.Request.URL.RawQuery != "" { + url = fmt.Sprintf("%s?%s", url, req.Request.URL.RawQuery) + } + + data, _ := request2mesosapi(req, url, "POST") + resp.Write([]byte(data)) +} + +func handlerGetActions(req *restful.Request, resp *restful.Response) { + blog.V(3).Infof("client %s request %s", req.Request.RemoteAddr, req.Request.URL.Path) + url := req.Request.URL.Path + + if req.Request.URL.RawQuery != "" { + url = fmt.Sprintf("%s?%s", url, req.Request.URL.RawQuery) + } + + data, _ := request2mesosapi(req, url, "GET") + resp.Write([]byte(data)) +} + +func handlerDeleteActions(req *restful.Request, resp *restful.Response) { + blog.V(3).Infof("client %s request %s", req.Request.RemoteAddr, req.Request.URL.Path) + url := req.Request.URL.Path + + if req.Request.URL.RawQuery != "" { + url = fmt.Sprintf("%s?%s", url, req.Request.URL.RawQuery) + } + + data, _ := request2mesosapi(req, url, "DELETE") + resp.Write([]byte(data)) +} + +func handlerPutActions(req *restful.Request, resp *restful.Response) { + blog.V(3).Infof("client %s request %s", req.Request.RemoteAddr, req.Request.URL.Path) + url := req.Request.URL.Path + + if req.Request.URL.RawQuery != "" { + url = fmt.Sprintf("%s?%s", url, req.Request.URL.RawQuery) + } + + data, _ := request2mesosapi(req, url, "PUT") + resp.Write([]byte(data)) +} + +//yamlTOJSON check if mesos request body is yaml, +// then convert yaml to json +func yamlTOJSON(rawData []byte) ([]byte, error) { + if len(rawData) == 0 { + return nil, nil + } + return yaml.YAMLToJSON(rawData) +} diff --git a/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/mesos-action_test.go b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/mesos-action_test.go new file mode 100644 index 0000000000..4d6453222e --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/mesos-action_test.go @@ -0,0 +1,50 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package mesos + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/Tencent/bk-bcs/bcs-common/common" + bhttp "github.com/Tencent/bk-bcs/bcs-common/common/http" + "github.com/emicklei/go-restful" +) + +func TestRequest2mesosapi(t *testing.T) { + req, err := http.NewRequest("GET", "/mesosdriver/v4/{sub_path:.*}", bytes.NewReader(nil)) + if err != nil { + t.Fatal(err) + } + request := restful.NewRequest(req) + rr := httptest.NewRecorder() + response := restful.NewResponse(rr) + handlerGetActions(request, response) + body, readErr := ioutil.ReadAll(rr.Body) + if readErr != nil { + t.Fatal("error read response body") + } + respData := bhttp.APIRespone{} + err = json.Unmarshal(body, &respData) + if err != nil { + t.Fatal("error when unmarshall response body") + } + if respData.Code != common.BcsErrCommHttpParametersFailed { + t.Error("header BCS-ClusterID empty should return a parameters failed code") + } +} diff --git a/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/tunnel-dialer.go b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/tunnel-dialer.go new file mode 100644 index 0000000000..cab1b9ed33 --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/tunnel-dialer.go @@ -0,0 +1,75 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package mesos + +import ( + "github.com/Tencent/bk-bcs/bcs-common/common/websocketDialer" + "math/rand" + "net/http" + "time" + + "github.com/Tencent/bk-bcs/bcs-common/common/blog" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/tunnel" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore" +) + +// LookupWsTransport will lookup websocket dialer in cache and generate transport +func LookupWsTransport(clusterId string) (string, *http.Transport, bool) { + credentials := sqlstore.GetWsCredentialsByClusterId(clusterId) + if len(credentials) == 0 { + return "", nil, false + } + + rand.Shuffle(len(credentials), func(i, j int) { + credentials[i], credentials[j] = credentials[j], credentials[i] + }) + + tunnelServer := tunnel.DefaultTunnelServer + for _, credential := range credentials { + clientKey := credential.ServerKey + serverAddress := credential.ServerAddress + if tunnelServer.HasSession(clientKey) { + blog.Infof("found sesseion for mesos: %s", clientKey) + tp := &http.Transport{} + cd := tunnelServer.Dialer(clientKey, 15*time.Second) + tp.Dial = cd + return serverAddress, tp, true + } + } + return "", nil, false +} + +// LookupWsDialer will lookup websocket dialer in cache +func LookupWsDialer(clusterId string) (string, websocketDialer.Dialer, bool) { + credentials := sqlstore.GetWsCredentialsByClusterId(clusterId) + if len(credentials) == 0 { + return "", nil, false + } + + rand.Shuffle(len(credentials), func(i, j int) { + credentials[i], credentials[j] = credentials[j], credentials[i] + }) + + tunnelServer := tunnel.DefaultTunnelServer + for _, credential := range credentials { + clientKey := credential.ServerKey + serverAddress := credential.ServerAddress + if tunnelServer.HasSession(clientKey) { + blog.Infof("found sesseion: %s", clientKey) + clusterDialer := tunnelServer.Dialer(clientKey, 15*time.Second) + return serverAddress, clusterDialer, true + } + } + return "", nil, false +} diff --git a/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/http-proxy.go b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/http-proxy.go new file mode 100644 index 0000000000..860a12b897 --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/http-proxy.go @@ -0,0 +1,45 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package webconsole + +import ( + "net/http" + "net/http/httputil" + "net/url" + + "github.com/Tencent/bk-bcs/bcs-common/common/websocketDialer" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/config" +) + +// NewHttpReverseProxy create a http reverse proxy +func NewHttpReverseProxy(target *url.URL, clusterDialer websocketDialer.Dialer) *httputil.ReverseProxy { + director := func(req *http.Request) { + req.URL.Scheme = target.Scheme + req.URL.Host = target.Host + req.URL.Path = target.Path + req.URL.RawQuery = target.RawQuery + } + reverseProxy := &httputil.ReverseProxy{Director: director} + // use the cluster tunnel dialer as transport Dial + tp := &http.Transport{ + //Proxy: http.ProxyFromEnvironment, + Dial: clusterDialer, + } + if target.Scheme == "https" { + tp.TLSClientConfig = config.CliTls + } + reverseProxy.Transport = tp + + return reverseProxy +} diff --git a/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/webconsole.go b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/webconsole.go new file mode 100644 index 0000000000..b1826525a8 --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/webconsole.go @@ -0,0 +1,94 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package webconsole + +import ( + "fmt" + "net/http" + "net/url" + "time" + + "github.com/Tencent/bk-bcs/bcs-common/common" + "github.com/Tencent/bk-bcs/bcs-common/common/blog" + bhttp "github.com/Tencent/bk-bcs/bcs-common/common/http" + "github.com/Tencent/bk-bcs/bcs-common/common/websocketDialer" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/metrics" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/tunnel-handler/mesos" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/utils" + "github.com/gorilla/websocket" +) + +type WebconsoleProxy struct { + // Backend returns the backend URL which the proxy uses to reverse proxy + Backend func(*http.Request) (*url.URL, websocketDialer.Dialer, error) +} + +// NewWebconsoleProxy create a webconsole proxy +func NewWebconsoleProxy() *WebconsoleProxy { + backend := func(req *http.Request) (*url.URL, websocketDialer.Dialer, error) { + cluster := req.Header.Get("BCS-ClusterID") + if cluster == "" { + blog.Error("handler url read header BCS-ClusterID is empty") + err1 := bhttp.InternalError(common.BcsErrCommHttpParametersFailed, "http header BCS-ClusterID can't be empty") + return nil, nil, err1 + } + + authed := utils.Authenticate(req) + if !authed { + return nil, nil, fmt.Errorf("must provide admin token to request with websocket tunnel") + } + + // find whether exist a cluster tunnel dialer in sessions + serverAddr, clusterDialer, found := mesos.LookupWsDialer(cluster) + if found { + tunnelUrl, err := url.Parse(serverAddr) + if err != nil { + return nil, nil, fmt.Errorf("error when parse server address: %s", err.Error()) + } + originUrl := req.URL + originUrl.Host = tunnelUrl.Host + originUrl.Scheme = tunnelUrl.Scheme + return originUrl, clusterDialer, nil + } + return nil, nil, fmt.Errorf("no tunnel could be found for cluster %s", cluster) + } + + return &WebconsoleProxy{ + Backend: backend, + } +} + +// ServeHTTP handle webconsole request +func (w *WebconsoleProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + start := time.Now() + + backendURL, clusterDialer, err := w.Backend(req) + if err != nil { + http.Error(rw, err.Error(), http.StatusBadRequest) + return + } + // if websocket request, handle it with websocket proxy + if websocket.IsWebSocketUpgrade(req) { + websocketProxy := NewWebsocketProxy(backendURL, clusterDialer) + websocketProxy.ServeHTTP(rw, req) + return + } + + // if ordinary request, handle it with http proxy + httpProxy := NewHttpReverseProxy(backendURL, clusterDialer) + httpProxy.ServeHTTP(rw, req) + metrics.RequestCount.WithLabelValues("mesos_webconsole", req.Method).Inc() + metrics.RequestLatency.WithLabelValues("mesos_webconsole", req.Method).Observe(time.Since(start).Seconds()) + return +} diff --git a/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/webconsole_test.go b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/webconsole_test.go new file mode 100644 index 0000000000..cdc534242d --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/webconsole_test.go @@ -0,0 +1,53 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package webconsole + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/Tencent/bk-bcs/bcs-common/common" + bhttp "github.com/Tencent/bk-bcs/bcs-common/common/http" +) + +func TestServeHTTP(t *testing.T) { + req, err := http.NewRequest("GET", "/mesosdriver/v4/{sub_path:.*}", bytes.NewReader(nil)) + if err != nil { + t.Fatal(err) + } + rr := httptest.NewRecorder() + + wp := NewWebconsoleProxy() + wp.ServeHTTP(rr, req) + body, readErr := ioutil.ReadAll(rr.Body) + if readErr != nil { + t.Fatal("error read response body") + } + respData := bhttp.APIRespone{} + err = json.Unmarshal(body, &respData) + if err != nil { + t.Fatal("error when unmarshall response body") + } + if respData.Code != common.BcsErrCommHttpParametersFailed { + t.Error("header BCS-ClusterID empty should return a parameters failed code") + } + + //req.Header.Set("BCS-ClusterID", "k8s-001") + //rr = httptest.NewRecorder() + //wp.ServeHTTP(rr, req) +} diff --git a/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/websocket-proxy.go b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/websocket-proxy.go new file mode 100644 index 0000000000..aaf8577ccc --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/websocket-proxy.go @@ -0,0 +1,220 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package webconsole + +import ( + "fmt" + "io" + "net" + "net/http" + "net/url" + "strings" + + "github.com/Tencent/bk-bcs/bcs-common/common" + "github.com/Tencent/bk-bcs/bcs-common/common/blog" + "github.com/Tencent/bk-bcs/bcs-common/common/websocketDialer" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/config" + "github.com/gorilla/websocket" +) + +var ( + // DefaultUpgrader specifies the parameters for upgrading an HTTP + // connection to a WebSocket connection. + DefaultUpgrader = &websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } +) + +// WebsocketProxy is an HTTP Handler that takes an incoming WebSocket +// connection and proxies it to another server. +type WebsocketProxy struct { + // Director, if non-nil, is a function that may copy additional request + // headers from the incoming WebSocket connection into the output headers + // which will be forwarded to another server. + Director func(incoming *http.Request, out http.Header) + + BackendUrl *url.URL + + // Upgrader specifies the parameters for upgrading a incoming HTTP + // connection to a WebSocket connection. If nil, DefaultUpgrader is used. + Upgrader *websocket.Upgrader + + // Dialer contains options for connecting to the backend WebSocket server. + // If nil, DefaultDialer is used. + Dialer *websocket.Dialer +} + +// NewWebsocketProxy returns a new Websocket reverse proxy that rewrites the +// URL's to the scheme, host and base path provider in target. +func NewWebsocketProxy(backendUrl *url.URL, clusterDialer websocketDialer.Dialer) *WebsocketProxy { + + // DefaultDialer is a dialer with all fields set to the default zero values. + defaultDialer := websocket.DefaultDialer + if backendUrl.Scheme == "https" { + defaultDialer.TLSClientConfig = config.CliTls + backendUrl.Scheme = "wss" + } else { + backendUrl.Scheme = "ws" + } + defaultDialer.NetDial = clusterDialer + + return &WebsocketProxy{ + BackendUrl: backendUrl, + Dialer: defaultDialer, + Upgrader: DefaultUpgrader, + } +} + +// ServeHTTP implements the http.Handler that proxies WebSocket connections. +func (w *WebsocketProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + backendURL := w.BackendUrl + requestHeader := w.setRequestHeader(req) + if w.Director != nil { + w.Director(req, requestHeader) + } + connBackend, resp, err := w.Dialer.Dial(backendURL.String(), requestHeader) + if err != nil { + blog.Errorf("websocketproxy: couldn't dial to remote backend url %s", err) + message := fmt.Sprintf("errcode: %d, couldn't dial to remote backend url %s", common.BcsErrApiWebConsoleFailedCode, err.Error()) + if resp != nil { + if err := copyResponse(rw, resp); err != nil { + blog.Errorf("websocketproxy: couldn't write response after failed remote backend handshake: %s", err) + } + } else { + http.Error(rw, message, http.StatusServiceUnavailable) + } + return + } + defer connBackend.Close() + + // Only pass those headers to the upgrader. + upgradeHeader := http.Header{} + if hdr := resp.Header.Get("Sec-Websocket-Protocol"); hdr != "" { + upgradeHeader.Set("Sec-Websocket-Protocol", hdr) + } + if hdr := resp.Header.Get("Set-Cookie"); hdr != "" { + upgradeHeader.Set("Set-Cookie", hdr) + } + + w.Upgrader.CheckOrigin = func(r *http.Request) bool { + return true + } + + // Now upgrade the existing incoming request to a WebSocket connection. + // Also pass the header that we gathered from the Dial handshake. + connPub, err := w.Upgrader.Upgrade(rw, req, upgradeHeader) + if err != nil { + blog.Errorf("websocketproxy: couldn't upgrade %s", err) + return + } + defer connPub.Close() + + // begin to replicate websocket conn data + errClient := make(chan error, 1) + errBackend := make(chan error, 1) + go replicateWebsocketConn(connPub, connBackend, errClient) + go replicateWebsocketConn(connBackend, connPub, errBackend) + + var message string + select { + case err = <-errClient: + message = "websocketproxy: Error when copying from backend to client: %v" + case err = <-errBackend: + message = "websocketproxy: Error when copying from client to backend: %v" + + } + if e, ok := err.(*websocket.CloseError); !ok || e.Code == websocket.CloseAbnormalClosure { + blog.Errorf(message, err) + } +} + +// setRequestHeader pass the origin header to backend request +func (w *WebsocketProxy) setRequestHeader(req *http.Request) http.Header { + // Pass headers from the incoming request to the dialer to forward them to + // the final destinations. + requestHeader := http.Header{} + if origin := req.Header.Get("Origin"); origin != "" { + requestHeader.Add("Origin", origin) + } + for _, prot := range req.Header[http.CanonicalHeaderKey("Sec-WebSocket-Protocol")] { + requestHeader.Add("Sec-WebSocket-Protocol", prot) + } + for _, cookie := range req.Header[http.CanonicalHeaderKey("Cookie")] { + requestHeader.Add("Cookie", cookie) + } + // should always add the BCS-ClusterID to request to mesos-driver + for _, cluster := range req.Header[http.CanonicalHeaderKey("BCS-ClusterID")] { + requestHeader.Add("BCS-ClusterID", cluster) + } + if req.Host != "" { + requestHeader.Set("Host", req.Host) + } + + if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil { + if prior, ok := req.Header["X-Forwarded-For"]; ok { + clientIP = strings.Join(prior, ", ") + ", " + clientIP + } + requestHeader.Set("X-Forwarded-For", clientIP) + } + + requestHeader.Set("X-Forwarded-Proto", "http") + if req.TLS != nil { + requestHeader.Set("X-Forwarded-Proto", "https") + } + + return requestHeader +} + +// replicateWebsocketConn keep replicating data from source websocket conn to destination websocket conn +func replicateWebsocketConn(dst, src *websocket.Conn, errc chan error) { + for { + msgType, msg, err1 := src.ReadMessage() + if err1 != nil { + m := websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("%v", err1)) + if e, ok := err1.(*websocket.CloseError); ok { + if e.Code != websocket.CloseNoStatusReceived { + m = websocket.FormatCloseMessage(e.Code, e.Text) + } + } + errc <- err1 + dst.WriteMessage(websocket.CloseMessage, m) + break + } + err := dst.WriteMessage(msgType, msg) + if err != nil { + errc <- err + break + } + } +} + +// copyHeader copy all header from source to destination +func copyHeader(dst, src http.Header) { + for k, vv := range src { + for _, v := range vv { + dst.Add(k, v) + } + } +} + +// copyResponse copy body from source ResponseWriter to destination Response +func copyResponse(rw http.ResponseWriter, resp *http.Response) error { + copyHeader(rw.Header(), resp.Header) + rw.WriteHeader(resp.StatusCode) + defer resp.Body.Close() + + _, err := io.Copy(rw, resp.Body) + return err +} diff --git a/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/websocket-proxy_test.go b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/websocket-proxy_test.go new file mode 100644 index 0000000000..74f6e3a013 --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole/websocket-proxy_test.go @@ -0,0 +1,63 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package webconsole + +import ( + "bytes" + "net/http" + "net/http/httptest" + "net/url" + "testing" +) + +func TestWebsocketProxyServeHTTP(t *testing.T) { + req, err := http.NewRequest("GET", "/mesosdriver/v4/webconsole/{sub_path:.*}", bytes.NewReader(nil)) + if err != nil { + t.Fatal(err) + } + rr := httptest.NewRecorder() + + u := "https://127.0.0.1:8087" + + backendUrl, err := url.Parse(u) + if err != nil { + t.Fatal("error parse url") + } + wp := NewWebsocketProxy(backendUrl, nil) + wp.ServeHTTP(rr, req) + if rr.Code != http.StatusServiceUnavailable { + t.Error("should return 505 http code") + } +} + +func TestSetRequestHeader(t *testing.T) { + req, err := http.NewRequest("GET", "/mesosdriver/v4/webconsole/{sub_path:.*}", bytes.NewReader(nil)) + if err != nil { + t.Fatal(err) + } + req.Header.Add("BCS-ClusterID", "k8s-001") + req.Header.Add("Sec-WebSocket-Protocol", "websocket") + + backendUrl := url.URL{} + wp := NewWebsocketProxy(&backendUrl, nil) + requestHeader := wp.setRequestHeader(req) + + if requestHeader.Get("BCS-ClusterID") != "k8s-001" { + t.Error("error set upgrade request header") + } + + if requestHeader.Get("Sec-WebSocket-Protocol") != "websocket" { + t.Error("error set upgrade request header") + } +} diff --git a/bcs-services/bcs-user-manager/app/tunnel/peermanager.go b/bcs-services/bcs-user-manager/app/tunnel/peermanager.go new file mode 100644 index 0000000000..21d0d63ac0 --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel/peermanager.go @@ -0,0 +1,180 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package tunnel + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "sync" + "time" + + "github.com/Tencent/bk-bcs/bcs-common/common/RegisterDiscover" + "github.com/Tencent/bk-bcs/bcs-common/common/blog" + "github.com/Tencent/bk-bcs/bcs-common/common/types" + "github.com/Tencent/bk-bcs/bcs-common/common/websocketDialer" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/config" + "golang.org/x/net/context" +) + +const ( + defaultPeerToken = "Mx9vWfTZea4MEzc7SlvB8aFl0NhmYQvZzEomOYypDMKkev34Q9kIyh32RjXXCIcn" +) + +type peerManager struct { + sync.Mutex + ready bool + token string + urlFormat string + server *websocketDialer.Server + peers map[string]bool +} + +type PeerRDiscover struct { + rd *RegisterDiscover.RegDiscover + rootCxt context.Context + cancel context.CancelFunc +} + +// StartPeerManager manage mutual tunnel connection +func StartPeerManager(conf *config.UserMgrConfig, dialerServer *websocketDialer.Server) error { + // self peerID is ip:port + dialerServer.PeerID = fmt.Sprintf("%s:%d", conf.LocalIp, conf.Port) + dialerServer.PeerToken = conf.PeerToken + // if not specified peertoken to authorize each other, use the default peertoken + if dialerServer.PeerToken == "" { + dialerServer.PeerToken = defaultPeerToken + blog.Info("use default peer token: [%s]", dialerServer.PeerToken) + } + pm := &peerManager{ + token: dialerServer.PeerToken, + urlFormat: "wss://%s/usermanager/v1/websocket/connect", + server: dialerServer, + peers: map[string]bool{}, + } + + // init register and discovery object + peerRd := &PeerRDiscover{ + rd: RegisterDiscover.NewRegDiscoverEx(conf.RegDiscvSrv, 10*time.Second), + } + peerRd.rootCxt, peerRd.cancel = context.WithCancel(context.Background()) + + if err := peerRd.rd.Start(); err != nil { + blog.Error("fail to start register and discover bcs-user-manager peers. err:%s", err.Error()) + return err + } + + go peerRd.discoveryAndWatchPeer(pm) + + return nil +} + +// discoveryAndWatchPeer discover and watch peers each other +func (p *PeerRDiscover) discoveryAndWatchPeer(pm *peerManager) { + key := fmt.Sprintf("%s/%s", types.BCS_SERV_BASEPATH, types.BCS_MODULE_USERMGR) + blog.Infof("start discover service key %s", key) + event, err := p.rd.DiscoverService(key) + if err != nil { + blog.Error("fail to register discover for api. err:%s", err.Error()) + p.cancel() + os.Exit(1) + } + + for { + select { + case eve := <-event: + var peerServs []string + // discovery a new peer event + for _, serv := range eve.Server { + userMgrServ := new(types.BcsUserMgrServInfo) + if err := json.Unmarshal([]byte(serv), userMgrServ); err != nil { + blog.Warn("fail to do json unmarshal(%s), err:%s", serv, err.Error()) + continue + } + peerServ := fmt.Sprintf("%s:%d", userMgrServ.IP, userMgrServ.Port) + peerServs = append(peerServs, peerServ) + } + + blog.Infof("%v", peerServs) + err := pm.syncPeers(peerServs) + if err != nil { + blog.Errorf("failed to discovery and watch peers: %s", err.Error()) + } + case <-p.rootCxt.Done(): + blog.Warn("zk register path %s and discover done", key) + return + } + } +} + +// syncPeers sync peers status, add tunnels to new peers, remove tunnels from deleted peers +func (p *peerManager) syncPeers(servs []string) error { + if len(servs) == 0 { + return errors.New("syncPeers even can't discovery self") + } + + p.addRemovePeers(servs) + + return nil +} + +// addRemovePeers add tunnels with new peers each other, remove tunnels from deleted peers +func (p *peerManager) addRemovePeers(servs []string) { + p.Lock() + defer p.Unlock() + + newSet := map[string]bool{} + ready := false + + for _, serv := range servs { + if serv == p.server.PeerID { + ready = true + } else { + newSet[serv] = true + } + } + + toCreate, toDelete, _ := diff(newSet, p.peers) + + // add new peers + for _, peerServ := range toCreate { + p.server.AddPeer(fmt.Sprintf(p.urlFormat, peerServ), peerServ, p.token) + } + // remove deleted peers + for _, ip := range toDelete { + p.server.RemovePeer(ip) + } + + p.peers = newSet + p.ready = ready +} + +// diff just compare and diff two map +func diff(desired, actual map[string]bool) ([]string, []string, []string) { + var same, toCreate, toDelete []string + for key := range desired { + if actual[key] { + same = append(same, key) + } else { + toCreate = append(toCreate, key) + } + } + for key := range actual { + if !desired[key] { + toDelete = append(toDelete, key) + } + } + return toCreate, toDelete, same +} diff --git a/bcs-services/bcs-user-manager/app/tunnel/peermanager_test.go b/bcs-services/bcs-user-manager/app/tunnel/peermanager_test.go new file mode 100644 index 0000000000..f9e24662e5 --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel/peermanager_test.go @@ -0,0 +1,61 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package tunnel + +import ( + "reflect" + "testing" + + "github.com/Tencent/bk-bcs/bcs-services/bcs-api/tunnel" +) + +// TestDiff test the diff func +func TestDiff(t *testing.T) { + desired := map[string]bool{ + "node1": true, + "node2": true, + "node3": true, + } + actual := map[string]bool{ + "node2": true, + "node3": true, + "node4": true, + } + + toCreate, toDelete, same := diff(desired, actual) + if !reflect.DeepEqual(toCreate, []string{"node1"}) { + t.Error("get an unexpected toCreate map") + } + if !reflect.DeepEqual(toDelete, []string{"node4"}) { + t.Error("get an unexpected toDelete map") + } + if !reflect.DeepEqual(same, []string{"node2", "node3"}) { + t.Error("get an unexpected same map") + } +} + +// TestSyncPeers test the syncPeers func +func TestSyncPeers(t *testing.T) { + tunnelServer := tunnel.NewTunnelServer() + pm := &peerManager{ + token: tunnelServer.PeerToken, + urlFormat: "wss://%s/usermanager/v1/websocket/connect", + server: tunnelServer, + peers: map[string]bool{}, + } + err := pm.syncPeers([]string{}) + if err == nil { + t.Error("register and discovery should always can discovery self") + } +} diff --git a/bcs-services/bcs-user-manager/app/tunnel/tunnel-server.go b/bcs-services/bcs-user-manager/app/tunnel/tunnel-server.go new file mode 100644 index 0000000000..20989487c1 --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel/tunnel-server.go @@ -0,0 +1,147 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package tunnel + +import ( + "encoding/base64" + "encoding/json" + "errors" + "net/http" + "net/url" + + "github.com/Tencent/bk-bcs/bcs-common/common/blog" + "github.com/Tencent/bk-bcs/bcs-common/common/websocketDialer" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore" +) + +const ( + Module = "BCS-API-Tunnel-Module" + RegisterToken = "BCS-API-Tunnel-Token" + Params = "BCS-API-Tunnel-Params" + Cluster = "BCS-API-Tunnel-ClusterId" + + KubeAgentModule = "kube-agent" + K8sDriverModule = "k8s-driver" + MesosDriverModule = "mesos-driver" +) + +var ( + DefaultTunnelServer *websocketDialer.Server + errFailedAuth = errors.New("failed authentication") +) + +type RegisterCluster struct { + Address string `json:"address"` + UserToken string `json:"userToken"` + CACert string `json:"caCert"` +} + +// authorizeTunnel authorize an client +func authorizeTunnel(req *http.Request) (string, bool, error) { + // module name, register_token, cluster is necessary + moduleName := req.Header.Get(Module) + if moduleName == "" { + return "", false, errors.New("module empty") + } + + registerToken := req.Header.Get(RegisterToken) + if registerToken == "" { + return "", false, errors.New("registerToken empty") + } + + clusterId := req.Header.Get(Cluster) + if clusterId == "" { + return "", false, errors.New("clusterId empty") + } + + var registerCluster RegisterCluster + params := req.Header.Get(Params) + bytes, err := base64.StdEncoding.DecodeString(params) + if err != nil { + blog.Errorf("error when decode cluster params registered by websocket: %s", err.Error()) + return "", false, err + } + if err := json.Unmarshal(bytes, ®isterCluster); err != nil { + blog.Errorf("error when unmarshal cluster params registered by websocket: %s", err.Error()) + return "", false, err + } + + if registerCluster.Address == "" { + return "", false, errors.New("client dialer address is empty") + } + + // bcs-kube-agent must report ca and usertoken + if moduleName == KubeAgentModule { + if registerCluster.CACert == "" || registerCluster.UserToken == "" { + return "", false, errors.New("address or cacert or token empty") + } + } + + var caCert string + if registerCluster.CACert != "" { + certBytes, err := base64.StdEncoding.DecodeString(registerCluster.CACert) + if err != nil { + blog.Errorf("error when decode cluster [%s] cacert registered by websocket: %s", clusterId, err.Error()) + return "", false, err + } + caCert = string(certBytes) + } + + // validate if the registerToken is correct + token := sqlstore.GetRegisterToken(clusterId) + if token == nil { + blog.Info("haha") + return "", false, nil + } + if token.Token != registerToken { + return "", false, nil + } + + if moduleName == KubeAgentModule { + // for k8s, the registerCluster.Address is kubernetes service url, just save to db + err = sqlstore.SaveWsCredentials(clusterId, moduleName, registerCluster.Address, caCert, registerCluster.UserToken) + if err != nil { + blog.Errorf("error when save websocket credentials: %s", err.Error()) + return "", false, err + } + return clusterId, true, nil + } else if moduleName == MesosDriverModule || moduleName == K8sDriverModule { + // for mesos, the registerCluster.Address is mesos-driver url. one mesos cluster may have 3 or more mesos-driver, + // so we should distinguish them, so use {clusterId}-{ip:port} as serverKey + url, err := url.Parse(registerCluster.Address) + if err != nil { + return "", false, nil + } + serverKey := clusterId + "-" + url.Host + err = sqlstore.SaveWsCredentials(serverKey, moduleName, registerCluster.Address, caCert, registerCluster.UserToken) + if err != nil { + blog.Errorf("error when save websocket credentials: %s", err.Error()) + return "", false, err + } + return serverKey, true, nil + } + + return "", false, errors.New("unknown client module") +} + +// NewTunnelServer create websocket tunnel server +func NewTunnelServer() *websocketDialer.Server { + DefaultTunnelServer = websocketDialer.New(authorizeTunnel, websocketDialer.DefaultErrorWriter, cleanCredentials) + return DefaultTunnelServer +} + +// cleanCredentials clean client credentials in db +func cleanCredentials(serverKey string) { + sqlstore.DelWsCredentials(serverKey) +} diff --git a/bcs-services/bcs-user-manager/app/tunnel/tunnel-server_test.go b/bcs-services/bcs-user-manager/app/tunnel/tunnel-server_test.go new file mode 100644 index 0000000000..fe5c1b68ea --- /dev/null +++ b/bcs-services/bcs-user-manager/app/tunnel/tunnel-server_test.go @@ -0,0 +1,61 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package tunnel + +import ( + "encoding/base64" + "encoding/json" + "net/http" + "strings" + "testing" +) + +// TestAuthorizeTunnel test the authorizeTunnel func +func TestAuthorizeTunnel(t *testing.T) { + req := &http.Request{ + Header: make(http.Header), + } + _, _, err := authorizeTunnel(req) + if err == nil || !strings.Contains(err.Error(), "module empty") { + t.Error("failed to handle request whit empty module") + } + + req.Header.Set(Module, "kube-agent") + _, _, err = authorizeTunnel(req) + if err == nil || !strings.Contains(err.Error(), "registerToken empty") { + t.Error("failed to handle request whit empty registerToken") + } + + req.Header.Set(RegisterToken, "abcdefg") + _, _, err = authorizeTunnel(req) + if err == nil || !strings.Contains(err.Error(), "clusterId empty") { + t.Error("failed to handle request whit empty cluster") + } + + req.Header.Set(Cluster, "k8s-001") + _, _, err = authorizeTunnel(req) + if err == nil { + t.Error("failed to handle request whit empty Params") + } + + params := map[string]interface{}{ + "address": "http:127.0.0.1:80", + } + bytes, err := json.Marshal(params) + req.Header.Set(Params, base64.StdEncoding.EncodeToString(bytes)) + _, _, err = authorizeTunnel(req) + if err == nil || !strings.Contains(err.Error(), "address or cacert or token empty") { + t.Error("failed to handle request whit empty cacert or usertoken when module is bcs-kube-agent") + } +} diff --git a/bcs-services/bcs-user-manager/app/user-manager/models/cluster.go b/bcs-services/bcs-user-manager/app/user-manager/models/cluster.go index 5ddb3fa903..d5eac7218d 100644 --- a/bcs-services/bcs-user-manager/app/user-manager/models/cluster.go +++ b/bcs-services/bcs-user-manager/app/user-manager/models/cluster.go @@ -42,3 +42,14 @@ type BcsClusterCredential struct { CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } + +type BcsWsClusterCredentials struct { + ID uint `gorm:"primary_key"` + ServerKey string `gorm:"unique;not null"` + ClientModule string `gorm:"not null"` + ServerAddress string `gorm:"size:2048"` + CaCertData string `gorm:"size:4096"` + UserToken string `gorm:"size:2048"` + CreatedAt time.Time + UpdatedAt time.Time +} diff --git a/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore/credentials.go b/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore/credentials.go index 96828b1c9d..38faba4329 100644 --- a/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore/credentials.go +++ b/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore/credentials.go @@ -48,3 +48,40 @@ func ListCredentials() []models.BcsClusterCredential { return credentials } + +// SaveWsCredentials saves the credentials of cluster registered by websocket +func SaveWsCredentials(serverKey, clientModule, serverAddress, caCertData, userToken string) error { + var credentials models.BcsWsClusterCredentials + // Create or update, source: https://github.com/jinzhu/gorm/issues/1307 + dbScoped := GCoreDB.Where(models.BcsWsClusterCredentials{ServerKey: serverKey}).Assign( + models.BcsWsClusterCredentials{ + ClientModule: clientModule, + ServerAddress: serverAddress, + CaCertData: caCertData, + UserToken: userToken, + }, + ).FirstOrCreate(&credentials) + return dbScoped.Error +} + +// GetWsCredentials query for clusterCredentials of cluster registered by websocket +func GetWsCredentials(serverKey string) *models.BcsWsClusterCredentials { + credentials := models.BcsWsClusterCredentials{} + GCoreDB.Where(&models.BcsWsClusterCredentials{ServerKey: serverKey}).First(&credentials) + if credentials.ID != 0 { + return &credentials + } + return nil +} + +func DelWsCredentials(serverKey string) { + credentials := models.BcsWsClusterCredentials{} + GCoreDB.Where(&models.BcsWsClusterCredentials{ServerKey: serverKey}).Delete(&credentials) +} + +func GetWsCredentialsByClusterId(clusterId string) []*models.BcsWsClusterCredentials { + var credentials []*models.BcsWsClusterCredentials + query := clusterId + "-%" + GCoreDB.Where("server_key LIKE ?", query).Find(&credentials) + return credentials +} diff --git a/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore/register_token.go b/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore/register-token.go similarity index 100% rename from bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore/register_token.go rename to bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore/register-token.go diff --git a/bcs-services/bcs-user-manager/app/user-manager/store.go b/bcs-services/bcs-user-manager/app/user-manager/store.go index 91fadc1ac0..e31e5ed53f 100644 --- a/bcs-services/bcs-user-manager/app/user-manager/store.go +++ b/bcs-services/bcs-user-manager/app/user-manager/store.go @@ -41,6 +41,7 @@ func SetupStore(conf *config.UserMgrConfig) error { &models.BcsRole{}, &models.BcsUserResourceRole{}, &models.TkeCidr{}, + &models.BcsWsClusterCredentials{}, //compatible with bcs-api to sync old data &m.ClusterCredentials{}, diff --git a/bcs-services/bcs-user-manager/app/user-manager/user-manager.go b/bcs-services/bcs-user-manager/app/user-manager/user-manager.go index 1d062139bf..febb1ca0ba 100644 --- a/bcs-services/bcs-user-manager/app/user-manager/user-manager.go +++ b/bcs-services/bcs-user-manager/app/user-manager/user-manager.go @@ -16,15 +16,23 @@ package user_manager import ( "encoding/json" "fmt" + "net/http" "os" "time" + "github.com/Tencent/bk-bcs/bcs-common/common" "github.com/Tencent/bk-bcs/bcs-common/common/RegisterDiscover" "github.com/Tencent/bk-bcs/bcs-common/common/blog" + bcshttp "github.com/Tencent/bk-bcs/bcs-common/common/http" "github.com/Tencent/bk-bcs/bcs-common/common/http/httpserver" "github.com/Tencent/bk-bcs/bcs-common/common/types" "github.com/Tencent/bk-bcs/bcs-common/common/version" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/tunnel" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/tunnel-handler/k8s" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/tunnel-handler/mesos" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/tunnel-handler/mesos/webconsole" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/v1http" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/utils" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/config" "github.com/emicklei/go-restful" ) @@ -56,12 +64,39 @@ func (u *UserManager) Start() error { return err } + // start and manage websocket tunnel + tunnelServer := tunnel.NewTunnelServer() + err = tunnel.StartPeerManager(u.config, tunnelServer) + if err != nil { + blog.Errorf("failed to start peermanager: %s", err.Error()) + return err + } + + // usermanager api ws := u.httpServ.NewWebService("/usermanager", nil) u.initRouters(ws) + // mesos api to use websocket tunnel + u.httpServ.RegisterWebServer("/mesosdriver/v4", Filter, mesos.GetApiAction()) + router := u.httpServ.GetRouter() webContainer := u.httpServ.GetWebContainer() + + // handle websocket tunnel register + router.Handle("/usermanager/v1/websocket/connect", tunnelServer) + + // handle user and cluster manager request router.Handle("/usermanager/{sub_path:.*}", webContainer) + + // handle k8s request with websocket tunnel + router.Handle("/tunnels/clusters/{cluster_id}/{sub_path:.*}", k8s.DefaultTunnelProxyDispatcher) + + //handle mesos webconsole request with websocket tunnel + router.Handle("/mesosdriver/v4/webconsole/{sub_path:.*}", webconsole.NewWebconsoleProxy()) + + //handle mesos request with websocket tunnel + router.Handle("/mesosdriver/v4/{sub_path:.*}", webContainer) + if err := u.httpServ.ListenAndServeMux(u.config.VerifyClientTLS); err != nil { return fmt.Errorf("http ListenAndServe error %s", err.Error()) } @@ -69,6 +104,24 @@ func (u *UserManager) Start() error { return nil } +// Filter authenticate the request +func Filter(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) { + // first authenticate the request, only admin user be allowed + auth := utils.Authenticate(req.Request) + if !auth { + resp.WriteHeaderAndEntity(http.StatusUnauthorized, bcshttp.APIRespone{ + Result: false, + Code: common.BcsErrApiUnauthorized, + Message: "must provide admin token to request with websocket tunnel", + Data: nil, + }) + return + } + + chain.ProcessFilter(req, resp) +} + +// initRouters init usermanager http router func (u *UserManager) initRouters(ws *restful.WebService) { v1http.InitV1Routers(ws) } diff --git a/bcs-services/bcs-user-manager/app/user-manager/v1http/auth.go b/bcs-services/bcs-user-manager/app/user-manager/v1http/auth.go index 041221f58f..45ed6eb873 100644 --- a/bcs-services/bcs-user-manager/app/user-manager/v1http/auth.go +++ b/bcs-services/bcs-user-manager/app/user-manager/v1http/auth.go @@ -22,7 +22,7 @@ import ( "github.com/Tencent/bk-bcs/bcs-common/common/blog" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/models" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore" - "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/utils" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/utils" "github.com/emicklei/go-restful" ) @@ -50,13 +50,11 @@ func (ta *TokenAuthenticater) GetUser() (*models.BcsUser, bool) { user, hasExpired := ta.GetUserFromToken(tokenString) if user == nil { - blog.Warnf("No user can be found by token:%s", tokenString) return user, hasExpired } else if hasExpired { blog.Warnf("usertoken has been expired: %s", tokenString) return user, hasExpired } else { - blog.Debug(fmt.Sprintf("User:%s found by token:%s", user.Name, tokenString)) return user, false } } diff --git a/bcs-services/bcs-user-manager/app/user-manager/v1http/cluster.go b/bcs-services/bcs-user-manager/app/user-manager/v1http/cluster.go index 5d0c0a5b64..57939816a6 100644 --- a/bcs-services/bcs-user-manager/app/user-manager/v1http/cluster.go +++ b/bcs-services/bcs-user-manager/app/user-manager/v1http/cluster.go @@ -22,7 +22,7 @@ import ( "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/metrics" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/models" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore" - "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/utils" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/utils" "github.com/emicklei/go-restful" ) diff --git a/bcs-services/bcs-user-manager/app/user-manager/v1http/credentials.go b/bcs-services/bcs-user-manager/app/user-manager/v1http/credentials.go index da94073eba..eac22bbe80 100644 --- a/bcs-services/bcs-user-manager/app/user-manager/v1http/credentials.go +++ b/bcs-services/bcs-user-manager/app/user-manager/v1http/credentials.go @@ -22,7 +22,7 @@ import ( "github.com/Tencent/bk-bcs/bcs-common/common/blog" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/metrics" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore" - "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/utils" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/utils" "github.com/emicklei/go-restful" ) diff --git a/bcs-services/bcs-user-manager/app/user-manager/v1http/permission.go b/bcs-services/bcs-user-manager/app/user-manager/v1http/permission.go index 122d6f2112..52843f9a70 100644 --- a/bcs-services/bcs-user-manager/app/user-manager/v1http/permission.go +++ b/bcs-services/bcs-user-manager/app/user-manager/v1http/permission.go @@ -25,7 +25,7 @@ import ( "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/metrics" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/models" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore" - "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/utils" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/utils" "github.com/emicklei/go-restful" ) diff --git a/bcs-services/bcs-user-manager/app/user-manager/v1http/register_token.go b/bcs-services/bcs-user-manager/app/user-manager/v1http/register-token.go similarity index 83% rename from bcs-services/bcs-user-manager/app/user-manager/v1http/register_token.go rename to bcs-services/bcs-user-manager/app/user-manager/v1http/register-token.go index 21b509a6db..4b43bc0561 100644 --- a/bcs-services/bcs-user-manager/app/user-manager/v1http/register_token.go +++ b/bcs-services/bcs-user-manager/app/user-manager/v1http/register-token.go @@ -21,7 +21,7 @@ import ( "github.com/Tencent/bk-bcs/bcs-common/common/blog" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/metrics" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore" - "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/utils" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/utils" "github.com/emicklei/go-restful" ) @@ -30,6 +30,16 @@ func CreateRegisterToken(request *restful.Request, response *restful.Response) { start := time.Now() clusterId := request.PathParameter("cluster_id") + clusterInDb := sqlstore.GetCluster(clusterId) + if clusterInDb == nil { + metrics.RequestErrorCount.WithLabelValues("register-token", request.Request.Method).Inc() + metrics.RequestErrorLatency.WithLabelValues("register-token", request.Request.Method).Observe(time.Since(start).Seconds()) + blog.Warnf("create register_token failed, cluster [%s] not exist", clusterId) + message := fmt.Sprintf("errcode: %d, create register_token failed, cluster [%s] not exist", common.BcsErrApiBadRequest, clusterId) + utils.WriteClientError(response, common.BcsErrApiBadRequest, message) + return + } + err := sqlstore.CreateRegisterToken(clusterId) if err != nil { metrics.RequestErrorCount.WithLabelValues("register-token", request.Request.Method).Inc() diff --git a/bcs-services/bcs-user-manager/app/user-manager/v1http/tke.go b/bcs-services/bcs-user-manager/app/user-manager/v1http/tke.go index 90614331ca..ca0ed706a7 100644 --- a/bcs-services/bcs-user-manager/app/user-manager/v1http/tke.go +++ b/bcs-services/bcs-user-manager/app/user-manager/v1http/tke.go @@ -23,7 +23,7 @@ import ( "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/external-cluster/tke" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/models" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore" - "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/utils" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/utils" "github.com/emicklei/go-restful" ) diff --git a/bcs-services/bcs-user-manager/app/user-manager/v1http/user.go b/bcs-services/bcs-user-manager/app/user-manager/v1http/user.go index af7d080637..802e404282 100644 --- a/bcs-services/bcs-user-manager/app/user-manager/v1http/user.go +++ b/bcs-services/bcs-user-manager/app/user-manager/v1http/user.go @@ -23,7 +23,7 @@ import ( "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/metrics" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/models" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore" - "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/utils" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/utils" "github.com/dchest/uniuri" "github.com/emicklei/go-restful" diff --git a/bcs-services/bcs-user-manager/app/utils/auth.go b/bcs-services/bcs-user-manager/app/utils/auth.go new file mode 100644 index 0000000000..90e8ee3406 --- /dev/null +++ b/bcs-services/bcs-user-manager/app/utils/auth.go @@ -0,0 +1,52 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package utils + +import ( + "net/http" + "strings" + + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/models" + "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore" +) + +// Authenticate only authenticate admin user token +func Authenticate(req *http.Request) bool { + var token string + authHeaderList := req.Header["Authorization"] + if len(authHeaderList) > 0 { + authHeader := strings.Split(authHeaderList[0], " ") + if len(authHeader) == 2 && authHeader[0] == "Bearer" { + token = strings.TrimSpace(authHeader[1]) + } + } + // if not specified token, authenticate failed + if token == "" { + return false + } + + u := models.BcsUser{ + UserToken: token, + } + user := sqlstore.GetUserByCondition(&u) + if user == nil { + return false + } + + // only authenticate admin user + if user.UserType == sqlstore.AdminUser && !user.HasExpired() { + return true + } + return false +} diff --git a/bcs-services/bcs-user-manager/app/utils/kube-api.go b/bcs-services/bcs-user-manager/app/utils/kube-api.go new file mode 100644 index 0000000000..62949b983f --- /dev/null +++ b/bcs-services/bcs-user-manager/app/utils/kube-api.go @@ -0,0 +1,69 @@ +/* + * Tencent is pleased to support the open source community by making Blueking Container Service available. + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under + * the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package utils + +import ( + "encoding/json" + "net/http" + + "github.com/Tencent/bk-bcs/bcs-services/bcs-api/pkg/apis/blueking.io/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/validation/field" +) + +var ClusterGroupKind = v1.Kind("Cluster") +var ClusterResource = v1.Resource("Clusters") + +// WriteKubeAPIError writes a standard error response +func WriteKubeAPIError(rw http.ResponseWriter, err *errors.StatusError) { + payload, _ := json.Marshal(err.ErrStatus) + + rw.Header().Set("Content-Type", "application/json") + rw.WriteHeader(int(err.ErrStatus.Code)) + rw.Write(payload) +} + +// NewNotFound returns a new error which indicates that the resource of the kind and the name was not found. +func NewNotFound(qualifiedResource schema.GroupResource, name string, message string) *errors.StatusError { + status := errors.NewNotFound(qualifiedResource, name) + status.ErrStatus.Message = message + return status +} + +func NewForbidden(qualifiedResource schema.GroupResource, name string, err error) *errors.StatusError { + status := errors.NewForbidden(qualifiedResource, name, err) + return status +} + +func NewInternalError(err error) *errors.StatusError { + status := errors.NewInternalError(err) + return status +} + +// NewInvalid returns an error indicating the item is invalid and cannot be processed. +func NewInvalid(qualifiedKind schema.GroupKind, name string, fieldName string, err error) *errors.StatusError { + fieldError := field.Error{ + Field: fieldName, + Detail: err.Error(), + } + errs := field.ErrorList{&fieldError} + status := errors.NewInvalid(qualifiedKind, name, errs) + return status +} + +// NewUnauthorized returns an error indicating the client is not authorized to perform the requested action. +func NewUnauthorized(reason string) *errors.StatusError { + return errors.NewUnauthorized(reason) +} diff --git a/bcs-services/bcs-user-manager/app/user-manager/utils/shortcut.go b/bcs-services/bcs-user-manager/app/utils/shortcut.go similarity index 100% rename from bcs-services/bcs-user-manager/app/user-manager/utils/shortcut.go rename to bcs-services/bcs-user-manager/app/utils/shortcut.go diff --git a/bcs-services/bcs-user-manager/app/user-manager/utils/utils.go b/bcs-services/bcs-user-manager/app/utils/utils.go similarity index 96% rename from bcs-services/bcs-user-manager/app/user-manager/utils/utils.go rename to bcs-services/bcs-user-manager/app/utils/utils.go index 743a76852a..177cf9e024 100644 --- a/bcs-services/bcs-user-manager/app/user-manager/utils/utils.go +++ b/bcs-services/bcs-user-manager/app/utils/utils.go @@ -20,7 +20,6 @@ import ( "strings" "github.com/Tencent/bk-bcs/bcs-common/common" - "github.com/Tencent/bk-bcs/bcs-common/common/blog" bhttp "github.com/Tencent/bk-bcs/bcs-common/common/http" "github.com/asaskevich/govalidator" "gopkg.in/go-playground/validator.v9" @@ -86,7 +85,7 @@ func CreateResponeData(err error, msg string, data interface{}) string { rpyErr = errors.New(bhttp.GetRespone(common.BcsSuccess, common.BcsSuccessStr, data)) } - blog.V(3).Infof("createRespone: %s", rpyErr.Error()) + //blog.V(3).Infof("createRespone: %s", rpyErr.Error()) return rpyErr.Error() } diff --git a/bcs-services/bcs-user-manager/config/config.go b/bcs-services/bcs-user-manager/config/config.go index beeb9815b2..66d8e363d0 100644 --- a/bcs-services/bcs-user-manager/config/config.go +++ b/bcs-services/bcs-user-manager/config/config.go @@ -14,6 +14,8 @@ package config import ( + "crypto/tls" + "github.com/Tencent/bk-bcs/bcs-common/common/static" "github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/options" ) @@ -44,10 +46,12 @@ type UserMgrConfig struct { DSN string BootStrapUsers []options.BootStrapUser TKE options.TKEOptions + PeerToken string } var ( - Tke options.TKEOptions + Tke options.TKEOptions + CliTls *tls.Config ) //NewUserMgrConfig create a config object diff --git a/bcs-services/bcs-user-manager/main.go b/bcs-services/bcs-user-manager/main.go index 1b1ba2cbff..2b6c95c1dc 100644 --- a/bcs-services/bcs-user-manager/main.go +++ b/bcs-services/bcs-user-manager/main.go @@ -14,7 +14,10 @@ package main import ( + "os" + "os/signal" "runtime" + "syscall" "github.com/Tencent/bk-bcs/bcs-common/common/blog" "github.com/Tencent/bk-bcs/bcs-common/common/conf" @@ -33,6 +36,12 @@ func main() { app.Run(op) - ch := make(chan int) - <-ch + // listening OS shutdown singal + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + <-signalChan + + blog.Infof("Got OS shutdown signal, shutting down bcs-user-manager server gracefully...") + + return } diff --git a/bcs-services/bcs-user-manager/options/options.go b/bcs-services/bcs-user-manager/options/options.go index e062585e1b..748d5d7b5b 100644 --- a/bcs-services/bcs-user-manager/options/options.go +++ b/bcs-services/bcs-user-manager/options/options.go @@ -31,6 +31,7 @@ type UserManagerOptions struct { DSN string `json:"mysql_dsn" value:"" usage:"dsn for connect to mysql"` BootStrapUsers []BootStrapUser `json:"bootstrap_users"` TKE TKEOptions `json:"tke"` + PeerToken string `json:"peer_token" value:"" usage:"peer token to authorize with each other, only used to websocket peer"` } // tke api option