Skip to content

Commit

Permalink
democluster,serverutils/regionlatency,rpc: extract code for simulatin…
Browse files Browse the repository at this point in the history
…g latency

We'll want to leverage these helpers in some tests to measure behavior under
simulated latency.

Release note: None
  • Loading branch information
ajwerner committed Nov 22, 2022
1 parent b12438a commit 2d79db8
Show file tree
Hide file tree
Showing 12 changed files with 273 additions and 104 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1898,6 +1898,7 @@ GO_TARGETS = [
"//pkg/testutils/metrictestutils:metrictestutils",
"//pkg/testutils/pgtest:pgtest",
"//pkg/testutils/physicalplanutils:physicalplanutils",
"//pkg/testutils/serverutils/regionlatency:regionlatency",
"//pkg/testutils/serverutils:serverutils",
"//pkg/testutils/skip:skip",
"//pkg/testutils/sqlutils:sqlutils",
Expand Down Expand Up @@ -2953,6 +2954,7 @@ GET_X_DATA_TARGETS = [
"//pkg/testutils/pgtest:get_x_data",
"//pkg/testutils/physicalplanutils:get_x_data",
"//pkg/testutils/serverutils:get_x_data",
"//pkg/testutils/serverutils/regionlatency:get_x_data",
"//pkg/testutils/skip:get_x_data",
"//pkg/testutils/sqlutils:get_x_data",
"//pkg/testutils/storageutils:get_x_data",
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/democluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ go_library(
"demo_cluster.go",
"demo_locality_list.go",
"doc.go",
"region_latencies.go",
"socket_unix.go",
"socket_windows.go",
],
Expand Down Expand Up @@ -37,6 +36,7 @@ go_library(
"//pkg/sql/distsql",
"//pkg/sql/sem/catconstants",
"//pkg/testutils/serverutils",
"//pkg/testutils/serverutils/regionlatency",
"//pkg/util/humanizeutil",
"//pkg/util/log/logcrash",
"//pkg/util/log/logpb",
Expand Down Expand Up @@ -68,6 +68,7 @@ go_test(
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/testutils/serverutils/regionlatency",
"//pkg/testutils/skip",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
38 changes: 13 additions & 25 deletions pkg/cli/democluster/demo_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,28 +319,7 @@ func (c *transientCluster) Start(ctx context.Context) (err error) {
// Now, all servers have been started enough to know their own RPC serving
// addresses, but nothing else. Assemble the artificial latency map.
c.infoLog(ctx, "initializing latency map")
for i, serv := range c.servers {
latencyMap := serv.Cfg.TestingKnobs.Server.(*server.TestingKnobs).ContextTestingKnobs.ArtificialLatencyMap
srcLocality, ok := serv.Cfg.Locality.Find("region")
if !ok {
continue
}
srcLocalityMap, ok := regionToRegionToLatency[srcLocality]
if !ok {
continue
}
for j, dst := range c.servers {
if i == j {
continue
}
dstLocality, ok := dst.Cfg.Locality.Find("region")
if !ok {
continue
}
latency := srcLocalityMap[dstLocality]
latencyMap[dst.ServingRPCAddr()] = latency
}
}
localityLatencies.Apply(c)
}
return nil
}(phaseCtx); err != nil {
Expand Down Expand Up @@ -389,7 +368,8 @@ func (c *transientCluster) Start(ctx context.Context) (err error) {

c.tenantServers = make([]serverutils.TestTenantInterface, c.demoCtx.NumNodes)
for i := 0; i < c.demoCtx.NumNodes; i++ {
latencyMap := c.servers[i].Cfg.TestingKnobs.Server.(*server.TestingKnobs).ContextTestingKnobs.ArtificialLatencyMap
latencyMap := c.servers[i].Cfg.TestingKnobs.Server.(*server.TestingKnobs).
ContextTestingKnobs.InjectedLatencyOracle
c.infoLog(ctx, "starting tenant node %d", i)
tenantStopper := stop.NewStopper()
tenID := uint64(i + 2)
Expand All @@ -409,7 +389,7 @@ func (c *transientCluster) Start(ctx context.Context) (err error) {
TestingKnobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ContextTestingKnobs: rpc.ContextTestingKnobs{
ArtificialLatencyMap: latencyMap,
InjectedLatencyOracle: latencyMap,
},
},
},
Expand Down Expand Up @@ -551,7 +531,7 @@ func (c *transientCluster) createAndAddNode(
// started listening on RPC, and before they proceed with their
// startup routine.
serverKnobs.ContextTestingKnobs = rpc.ContextTestingKnobs{
ArtificialLatencyMap: make(map[string]int),
InjectedLatencyOracle: make(rpc.InjectedLatencyMap),
}
}

Expand Down Expand Up @@ -1418,6 +1398,14 @@ func (c *transientCluster) NumNodes() int {
return len(c.servers)
}

func (c *transientCluster) NumServers() int {
return len(c.servers)
}

func (c *transientCluster) Server(i int) serverutils.TestServerInterface {
return c.servers[i]
}

func (c *transientCluster) GetLocality(nodeID int32) string {
return c.demoCtx.Localities[nodeID-1].String()
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/cli/democluster/demo_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/security/securityassets"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils/regionlatency"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -146,6 +147,7 @@ func TestTransientClusterSimulateLatencies(t *testing.T) {
// Set up an empty 9-node cluster with simulated latencies.
demoCtx.SimulateLatency = true
demoCtx.NumNodes = 9
demoCtx.Localities = defaultLocalities

certsDir := t.TempDir()

Expand Down Expand Up @@ -179,6 +181,8 @@ func TestTransientClusterSimulateLatencies(t *testing.T) {

require.NoError(t, c.Start(ctx))

c.SetSimulatedLatency(true)

for _, tc := range []struct {
desc string
nodeIdx int
Expand Down Expand Up @@ -212,12 +216,14 @@ func TestTransientClusterSimulateLatencies(t *testing.T) {
}
}()
// Find the maximum latency in the cluster from the current node.
var maxLatency time.Duration
for _, latencyMS := range regionToRegionToLatency[tc.region] {
if d := time.Duration(latencyMS) * time.Millisecond; d > maxLatency {
maxLatency = d
var maxLatency regionlatency.RoundTripLatency
localityLatencies.ForEachLatencyFrom(tc.region, func(
_ regionlatency.Region, l regionlatency.OneWayLatency,
) {
if rtt := l * 2; rtt > maxLatency {
maxLatency = rtt
}
}
})

// Attempt to make a query that talks to every node.
// This should take at least maxLatency.
Expand Down
9 changes: 9 additions & 0 deletions pkg/cli/democluster/demo_locality_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ package democluster

import (
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils/regionlatency"
)

// DemoLocalityList represents a list of localities for the cockroach
Expand Down Expand Up @@ -60,3 +62,10 @@ var defaultLocalities = DemoLocalityList{
{Tiers: []roachpb.Tier{{Key: "region", Value: "europe-west1"}, {Key: "az", Value: "c"}}},
{Tiers: []roachpb.Tier{{Key: "region", Value: "europe-west1"}, {Key: "az", Value: "d"}}},
}

// Round-trip latencies collected from http://cloudping.co on 2019-09-11.
var localityLatencies = regionlatency.RoundTripPairs{
{A: "us-east1", B: "us-west1"}: 66 * time.Millisecond,
{A: "us-east1", B: "europe-west1"}: 64 * time.Millisecond,
{A: "us-west1", B: "europe-west1"}: 146 * time.Millisecond,
}.ToLatencyMap()
52 changes: 0 additions & 52 deletions pkg/cli/democluster/region_latencies.go

This file was deleted.

11 changes: 11 additions & 0 deletions pkg/rpc/clock_offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ type RemoteClockMonitor struct {
metrics RemoteClockMetrics
}

// TestingResetLatencyInfos will clear all latency info from the clock monitor.
// It is intended to be used in tests when enabling or disabling injected
// latency.
func (r *RemoteClockMonitor) TestingResetLatencyInfos() {
r.mu.Lock()
defer r.mu.Unlock()
for a := range r.mu.latencyInfos {
delete(r.mu.latencyInfos, a)
}
}

// newRemoteClockMonitor returns a monitor with the given server clock.
func newRemoteClockMonitor(
clock hlc.WallClock,
Expand Down
50 changes: 34 additions & 16 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1446,18 +1446,19 @@ func (rpcCtx *Context) grpcDialOptions(
streamInterceptors = append(append([]grpc.StreamClientInterceptor(nil), streamInterceptors...), testingStreamInterceptor)
}
}
if rpcCtx.Knobs.ArtificialLatencyMap != nil {
if rpcCtx.Knobs.InjectedLatencyOracle != nil {
dialerFunc := func(ctx context.Context, target string) (net.Conn, error) {
dialer := net.Dialer{
LocalAddr: sourceAddr,
}
return dialer.DialContext(ctx, "tcp", target)
}
latency := rpcCtx.Knobs.ArtificialLatencyMap[target]
latency := rpcCtx.Knobs.InjectedLatencyOracle.GetLatency(target)
log.VEventf(rpcCtx.MasterCtx, 1, "connecting to node %s with simulated latency %dms", target, latency)
dialer := artificialLatencyDialer{
dialerFunc: dialerFunc,
latencyMS: latency,
latency: latency,
enabled: rpcCtx.Knobs.InjectedLatencyEnabled,
}
dialerFunc = dialer.dial
dialOpts = append(dialOpts, grpc.WithContextDialer(dialerFunc))
Expand Down Expand Up @@ -1562,7 +1563,8 @@ type dialerFunc func(context.Context, string) (net.Conn, error)

type artificialLatencyDialer struct {
dialerFunc dialerFunc
latencyMS int
latency time.Duration
enabled func() bool
}

func (ald *artificialLatencyDialer) dial(ctx context.Context, addr string) (net.Conn, error) {
Expand All @@ -1572,21 +1574,23 @@ func (ald *artificialLatencyDialer) dial(ctx context.Context, addr string) (net.
}
return &delayingConn{
Conn: conn,
latency: time.Duration(ald.latencyMS) * time.Millisecond,
latency: ald.latency,
enabled: ald.enabled,
readBuf: new(bytes.Buffer),
}, nil
}

type delayingListener struct {
net.Listener
enabled func() bool
}

// NewDelayingListener creates a net.Listener that introduces a set delay on its connections.
func NewDelayingListener(l net.Listener) net.Listener {
return delayingListener{Listener: l}
func NewDelayingListener(l net.Listener, enabled func() bool) net.Listener {
return &delayingListener{Listener: l, enabled: enabled}
}

func (d delayingListener) Accept() (net.Conn, error) {
func (d *delayingListener) Accept() (net.Conn, error) {
c, err := d.Listener.Accept()
if err != nil {
return nil, err
Expand All @@ -1597,6 +1601,7 @@ func (d delayingListener) Accept() (net.Conn, error) {
// as packets are exchanged across the delayingConnections.
latency: time.Duration(0) * time.Millisecond,
readBuf: new(bytes.Buffer),
enabled: d.enabled,
}, nil
}

Expand All @@ -1612,12 +1617,24 @@ func (d delayingListener) Accept() (net.Conn, error) {
// on both ends with x/2 milliseconds of latency.
type delayingConn struct {
net.Conn
enabled func() bool
latency time.Duration
lastSendEnd time.Time
readBuf *bytes.Buffer
}

func (d delayingConn) Write(b []byte) (n int, err error) {
func (d *delayingConn) getLatencyMS() int32 {
if !d.isEnabled() {
return 0
}
return int32(d.latency / time.Millisecond)
}

func (d *delayingConn) isEnabled() bool {
return d.enabled == nil || d.enabled()
}

func (d *delayingConn) Write(b []byte) (n int, err error) {
tNow := timeutil.Now()
if d.lastSendEnd.Before(tNow) {
d.lastSendEnd = tNow
Expand All @@ -1626,7 +1643,7 @@ func (d delayingConn) Write(b []byte) (n int, err error) {
Magic: magic,
ReadTime: d.lastSendEnd.Add(d.latency).UnixNano(),
Sz: int32(len(b)),
DelayMS: int32(d.latency / time.Millisecond),
DelayMS: d.getLatencyMS(),
}
if err := binary.Write(d.Conn, binary.BigEndian, hdr); err != nil {
return n, err
Expand Down Expand Up @@ -1665,9 +1682,9 @@ func (d *delayingConn) Read(b []byte) (n int, err error) {
if d.latency == 0 && hdr.DelayMS != 0 {
d.latency = time.Duration(hdr.DelayMS) * time.Millisecond
}
defer func() {
time.Sleep(timeutil.Until(timeutil.Unix(0, hdr.ReadTime)))
}()
if d.isEnabled() {
defer time.Sleep(timeutil.Until(timeutil.Unix(0, hdr.ReadTime)))
}
if _, err := io.CopyN(d.readBuf, d.Conn, int64(hdr.Sz)); err != nil {
return 0, err
}
Expand Down Expand Up @@ -1756,13 +1773,14 @@ func (rpcCtx *Context) grpcDialRaw(

dialer := onlyOnceDialer{}
dialerFunc := dialer.dial
if rpcCtx.Knobs.ArtificialLatencyMap != nil {
latency := rpcCtx.Knobs.ArtificialLatencyMap[target]
if rpcCtx.Knobs.InjectedLatencyOracle != nil {
latency := rpcCtx.Knobs.InjectedLatencyOracle.GetLatency(target)
log.VEventf(ctx, 1, "connecting with simulated latency %dms",
latency)
dialer := artificialLatencyDialer{
dialerFunc: dialerFunc,
latencyMS: latency,
latency: latency,
enabled: rpcCtx.Knobs.InjectedLatencyEnabled,
}
dialerFunc = dialer.dial
}
Expand Down
Loading

0 comments on commit 2d79db8

Please sign in to comment.