Skip to content

Commit

Permalink
feature: add websocket tunnel to usermanager, to adapt the bcs-api-ga…
Browse files Browse the repository at this point in the history
…teway architecture. issue TencentBlueKing#521
  • Loading branch information
bryanhe-bupt committed Jul 9, 2020
1 parent 75f457c commit 9d7adba
Show file tree
Hide file tree
Showing 42 changed files with 1,939 additions and 32 deletions.
1 change: 0 additions & 1 deletion bcs-common/common/encrypt/des.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"crypto/cipher"
"crypto/des"
"encoding/base64"

"github.com/Tencent/bk-bcs/bcs-common/common/static"
)

Expand Down
2 changes: 1 addition & 1 deletion bcs-common/common/websocketDialer/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *Session) startPings(rootCtx context.Context) {
if err := s.conn.conn.WriteControl(websocket.PingMessage, []byte(""), time.Now().Add(time.Second)); err != nil {
blog.Errorf("Error writing ping: %s", err.Error())
}
blog.Debug("Wrote ping")
//blog.Debug("Wrote ping")
s.conn.Unlock()
}
}
Expand Down
2 changes: 1 addition & 1 deletion bcs-k8s/bcs-kube-agent/app/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func buildWebsocketToBke(cfg *rest.Config) error {

go func() {
for {
wsURL := fmt.Sprintf("wss://%s/bcsapi/v1/websocket/connect", bkeServerUrl.Host)
wsURL := fmt.Sprintf("wss://%s/bcsapi/v4/usermanager/v1/websocket/connect", bkeServerUrl.Host)
blog.Infof("Connecting to %s with token %s", wsURL, registerToken)

websocketDialer.ClientConnect(context.Background(), wsURL, headers, tlsConfig, nil, func(proto, address string) bool {
Expand Down
2 changes: 1 addition & 1 deletion bcs-mesos/bcs-mesos-driver/mesosdriver/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (m *MesosDriver) buildWebsocketToApi() error {

go func() {
for {
wsURL := fmt.Sprintf("wss://%s/bcsapi/v1/websocket/connect", bcsApiUrl.Host)
wsURL := fmt.Sprintf("wss://%s/bcsapi/v4/usermanager/v1/websocket/connect", bcsApiUrl.Host)
blog.Infof("Connecting to %s with token %s", wsURL, m.config.RegisterToken)

websocketDialer.ClientConnect(context.Background(), wsURL, headers, tlsConfig, nil, func(proto, address string) bool {
Expand Down
42 changes: 28 additions & 14 deletions bcs-services/bcs-user-manager/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package app

import (
"fmt"
"os"

"github.com/Tencent/bk-bcs/bcs-common/common"
"github.com/Tencent/bk-bcs/bcs-common/common/blog"
"github.com/Tencent/bk-bcs/bcs-common/common/encrypt"
"github.com/Tencent/bk-bcs/bcs-common/common/ssl"
"github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/metrics"
"github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager"
"github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/config"
Expand All @@ -27,11 +29,27 @@ import (

// Run run app
func Run(op *options.UserManagerOptions) {
conf := parseConfig(op)

conf, err := parseConfig(op)
if err != nil {
blog.Errorf("error parse config: %s", err.Error())
os.Exit(1)
}
if conf.ClientCert.IsSSL {
cliTls, err := ssl.ClientTslConfVerity(conf.ClientCert.CAFile, conf.ClientCert.CertFile, conf.ClientCert.KeyFile, conf.ClientCert.CertPasswd)
if err != nil {
blog.Errorf("set client tls config error %s", err.Error())
} else {
config.CliTls = cliTls
blog.Infof("set client tls config success")
}
}
userManager := user_manager.NewUserManager(conf)

//register to zk
userManager.RegDiscover()

//start userManager, and http service
err := userManager.Start()
err = userManager.Start()
if err != nil {
blog.Errorf("start processor error %s, and exit", err.Error())
os.Exit(1)
Expand All @@ -42,13 +60,11 @@ func Run(op *options.UserManagerOptions) {
blog.Error("fail to save pid: err:%s", err.Error())
}

//register to zk
userManager.RegDiscover()

metrics.RunMetric(conf)
}

func parseConfig(op *options.UserManagerOptions) *config.UserMgrConfig {
// parseConfig parse the option to config
func parseConfig(op *options.UserManagerOptions) (*config.UserMgrConfig, error) {
userMgrConfig := config.NewUserMgrConfig()

userMgrConfig.Address = op.Address
Expand All @@ -60,25 +76,23 @@ func parseConfig(op *options.UserManagerOptions) *config.UserMgrConfig {
userMgrConfig.MetricPort = op.MetricPort
userMgrConfig.BootStrapUsers = op.BootStrapUsers
userMgrConfig.TKE = op.TKE
userMgrConfig.PeerToken = op.PeerToken

config.Tke = op.TKE
secretId, err := encrypt.DesDecryptFromBase([]byte(config.Tke.SecretId))
if err != nil {
blog.Errorf("error decrypting tke secretId and exit: %s", err.Error())
os.Exit(1)
return nil, fmt.Errorf("error decrypting tke secretId and exit: %s", err.Error())
}
config.Tke.SecretId = string(secretId)
secretKey, err := encrypt.DesDecryptFromBase([]byte(config.Tke.SecretKey))
if err != nil {
blog.Errorf("error decrypting tke secretKey and exit: %s", err.Error())
os.Exit(1)
return nil, fmt.Errorf("error decrypting tke secretKey and exit: %s", err.Error())
}
config.Tke.SecretKey = string(secretKey)

dsn, err := encrypt.DesDecryptFromBase([]byte(op.DSN))
if err != nil {
blog.Errorf("error decrypting db config and exit: %s", err.Error())
os.Exit(1)
return nil, fmt.Errorf("error decrypting db config and exit: %s", err.Error())
}
userMgrConfig.DSN = string(dsn)

Expand All @@ -100,5 +114,5 @@ func parseConfig(op *options.UserManagerOptions) *config.UserMgrConfig {
userMgrConfig.ClientCert.IsSSL = true
}

return userMgrConfig
return userMgrConfig, nil
}
34 changes: 34 additions & 0 deletions bcs-services/bcs-user-manager/app/app_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package app

import (
"testing"

"github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/options"
)

// TestParseConfig test the parseConfig func
func TestParseConfig(t *testing.T) {
op := options.UserManagerOptions{
TKE: options.TKEOptions{
SecretId: "abcdefg",
},
}
_, err := parseConfig(&op)
if err == nil {
t.Error("empty EncryptionKey can't encrypt")
}

}
139 changes: 139 additions & 0 deletions bcs-services/bcs-user-manager/app/tunnel-handler/k8s/proxier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package k8s

import (
"fmt"
"net/http"
"strings"
"time"

"github.com/Tencent/bk-bcs/bcs-common/common/blog"
"github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/metrics"
"github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/user-manager/storages/sqlstore"
"github.com/Tencent/bk-bcs/bcs-services/bcs-user-manager/app/utils"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)

var DefaultTunnelProxyDispatcher = NewTunnelProxyDispatcher("cluster_id", "sub_path")

// TunnelProxyDispatcher is the handler which dispatch and proxy the incoming requests to external kube-apiserver with websocket tunnel
type TunnelProxyDispatcher struct {
// ClusterVarName is the path parameter name of cluster_id
ClusterVarName string
// SubPathVarName is the path parameter name of sub-path needs to be forwarded
SubPathVarName string
}

type ClusterHandlerInstance struct {
ServerAddress string
Handler http.Handler
}

// NewTunnelProxyDispatcher new a default TunnelProxyDispatcher
func NewTunnelProxyDispatcher(clusterVarName, subPathVarName string) *TunnelProxyDispatcher {
return &TunnelProxyDispatcher{
ClusterVarName: clusterVarName,
SubPathVarName: subPathVarName,
}
}

func (f *TunnelProxyDispatcher) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
start := time.Now()

// first authenticate the request, only admin user be allowed
auth := utils.Authenticate(req)
if !auth {
status := utils.NewUnauthorized("anonymous requests is forbidden")
utils.WriteKubeAPIError(rw, status)
return
}

vars := mux.Vars(req)
// Get cluster id
clusterId := vars[f.ClusterVarName]

var proxyHandler *ClusterHandlerInstance
// 先从websocket dialer缓存中查找websocket链
websocketHandler, found, err := lookupWsHandler(clusterId)
if err != nil {
blog.Errorf("error when lookup websocket conn: %s", err.Error())
err := fmt.Errorf("error when lookup websocket conn: %s", err.Error())
status := utils.NewInternalError(err)
status.ErrStatus.Reason = "CREATE_TUNNEL_ERROR"
utils.WriteKubeAPIError(rw, status)
return
}
// if found tunnel, use this tunnel to request to kube-apiserver
if found {
blog.Info("found websocket conn for k8s cluster %s", clusterId)
handlerServer := stripLeaveSlash(f.ExtractPathPrefix(req), websocketHandler)
proxyHandler = &ClusterHandlerInstance{
Handler: handlerServer,
}
credentials := sqlstore.GetWsCredentials(clusterId)
bearerToken := "Bearer " + credentials.UserToken
req.Header.Set("Authorization", bearerToken)

// set request scheme
req.URL.Scheme = "https"

// if webconsole long request, then set the latency before ServerHTTP
if websocket.IsWebSocketUpgrade(req) {
metrics.RequestCount.WithLabelValues("k8s_tunnel_request", "websocket").Inc()
metrics.RequestLatency.WithLabelValues("k8s_tunnel_request", "websocket").Observe(time.Since(start).Seconds())
}
proxyHandler.Handler.ServeHTTP(rw, req)
if !websocket.IsWebSocketUpgrade(req) {
metrics.RequestCount.WithLabelValues("k8s_tunnel_request", req.Method).Inc()
metrics.RequestLatency.WithLabelValues("k8s_tunnel_request", req.Method).Observe(time.Since(start).Seconds())
}

return
}

message := "no cluster can be found using given cluster id"
status := utils.NewNotFound(utils.ClusterResource, clusterId, message)
utils.WriteKubeAPIError(rw, status)
return
}

// like http.StripPrefix, but always leaves an initial slash. (so that our
// regexps will work.)
func stripLeaveSlash(prefix string, h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
blog.Debug(fmt.Sprintf("begin proxy for: %s", req.URL.Path))
p := strings.TrimPrefix(req.URL.Path, prefix)
if len(p) >= len(req.URL.Path) {
http.NotFound(w, req)
return
}
if len(p) > 0 && p[:1] != "/" {
p = "/" + p
}
req.URL.Path = p
h.ServeHTTP(w, req)
})
}

// ExtractPathPrefix extracts the path prefix which needs to be stripped when the request is forwarded to the reverse
// proxy handler.
func (f *TunnelProxyDispatcher) ExtractPathPrefix(req *http.Request) string {
subPath := mux.Vars(req)[f.SubPathVarName]
fullPath := req.URL.Path
// We need to strip the prefix string before the request can be forward to apiserver, so we will walk over the full
// request path backwards, everything before the `sub_path` will be the prefix we need to strip
return fullPath[:len(fullPath)-len(subPath)]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package k8s

import (
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/gorilla/mux"
)

type TestHandler struct {
// ClusterVarName is the path parameter name of cluster_id
ClusterVarName string
// SubPathVarName is the path parameter name of sub-path needs to be forwarded
SubPathVarName string
}

type BackendHandler struct {
}

func (b *BackendHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
path := req.URL.Path
rw.Write([]byte(path))
}

var DefaultTestHandler = NewTestHandler("cluster_id", "sub_path")

// NewTestHandler new a default TestHandler
func NewTestHandler(clusterVarName, subPathVarName string) *TestHandler {
return &TestHandler{
ClusterVarName: clusterVarName,
SubPathVarName: subPathVarName,
}
}

func (t *TestHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
backend := &BackendHandler{}
subPath := mux.Vars(req)[t.SubPathVarName]
fullPath := req.URL.Path

handlerServer := stripLeaveSlash(fullPath[:len(fullPath)-len(subPath)], backend)
handlerServer.ServeHTTP(rw, req)
}

func TestStripLeaveSlash(t *testing.T) {
router := mux.NewRouter()
router.Handle("/tunnels/clusters/{cluster_id}/{sub_path:.*}", DefaultTestHandler)

urls := []string{"/tunnels/clusters/k8s-001/version", "/tunnels/clusters/k8s-001/apis/apps/v1/namespaces/default/deployments/test"}
for _, url := range urls {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
t.Fatal(err)
}
rr := httptest.NewRecorder()
router.ServeHTTP(rr, req)
stripPath := strings.TrimPrefix(url, "/tunnels/clusters/k8s-001")
if rr.Body.String() != stripPath {
t.Error("stripLeaveSlash got an unexpected path")
}
}
}

func TestTunnelServeHTTP(t *testing.T) {
req, err := http.NewRequest("GET", "/tunnels/clusters/k8s-001/version", nil)
if err != nil {
t.Fatal(err)
}

rr := httptest.NewRecorder()
DefaultTunnelProxyDispatcher.ServeHTTP(rr, req)
if rr.Code != http.StatusUnauthorized {
t.Error("tunnel api should provide admin token")
}
}

0 comments on commit 9d7adba

Please sign in to comment.