Skip to content

Commit

Permalink
Merge branch 'master' into cancel-panic
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Dec 2, 2021
2 parents 4f07e2d + f9d61f2 commit cba04e4
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 9 deletions.
2 changes: 1 addition & 1 deletion internal/activity.go
Expand Up @@ -193,7 +193,7 @@ func GetWorkerStopChannel(ctx context.Context) <-chan struct{} {
// TODO: we don't have a way to distinguish between the two cases when context is canceled because
// context doesn't support overriding value of ctx.Error.
// TODO: Implement automatic heartbeating with cancellation through ctx.
// details - the details that you provided here can be seen in the worflow when it receives TimeoutError, you
// details - the details that you provided here can be seen in the workflow when it receives TimeoutError, you
// can check error TimeoutType()/Details().
func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
getActivityOutboundInterceptor(ctx).RecordHeartbeat(ctx, details...)
Expand Down
9 changes: 3 additions & 6 deletions internal/internal_activity.go
Expand Up @@ -394,19 +394,16 @@ func (a *activityEnvironmentInterceptor) RecordHeartbeat(ctx context.Context, de
}
var data *commonpb.Payloads
var err error
// We would like to be a able to pass in "nil" as part of details(that is no progress to report to)
// We would like to be able to pass in "nil" as part of details(that is no progress to report to)
if len(details) > 1 || (len(details) == 1 && details[0] != nil) {
data, err = encodeArgs(getDataConverterFromActivityCtx(ctx), details)
if err != nil {
panic(err)
}
}

err = a.env.serviceInvoker.Heartbeat(ctx, data, false)
if err != nil {
log := GetActivityLogger(ctx)
log.Debug("RecordActivityHeartbeat with error", tagError, err)
}
// Heartbeat error is logged inside ServiceInvoker.internalHeartBeat
_ = a.env.serviceInvoker.Heartbeat(ctx, data, false)
}

func (a *activityEnvironmentInterceptor) HasHeartbeatDetails(ctx context.Context) bool {
Expand Down
5 changes: 5 additions & 0 deletions internal/internal_task_handlers.go
Expand Up @@ -1694,6 +1694,11 @@ func (i *temporalInvoker) internalHeartBeat(ctx context.Context, details *common
}
}

if err != nil {
logger := GetActivityLogger(ctx)
logger.Debug("RecordActivityHeartbeat with error", tagError, err)
}

// This error won't be returned to user check RecordActivityHeartbeat().
return isActivityCanceled, err
}
Expand Down
10 changes: 8 additions & 2 deletions internal/internal_task_handlers_test.go
Expand Up @@ -1398,7 +1398,10 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithError() {
nil, "Test_Temporal_Invoker", mockService, tally.NoopScope, func() {}, 0,
make(chan struct{}), t.namespace)

heartbeatErr := temporalInvoker.Heartbeat(context.Background(), nil, false)
ctx, err := newActivityContext(context.Background(), nil, &activityEnvironment{serviceInvoker: temporalInvoker, logger: t.logger})
t.NoError(err)

heartbeatErr := temporalInvoker.Heartbeat(ctx, nil, false)
t.NotNil(heartbeatErr)
t.IsType(&serviceerror.NotFound{}, heartbeatErr, "heartbeatErr must be of type NotFound.")
}
Expand All @@ -1416,7 +1419,10 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithNamespaceNotActiveE
nil, "Test_Temporal_Invoker", mockService, tally.NoopScope, cancelHandler,
0, make(chan struct{}), t.namespace)

heartbeatErr := temporalInvoker.Heartbeat(context.Background(), nil, false)
ctx, err := newActivityContext(context.Background(), nil, &activityEnvironment{serviceInvoker: temporalInvoker, logger: t.logger})
t.NoError(err)

heartbeatErr := temporalInvoker.Heartbeat(ctx, nil, false)
t.NotNil(heartbeatErr)
t.IsType(&serviceerror.NamespaceNotActive{}, heartbeatErr, "heartbeatErr must be of type NamespaceNotActive.")
t.True(called)
Expand Down

0 comments on commit cba04e4

Please sign in to comment.