|
|
@@ -196,10 +196,9 @@ private[remote] class AkkaProtocolHandle( |
|
|
|
|
|
override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload))
|
|
|
|
|
|
- override def disassociate(): Unit = stateActor ! DisassociateUnderlying(Unknown)
|
|
|
+ override def disassociate(): Unit = disassociate(Unknown)
|
|
|
|
|
|
def disassociate(info: DisassociateInfo): Unit = stateActor ! DisassociateUnderlying(info)
|
|
|
-
|
|
|
}
|
|
|
|
|
|
private[transport] object ProtocolStateActor {
|
|
|
@@ -394,8 +393,12 @@ private[transport] class ProtocolStateActor( |
|
|
// After receiving Disassociate we MUST NOT send back a Disassociate (loop)
|
|
|
stop(FSM.Failure(info))
|
|
|
|
|
|
- case _ ⇒
|
|
|
+ case msg ⇒
|
|
|
// Expected handshake to be finished, dropping connection
|
|
|
+ if (log.isDebugEnabled)
|
|
|
+ log.debug(
|
|
|
+ "Sending disassociate to [{}] because unexpected message of type [{}] was received during handshake",
|
|
|
+ wrappedHandle, msg.getClass.getName)
|
|
|
sendDisassociate(wrappedHandle, Unknown)
|
|
|
stop()
|
|
|
|
|
|
@@ -431,18 +434,32 @@ private[transport] class ProtocolStateActor( |
|
|
}
|
|
|
|
|
|
// Got a stray message -- explicitly reset the association (force remote endpoint to reassociate)
|
|
|
- case _ ⇒
|
|
|
+ case msg ⇒
|
|
|
+ if (log.isDebugEnabled)
|
|
|
+ log.debug(
|
|
|
+ "Sending disassociate to [{}] because unexpected message of type [{}] was received while unassociated",
|
|
|
+ wrappedHandle, msg.getClass.getName)
|
|
|
sendDisassociate(wrappedHandle, Unknown)
|
|
|
stop()
|
|
|
|
|
|
}
|
|
|
|
|
|
case Event(HandshakeTimer, OutboundUnderlyingAssociated(_, wrappedHandle)) ⇒
|
|
|
+ if (log.isDebugEnabled)
|
|
|
+ log.debug(
|
|
|
+ "Sending disassociate to [{}] because handshake timed out for outbound association after [{}] ms.",
|
|
|
+ wrappedHandle, settings.HandshakeTimeout.toMillis)
|
|
|
+
|
|
|
sendDisassociate(wrappedHandle, Unknown)
|
|
|
stop(FSM.Failure(TimeoutReason("No response from remote for outbound association. Handshake timed out after " +
|
|
|
s"[${settings.HandshakeTimeout.toMillis} ms].")))
|
|
|
|
|
|
case Event(HandshakeTimer, InboundUnassociated(_, wrappedHandle)) ⇒
|
|
|
+ if (log.isDebugEnabled)
|
|
|
+ log.debug(
|
|
|
+ "Sending disassociate to [{}] because handshake timed out for inbound association after [{}] ms.",
|
|
|
+ wrappedHandle, settings.HandshakeTimeout.toMillis)
|
|
|
+
|
|
|
sendDisassociate(wrappedHandle, Unknown)
|
|
|
stop(FSM.Failure(TimeoutReason("No response from remote for inbound association. Handshake timed out after " +
|
|
|
s"[${settings.HandshakeTimeout.toMillis} ms].")))
|
|
|
@@ -489,6 +506,9 @@ private[transport] class ProtocolStateActor( |
|
|
case msg ⇒
|
|
|
throw new AkkaProtocolException(s"unhandled message in state Open(DisassociateUnderlying) with type [${safeClassName(msg)}]")
|
|
|
}
|
|
|
+ // No debug logging here as sending DisassociateUnderlying(Unknown) should have been logged from where
|
|
|
+ // it was sent
|
|
|
+
|
|
|
sendDisassociate(handle, info)
|
|
|
stop()
|
|
|
|
|
|
@@ -510,6 +530,11 @@ private[transport] class ProtocolStateActor( |
|
|
sendHeartbeat(wrappedHandle)
|
|
|
stay()
|
|
|
} else {
|
|
|
+ if (log.isDebugEnabled)
|
|
|
+ log.debug(
|
|
|
+ "Sending disassociate to [{}] because failure detector triggered in state [{}]",
|
|
|
+ wrappedHandle, stateName)
|
|
|
+
|
|
|
// send disassociate just to be sure
|
|
|
sendDisassociate(wrappedHandle, Unknown)
|
|
|
stop(FSM.Failure(TimeoutReason(s"No response from remote. " +
|
|
|
@@ -545,7 +570,7 @@ private[transport] class ProtocolStateActor( |
|
|
case _ ⇒
|
|
|
new AkkaProtocolException("Transport disassociated before handshake finished")
|
|
|
})
|
|
|
- wrappedHandle.disassociate()
|
|
|
+ wrappedHandle.disassociate(disassociationReason(reason), log)
|
|
|
|
|
|
case StopEvent(reason, _, AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)) ⇒
|
|
|
// Invalidate exposed but still unfinished promise. The underlying association disappeared, so after
|
|
|
@@ -555,18 +580,18 @@ private[transport] class ProtocolStateActor( |
|
|
case _ ⇒ Disassociated(Unknown)
|
|
|
}
|
|
|
handlerFuture foreach { _ notify disassociateNotification }
|
|
|
- wrappedHandle.disassociate()
|
|
|
+ wrappedHandle.disassociate(disassociationReason(reason), log)
|
|
|
|
|
|
case StopEvent(reason, _, ListenerReady(handler, wrappedHandle)) ⇒
|
|
|
val disassociateNotification = reason match {
|
|
|
case FSM.Failure(info: DisassociateInfo) ⇒ Disassociated(info)
|
|
|
case _ ⇒ Disassociated(Unknown)
|
|
|
}
|
|
|
handler notify disassociateNotification
|
|
|
- wrappedHandle.disassociate()
|
|
|
+ wrappedHandle.disassociate(disassociationReason(reason), log)
|
|
|
|
|
|
- case StopEvent(_, _, InboundUnassociated(_, wrappedHandle)) ⇒
|
|
|
- wrappedHandle.disassociate()
|
|
|
+ case StopEvent(reason, _, InboundUnassociated(_, wrappedHandle)) ⇒
|
|
|
+ wrappedHandle.disassociate(disassociationReason(reason), log)
|
|
|
|
|
|
}
|
|
|
|
|
|
@@ -650,4 +675,10 @@ private[transport] class ProtocolStateActor( |
|
|
} catch {
|
|
|
case NonFatal(e) ⇒ throw new AkkaProtocolException("Error writing ASSOCIATE to transport", e)
|
|
|
}
|
|
|
+
|
|
|
+ private def disassociationReason(reason: FSM.Reason): String = reason match {
|
|
|
+ case FSM.Normal ⇒ "the ProtocolStateActor was stopped normally"
|
|
|
+ case FSM.Shutdown ⇒ "the ProtocolStateActor was shutdown"
|
|
|
+ case FSM.Failure(ex) ⇒ s"the ProtocolStateActor failed: $ex"
|
|
|
+ }
|
|
|
}
|