Skip to content

Commit

Permalink
Pilot agent changes to redirect Kubernetes liveness/readiness check. (i…
Browse files Browse the repository at this point in the history
…stio#9204)

* Complete redirection in pilot-agent.

* Protoype work by manual injecting.

* Wrap up the changes for flag definition.

* fix lint.

* use one cmd argument.

* Add server_test.go for unit test.

* add license in the server_test.go

* fix lint.

* Complete redirection in pilot-agent.

* Protoype work by manual injecting.

* Wrap up the changes for flag definition.

* fix lint.

* use one cmd argument.

* Add server_test.go for unit test.

* add license in the server_test.go

* fix lint.

* doc clarify and var rename.

* lint fixing.
  • Loading branch information
Jianfei Hu authored and istio-testing committed Oct 16, 2018
1 parent 26b4b02 commit b412d59
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 5 deletions.
13 changes: 11 additions & 2 deletions pilot/cmd/pilot-agent/main.go
Expand Up @@ -66,8 +66,9 @@ var (
concurrency int
templateFile string
disableInternalTelemetry bool

loggingOptions = log.DefaultOptions()
appReadinessProbeURL string
livenessProbeURL string
loggingOptions = log.DefaultOptions()

rootCmd = &cobra.Command{
Use: "pilot-agent",
Expand Down Expand Up @@ -262,6 +263,8 @@ var (
AdminPort: proxyAdminPort,
StatusPort: statusPort,
ApplicationPorts: parsedPorts,
AppReadinessURL: appReadinessProbeURL,
AppLivenessURL: livenessProbeURL,
})
go statusServer.Run(ctx)
}
Expand Down Expand Up @@ -361,6 +364,12 @@ func init() {
proxyCmd.PersistentFlags().BoolVar(&disableInternalTelemetry, "disableInternalTelemetry", false,
"Disable internal telemetry")

// Flags for Pilot agent to take over Kubernetes readiness and liveness check.
proxyCmd.PersistentFlags().StringVar(&livenessProbeURL, "appLiveUrl", "",
"The url, including path and port, for the application liveness check. Examples, \"/path\", \":8080/path\"")
proxyCmd.PersistentFlags().StringVar(&appReadinessProbeURL, "appReadyUrl", "",
"The url, including path and port for the app readiness check. Examples, \"/path\", \":8080/path\"")

// Attach the Istio logging options to the command.
loggingOptions.AttachCobraFlags(rootCmd)

Expand Down
81 changes: 78 additions & 3 deletions pilot/cmd/pilot-agent/status/server.go
Expand Up @@ -15,41 +15,64 @@
package status

import (
"context"
"fmt"
"net"
"net/http"

"context"
"os"
"strconv"
"strings"
"sync"
"time"

"istio.io/istio/pilot/cmd/pilot-agent/status/ready"
"istio.io/istio/pkg/log"
)

const (
// readyPath is for the pilot agent readiness itself.
readyPath = "/healthz/ready"
// appReadinessPath is the path handled by pilot agent for application's readiness probe.
appReadinessPath = "/app/ready"
// appLivenessPath is the path handled by pilot agent for application's liveness probe.
appLivenessPath = "/app/live"
)

// AppProbeInfo defines the information for Pilot agent to take over application probing.
type AppProbeInfo struct {
Path string
Port uint16
}

// Config for the status server.
type Config struct {
StatusPort uint16
AdminPort uint16
ApplicationPorts []uint16
// AppReadinessURL specifies the path, including the port to take over Kubernetes readiness probe.
// This allows Kubernetes probing to work even mTLS is turned on for the workload.
AppReadinessURL string
// AppLivenessURL specifies the path, including the port to take over Kubernetes liveness probe.
// This allows Kubernetes probing to work even mTLS is turned on for the workload.
AppLivenessURL string
}

// Server provides an endpoint for handling status probes.
type Server struct {
statusPort uint16
ready *ready.Probe
appLiveURL string
appReadyURL string
mutex sync.Mutex
lastProbeSuccessful bool
}

// NewServer creates a new status server.
func NewServer(config Config) *Server {
return &Server{
statusPort: config.StatusPort,
statusPort: config.StatusPort,
appLiveURL: config.AppLivenessURL,
appReadyURL: config.AppReadinessURL,
ready: &ready.Probe{
AdminPort: config.AdminPort,
ApplicationPorts: config.ApplicationPorts,
Expand All @@ -64,11 +87,28 @@ func (s *Server) Run(ctx context.Context) {
// Add the handler for ready probes.
http.HandleFunc(readyPath, s.handleReadyProbe)

// TODO: we require non empty url to take over the health check. Make sure this is consistent in injector.
if s.appReadyURL != "" {
log.Infof("Pilot agent takes over readiness probe, path %v", s.appReadyURL)
http.HandleFunc(appReadinessPath, s.handleAppReadinessProbe)
}

if s.appLiveURL != "" {
log.Infof("Pilot agent takes over liveness probe, path %v", s.appLiveURL)
http.HandleFunc(appLivenessPath, s.handleAppLivenessProbe)
}

l, err := net.Listen("tcp", fmt.Sprintf(":%d", s.statusPort))
if err != nil {
log.Errorf("Error listening on status port: %v", err.Error())
return
}
// for testing.
if s.statusPort == 0 {
addrs := strings.Split(l.Addr().String(), ":")
allocatedPort, _ := strconv.Atoi(addrs[len(addrs)-1])
s.statusPort = uint16(allocatedPort)
}
defer l.Close()

go func() {
Expand Down Expand Up @@ -101,3 +141,38 @@ func (s *Server) handleReadyProbe(w http.ResponseWriter, _ *http.Request) {
s.lastProbeSuccessful = true
}
}

func (s *Server) handleAppReadinessProbe(w http.ResponseWriter, req *http.Request) {
requestStatusCode(fmt.Sprintf("http://127.0.0.1%s", s.appReadyURL), w, req)
}

func (s *Server) handleAppLivenessProbe(w http.ResponseWriter, req *http.Request) {
requestStatusCode(fmt.Sprintf("http://127.0.0.1%s", s.appLiveURL), w, req)
}

func requestStatusCode(appURL string, w http.ResponseWriter, req *http.Request) {
httpClient := &http.Client{
// TODO: figure out the appropriate timeout?
Timeout: 10 * time.Second,
}

appReq, err := http.NewRequest(req.Method, appURL, req.Body)
for key, value := range req.Header {
appReq.Header[key] = value
}

if err != nil {
log.Errorf("Failed to copy request to probe app %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
response, err := httpClient.Do(appReq)
if err != nil {
log.Errorf("Request to probe app failed: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

// We only write the status code to the response.
w.WriteHeader(response.StatusCode)
}
83 changes: 83 additions & 0 deletions pilot/cmd/pilot-agent/status/server_test.go
@@ -0,0 +1,83 @@
// Copyright 2018 Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package status

import (
"context"
"fmt"
"net/http"
"testing"
"time"

"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pkg/log"
"istio.io/istio/pkg/test/application"
"istio.io/istio/pkg/test/application/echo"
)

var (
appPort uint16
)

func init() {
appFactory := &echo.Factory{
Ports: model.PortList{{
Name: "http",
Protocol: model.ProtocolHTTP,
}},
Version: "version-foo",
}
app, err := appFactory.NewApplication(application.Dialer{HTTP: application.DefaultHTTPDoFunc})
if err != nil {
log.Fatalf("Failed to create application %v", err)
}
log.Fatalf("application created %v", app.GetPorts())
appPort = uint16(app.GetPorts()[0].Port)
}

func TestAppProbe(t *testing.T) {
server := NewServer(Config{
StatusPort: 0,
AppReadinessURL: fmt.Sprintf(":%v/", appPort),
})
go server.Run(context.Background())

// We wait a bit here to ensure server's statusPort is updated.
time.Sleep(time.Second * 3)

t.Logf("status server starts at port %v, app starts at port %v", server.statusPort, appPort)
testCases := []struct {
probePath string
statusCode int
err string
}{
{
probePath: fmt.Sprintf(":%v/app/ready", server.statusPort),
statusCode: 200,
},
{
probePath: fmt.Sprintf(":%v/app/live", server.statusPort),
// expect 404 because we didn't configure status server to take over app's liveness check.
statusCode: 404,
},
}
for _, tc := range testCases {
client := http.Client{}
resp, _ := client.Get(fmt.Sprintf("http://localhost%s", tc.probePath))
if resp.StatusCode != tc.statusCode {
t.Errorf("[%v] unexpected status code, want = %v, got = %v", tc.probePath, tc.statusCode, resp.StatusCode)
}
}
}

0 comments on commit b412d59

Please sign in to comment.