Skip to content

Commit

Permalink
Separate into eager and lazy clients
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Apr 28, 2022
1 parent 68a3c41 commit f7e78eb
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 55 deletions.
15 changes: 15 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,24 @@ type MetricsTimer = metrics.Timer
// MetricsNopHandler is a noop handler that does nothing with the metrics.
var MetricsNopHandler = metrics.NopHandler

// Dial creates an instance of a workflow client. This will attempt to connect
// to the server eagerly and will return an error if the server is not
// available.
func Dial(options Options) (Client, error) {
return internal.DialClient(options)
}

// NewLazyClient creates an instance of a workflow client. Unlike Dial, this
// will not eagerly connect to the server.
func NewLazyClient(options Options) (Client, error) {
return internal.NewLazyClient(options)
}

// NewClient creates an instance of a workflow client. This will attempt to
// connect to the server eagerly and will return an error if the server is not
// available.
//
// Deprecated: Use Dial or NewLazyClient instead.
func NewClient(options Options) (Client, error) {
return internal.NewClient(options)
}
Expand Down
1 change: 1 addition & 0 deletions evictiontest/workflow_cache_eviction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type (
func (s *CacheEvictionSuite) SetupTest() {
s.mockCtrl = gomock.NewController(s.T())
s.service = workflowservicemock.NewMockWorkflowServiceClient(s.mockCtrl)
s.service.EXPECT().GetSystemInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.GetSystemInfoResponse{}, nil).AnyTimes()
}

func (s *CacheEvictionSuite) TearDownTest() {
Expand Down
83 changes: 44 additions & 39 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ package internal
import (
"context"
"crypto/tls"
"fmt"
"io"
"time"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
uberatomic "go.uber.org/atomic"
"google.golang.org/grpc"
Expand All @@ -55,8 +53,6 @@ const (
// QueryTypeOpenSessions is the build in query type for Client.QueryWorkflow() call. Use this query type to get all open
// sessions in the workflow. The result will be a list of SessionInfo encoded in the EncodedValue.
QueryTypeOpenSessions string = "__open_sessions"

getSystemInfoTimeout = 5 * time.Second
)

type (
Expand Down Expand Up @@ -467,6 +463,12 @@ type (
// grpc.WithUnaryInterceptor option can be added since grpc.WithUnaryInterceptor is prepended to chains set with
// grpc.WithChainUnaryInterceptor.
DialOptions []grpc.DialOption

// Hidden for use by client overloads.
disableEagerConnection bool

// If not present, created during service client creation.
excludeInternalFromRetry *uberatomic.Bool
}

// StartWorkflowOptions configuration parameters for starting a workflow execution.
Expand Down Expand Up @@ -609,7 +611,21 @@ type (
}
)

// DialClient creates a client and attempts to connect to the server.
func DialClient(options ClientOptions) (Client, error) {
options.ConnectionOptions.disableEagerConnection = false
return NewClient(options)
}

// NewLazyClient creates a client and does not attempt to connect to the server.
func NewLazyClient(options ClientOptions) (Client, error) {
options.ConnectionOptions.disableEagerConnection = true
return NewClient(options)
}

// NewClient creates an instance of a workflow client
//
// Deprecated: Use DialClient or NewLazyClient instead.
func NewClient(options ClientOptions) (Client, error) {
if options.Namespace == "" {
options.Namespace = DefaultNamespace
Expand All @@ -630,22 +646,22 @@ func NewClient(options ClientOptions) (Client, error) {
options.Logger.Info("No logger configured for temporal client. Created default one.")
}

var excludeInternalFromRetry uberatomic.Bool
connection, err := dial(newDialParameters(&options, &excludeInternalFromRetry))
options.ConnectionOptions.excludeInternalFromRetry = uberatomic.NewBool(false)
connection, err := dial(newDialParameters(&options, options.ConnectionOptions.excludeInternalFromRetry))
if err != nil {
return nil, err
}

client := NewServiceClient(workflowservice.NewWorkflowServiceClient(connection), connection, options)

// Get server capabilities eagerly. This has replaced health checking and we
// have accepted that this forces an eager connection with the server.
client.capabilities, err = getServerCapabilities(client.workflowService)
if err != nil {
client.Close()
return nil, err
// Load server capabilities eagerly if not disabled
if !options.ConnectionOptions.disableEagerConnection {
if _, err := client.loadCapabilities(); err != nil {
client.Close()
return nil, err
}
}
excludeInternalFromRetry.Store(client.capabilities.InternalErrorDifferentiation)

return client, nil
}

Expand Down Expand Up @@ -682,6 +698,10 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
options.MetricsHandler = metrics.NopHandler
}

if options.ConnectionOptions.excludeInternalFromRetry == nil {
options.ConnectionOptions.excludeInternalFromRetry = uberatomic.NewBool(false)
}

// Collect set of applicable worker interceptors
var workerInterceptors []WorkerInterceptor
for _, interceptor := range options.Interceptors {
Expand All @@ -691,16 +711,17 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
}

client := &WorkflowClient{
workflowService: workflowServiceClient,
connectionCloser: connectionCloser,
namespace: options.Namespace,
registry: newRegistry(),
metricsHandler: options.MetricsHandler,
logger: options.Logger,
identity: options.Identity,
dataConverter: options.DataConverter,
contextPropagators: options.ContextPropagators,
workerInterceptors: workerInterceptors,
workflowService: workflowServiceClient,
connectionCloser: connectionCloser,
namespace: options.Namespace,
registry: newRegistry(),
metricsHandler: options.MetricsHandler,
logger: options.Logger,
identity: options.Identity,
dataConverter: options.DataConverter,
contextPropagators: options.ContextPropagators,
workerInterceptors: workerInterceptors,
excludeInternalFromRetry: options.ConnectionOptions.excludeInternalFromRetry,
}

// Create outbound interceptor by wrapping backwards through chain
Expand Down Expand Up @@ -766,19 +787,3 @@ func NewValue(data *commonpb.Payloads) converter.EncodedValue {
func NewValues(data *commonpb.Payloads) converter.EncodedValues {
return newEncodedValues(data, nil)
}

func getServerCapabilities(
client workflowservice.WorkflowServiceClient,
) (cap workflowservice.GetSystemInfoResponse_Capabilities, err error) {
ctx, cancel := context.WithTimeout(context.Background(), getSystemInfoTimeout)
defer cancel()
resp, err := client.GetSystemInfo(ctx, &workflowservice.GetSystemInfoRequest{})
// We ignore unimplemented
if _, isUnimplemented := err.(*serviceerror.Unimplemented); err != nil && !isUnimplemented {
return cap, fmt.Errorf("get system info failed: %w - %T", err, err)
}
if resp != nil && resp.Capabilities != nil {
cap = *resp.Capabilities
}
return cap, nil
}
39 changes: 37 additions & 2 deletions internal/grpc_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestMissingGetServerInfo(t *testing.T) {
client, err := NewClient(ClientOptions{HostPort: l.Addr().String()})
require.NoError(t, err)
workflowClient := client.(*WorkflowClient)
require.True(t, proto.Equal(&workflowservice.GetSystemInfoResponse_Capabilities{}, &workflowClient.capabilities))
require.True(t, proto.Equal(&workflowservice.GetSystemInfoResponse_Capabilities{}, workflowClient.capabilities))
}

func TestInternalErrorRetry(t *testing.T) {
Expand Down Expand Up @@ -217,6 +217,40 @@ func TestInternalErrorRetry(t *testing.T) {
require.Equal(t, 1, srv.signalWorkflowInvokeCount())
}

func TestEagerAndLazyClient(t *testing.T) {
// Start a server that always returns an error on get system info
srv, err := startTestGRPCServer()
require.NoError(t, err)
defer srv.Stop()
srv.getSystemInfoResponseError = fmt.Errorf("some server failure")

// Confirm eager dial fails
_, err = DialClient(ClientOptions{HostPort: srv.addr})
require.EqualError(t, err, "failed reaching server: some server failure")

// Confirm lazy dial succeeds but fails signal workflow
c, err := NewLazyClient(ClientOptions{HostPort: srv.addr})
require.NoError(t, err)
defer c.Close()
err = c.SignalWorkflow(context.Background(), "workflow1", "", "my-signal", nil)
require.EqualError(t, err, "failed reaching server: some server failure")

// But if we call again without a sys info response error, it will succeed
srv.getSystemInfoResponseError = nil
err = c.SignalWorkflow(context.Background(), "workflow1", "", "my-signal", nil)
require.NoError(t, err)

// Now that there's no sys info response error, eager should succeed
c, err = DialClient(ClientOptions{HostPort: srv.addr})
require.NoError(t, err)
defer c.Close()

// And even if it starts erroring, the success was memoized so calls succeed
srv.getSystemInfoResponseError = fmt.Errorf("some server failure")
err = c.SignalWorkflow(context.Background(), "workflow1", "", "my-signal", nil)
require.NoError(t, err)
}

func TestDialOptions(t *testing.T) {
// Start an unimplemented gRPC server
srv, err := startTestGRPCServer()
Expand Down Expand Up @@ -337,6 +371,7 @@ type testGRPCServer struct {
addr string
sigWfCount int32
getSystemInfoResponse workflowservice.GetSystemInfoResponse
getSystemInfoResponseError error
signalWorkflowExecutionResponse workflowservice.SignalWorkflowExecutionResponse
signalWorkflowExecutionResponseError error
}
Expand Down Expand Up @@ -386,7 +421,7 @@ func (t *testGRPCServer) GetSystemInfo(
ctx context.Context,
req *workflowservice.GetSystemInfoRequest,
) (*workflowservice.GetSystemInfoResponse, error) {
return &t.getSystemInfoResponse, nil
return &t.getSystemInfoResponse, t.getSystemInfoResponseError
}

func (t *testGRPCServer) SignalWorkflowExecution(
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ func getActivityEnvironmentFromCtx(ctx context.Context) *activityEnvironment {

// AggregatedWorker combines management of both workflowWorker and activityWorker worker lifecycle.
type AggregatedWorker struct {
client *WorkflowClient
workflowWorker *workflowWorker
activityWorker *activityWorker
sessionWorker *sessionWorker
Expand Down Expand Up @@ -881,6 +882,8 @@ func (aw *AggregatedWorker) Start() error {
aw.assertNotStopped()
if err := initBinaryChecksum(); err != nil {
return fmt.Errorf("failed to get executable checksum: %v", err)
} else if err = aw.client.ensureInitialized(); err != nil {
return err
}

if !util.IsInterfaceNil(aw.workflowWorker) {
Expand Down Expand Up @@ -1369,6 +1372,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke
}

return &AggregatedWorker{
client: client,
workflowWorker: workflowWorker,
activityWorker: activityWorker,
sessionWorker: sessionWorker,
Expand Down
1 change: 1 addition & 0 deletions internal/internal_worker_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func TestInterfacesTestSuite(t *testing.T) {
func (s *InterfacesTestSuite) SetupTest() {
s.mockCtrl = gomock.NewController(s.T())
s.service = workflowservicemock.NewMockWorkflowServiceClient(s.mockCtrl)
s.service.EXPECT().GetSystemInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.GetSystemInfoResponse{}, nil).AnyTimes()
}

func (s *InterfacesTestSuite) TearDownTest() {
Expand Down
2 changes: 2 additions & 0 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func TestInternalWorkerTestSuite(t *testing.T) {
func (s *internalWorkerTestSuite) SetupTest() {
s.mockCtrl = gomock.NewController(s.T())
s.service = workflowservicemock.NewMockWorkflowServiceClient(s.mockCtrl)
s.service.EXPECT().GetSystemInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.GetSystemInfoResponse{}, nil).AnyTimes()
}

func (s *internalWorkerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -1632,6 +1633,7 @@ func (s *internalWorkerTestSuite) TestCreateWorkerRun() {
// Create service endpoint
mockCtrl := gomock.NewController(s.T())
service := workflowservicemock.NewMockWorkflowServiceClient(mockCtrl)
service.EXPECT().GetSystemInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.GetSystemInfoResponse{}, nil).AnyTimes()

worker := createWorker(service)
worker.RegisterActivity(testActivityNoResult)
Expand Down
1 change: 1 addition & 0 deletions internal/internal_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type (
func (s *WorkersTestSuite) SetupTest() {
s.mockCtrl = gomock.NewController(s.T())
s.service = workflowservicemock.NewMockWorkflowServiceClient(s.mockCtrl)
s.service.EXPECT().GetSystemInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.GetSystemInfoResponse{}, nil).AnyTimes()
s.dataConverter = converter.GetDefaultDataConverter()
}

Expand Down
Loading

0 comments on commit f7e78eb

Please sign in to comment.