Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unix Sockets #12

Merged
merged 3 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions uring/src/main/scala/fs2/io/uring/net/UringSocketGroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private final class UringSocketGroup[F[_]](implicit F: Async[F], dns: Dns[F])
if (listen(fd, 65535) == 0)
F.unit
else
F.raiseError(new IOException(errno.toString))
F.raiseError(IOExceptionHelper(errno))
}.flatten

bindF *> listenF *> UringSocket.getLocalAddress(fd)
Expand Down Expand Up @@ -126,6 +126,6 @@ private final class UringSocketGroup[F[_]](implicit F: Async[F], dns: Dns[F])

object UringSocketGroup {

def apply[F[_]](implicit F: Async[F]): SocketGroup[F] = new UringSocketGroup
def apply[F[_]: Async]: SocketGroup[F] = new UringSocketGroup

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2022 Arman Bilge
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2
package io.uring
package net
package unixsocket

import cats.effect.kernel.Async
import cats.effect.kernel.Resource
import cats.syntax.all._
import fs2.io.file.Files
import fs2.io.file.Path
import fs2.io.net.Socket
import fs2.io.net.unixsocket.UnixSocketAddress
import fs2.io.net.unixsocket.UnixSockets
import fs2.io.uring.unsafe.syssocket._
import fs2.io.uring.unsafe.sysun._
import fs2.io.uring.unsafe.sysunOps._
import fs2.io.uring.unsafe.uring._
import fs2.io.uring.unsafe.util._

import scala.scalanative.libc.errno._
import scala.scalanative.posix.sys.socket.{bind => _, _}
import scala.scalanative.unsafe._
import scala.scalanative.unsigned._

private[net] final class UringUnixSockets[F[_]: Files](implicit F: Async[F])
extends UnixSockets[F] {

def client(address: UnixSocketAddress): Resource[F, Socket[F]] =
Resource.eval(Uring[F]).flatMap { implicit ring =>
openSocket.flatMap { fd =>
Resource.eval {
toSockaddrUn(address.path).use { addr =>
ring.call(io_uring_prep_connect(_, fd, addr, sizeof[sockaddr_un].toUInt))
}
} *> UringSocket(ring, fd, null)
}
}

def server(
address: UnixSocketAddress,
deleteIfExists: Boolean,
deleteOnClose: Boolean
): Stream[F, Socket[F]] =
Stream.eval(Uring[F]).flatMap { implicit ring =>
for {

_ <- Stream.bracket(Files[F].deleteIfExists(Path(address.path)).whenA(deleteIfExists)) {
_ => Files[F].deleteIfExists(Path(address.path)).whenA(deleteOnClose)
}

fd <- Stream.resource(openSocket)

_ <- Stream.eval {
val bindF = toSockaddrUn(address.path).use { addr =>
F.delay[F[Unit]] {
if (bind(fd, addr, sizeof[sockaddr_un].toUInt) == 0)
F.unit
else
F.raiseError(IOExceptionHelper(errno))
}.flatten
}

val listenF = F.delay {
if (listen(fd, 65535) == 0)
F.unit
else
F.raiseError(IOExceptionHelper(errno))
}.flatten

bindF *> listenF
}

socket <- Stream
.resource {
val accept = ring.bracket(io_uring_prep_accept(_, fd, null, null, 0))(closeSocket(_))
accept
.flatMap(UringSocket(ring, _, null))
.attempt
.map(_.toOption)
}
.repeat
.unNone

} yield socket
}

private def toSockaddrUn(path: String): Resource[F, Ptr[sockaddr]] =
Resource.make(F.delay(Zone.open()))(z => F.delay(z.close())).evalMap[Ptr[sockaddr]] {
implicit z =>
val pathBytes = path.getBytes
if (pathBytes.length > 107)
F.raiseError(new IllegalArgumentException(s"Path too long: $path"))
else
F.delay {
val addr = alloc[sockaddr_un]()
addr.sun_family = AF_UNIX.toUShort
toPtr(pathBytes, addr.sun_path.at(0))
addr.asInstanceOf[Ptr[sockaddr]]
}
}

private def openSocket(implicit ring: Uring[F]): Resource[F, Int] =
Resource.make[F, Int] {
F.delay(socket(AF_UNIX, SOCK_STREAM, 0))
}(closeSocket(_))

private def closeSocket(fd: Int)(implicit ring: Uring[F]): F[Unit] =
ring.call(io_uring_prep_close(_, fd)).void

}

object UringUnixSockets {

def apply[F[_]: Async: Files]: UringUnixSockets[F] = new UringUnixSockets

}
26 changes: 26 additions & 0 deletions uring/src/main/scala/fs2/io/uring/unsafe/syssocket.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2022 Arman Bilge
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2.io.uring.unsafe

import scala.scalanative.posix.sys.socket._
import scala.scalanative.unsafe._

@extern
private[uring] object syssocket {
def bind(sockfd: CInt, addr: Ptr[sockaddr], addrlen: socklen_t): CInt =
extern
}
43 changes: 43 additions & 0 deletions uring/src/main/scala/fs2/io/uring/unsafe/sysun.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2022 Arman Bilge
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2.io.uring.unsafe

import scala.scalanative.posix.sys.socket._
import scala.scalanative.unsafe._

private[uring] object sysun {
import Nat._
type _108 = Digit3[_1, _0, _8]

type sockaddr_un = CStruct2[
sa_family_t,
CArray[CChar, _108]
]

}

private[uring] object sysunOps {
import sysun._

implicit final class sockaddr_unOps(val sockaddr_un: Ptr[sockaddr_un]) extends AnyVal {
def sun_family: sa_family_t = sockaddr_un._1
def sun_family_=(sun_family: sa_family_t): Unit = sockaddr_un._1 = sun_family
def sun_path: CArray[CChar, _108] = sockaddr_un._2
def sun_path_=(sun_path: CArray[CChar, _108]): Unit = sockaddr_un._2 = sun_path
}

}
5 changes: 5 additions & 0 deletions uring/src/main/scala/fs2/io/uring/unsafe/util.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ private[uring] object util {
def toPtr(bytes: Array[Byte]): Ptr[Byte] =
bytes.asInstanceOf[ByteArray].at(0)

def toPtr(bytes: Array[Byte], ptr: Ptr[Byte]): Unit = {
memcpy(ptr, toPtr(bytes), bytes.length.toUInt)
()
}

def toArray(ptr: Ptr[Byte], length: Int): Array[Byte] = {
val bytes = new Array[Byte](length)
memcpy(toPtr(bytes), ptr, length.toUInt)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2022 Arman Bilge
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fs2
package io.uring
package net
package unixsocket

import cats.effect._
import fs2.io.net.unixsocket.UnixSocketAddress
import scala.concurrent.duration._

class UnixSocketsSuite extends UringSuite {

test("echoes") {
val address = UnixSocketAddress("fs2-unix-sockets-test.sock")

val server = UringUnixSockets[IO]
.server(address)
.map { client =>
client.reads.through(client.writes)
}
.parJoinUnbounded

def client(msg: Chunk[Byte]) = UringUnixSockets[IO].client(address).use { server =>
server.write(msg) *> server.endOfOutput *> server.reads.compile
.to(Chunk)
.map(read => assertEquals(read, msg))
}

val clients = (0 until 100).map(b => client(Chunk.singleton(b.toByte)))

(Stream.sleep_[IO](1.second) ++ Stream.emits(clients).evalMap(identity))
.concurrently(server)
.compile
.drain
}

}