Skip to content

Commit

Permalink
Reenable cancellation as an option (#331)
Browse files Browse the repository at this point in the history
* Reenable cancellation as an option

* Move cancellation config flag and update execution bypass
  • Loading branch information
tardieu committed Jun 20, 2022
1 parent 1619423 commit e30ae37
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 1 deletion.
1 change: 1 addition & 0 deletions core/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ Available commands:
flag.StringVar(&Hostname, "hostname", "localhost", "Hostname")
flag.DurationVar(&ActorBusyTimeout, "actor_busy_timeout", 2*time.Minute, "Time to wait on a busy actor before timing out (0 is infinite)")
flag.DurationVar(&MissingComponentTimeout, "missing_component_timeout", 2*time.Minute, "Time to wait on request to unknown service or actor type before timing out (0 is infinite)")
flag.BoolVar(&KafkaConfig.Cancellation, "cancel", false, "Cancel a pending call if the caller has failed")

case GetCmd:
usage = "kar get [OPTIONS]"
Expand Down
11 changes: 10 additions & 1 deletion core/pkg/rpc/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
handlersNode = map[string]NodeHandler{} // registered method handlers for node targets
sessionTable = sync.Map{} // session table: SessionKey -> *SessionInstance
deferredLocks = sync.Map{} // locks being defered by tail calls: deferredLockId -> chan
cancellation = false // whether to cancel a pending call if the caller has failed
sessionBusyTimeout time.Duration = 0
deactivateCallback = func(ctx context.Context, i *SessionInstance) error { i.Activated = false; return nil }
)
Expand Down Expand Up @@ -422,7 +423,14 @@ func handleSessionRequest(ctx context.Context, before chan struct{}, waitForChil
sendOrDie(ctx, Done{RequestID: m.requestID(), Deadline: m.deadline()})
}
} else {
dest, value, err := f(ctx, target, instance, m.requestID(), m.value()) // The call to the higher-level handler that does something useful....at last!!!
var dest *Destination
var value []byte
var err error
if cr, ok := m.(CallRequest); ok && cancellation && node2partition[cr.Caller] == 0 {
logger.Info("Cancelling call request %s from dead sidecar %s", m.requestID(), cr.Caller)
} else {
dest, value, err = f(ctx, target, instance, m.requestID(), m.value()) // The call to the higher-level handler that does something useful....at last!!!
}
if instance.Activated && target.Flow != "nonexclusive" {
instance.lastAccess = time.Now()
}
Expand Down Expand Up @@ -541,5 +549,6 @@ func registerNode(method string, handler NodeHandler) {
// Connect to Kafka
func connect(ctx context.Context, topic string, conf *Config, services ...string) (<-chan struct{}, error) {
sessionBusyTimeout = conf.SessionBusyTimeout
cancellation = conf.Cancellation
return Dial(ctx, topic, conf, services, func(msg Message) { accept(ctx, msg) })
}
1 change: 1 addition & 0 deletions core/pkg/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Config struct {
TLSSkipVerify bool
TopicConfig map[string]*string
SessionBusyTimeout time.Duration
Cancellation bool
}

// Target of an invocation
Expand Down

0 comments on commit e30ae37

Please sign in to comment.