Skip to content

Commit

Permalink
Fix Blackhole implemention for e2e tests
Browse files Browse the repository at this point in the history
Based on Fu Wei's idea discussed in the [issue](etcd-io#17737), we employ the blocking on L7 but without using external tools.

A peer will
(a) receive traffic from its peers
(b) initiate connections to its peers (via stream and pipeline).

Thus, the current mechanism of only blocking peer traffic via the peer's existing proxy is insufficient, since only scenario (a) is handled, and scenario (b) is not blocked at all.

We introduce an "HTTP proxy" for each peer, which will be proxying all the connections initiated from a peer to its peers.

The modified architecture will look something like this:
```
A -- A's HTTP proxy ----- B's (currently existing) proxy - B
     ^ newly introduced   ^ in the original codebase
```

By adding this HTTP proxy, we can block all in and out traffic that is initiated from a peer to others, without having to resort to external tools, such as iptables. It's verified that the blocking of traffic is complete, compared to previous solutions [2][3].

The main subtasks are
- set up an environment variable `FORWARD_PROXY`
- implement HTTP proxy by extending the existing proxy server code
- implement enable/disable of the HTTP proxy in the e2e test

The result is that for every peer, we will have the arch like this
```
A -- A's HTTP proxy (connections initiated from A will be forwarded from this proxy)
 |   ^ covers case (b)
 |
 --- A's (currently existing) proxy (advertised to other peers where the connection should come in from)
     ^ covers case (a)
```

- `make gofail-enable && make build && make gofail-disable && \
go test -timeout 60s -run ^TestBlackholeByMockingPartitionLeader$ go.etcd.io/etcd/tests/v3/e2e -v -count=1`
- `make gofail-enable && make build && make gofail-disable && \
go test -timeout 60s -run ^TestBlackholeByMockingPartitionFollower$ go.etcd.io/etcd/tests/v3/e2e -v -count=1`

- I run into `context deadline exceeded` sometimes
```
    etcd_mix_versions_test.go:175:
                Error Trace:    /Users/henrybear327/go/src/etcd/tests/e2e/etcd_mix_versions_test.go:175
                                                        /Users/henrybear327/go/src/etcd/tests/e2e/blackhole_test.go:75
                                                        /Users/henrybear327/go/src/etcd/tests/e2e/blackhole_test.go:31
                Error:          Received unexpected error:
                                [/Users/henrybear327/go/src/etcd/bin/etcdctl --endpoints=http://localhost:20006 put key-0 value-0] match not found.  Set EXPECT_DEBUG for more info Errs: [unexpected exit code [1] after running [/Users/henrybear327/go/src/etcd/bin/etcdctl --endpoints=http://localhost:20006 put key-0 value-0]], last lines:
                                {"level":"warn","ts":"2024-05-05T23:02:36.809726+0800","logger":"etcd-client","caller":"v3@v3.6.0-alpha.0/retry_interceptor.go:65","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0x140001ee960/localhost:20006","method":"/etcdserverpb.KV/Put","attempt":0,"error":"rpc error: code = DeadlineExceeded desc = context deadline exceeded"}
                                Error: context deadline exceeded
                                 (expected "OK", got []). Try EXPECT_DEBUG=TRUE
                Test:           TestBlackholeByMockingPartitionLeader
                Messages:       failed to put "key-0", error: [/Users/henrybear327/go/src/etcd/bin/etcdctl --endpoints=http://localhost:20006 put key-0 value-0] match not found.  Set EXPECT_DEBUG for more info Errs: [unexpected exit code [1] after running [/Users/henrybear327/go/src/etcd/bin/etcdctl --endpoints=http://localhost:20006 put key-0 value-0]], last lines:
                                {"level":"warn","ts":"2024-05-05T23:02:36.809726+0800","logger":"etcd-client","caller":"v3@v3.6.0-alpha.0/retry_interceptor.go:65","msg":"retrying of unary invoker failed","target":"etcd-endpoints://0x140001ee960/localhost:20006","method":"/etcdserverpb.KV/Put","attempt":0,"error":"rpc error: code = DeadlineExceeded desc = context deadline exceeded"}
                                Error: context deadline exceeded
                                 (expected "OK", got []). Try EXPECT_DEBUG=TRUE
```

[References]
[1] issue etcd-io#17737
[2] PR (V1) https://github.com/henrybear327/etcd/tree/fix/e2e_blackhole
[3] PR (V2) etcd-io#17891

Signed-off-by: Chun-Hung Tseng <henrybear327@gmail.com>
  • Loading branch information
henrybear327 committed May 6, 2024
1 parent 137349b commit 0e88e7c
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 32 deletions.
14 changes: 13 additions & 1 deletion client/pkg/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
)
Expand All @@ -31,7 +33,17 @@ func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, er
}

t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Proxy: func(req *http.Request) (*url.URL, error) {
// according to the comment of http.ProxyFromEnvironment: if the
// proxy URL is "localhost" (with or without a port number),
// then a nil URL and nil error will be returned.
// Thus, we need to workaround this by manually setting an
// ENV named FORWARD_PROXY and parse the URL (which is a localhost in our case)
if forwardProxy, exists := os.LookupEnv("FORWARD_PROXY"); exists {
return url.Parse(forwardProxy)
}
return http.ProxyFromEnvironment(req)
},
DialContext: (&net.Dialer{
Timeout: dialtimeoutd,
// value taken from http.DefaultTransport
Expand Down
90 changes: 74 additions & 16 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package proxy

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -130,18 +132,21 @@ type Server interface {

// ServerConfig defines proxy server configuration.
type ServerConfig struct {
Logger *zap.Logger
From url.URL
To url.URL
TLSInfo transport.TLSInfo
DialTimeout time.Duration
BufferSize int
RetryInterval time.Duration
Logger *zap.Logger
From url.URL
To url.URL
TLSInfo transport.TLSInfo
DialTimeout time.Duration
BufferSize int
RetryInterval time.Duration
IsForwardProxy bool
}

type server struct {
lg *zap.Logger

isForwardProxy bool

from url.URL
fromPort int
to url.URL
Expand Down Expand Up @@ -194,6 +199,8 @@ func NewServer(cfg ServerConfig) Server {
s := &server{
lg: cfg.Logger,

isForwardProxy: cfg.IsForwardProxy,

from: cfg.From,
to: cfg.To,

Expand All @@ -216,10 +223,12 @@ func NewServer(cfg ServerConfig) Server {
if err == nil {
s.fromPort, _ = strconv.Atoi(fromPort)
}
var toPort string
_, toPort, err = net.SplitHostPort(cfg.To.Host)
if err == nil {
s.toPort, _ = strconv.Atoi(toPort)
if !s.isForwardProxy {
var toPort string
_, toPort, err = net.SplitHostPort(cfg.To.Host)
if err == nil {
s.toPort, _ = strconv.Atoi(toPort)
}
}

if s.dialTimeout == 0 {
Expand All @@ -239,8 +248,10 @@ func NewServer(cfg ServerConfig) Server {
if strings.HasPrefix(s.from.Scheme, "http") {
s.from.Scheme = "tcp"
}
if strings.HasPrefix(s.to.Scheme, "http") {
s.to.Scheme = "tcp"
if !s.isForwardProxy {
if strings.HasPrefix(s.to.Scheme, "http") {
s.to.Scheme = "tcp"
}
}

addr := fmt.Sprintf(":%d", s.fromPort)
Expand Down Expand Up @@ -273,7 +284,10 @@ func (s *server) From() string {
}

func (s *server) To() string {
return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host)
if !s.isForwardProxy {
return fmt.Sprintf("%s://%s", s.to.Scheme, s.to.Host)
}
return ""
}

// TODO: implement packet reordering from multiple TCP connections
Expand Down Expand Up @@ -353,6 +367,40 @@ func (s *server) listenAndServe() {
continue
}

parseHeaderForDestination := func() string {
// the first request should always contain a CONNECT header field
// since we set the transport to forward the traffic to the proxy
buf := make([]byte, s.bufferSize)
var data []byte
if nr1, err := in.Read(buf); err != nil {
if err == io.EOF {
panic("No data available for forward proxy to work on")
}
} else {
data = buf[:nr1]
}

// attempt to parse for the HOST from the CONNECT request
var req *http.Request
if req, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(data))); err != nil {
panic("Failed to parse header in forward proxy")
}

if req.Method == http.MethodConnect {
// make sure a reply is sent back to the client
connectResponse := &http.Response{
StatusCode: 200,
ProtoMajor: 1,
ProtoMinor: 1,
}
connectResponse.Write(in)

return req.URL.Host
}

panic("Wrong header type to start the connection")
}

var out net.Conn
if !s.tlsInfo.Empty() {
var tp *http.Transport
Expand All @@ -370,9 +418,19 @@ func (s *server) listenAndServe() {
}
continue
}
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
if s.isForwardProxy {
dest := parseHeaderForDestination()
out, err = tp.DialContext(ctx, "tcp", dest)
} else {
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
}
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
if s.isForwardProxy {
dest := parseHeaderForDestination()
out, err = net.Dial("tcp", dest)
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
}
}
if err != nil {
select {
Expand Down
11 changes: 8 additions & 3 deletions tests/e2e/blackhole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,20 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
require.NoError(t, epc.Close(), "failed to close etcd cluster")
}()

leaderId := epc.WaitLeader(t)
mockPartitionNodeIndex := leaderId
leaderID := epc.WaitLeader(t)
mockPartitionNodeIndex := leaderID
if !partitionLeader {
mockPartitionNodeIndex = (leaderId + 1) % (clusterSize)
mockPartitionNodeIndex = (leaderID + 1) % (clusterSize)
}
partitionedMember := epc.Procs[mockPartitionNodeIndex]
// Mock partition
proxy := partitionedMember.PeerProxy()
forwardProxy := partitionedMember.PeerForwardProxy()
t.Logf("Blackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
forwardProxy.BlackholeTx()
forwardProxy.BlackholeRx()

t.Logf("Wait 5s for any open connections to expire")
time.Sleep(5 * time.Second)
Expand All @@ -81,6 +84,8 @@ func blackholeTestByMockingPartition(t *testing.T, clusterSize int, partitionLea
t.Logf("Unblackholing traffic from and to member %q", partitionedMember.Config().Name)
proxy.UnblackholeTx()
proxy.UnblackholeRx()
forwardProxy.UnblackholeTx()
forwardProxy.UnblackholeRx()

leaderEPC = epc.Procs[epc.WaitLeader(t)]
time.Sleep(5 * time.Second)
Expand Down
22 changes: 19 additions & 3 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,13 @@ func (cfg *EtcdProcessClusterConfig) SetInitialOrDiscovery(serverCfg *EtcdServer
func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i int) *EtcdServerProcessConfig {
var curls []string
var curl string
port := cfg.BasePort + 5*i
port := cfg.BasePort + 6*i
clientPort := port
peerPort := port + 1
peerPort := port + 1 // the port that the peer actually listens on
metricsPort := port + 2
peer2Port := port + 3
peer2Port := port + 3 // the port that the peer advertises
clientHTTPPort := port + 4
forwardProxyPort := port + 5 // the port of the forward proxy

if cfg.Client.ConnectionType == ClientTLSAndNonTLS {
curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS)
Expand All @@ -499,6 +500,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
peerListenURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
peerAdvertiseURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
var proxyCfg *proxy.ServerConfig
var forwardProxyCfg *proxy.ServerConfig
if cfg.PeerProxy {
if !cfg.IsPeerTLS {
panic("Can't use peer proxy without peer TLS as it can result in malformed packets")
Expand All @@ -509,6 +511,19 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
To: peerListenURL,
From: peerAdvertiseURL,
}

// setup forward proxy
forwardProxyURL := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", forwardProxyPort)}
forwardProxyCfg = &proxy.ServerConfig{
Logger: zap.NewNop(),
From: forwardProxyURL,
IsForwardProxy: true,
}

if cfg.EnvVars == nil {
cfg.EnvVars = make(map[string]string)
}
cfg.EnvVars["FORWARD_PROXY"] = fmt.Sprintf("http://127.0.0.1:%d", forwardProxyPort)
}

name := fmt.Sprintf("%s-test-%d", testNameCleanRegex.ReplaceAllString(tb.Name(), ""), i)
Expand Down Expand Up @@ -631,6 +646,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
GoFailPort: gofailPort,
GoFailClientTimeout: cfg.GoFailClientTimeout,
Proxy: proxyCfg,
ForwardProxy: forwardProxyCfg,
LazyFSEnabled: cfg.LazyFSEnabled,
}
}
Expand Down
34 changes: 28 additions & 6 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type EtcdProcess interface {
Close() error
Config() *EtcdServerProcessConfig
PeerProxy() proxy.Server
PeerForwardProxy() proxy.Server
Failpoints() *BinaryFailpoints
LazyFS() *LazyFS
Logs() LogsExpect
Expand All @@ -69,12 +70,13 @@ type LogsExpect interface {
}

type EtcdServerProcess struct {
cfg *EtcdServerProcessConfig
proc *expect.ExpectProcess
proxy proxy.Server
lazyfs *LazyFS
failpoints *BinaryFailpoints
donec chan struct{} // closed when Interact() terminates
cfg *EtcdServerProcessConfig
proc *expect.ExpectProcess
proxy proxy.Server
forwardProxy proxy.Server
lazyfs *LazyFS
failpoints *BinaryFailpoints
donec chan struct{} // closed when Interact() terminates
}

type EtcdServerProcessConfig struct {
Expand Down Expand Up @@ -102,6 +104,7 @@ type EtcdServerProcessConfig struct {

LazyFSEnabled bool
Proxy *proxy.ServerConfig
ForwardProxy *proxy.ServerConfig
}

func NewEtcdServerProcess(t testing.TB, cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) {
Expand Down Expand Up @@ -159,6 +162,14 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error {
case err := <-ep.proxy.Error():
return err
}

ep.cfg.lg.Info("starting forward proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.ForwardProxy.From.String()), zap.String("to", ep.cfg.ForwardProxy.To.String()))
ep.forwardProxy = proxy.NewServer(*ep.cfg.ForwardProxy)
select {
case <-ep.forwardProxy.Ready():
case err := <-ep.forwardProxy.Error():
return err
}
}
if ep.lazyfs != nil {
ep.cfg.lg.Info("starting lazyfs...", zap.String("name", ep.cfg.Name))
Expand Down Expand Up @@ -222,6 +233,13 @@ func (ep *EtcdServerProcess) Stop() (err error) {
}
ep.cfg.lg.Info("stopped server.", zap.String("name", ep.cfg.Name))
if ep.proxy != nil {
ep.cfg.lg.Info("stopping forward proxy...", zap.String("name", ep.cfg.Name))
err = ep.forwardProxy.Close()
ep.forwardProxy = nil
if err != nil {
return err
}

ep.cfg.lg.Info("stopping proxy...", zap.String("name", ep.cfg.Name))
err = ep.proxy.Close()
ep.proxy = nil
Expand Down Expand Up @@ -330,6 +348,10 @@ func (ep *EtcdServerProcess) PeerProxy() proxy.Server {
return ep.proxy
}

func (ep *EtcdServerProcess) PeerForwardProxy() proxy.Server {
return ep.forwardProxy
}

func (ep *EtcdServerProcess) LazyFS() *LazyFS {
return ep.lazyfs
}
Expand Down
9 changes: 6 additions & 3 deletions tests/robustness/failpoint/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,21 @@ func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, proces

func Blackhole(ctx context.Context, t *testing.T, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error {
proxy := member.PeerProxy()
forwardProxy := member.PeerForwardProxy()

// Blackholing will cause peers to not be able to use streamWriters registered with member
// but peer traffic is still possible because member has 'pipeline' with peers
// TODO: find a way to stop all traffic
t.Logf("Blackholing traffic from and to member %q", member.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()
forwardProxy.BlackholeTx()
forwardProxy.BlackholeRx()
defer func() {
t.Logf("Traffic restored from and to member %q", member.Config().Name)
proxy.UnblackholeTx()
proxy.UnblackholeRx()
forwardProxy.UnblackholeTx()
forwardProxy.UnblackholeRx()
}()

if shouldWaitTillSnapshot {
return waitTillSnapshot(ctx, t, clus, member)
}
Expand Down

0 comments on commit 0e88e7c

Please sign in to comment.