Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

`typed.internal.TimerSchedulerImpl$TimerMsg` cannot be cast to behaviour's event #26556

Closed
huntc opened this Issue Mar 19, 2019 · 7 comments

Comments

Projects
4 participants
@huntc
Copy link
Contributor

commented Mar 19, 2019

We observed a TimerMsg being unable to be cast to a behaviour's event type. We believe that this may be due to a behaviour not declaring that it has a timer, and where another behaviour of the same actor has caused a timer event to fire (perhaps event during an actor stopping?).

Here's the code causing this issue:

// This is the behaviour that shows up the TimerMsg coercian error - note no timers
def prepareServerUnsubscribe(data: Start): Behavior[Event] = Behaviors.setup { context =>
  val reply = Promise[LocalPacketRouter.Registered]
  data.packetRouter ! LocalPacketRouter.Register(context.self, reply)
  import context.executionContext
  reply.future.onComplete {
    case Success(registered: LocalPacketRouter.Registered) => context.self ! AcquiredPacketId(registered.packetId)
    case Failure(_) => context.self ! UnobtainablePacketId
  }

  Behaviors.receiveMessagePartial[Event] {
    case AcquiredPacketId(packetId) =>
      data.remote.success(ForwardUnsubscribe(packetId))
      serverUnsubscribe(
        ServerUnsubscribe(packetId, data.unsubscribeData, data.packetRouter, data.settings)
      )
    case UnobtainablePacketId =>
      data.remote.failure(UnsubscribeFailed)
      throw UnsubscribeFailed
  }
}

// Here's the behaviour that sets up a timer
def serverUnsubscribe(data: ServerUnsubscribe): Behavior[Event] = Behaviors.withTimers { timer =>
  val ReceiveUnsubAck = "client-receive-unsubAck"
  timer.startSingleTimer(ReceiveUnsubAck, ReceiveUnsubAckTimeout, data.settings.receiveUnsubAckTimeout)

  Behaviors
    .receiveMessagePartial[Event] {
      case UnsubAckReceivedFromRemote(local) =>
        local.success(ForwardUnsubAck(data.unsubscribeData))
        Behaviors.stopped
      case ReceiveUnsubAckTimeout =>
        throw UnsubscribeFailed
    }
    .receiveSignal {
      case (_, PostStop) =>
        data.packetRouter ! LocalPacketRouter.Unregister(data.packetId)
        Behaviors.same
    }
}

It doesn't seem correct to us that Akka Typed should even attempt to coerce the TimerMsg.

Full stack trace:

2019-03-18T19:10:18.691Z [19:10:18.575UTC] 127.0.0.1 com.cisco.streambed.controlcenter ERROR ActorAdapter - akka.actor.typed.internal.TimerSchedulerImpl$TimerMsg cannot be cast to akka.stream.alpakka.mqtt.streaming.impl.Unsubscriber$Event
java.lang.ClassCastException: akka.actor.typed.internal.TimerSchedulerImpl$TimerMsg cannot be cast to akka.stream.alpakka.mqtt.streaming.impl.Unsubscriber$Event
	at akka.stream.alpakka.mqtt.streaming.impl.Unsubscriber$$anonfun$$nestedInanonfun$prepareServerUnsubscribe$1$1.applyOrElse(ClientState.scala:633)
	at akka.actor.typed.scaladsl.Behaviors$.$anonfun$receiveMessagePartial$1(Behaviors.scala:132)
	at akka.actor.typed.internal.BehaviorImpl$ReceiveBehavior.receive(BehaviorImpl.scala:34)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:421)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:394)
	at akka.actor.typed.internal.adapter.ActorAdapter.akka$actor$typed$internal$adapter$ActorAdapter$$handleMessage(ActorAdapter.scala:82)
	at akka.actor.typed.internal.adapter.ActorAdapter$$anonfun$running$1.applyOrElse(ActorAdapter.scala:78)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:39)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

@johanandren johanandren self-assigned this Mar 19, 2019

@johanandren

This comment has been minimized.

Copy link
Member

commented Mar 19, 2019

I can't repeat this, and I can't think of how it could happen, especially given the shared code where it goes from prepareServerUnsubscribe to serverUnsubscribe and never back but only to stop.

Are you supervising it so that it restarts the original behavior when UnsubscribeFailed is thrown?

@huntc

This comment has been minimized.

Copy link
Contributor Author

commented Mar 19, 2019

I checked for supervision and, no, we are not supervising it. Here’s where the actor in question is spawned: https://github.com/akka/alpakka/blob/master/mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/impl/ClientState.scala#L359

@johanandren

This comment has been minimized.

Copy link
Member

commented Mar 19, 2019

It would be great if you could cook down a minimal reproducer without the Mqtt integration stuff.

@johanandren johanandren added bug 1 - triaged and removed 0 - new labels Mar 19, 2019

@longshorej

This comment has been minimized.

Copy link
Contributor

commented Mar 19, 2019

I tried and failed to put one together. This matches quite closely the MQTT code: https://github.com/longshorej/garbage/tree/master/akka-26556

It is strange though, as you say we never go back to that original state in the MQTT code. FWIW, this happened under high load on a system with an under powered CPU.

If you uncomment L36 it'll run repeatedly until a ClassCastException is thrown where expected. No such luck yet but I'm letting it run locally.

@patriknw patriknw added this to Ready for production backlog in Akka Typed Mar 25, 2019

@patriknw

This comment has been minimized.

Copy link
Member

commented Mar 25, 2019

Unrelated: reply.future.onComplete can be replaced with context.pipeToSelf

but sending a Promise like that in a message seems strange, why not use context.ask?

@patriknw

This comment has been minimized.

Copy link
Member

commented Mar 31, 2019

I was able to reproduce the ClassCastException, but it's not anywhere close the code that was reported.

      val behv = Behaviors.receiveMessage[Command] {
        case Tick(-1) =>
          Behaviors.withTimers[Command] { timer =>
            timer.startSingleTimer("T0", Tick(0), 5.millis)
            throw new RuntimeException
          }
      }

      val ref = spawn(Behaviors.supervise(behv).onFailure(SupervisorStrategy.restart))
      ref ! Tick(-1)
[ERROR] [03/31/2019 20:48:02.854] [TimerSpec-akka.actor.default-dispatcher-3] [akka://TimerSpec/user/$a] Supervisor RestartSupervisor saw failure: null
java.lang.RuntimeException
	at akka.actor.typed.TimerSpec.$anonfun$new$4(TimerSpec.scala:101)
	at akka.actor.typed.internal.TimerSchedulerImpl$.wrapWithTimers(TimerSchedulerImpl.scala:35)
	at akka.actor.typed.internal.TimerSchedulerImpl$.$anonfun$withTimers$1(TimerSchedulerImpl.scala:28)
	at akka.actor.typed.Behavior$DeferredBehavior$$anon$1.apply(Behavior.scala:265)
	at akka.actor.typed.Behavior$.start(Behavior.scala:332)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:439)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:394)
	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:53)
	at akka.actor.typed.internal.RestartSupervisor.aroundReceive(Supervision.scala:242)
	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:80)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:438)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:394)
	at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:105)
	at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:99)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:610)
	at akka.actor.ActorCell.invoke(ActorCell.scala:579)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
	at akka.dispatch.Mailbox.run(Mailbox.scala:229)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[ERROR] [03/31/2019 20:48:02.874] [TimerSpec-akka.actor.default-dispatcher-4] [akka://TimerSpec/user/$a] Supervisor RestartSupervisor saw failure: akka.actor.typed.internal.TimerSchedulerImpl$TimerMsg cannot be cast to akka.actor.typed.TimerSpec$Command
java.lang.ClassCastException: akka.actor.typed.internal.TimerSchedulerImpl$TimerMsg cannot be cast to akka.actor.typed.TimerSpec$Command
	at akka.actor.typed.internal.BehaviorImpl$ReceiveMessageBehavior.receive(BehaviorImpl.scala:53)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:438)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:394)
	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:53)
	at akka.actor.typed.internal.RestartSupervisor.aroundReceive(Supervision.scala:242)
	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:80)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:438)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:394)
	at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:105)
	at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:99)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:610)
	at akka.actor.ActorCell.invoke(ActorCell.scala:579)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
	at akka.dispatch.Mailbox.run(Mailbox.scala:229)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
@johanandren

This comment has been minimized.

Copy link
Member

commented Apr 1, 2019

That's the scenario I could think of, but the reported one can't be the same as far as I can see.

patriknw added a commit that referenced this issue Apr 1, 2019

Keep TimerInterceptor when restarted, #26556
* To avoid ClassCastException of TimerMsg if TimerMsg is already enqueued
  in mailbox and there is a restart with intiial behavior that is not using withTimers

patriknw added a commit that referenced this issue Apr 1, 2019

Keep TimerInterceptor when restarted, #26556
* To avoid ClassCastException of TimerMsg if TimerMsg is already enqueued
  in mailbox and there is a restart with intiial behavior that is not using withTimers

@patriknw patriknw self-assigned this Apr 1, 2019

patriknw added a commit that referenced this issue Apr 1, 2019

Keep TimerInterceptor when restarted, #26556
* To avoid ClassCastException of TimerMsg if TimerMsg is already enqueued
  in mailbox and there is a restart with intiial behavior that is not using withTimers

@patriknw patriknw moved this from Ready for production backlog to Reviewing in Akka Typed Apr 1, 2019

patriknw added a commit that referenced this issue Apr 2, 2019

Always intercept TimerMsg, also when restarted, #26556 (#26650)
* To avoid ClassCastException of TimerMsg if TimerMsg is already enqueued
  in mailbox and there is a restart with intiial behavior that is not using withTimers
* let ActorAdapter be responsible of intercepting TimerMsg
  * instead of trying to keep the TimerInterceptor when restarting
* more conistent cancelation of timers when exception/restart

@patriknw patriknw added this to the 2.5.22 milestone Apr 2, 2019

@patriknw patriknw closed this Apr 2, 2019

Akka Typed automation moved this from Reviewing to Done Apr 2, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.