Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update unix-domain-socket to Akka 2.6 #2586

Merged
merged 3 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ lazy val text = alpakkaProject("text", "text")

lazy val udp = alpakkaProject("udp", "udp")

lazy val unixdomainsocket = alpakkaProject("unix-domain-socket", "unixdomainsocket", Dependencies.UnixDomainSocket)
lazy val unixdomainsocket =
alpakkaProject("unix-domain-socket", "unixdomainsocket", Dependencies.UnixDomainSocket, fatalWarnings := true)

lazy val xml = alpakkaProject("xml", "xml", Dependencies.Xml, fatalWarnings := true)

Expand Down
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,8 @@ object Dependencies {

val UnixDomainSocket = Seq(
libraryDependencies ++= Seq(
"com.github.jnr" % "jffi" % "1.2.23", // classifier "complete", // Is the classifier needed anymore?
"com.github.jnr" % "jnr-unixsocket" % "0.28" // BSD/ApacheV2/CPL/MIT as per https://github.com/akka/alpakka/issues/620#issuecomment-348727265
"com.github.jnr" % "jffi" % "1.3.1", // classifier "complete", // Is the classifier needed anymore?
"com.github.jnr" % "jnr-unixsocket" % "0.38.5" // BSD/ApacheV2/CPL/MIT as per https://github.com/akka/alpakka/issues/620#issuecomment-348727265
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,25 @@
package akka.stream.alpakka.unixdomainsocket
package impl

import java.io.IOException
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey, Selector}
import java.nio.file.{Files, Path, Paths}

import akka.actor.{Cancellable, CoordinatedShutdown, ExtendedActorSystem, Extension}
import akka.annotation.InternalApi
import akka.event.{Logging, LoggingAdapter}
import akka.stream._
import akka.stream.alpakka.unixdomainsocket.scaladsl
import akka.stream.alpakka.unixdomainsocket.scaladsl.UnixDomainSocket.{
IncomingConnection,
OutgoingConnection,
ServerBinding
}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source, SourceQueueWithComplete}
import akka.util.ByteString
import akka.{Done, NotUsed}
import jnr.enxio.channels.NativeSelectorProvider
import jnr.unixsocket.{UnixServerSocketChannel, UnixSocketChannel, UnixSocketAddress => JnrUnixSocketAddress}

import java.io.IOException
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey, Selector}
import java.nio.file.{Files, Path, Paths}
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.control.NonFatal
Expand All @@ -32,8 +35,6 @@ import scala.util.{Failure, Success, Try}
@InternalApi
private[unixdomainsocket] object UnixDomainSocketImpl {

import scaladsl.UnixDomainSocket._

private sealed abstract class ReceiveContext(
val queue: SourceQueueWithComplete[ByteString],
val buffer: ByteBuffer
Expand Down Expand Up @@ -244,7 +245,7 @@ private[unixdomainsocket] object UnixDomainSocketImpl {
halfClose: Boolean,
receiveBufferSize: Int,
sendBufferSize: Int
)(sel: Selector, key: SelectionKey)(implicit mat: ActorMaterializer, ec: ExecutionContext): Unit = {
)(sel: Selector, key: SelectionKey)(implicit mat: Materializer, ec: ExecutionContext): Unit = {

val acceptingChannel = key.channel().asInstanceOf[UnixServerSocketChannel]
val acceptedChannel = try {
Expand Down Expand Up @@ -289,7 +290,7 @@ private[unixdomainsocket] object UnixDomainSocketImpl {
}

private def sendReceiveStructures(sel: Selector, receiveBufferSize: Int, sendBufferSize: Int, halfClose: Boolean)(
implicit mat: ActorMaterializer,
implicit mat: Materializer,
ec: ExecutionContext
): (SendReceiveContext, Flow[ByteString, ByteString, NotUsed]) = {

Expand Down Expand Up @@ -354,7 +355,7 @@ private[unixdomainsocket] object UnixDomainSocketImpl {
.to(Sink.ignore)
)

(sendReceiveContext, Flow.fromSinkAndSource(sendSink, Source.fromFutureSource(receiveSource)))
(sendReceiveContext, Flow.fromSinkAndSource(sendSink, Source.futureSource(receiveSource)))
}
}

Expand All @@ -364,21 +365,17 @@ private[unixdomainsocket] object UnixDomainSocketImpl {
@InternalApi
private[unixdomainsocket] abstract class UnixDomainSocketImpl(system: ExtendedActorSystem) extends Extension {

import scaladsl.UnixDomainSocket._
import UnixDomainSocketImpl._

private implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
private implicit val materializer: Materializer = Materializer(system)
import system.dispatcher

private val sel = NativeSelectorProvider.getInstance.openSelector

/** Override to customise reported log source */
protected def logSource: Class[_] = this.getClass

private val ioThread = new Thread(new Runnable {
override def run(): Unit =
nioEventLoop(sel, Logging(system, logSource))
}, "unix-domain-socket-io")
private val ioThread = new Thread(() => nioEventLoop(sel, Logging(system, logSource)), "unix-domain-socket-io")
ioThread.start()

CoordinatedShutdown(system).addTask(CoordinatedShutdown.PhaseServiceStop, "stopUnixDomainSocket") { () =>
Expand Down Expand Up @@ -455,12 +452,12 @@ private[unixdomainsocket] abstract class UnixDomainSocketImpl(system: ExtendedAc
}

Source
.fromFutureSource(incomingConnectionSource)
.futureSource(incomingConnectionSource)
.mapMaterializedValue(_ => serverBinding.future)

}

Source.lazily(bind).mapMaterializedValue(_.flatMap(identity))
Source.lazySource(bind).mapMaterializedValue(_.flatMap(identity))
}

protected def outgoingConnection(
Expand Down Expand Up @@ -494,7 +491,7 @@ private[unixdomainsocket] abstract class UnixDomainSocketImpl(system: ExtendedAc

Future.successful(
connectionFlow
.merge(Source.fromFuture(connectionFinished.future.map(_ => ByteString.empty)))
.merge(Source.future(connectionFinished.future.map(_ => ByteString.empty)))
.filter(_.nonEmpty) // We merge above so that we can get connection failures - we're not interested in the empty bytes though
.mapMaterializedValue { _ =>
connection match {
Expand All @@ -510,6 +507,6 @@ private[unixdomainsocket] abstract class UnixDomainSocketImpl(system: ExtendedAc
)
}

Flow.lazyInitAsync(connect).mapMaterializedValue(_.flatMap(_.get))
Flow.lazyFutureFlow(connect).mapMaterializedValue(_.flatten)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object UnixDomainSocket extends ExtensionId[UnixDomainSocket] with ExtensionIdPr
*/
override def get(system: ClassicActorSystemProvider): UnixDomainSocket = super.apply(system.classicSystem)

def lookup(): ExtensionId[_ <: Extension] =
def lookup: ExtensionId[_ <: Extension] =
UnixDomainSocket

def createExtension(system: ExtendedActorSystem): UnixDomainSocket =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object UnixDomainSocket extends ExtensionId[UnixDomainSocket] with ExtensionIdPr
override def createExtension(system: ExtendedActorSystem) =
new UnixDomainSocket(system)

override def lookup(): ExtensionId[_ <: Extension] =
override def lookup: ExtensionId[_ <: Extension] =
UnixDomainSocket

/**
Expand Down Expand Up @@ -73,7 +73,7 @@ final class UnixDomainSocket(system: ExtendedActorSystem) extends UnixDomainSock

import UnixDomainSocket._

private implicit val materializer: ActorMaterializer = ActorMaterializer()(system)
private implicit val materializer: Materializer = Materializer(system)

/**
* Creates a [[UnixDomainSocket.ServerBinding]] instance which represents a prospective Unix Domain Socket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,31 @@
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.*;
import akka.japi.Pair;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4;
import akka.stream.alpakka.unixdomainsocket.javadsl.UnixDomainSocket;
import akka.stream.javadsl.*;
import akka.stream.alpakka.unixdomainsocket.javadsl.UnixDomainSocket.IncomingConnection;
import akka.stream.alpakka.unixdomainsocket.javadsl.UnixDomainSocket.ServerBinding;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.javadsl.TestKit;
import akka.util.ByteString;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

import akka.japi.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Files;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import akka.stream.alpakka.unixdomainsocket.javadsl.UnixDomainSocket.IncomingConnection;
import akka.stream.alpakka.unixdomainsocket.javadsl.UnixDomainSocket.ServerBinding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assert.assertEquals;

public class UnixDomainSocketTest {
Expand All @@ -39,11 +41,11 @@ public class UnixDomainSocketTest {
private static final Logger log = LoggerFactory.getLogger(UnixDomainSocketTest.class);
private static ActorSystem system;
private static Materializer materializer;
private static int timeoutSeconds = 10;
private static final int timeoutSeconds = 10;

private static Pair<ActorSystem, Materializer> setupMaterializer() {
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final Materializer materializer = Materializer.createMaterializer(system);
return Pair.create(system, materializer);
}

Expand Down Expand Up @@ -92,7 +94,7 @@ public void aUnixDomainSocketShouldReceiveWhatIsSent() throws Exception {
return connection.handleWith(echo, materializer);
})
.toMat(Sink.ignore(), Keep.left())
.run(materializer);
.run(system);

// #outgoingConnection

Expand All @@ -103,7 +105,7 @@ public void aUnixDomainSocketShouldReceiveWhatIsSent() throws Exception {
final CompletionStage<Done> sent =
Source.single(sendBytes)
.via(UnixDomainSocket.get(system).outgoingConnection(path))
.runWith(Sink.ignore(), materializer);
.runWith(Sink.ignore(), system);

sent.toCompletableFuture().get(timeoutSeconds, TimeUnit.SECONDS);
assertEquals(sendBytes, received.get(timeoutSeconds, TimeUnit.SECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,27 @@

package docs.scaladsl

import java.io.IOException
import java.nio.file.{Files, Paths}
import java.util.concurrent.TimeUnit

import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.concurrent.duration._
import scala.util.{Failure, Success}

import akka.Done
import akka.actor.ActorSystem
import akka.stream.alpakka.testkit.scaladsl.LogCapturing
import akka.stream.alpakka.unixdomainsocket.UnixSocketAddress
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.alpakka.unixdomainsocket.scaladsl.UnixDomainSocket
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{Materializer, OverflowStrategy}
import akka.testkit._
import akka.util.ByteString
import org.scalatest._
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}

import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import java.io.IOException
import java.nio.file.{Files, Paths}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}

class UnixDomainSocketSpec
extends TestKit(ActorSystem("UnixDomainSocketSpec"))
with AnyWordSpecLike
Expand All @@ -39,7 +37,7 @@ class UnixDomainSocketSpec
override def afterAll: Unit =
TestKit.shutdownActorSystem(system)

implicit val ma: ActorMaterializer = ActorMaterializer()
implicit val ma: Materializer = Materializer(system)
implicit val ec: ExecutionContext = system.dispatcher

private val dir = Files.createTempDirectory("UnixDomainSocketSpec")
Expand Down