diff --git a/core/internal/config/config.go b/core/internal/config/config.go index 4efed03c..0b42289c 100644 --- a/core/internal/config/config.go +++ b/core/internal/config/config.go @@ -244,6 +244,7 @@ Available commands: flag.StringVar(&Hostname, "hostname", "localhost", "Hostname") flag.DurationVar(&ActorBusyTimeout, "actor_busy_timeout", 2*time.Minute, "Time to wait on a busy actor before timing out (0 is infinite)") flag.DurationVar(&MissingComponentTimeout, "missing_component_timeout", 2*time.Minute, "Time to wait on request to unknown service or actor type before timing out (0 is infinite)") + flag.BoolVar(&KafkaConfig.Cancellation, "cancel", false, "Cancel a pending call if the caller has failed") case GetCmd: usage = "kar get [OPTIONS]" diff --git a/core/pkg/rpc/requests.go b/core/pkg/rpc/requests.go index c31e7093..0783069c 100644 --- a/core/pkg/rpc/requests.go +++ b/core/pkg/rpc/requests.go @@ -44,6 +44,7 @@ var ( handlersNode = map[string]NodeHandler{} // registered method handlers for node targets sessionTable = sync.Map{} // session table: SessionKey -> *SessionInstance deferredLocks = sync.Map{} // locks being defered by tail calls: deferredLockId -> chan + cancellation = false // whether to cancel a pending call if the caller has failed sessionBusyTimeout time.Duration = 0 deactivateCallback = func(ctx context.Context, i *SessionInstance) error { i.Activated = false; return nil } ) @@ -422,7 +423,14 @@ func handleSessionRequest(ctx context.Context, before chan struct{}, waitForChil sendOrDie(ctx, Done{RequestID: m.requestID(), Deadline: m.deadline()}) } } else { - dest, value, err := f(ctx, target, instance, m.requestID(), m.value()) // The call to the higher-level handler that does something useful....at last!!! + var dest *Destination + var value []byte + var err error + if cr, ok := m.(CallRequest); ok && cancellation && node2partition[cr.Caller] == 0 { + logger.Info("Cancelling call request %s from dead sidecar %s", m.requestID(), cr.Caller) + } else { + dest, value, err = f(ctx, target, instance, m.requestID(), m.value()) // The call to the higher-level handler that does something useful....at last!!! + } if instance.Activated && target.Flow != "nonexclusive" { instance.lastAccess = time.Now() } @@ -541,5 +549,6 @@ func registerNode(method string, handler NodeHandler) { // Connect to Kafka func connect(ctx context.Context, topic string, conf *Config, services ...string) (<-chan struct{}, error) { sessionBusyTimeout = conf.SessionBusyTimeout + cancellation = conf.Cancellation return Dial(ctx, topic, conf, services, func(msg Message) { accept(ctx, msg) }) } diff --git a/core/pkg/rpc/rpc.go b/core/pkg/rpc/rpc.go index a29238b8..bbf9737f 100644 --- a/core/pkg/rpc/rpc.go +++ b/core/pkg/rpc/rpc.go @@ -32,6 +32,7 @@ type Config struct { TLSSkipVerify bool TopicConfig map[string]*string SessionBusyTimeout time.Duration + Cancellation bool } // Target of an invocation