Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

!test Migrate multi node testkit to Netty 4. #539

Merged
merged 1 commit into from
Aug 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cluster/src/multi-jvm/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="info">
<appender-ref ref="STDOUT" />
</root>
<logger name="io.netty.util.Recycler" level="ERROR" />
mdedetrich marked this conversation as resolved.
Show resolved Hide resolved
<logger name="io.netty.buffer.PoolThreadCache" level="ERROR" />
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Migrate to netty 4
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.pekko.remote.testconductor.RemoteConnection")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.PlayerHandler")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelOpen")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelClosed")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelBound")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelUnbound")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.writeComplete")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.exceptionCaught")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelConnected")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.channelDisconnected")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.PlayerHandler.messageReceived")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ServerFSM.channel")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ServerFSM.this")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ConductorHandler")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.clients")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.channelConnected")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.channelDisconnected")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.ConductorHandler.messageReceived")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ProtobufDecoder")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ProtobufDecoder.decode")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.ProtobufEncoder")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ProtobufEncoder.encode")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.MsgEncoder")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.MsgEncoder.encode")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.apply")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.getAddrString")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.RemoteConnection.shutdown")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory.getPipeline")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.TestConductorPipelineFactory.this")
ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.remote.testconductor.MsgDecoder")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.MsgDecoder.decode")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller.connection")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.unapply")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.unapply")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.unapply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.channel")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.copy")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.channel")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.copy$default$1")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected.this")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.channel")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.copy")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.copy$default$1")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM.copy$default$1")

# For Scala 3 these are also needed
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Connected._1")
ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.remote.testconductor.ClientFSM#Data._1")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.remote.testconductor.Controller#CreateServerFSM._1")
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,21 @@ package org.apache.pekko.remote.testconductor
import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap

import scala.annotation.nowarn
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.reflect.classTag
import scala.util.control.NoStackTrace

import RemoteConnection.getAddrString
import io.netty.channel.{ Channel, ChannelHandlerContext, ChannelInboundHandlerAdapter }
import io.netty.channel.ChannelHandler.Sharable
import language.postfixOps
import org.jboss.netty.channel.{
Channel,
ChannelHandlerContext,
ChannelStateEvent,
MessageEvent,
SimpleChannelUpstreamHandler
}

import org.apache.pekko
import pekko.PekkoException
import pekko.ConfigurationException
import pekko.PekkoException
import pekko.actor.{
Actor,
ActorRef,
Expand Down Expand Up @@ -286,32 +282,33 @@ trait Conductor { this: TestConductorExt =>
*
* INTERNAL API.
*/
@Sharable
private[pekko] class ConductorHandler(_createTimeout: Timeout, controller: ActorRef, log: LoggingAdapter)
extends SimpleChannelUpstreamHandler {
extends ChannelInboundHandlerAdapter {

implicit val createTimeout: Timeout = _createTimeout
val clients = new ConcurrentHashMap[Channel, ActorRef]()

override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val channel = event.getChannel
override def channelActive(ctx: ChannelHandlerContext): Unit = {
val channel = ctx.channel()
log.debug("connection from {}", getAddrString(channel))
val fsm: ActorRef =
Await.result((controller ? Controller.CreateServerFSM(channel)).mapTo(classTag[ActorRef]), Duration.Inf)
clients.put(channel, fsm)
}

override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val channel = event.getChannel
override def channelInactive(ctx: ChannelHandlerContext): Unit = {
val channel = ctx.channel()
log.debug("disconnect from {}", getAddrString(channel))
val fsm = clients.get(channel)
fsm ! Controller.ClientDisconnected
clients.remove(channel)
}

override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
val channel = event.getChannel
log.debug("message from {}: {}", getAddrString(channel), event.getMessage)
event.getMessage match {
override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = {
val channel = ctx.channel()
log.debug("message from {}: {}", getAddrString(channel), msg)
msg match {
case msg: NetworkOp =>
clients.get(channel) ! msg
case msg =>
Expand All @@ -320,6 +317,11 @@ private[pekko] class ConductorHandler(_createTimeout: Timeout, controller: Actor
}
}

@nowarn("msg=deprecated")
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
log.error("channel {} exception {}", ctx.channel(), cause)
ctx.close()
}
}

/**
Expand Down Expand Up @@ -398,10 +400,10 @@ private[pekko] class ServerFSM(val controller: ActorRef, val channel: Channel)
log.warning("client {} sent unsupported message {}", getAddrString(channel), msg)
stop()
case Event(ToClient(msg: UnconfirmedClientOp), _) =>
channel.write(msg)
channel.writeAndFlush(msg)
stay()
case Event(ToClient(msg), None) =>
channel.write(msg)
channel.writeAndFlush(msg)
stay().using(Some(sender()))
case Event(ToClient(msg), _) =>
log.warning("cannot send {} while waiting for previous ACK", msg)
Expand Down Expand Up @@ -436,7 +438,7 @@ private[pekko] class Controller(private var initialParticipants: Int, controller
import Controller._

val settings = TestConductor().Settings
val connection = RemoteConnection(
val connection: RemoteConnection = RemoteConnection(
Server,
controllerPort,
settings.ServerSocketWorkerPoolSize,
Expand Down Expand Up @@ -472,7 +474,7 @@ private[pekko] class Controller(private var initialParticipants: Int, controller

override def receive = LoggingReceive {
case CreateServerFSM(channel) =>
val (ip, port) = channel.getRemoteAddress match {
val (ip, port) = channel.remoteAddress() match {
case s: InetSocketAddress => (s.getAddress.getHostAddress, s.getPort)
case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser
}
Expand Down Expand Up @@ -525,12 +527,13 @@ private[pekko] class Controller(private var initialParticipants: Int, controller
case Remove(node) =>
barrier ! BarrierCoordinator.RemoveClient(node)
}
case GetNodes => sender() ! nodes.keys
case GetSockAddr => sender() ! connection.getLocalAddress
case GetNodes => sender() ! nodes.keys
case GetSockAddr =>
sender() ! connection.channelFuture.sync().channel().localAddress()
}

override def postStop(): Unit = {
RemoteConnection.shutdown(connection)
connection.shutdown()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@ package org.apache.pekko.remote.testconductor

import scala.concurrent.duration._

import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.{ MessageToMessageDecoder, MessageToMessageEncoder }
import language.implicitConversions
import org.jboss.netty.channel.Channel
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder

import org.apache.pekko
import pekko.actor.Address
import pekko.remote.testconductor.{ TestConductorProtocol => TCP }
Expand Down Expand Up @@ -74,7 +71,7 @@ private[pekko] case object Done extends Done {

private[pekko] final case class Remove(node: RoleName) extends CommandOp

private[pekko] class MsgEncoder extends OneToOneEncoder {
private[pekko] class MsgEncoder extends MessageToMessageEncoder[AnyRef] {

implicit def address2proto(addr: Address): TCP.Address =
TCP.Address.newBuilder
Expand All @@ -90,7 +87,11 @@ private[pekko] class MsgEncoder extends OneToOneEncoder {
case Direction.Both => TCP.Direction.Both
}

def encode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = msg match {
override def encode(ctx: ChannelHandlerContext, msg: AnyRef, out: java.util.List[AnyRef]): Unit = {
out.add(encode0(msg))
}

private def encode0(msg: AnyRef): AnyRef = msg match {
case x: NetworkOp =>
val w = TCP.Wrapper.newBuilder
x match {
Expand Down Expand Up @@ -136,7 +137,7 @@ private[pekko] class MsgEncoder extends OneToOneEncoder {
}
}

private[pekko] class MsgDecoder extends OneToOneDecoder {
private[pekko] class MsgDecoder extends MessageToMessageDecoder[AnyRef] {

implicit def address2scala(addr: TCP.Address): Address =
Address(addr.getProtocol, addr.getSystem, addr.getHost, addr.getPort)
Expand All @@ -147,7 +148,11 @@ private[pekko] class MsgDecoder extends OneToOneDecoder {
case TCP.Direction.Both => Direction.Both
}

def decode(ctx: ChannelHandlerContext, ch: Channel, msg: AnyRef): AnyRef = msg match {
override def decode(ctx: ChannelHandlerContext, msg: AnyRef, out: java.util.List[AnyRef]): Unit = {
out.add(decode0(msg))
}

private def decode0(msg: AnyRef): AnyRef = msg match {
case w: TCP.Wrapper if w.getAllFields.size == 1 =>
if (w.hasHello) {
val h = w.getHello
Expand Down
Loading