Skip to content

Commit

Permalink
add deadline to DirectRPC interface
Browse files Browse the repository at this point in the history
  • Loading branch information
wjordan committed Nov 5, 2021
1 parent 4f54651 commit 244eedb
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 19 deletions.
2 changes: 1 addition & 1 deletion agent/auto-config/auto_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (ac *AutoConfig) getInitialConfigurationOnce(ctx context.Context, csr strin
}

ac.logger.Debug("making AutoConfig.InitialConfiguration RPC", "addr", addr.String())
if err = ac.acConfig.DirectRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "AutoConfig.InitialConfiguration", &request, &resp); err != nil {
if err = ac.acConfig.DirectRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "AutoConfig.InitialConfiguration", &request, &resp, time.Time{}); err != nil {
ac.logger.Error("AutoConfig.InitialConfiguration RPC failed", "addr", addr.String(), "error", err)
continue
}
Expand Down
37 changes: 27 additions & 10 deletions agent/auto-config/auto_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,15 @@ func TestInitialConfiguration_cancelled(t *testing.T) {
JWT: "blarg",
}

mcfg.directRPC.On("RPC", "dc1", "autoconf", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8300}, "AutoConfig.InitialConfiguration", &expectedRequest, mock.Anything).Return(fmt.Errorf("injected error")).Times(0).Maybe()
mcfg.directRPC.On(
"RPC",
"dc1",
"autoconf",
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8300},
"AutoConfig.InitialConfiguration",
&expectedRequest,
mock.Anything,
time.Time{}).Return(fmt.Errorf("injected error")).Times(0).Maybe()
mcfg.serverProvider.On("FindLANServer").Return(nil).Times(0).Maybe()

ac, err := New(mcfg.Config)
Expand Down Expand Up @@ -386,7 +394,8 @@ func TestInitialConfiguration_success(t *testing.T) {
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8300},
"AutoConfig.InitialConfiguration",
&expectedRequest,
&pbautoconf.AutoConfigResponse{}).Return(nil).Run(populateResponse)
&pbautoconf.AutoConfigResponse{},
time.Time{}).Return(nil).Run(populateResponse)

ac, err := New(mcfg.Config)
require.NoError(t, err)
Expand Down Expand Up @@ -471,39 +480,44 @@ func TestInitialConfiguration_retries(t *testing.T) {
&net.TCPAddr{IP: net.IPv4(198, 18, 0, 1), Port: 8300},
"AutoConfig.InitialConfiguration",
&expectedRequest,
&pbautoconf.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Times(0)
&pbautoconf.AutoConfigResponse{},
time.Time{}).Return(fmt.Errorf("injected failure")).Times(0)
mcfg.directRPC.On(
"RPC",
"dc1",
"autoconf",
&net.TCPAddr{IP: net.IPv4(198, 18, 0, 2), Port: 8398},
"AutoConfig.InitialConfiguration",
&expectedRequest,
&pbautoconf.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Times(0)
&pbautoconf.AutoConfigResponse{},
time.Time{}).Return(fmt.Errorf("injected failure")).Times(0)
mcfg.directRPC.On(
"RPC",
"dc1",
"autoconf",
&net.TCPAddr{IP: net.IPv4(198, 18, 0, 3), Port: 8399},
"AutoConfig.InitialConfiguration",
&expectedRequest,
&pbautoconf.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Times(0)
&pbautoconf.AutoConfigResponse{},
time.Time{}).Return(fmt.Errorf("injected failure")).Times(0)
mcfg.directRPC.On(
"RPC",
"dc1",
"autoconf",
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1234},
"AutoConfig.InitialConfiguration",
&expectedRequest,
&pbautoconf.AutoConfigResponse{}).Return(fmt.Errorf("injected failure")).Once()
&pbautoconf.AutoConfigResponse{},
time.Time{}).Return(fmt.Errorf("injected failure")).Once()
mcfg.directRPC.On(
"RPC",
"dc1",
"autoconf",
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1234},
"AutoConfig.InitialConfiguration",
&expectedRequest,
&pbautoconf.AutoConfigResponse{}).Return(nil).Run(populateResponse).Once()
&pbautoconf.AutoConfigResponse{},
time.Time{}).Return(nil).Run(populateResponse).Once()

ac, err := New(mcfg.Config)
require.NoError(t, err)
Expand Down Expand Up @@ -789,7 +803,8 @@ func startedAutoConfig(t *testing.T, autoEncrypt bool) testAutoConfig {
&net.TCPAddr{IP: net.IPv4(198, 18, 0, 1), Port: 8300},
"AutoConfig.InitialConfiguration",
&expectedRequest,
&pbautoconf.AutoConfigResponse{}).Return(nil).Run(populateResponse).Once()
&pbautoconf.AutoConfigResponse{},
time.Time{}).Return(nil).Run(populateResponse).Once()
} else {
expectedRequest := structs.CASignRequest{
WriteRequest: structs.WriteRequest{Token: originalToken},
Expand All @@ -809,7 +824,8 @@ func startedAutoConfig(t *testing.T, autoEncrypt bool) testAutoConfig {
&net.TCPAddr{IP: net.IPv4(198, 18, 0, 1), Port: 8300},
"AutoEncrypt.Sign",
&expectedRequest,
&structs.SignedResponse{}).Return(nil).Run(populateResponse)
&structs.SignedResponse{},
time.Time{}).Return(nil).Run(populateResponse)
}

ac, err := New(mcfg.Config)
Expand Down Expand Up @@ -1083,7 +1099,8 @@ func TestFallback(t *testing.T) {
&net.TCPAddr{IP: net.IPv4(198, 18, 23, 2), Port: 8300},
"AutoConfig.InitialConfiguration",
&expectedRequest,
&pbautoconf.AutoConfigResponse{}).Return(nil).Run(populateResponse).Once()
&pbautoconf.AutoConfigResponse{},
time.Time{}).Return(nil).Run(populateResponse).Once()

// this gets called when InitialConfiguration is invoked to record the token from the
// auto-config response which is how the Fallback for auto-config works
Expand Down
3 changes: 2 additions & 1 deletion agent/auto-config/auto_encrypt.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"
"strings"
"time"

"github.com/hashicorp/consul/agent/structs"
)
Expand Down Expand Up @@ -56,7 +57,7 @@ func (ac *AutoConfig) autoEncryptInitialCertsOnce(ctx context.Context, csr, key
}

ac.logger.Debug("making AutoEncrypt.Sign RPC", "addr", addr.String())
err = ac.acConfig.DirectRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "AutoEncrypt.Sign", &request, &resp)
err = ac.acConfig.DirectRPC.RPC(ac.config.Datacenter, ac.config.NodeName, &addr, "AutoEncrypt.Sign", &request, &resp, time.Time{})
if err != nil {
ac.logger.Error("AutoEncrypt.Sign RPC failed", "addr", addr.String(), "error", err)
continue
Expand Down
9 changes: 7 additions & 2 deletions agent/auto-config/auto_encrypt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func TestAutoEncrypt_InitialCerts(t *testing.T) {
"AutoEncrypt.Sign",
&request,
&structs.SignedResponse{},
time.Time{},
).Once().Return(fmt.Errorf("injected error"))
// second failure
mcfg.directRPC.On("RPC",
Expand All @@ -212,6 +213,7 @@ func TestAutoEncrypt_InitialCerts(t *testing.T) {
"AutoEncrypt.Sign",
&request,
&structs.SignedResponse{},
time.Time{},
).Once().Return(fmt.Errorf("injected error"))
// third times is successfuly (second attempt to first server)
mcfg.directRPC.On("RPC",
Expand All @@ -221,6 +223,7 @@ func TestAutoEncrypt_InitialCerts(t *testing.T) {
"AutoEncrypt.Sign",
&request,
&structs.SignedResponse{},
time.Time{},
).Once().Return(nil).Run(func(args mock.Arguments) {
resp, ok := args.Get(5).(*structs.SignedResponse)
require.True(t, ok)
Expand Down Expand Up @@ -308,7 +311,8 @@ func TestAutoEncrypt_InitialConfiguration(t *testing.T) {
&net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 8300},
"AutoEncrypt.Sign",
&expectedRequest,
&structs.SignedResponse{}).Return(nil).Run(populateResponse)
&structs.SignedResponse{},
time.Time{}).Return(nil).Run(populateResponse)

ac, err := New(mcfg.Config)
require.NoError(t, err)
Expand Down Expand Up @@ -517,7 +521,8 @@ func TestAutoEncrypt_Fallback(t *testing.T) {
&net.TCPAddr{IP: net.IPv4(198, 18, 23, 2), Port: 8300},
"AutoEncrypt.Sign",
&expectedRequest,
&structs.SignedResponse{}).Return(nil).Run(populateResponse).Once()
&structs.SignedResponse{},
time.Time{}).Return(nil).Run(populateResponse).Once()

testAC.mcfg.expectInitialTLS(t, "autoconf", "dc1", testAC.originalToken, secondCA, &secondRoots, thirdCert, testAC.extraCerts)

Expand Down
2 changes: 1 addition & 1 deletion agent/auto-config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// the agent gets configured, it can go through the normal RPC means of selecting a available
// server automatically.
type DirectRPC interface {
RPC(dc string, node string, addr net.Addr, method string, args interface{}, reply interface{}) error
RPC(dc string, node string, addr net.Addr, method string, args interface{}, reply interface{}, deadline time.Time) error
}

// Cache is an interface to represent the methods of the
Expand Down
9 changes: 5 additions & 4 deletions agent/auto-config/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"sync"
"testing"
"time"

"github.com/stretchr/testify/mock"

Expand All @@ -29,22 +30,22 @@ func newMockDirectRPC(t *testing.T) *mockDirectRPC {
return &m
}

func (m *mockDirectRPC) RPC(dc string, node string, addr net.Addr, method string, args interface{}, reply interface{}) error {
func (m *mockDirectRPC) RPC(dc string, node string, addr net.Addr, method string, args interface{}, reply interface{}, deadline time.Time) error {
var retValues mock.Arguments
if method == "AutoConfig.InitialConfiguration" {
req := args.(*pbautoconf.AutoConfigRequest)
csr := req.CSR
req.CSR = ""
retValues = m.Called(dc, node, addr, method, args, reply)
retValues = m.Called(dc, node, addr, method, args, reply, deadline)
req.CSR = csr
} else if method == "AutoEncrypt.Sign" {
req := args.(*structs.CASignRequest)
csr := req.CSR
req.CSR = ""
retValues = m.Called(dc, node, addr, method, args, reply)
retValues = m.Called(dc, node, addr, method, args, reply, deadline)
req.CSR = csr
} else {
retValues = m.Called(dc, node, addr, method, args, reply)
retValues = m.Called(dc, node, addr, method, args, reply, deadline)
}

return retValues.Error(0)
Expand Down

0 comments on commit 244eedb

Please sign in to comment.