From 6008c87633d697e86b2c3404b4662895fde80f17 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 28 Sep 2022 01:36:31 +0000 Subject: [PATCH 1/3] First attempt at unix sockets --- .../fs2/io/uring/net/UringSocketGroup.scala | 4 +- .../net/unixsocket/UringUnixSockets.scala | 130 ++++++++++++++++++ .../scala/fs2/io/uring/unsafe/sysun.scala | 43 ++++++ .../main/scala/fs2/io/uring/unsafe/util.scala | 3 + .../net/unixsocket/UnixSocketsSuite.scala | 36 +++++ 5 files changed, 214 insertions(+), 2 deletions(-) create mode 100644 uring/src/main/scala/fs2/io/uring/net/unixsocket/UringUnixSockets.scala create mode 100644 uring/src/main/scala/fs2/io/uring/unsafe/sysun.scala create mode 100644 uring/src/test/scala/fs2/io/uring/net/unixsocket/UnixSocketsSuite.scala diff --git a/uring/src/main/scala/fs2/io/uring/net/UringSocketGroup.scala b/uring/src/main/scala/fs2/io/uring/net/UringSocketGroup.scala index 92130b81..765eb719 100644 --- a/uring/src/main/scala/fs2/io/uring/net/UringSocketGroup.scala +++ b/uring/src/main/scala/fs2/io/uring/net/UringSocketGroup.scala @@ -82,7 +82,7 @@ private final class UringSocketGroup[F[_]](implicit F: Async[F], dns: Dns[F]) if (listen(fd, 65535) == 0) F.unit else - F.raiseError(new IOException(errno.toString)) + F.raiseError(IOExceptionHelper(errno)) }.flatten bindF *> listenF *> UringSocket.getLocalAddress(fd) @@ -126,6 +126,6 @@ private final class UringSocketGroup[F[_]](implicit F: Async[F], dns: Dns[F]) object UringSocketGroup { - def apply[F[_]](implicit F: Async[F]): SocketGroup[F] = new UringSocketGroup + def apply[F[_]: Async]: SocketGroup[F] = new UringSocketGroup } diff --git a/uring/src/main/scala/fs2/io/uring/net/unixsocket/UringUnixSockets.scala b/uring/src/main/scala/fs2/io/uring/net/unixsocket/UringUnixSockets.scala new file mode 100644 index 00000000..832ca3b2 --- /dev/null +++ b/uring/src/main/scala/fs2/io/uring/net/unixsocket/UringUnixSockets.scala @@ -0,0 +1,130 @@ +/* + * Copyright 2022 Arman Bilge + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fs2 +package io.uring +package net +package unixsocket + +import cats.effect.kernel.Async +import cats.effect.kernel.Resource +import cats.syntax.all._ +import fs2.io.file.Files +import fs2.io.file.Path +import fs2.io.net.Socket +import fs2.io.net.unixsocket.UnixSocketAddress +import fs2.io.net.unixsocket.UnixSockets +import fs2.io.uring.unsafe.sysun._ +import fs2.io.uring.unsafe.sysunOps._ +import fs2.io.uring.unsafe.uring._ +import fs2.io.uring.unsafe.util._ + +import scala.scalanative.libc.errno._ +import scala.scalanative.posix.sys.socket._ +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ + +private[net] final class UringUnixSockets[F[_]: Files](implicit F: Async[F]) + extends UnixSockets[F] { + + def client(address: UnixSocketAddress): Resource[F, Socket[F]] = + Resource.eval(Uring[F]).flatMap { implicit ring => + openSocket.flatMap { fd => + Resource.eval { + toSockaddrUn(address.path).use { addr => + ring.call(io_uring_prep_connect(_, fd, addr, sizeof[sockaddr_un].toUInt)) + } + } *> UringSocket(ring, fd, null) + } + } + + def server( + address: UnixSocketAddress, + deleteIfExists: Boolean, + deleteOnClose: Boolean + ): Stream[F, Socket[F]] = + Stream.eval(Uring[F]).flatMap { implicit ring => + for { + + _ <- Stream.bracket(Files[F].deleteIfExists(Path(address.path)).whenA(deleteIfExists)) { + _ => Files[F].deleteIfExists(Path(address.path)).whenA(deleteOnClose) + } + + fd <- Stream.resource(openSocket) + + localAddress <- Stream.eval { + val bindF = toSockaddrUn(address.path).use { addr => + F.delay { + if (bind(fd, addr, sizeof[sockaddr_un].toUInt) == 0) + F.unit + else + F.raiseError(IOExceptionHelper(errno)) + }.flatten + } + + val listenF = F.delay { + if (listen(fd, 65535) == 0) + F.unit + else + F.raiseError(IOExceptionHelper(errno)) + }.flatten + + bindF *> listenF *> UringSocket.getLocalAddress(fd) + } + + socket <- Stream + .resource { + val accept = ring.bracket(io_uring_prep_accept(_, fd, null, null, 0))(closeSocket(_)) + accept + .flatMap(UringSocket(ring, _, null)) + .attempt + .map(_.toOption) + } + .repeat + .unNone + + } yield socket + } + + private def toSockaddrUn(path: String): Resource[F, Ptr[sockaddr]] = + Resource.make(F.delay(Zone.open()))(z => F.delay(z.close())).evalMap { implicit z => + val pathBytes = path.getBytes + if (pathBytes.length > 107) + F.raiseError(new IllegalArgumentException(s"Path too long: $path")) + else + F.delay { + val addr = alloc[sockaddr_un]() + addr.sun_family = AF_UNIX.toUShort + toPtr(pathBytes, addr.sun_path.at(0)) + addr.asInstanceOf[Ptr[sockaddr]] + } + } + + private def openSocket(implicit ring: Uring[F]): Resource[F, Int] = + Resource.make[F, Int] { + F.delay(socket(AF_UNIX, SOCK_STREAM, 0)) + }(closeSocket(_)) + + private def closeSocket(fd: Int)(implicit ring: Uring[F]): F[Unit] = + ring.call(io_uring_prep_close(_, fd)).void + +} + +object UringUnixSockets { + + def apply[F[_]: Async: Files]: UringUnixSockets[F] = new UringUnixSockets + +} diff --git a/uring/src/main/scala/fs2/io/uring/unsafe/sysun.scala b/uring/src/main/scala/fs2/io/uring/unsafe/sysun.scala new file mode 100644 index 00000000..d2e982b0 --- /dev/null +++ b/uring/src/main/scala/fs2/io/uring/unsafe/sysun.scala @@ -0,0 +1,43 @@ +/* + * Copyright 2022 Arman Bilge + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fs2.io.uring.unsafe + +import scala.scalanative.posix.sys.socket._ +import scala.scalanative.unsafe._ + +private[uring] object sysun { + import Nat._ + type _108 = Digit3[_1, _0, _8] + + type sockaddr_un = CStruct2[ + sa_family_t, + CArray[CChar, _108] + ] + +} + +private[uring] object sysunOps { + import sysun._ + + implicit final class sockaddr_unOps(sockaddr_un: Ptr[sockaddr_un]) extends AnyVal { + def sun_family: sa_family_t = sockaddr_un._1 + def sun_family_=(sun_family: sa_family_t): Unit = sockaddr_un._1 = sun_family + def sun_path: CArray[CChar, _108] = sockaddr_un._2 + def sun_path_=(sun_path: CArray[CChar, _108]): Unit = sockaddr_un._2 = sun_path + } + +} diff --git a/uring/src/main/scala/fs2/io/uring/unsafe/util.scala b/uring/src/main/scala/fs2/io/uring/unsafe/util.scala index acb982ae..24a44af8 100644 --- a/uring/src/main/scala/fs2/io/uring/unsafe/util.scala +++ b/uring/src/main/scala/fs2/io/uring/unsafe/util.scala @@ -26,6 +26,9 @@ private[uring] object util { def toPtr(bytes: Array[Byte]): Ptr[Byte] = bytes.asInstanceOf[ByteArray].at(0) + def toPtr(bytes: Array[Byte], ptr: Ptr[Byte]): Unit = + memcpy(ptr, toPtr(bytes), bytes.length.toUInt) + def toArray(ptr: Ptr[Byte], length: Int): Array[Byte] = { val bytes = new Array[Byte](length) memcpy(toPtr(bytes), ptr, length.toUInt) diff --git a/uring/src/test/scala/fs2/io/uring/net/unixsocket/UnixSocketsSuite.scala b/uring/src/test/scala/fs2/io/uring/net/unixsocket/UnixSocketsSuite.scala new file mode 100644 index 00000000..4ac81862 --- /dev/null +++ b/uring/src/test/scala/fs2/io/uring/net/unixsocket/UnixSocketsSuite.scala @@ -0,0 +1,36 @@ +package fs2 +package io.uring +package net +package unixsocket + +import cats.effect._ +import fs2.io.net.unixsocket.UnixSocketAddress +import scala.concurrent.duration._ + +class UnixSocketsSuite extends UringSuite { + + test("echoes") { + val address = UnixSocketAddress("fs2-unix-sockets-test.sock") + + val server = UringUnixSockets[IO] + .server(address) + .map { client => + client.reads.through(client.writes) + } + .parJoinUnbounded + + def client(msg: Chunk[Byte]) = UringUnixSockets[IO].client(address).use { server => + server.write(msg) *> server.endOfOutput *> server.reads.compile + .to(Chunk) + .map(read => assertEquals(read, msg)) + } + + val clients = (0 until 100).map(b => client(Chunk.singleton(b.toByte))) + + (Stream.sleep_[IO](1.second) ++ Stream.emits(clients).evalMap(identity)) + .concurrently(server) + .compile + .drain + } + +} From 6b7e6b11b11d9f0a28aba2fa879aa4fc49708909 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 28 Sep 2022 02:27:49 +0000 Subject: [PATCH 2/3] Fixes --- .../net/unixsocket/UringUnixSockets.scala | 7 ++--- .../scala/fs2/io/uring/unsafe/syssocket.scala | 26 +++++++++++++++++++ .../net/unixsocket/UnixSocketsSuite.scala | 16 ++++++++++++ 3 files changed, 46 insertions(+), 3 deletions(-) create mode 100644 uring/src/main/scala/fs2/io/uring/unsafe/syssocket.scala diff --git a/uring/src/main/scala/fs2/io/uring/net/unixsocket/UringUnixSockets.scala b/uring/src/main/scala/fs2/io/uring/net/unixsocket/UringUnixSockets.scala index 832ca3b2..3833731b 100644 --- a/uring/src/main/scala/fs2/io/uring/net/unixsocket/UringUnixSockets.scala +++ b/uring/src/main/scala/fs2/io/uring/net/unixsocket/UringUnixSockets.scala @@ -27,13 +27,14 @@ import fs2.io.file.Path import fs2.io.net.Socket import fs2.io.net.unixsocket.UnixSocketAddress import fs2.io.net.unixsocket.UnixSockets +import fs2.io.uring.unsafe.syssocket._ import fs2.io.uring.unsafe.sysun._ import fs2.io.uring.unsafe.sysunOps._ import fs2.io.uring.unsafe.uring._ import fs2.io.uring.unsafe.util._ import scala.scalanative.libc.errno._ -import scala.scalanative.posix.sys.socket._ +import scala.scalanative.posix.sys.socket.{bind => _, _} import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ @@ -65,7 +66,7 @@ private[net] final class UringUnixSockets[F[_]: Files](implicit F: Async[F]) fd <- Stream.resource(openSocket) - localAddress <- Stream.eval { + _ <- Stream.eval { val bindF = toSockaddrUn(address.path).use { addr => F.delay { if (bind(fd, addr, sizeof[sockaddr_un].toUInt) == 0) @@ -82,7 +83,7 @@ private[net] final class UringUnixSockets[F[_]: Files](implicit F: Async[F]) F.raiseError(IOExceptionHelper(errno)) }.flatten - bindF *> listenF *> UringSocket.getLocalAddress(fd) + bindF *> listenF } socket <- Stream diff --git a/uring/src/main/scala/fs2/io/uring/unsafe/syssocket.scala b/uring/src/main/scala/fs2/io/uring/unsafe/syssocket.scala new file mode 100644 index 00000000..4211f0b1 --- /dev/null +++ b/uring/src/main/scala/fs2/io/uring/unsafe/syssocket.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2022 Arman Bilge + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fs2.io.uring.unsafe + +import scala.scalanative.posix.sys.socket._ +import scala.scalanative.unsafe._ + +@extern +private[uring] object syssocket { + def bind(sockfd: CInt, addr: Ptr[sockaddr], addrlen: socklen_t): CInt = + extern +} diff --git a/uring/src/test/scala/fs2/io/uring/net/unixsocket/UnixSocketsSuite.scala b/uring/src/test/scala/fs2/io/uring/net/unixsocket/UnixSocketsSuite.scala index 4ac81862..60ee078e 100644 --- a/uring/src/test/scala/fs2/io/uring/net/unixsocket/UnixSocketsSuite.scala +++ b/uring/src/test/scala/fs2/io/uring/net/unixsocket/UnixSocketsSuite.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2022 Arman Bilge + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package fs2 package io.uring package net From b1b5038c5abb1dcba433f477cc5047618c264449 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Wed, 28 Sep 2022 02:30:18 +0000 Subject: [PATCH 3/3] More fixes --- .../net/unixsocket/UringUnixSockets.scala | 25 ++++++++++--------- .../scala/fs2/io/uring/unsafe/sysun.scala | 2 +- .../main/scala/fs2/io/uring/unsafe/util.scala | 4 ++- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/uring/src/main/scala/fs2/io/uring/net/unixsocket/UringUnixSockets.scala b/uring/src/main/scala/fs2/io/uring/net/unixsocket/UringUnixSockets.scala index 3833731b..77e812f7 100644 --- a/uring/src/main/scala/fs2/io/uring/net/unixsocket/UringUnixSockets.scala +++ b/uring/src/main/scala/fs2/io/uring/net/unixsocket/UringUnixSockets.scala @@ -68,7 +68,7 @@ private[net] final class UringUnixSockets[F[_]: Files](implicit F: Async[F]) _ <- Stream.eval { val bindF = toSockaddrUn(address.path).use { addr => - F.delay { + F.delay[F[Unit]] { if (bind(fd, addr, sizeof[sockaddr_un].toUInt) == 0) F.unit else @@ -101,17 +101,18 @@ private[net] final class UringUnixSockets[F[_]: Files](implicit F: Async[F]) } private def toSockaddrUn(path: String): Resource[F, Ptr[sockaddr]] = - Resource.make(F.delay(Zone.open()))(z => F.delay(z.close())).evalMap { implicit z => - val pathBytes = path.getBytes - if (pathBytes.length > 107) - F.raiseError(new IllegalArgumentException(s"Path too long: $path")) - else - F.delay { - val addr = alloc[sockaddr_un]() - addr.sun_family = AF_UNIX.toUShort - toPtr(pathBytes, addr.sun_path.at(0)) - addr.asInstanceOf[Ptr[sockaddr]] - } + Resource.make(F.delay(Zone.open()))(z => F.delay(z.close())).evalMap[Ptr[sockaddr]] { + implicit z => + val pathBytes = path.getBytes + if (pathBytes.length > 107) + F.raiseError(new IllegalArgumentException(s"Path too long: $path")) + else + F.delay { + val addr = alloc[sockaddr_un]() + addr.sun_family = AF_UNIX.toUShort + toPtr(pathBytes, addr.sun_path.at(0)) + addr.asInstanceOf[Ptr[sockaddr]] + } } private def openSocket(implicit ring: Uring[F]): Resource[F, Int] = diff --git a/uring/src/main/scala/fs2/io/uring/unsafe/sysun.scala b/uring/src/main/scala/fs2/io/uring/unsafe/sysun.scala index d2e982b0..fceb1ab2 100644 --- a/uring/src/main/scala/fs2/io/uring/unsafe/sysun.scala +++ b/uring/src/main/scala/fs2/io/uring/unsafe/sysun.scala @@ -33,7 +33,7 @@ private[uring] object sysun { private[uring] object sysunOps { import sysun._ - implicit final class sockaddr_unOps(sockaddr_un: Ptr[sockaddr_un]) extends AnyVal { + implicit final class sockaddr_unOps(val sockaddr_un: Ptr[sockaddr_un]) extends AnyVal { def sun_family: sa_family_t = sockaddr_un._1 def sun_family_=(sun_family: sa_family_t): Unit = sockaddr_un._1 = sun_family def sun_path: CArray[CChar, _108] = sockaddr_un._2 diff --git a/uring/src/main/scala/fs2/io/uring/unsafe/util.scala b/uring/src/main/scala/fs2/io/uring/unsafe/util.scala index 24a44af8..3a4b9afd 100644 --- a/uring/src/main/scala/fs2/io/uring/unsafe/util.scala +++ b/uring/src/main/scala/fs2/io/uring/unsafe/util.scala @@ -26,8 +26,10 @@ private[uring] object util { def toPtr(bytes: Array[Byte]): Ptr[Byte] = bytes.asInstanceOf[ByteArray].at(0) - def toPtr(bytes: Array[Byte], ptr: Ptr[Byte]): Unit = + def toPtr(bytes: Array[Byte], ptr: Ptr[Byte]): Unit = { memcpy(ptr, toPtr(bytes), bytes.length.toUInt) + () + } def toArray(ptr: Ptr[Byte], length: Int): Array[Byte] = { val bytes = new Array[Byte](length)