Skip to content

Commit

Permalink
Merge pull request #105 from antoniojimeneznieto/feature/add-masking
Browse files Browse the repository at this point in the history
Add Error Masking to UringSystem in Response to Issue #33
  • Loading branch information
armanbilge committed Aug 26, 2023
2 parents da04247 + b075456 commit 746e7c0
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 9 deletions.
19 changes: 12 additions & 7 deletions uring/src/main/scala/fs2/io/uring/Uring.scala
Expand Up @@ -31,14 +31,19 @@ import scala.scalanative.unsafe.Ptr
private[uring] final class Uring[F[_]](ring: UringExecutorScheduler)(implicit F: Async[F]) {

private[this] val noopRelease: Int => F[Unit] = _ => F.unit
private[this] val noopMask: Int => Boolean = _ => false

def call(prep: Ptr[io_uring_sqe] => Unit): F[Int] =
exec(prep)(noopRelease)
def call(prep: Ptr[io_uring_sqe] => Unit, mask: Int => Boolean = noopMask): F[Int] =
exec(prep, mask)(noopRelease)

def bracket(prep: Ptr[io_uring_sqe] => Unit)(release: Int => F[Unit]): Resource[F, Int] =
Resource.makeFull[F, Int](poll => poll(exec(prep)(release(_))))(release(_))
def bracket(prep: Ptr[io_uring_sqe] => Unit, mask: Int => Boolean = noopMask)(
release: Int => F[Unit]
): Resource[F, Int] =
Resource.makeFull[F, Int](poll => poll(exec(prep, mask)(release(_))))(release(_))

private def exec(prep: Ptr[io_uring_sqe] => Unit)(release: Int => F[Unit]): F[Int] =
private def exec(prep: Ptr[io_uring_sqe] => Unit, mask: Int => Boolean)(
release: Int => F[Unit]
): F[Int] =
F.cont {
new Cont[F, Int, Int] {
def apply[G[_]](implicit
Expand All @@ -59,13 +64,13 @@ private[uring] final class Uring[F[_]](ring: UringExecutorScheduler)(implicit F:
G.unit,
// if cannot cancel, fallback to get
get.flatMap { rtn =>
if (rtn < 0) G.raiseError(IOExceptionHelper(-rtn))
if (rtn < 0 && !mask(-rtn)) G.raiseError(IOExceptionHelper(-rtn))
else lift(release(rtn))
}
)
)
}
.flatTap(e => G.raiseWhen(e < 0)(IOExceptionHelper(-e)))
.flatTap(e => G.raiseWhen(e < 0 && !mask(-e))(IOExceptionHelper(-e)))
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions uring/src/main/scala/fs2/io/uring/net/UringSocket.scala
Expand Up @@ -34,6 +34,7 @@ import fs2.io.uring.unsafe.util._
import java.io.IOException
import scala.scalanative.libc.errno._
import scala.scalanative.posix.sys.socket._
import scala.scalanative.posix.errno._
import scala.scalanative.unsafe._
import scala.scalanative.unsigned._

Expand Down Expand Up @@ -69,9 +70,9 @@ private[net] final class UringSocket[F[_]](

def reads: Stream[F, Byte] = Stream.repeatEval(read(defaultReadSize)).unNoneTerminate.unchunks

def endOfInput: F[Unit] = ring.call(io_uring_prep_shutdown(_, fd, 0)).void
def endOfInput: F[Unit] = ring.call(io_uring_prep_shutdown(_, fd, 0), _ == ENOTCONN).void

def endOfOutput: F[Unit] = ring.call(io_uring_prep_shutdown(_, fd, 1)).void
def endOfOutput: F[Unit] = ring.call(io_uring_prep_shutdown(_, fd, 1), _ == ENOTCONN).void

def isOpen: F[Boolean] = F.pure(true)

Expand Down
16 changes: 16 additions & 0 deletions uring/src/test/scala/fs2/io/uring/net/TcpSocketSuite.scala
Expand Up @@ -230,4 +230,20 @@ class TcpSocketSuite extends UringSuite {
}
}

test("endOfOutput / endOfInput ignores ENOTCONN") {
sg.serverResource().use { case (bindAddress, clients) =>
sg.client(bindAddress).surround(IO.sleep(100.millis)).background.surround {
clients
.take(1)
.foreach { socket =>
socket.write(Chunk.array("fs2.rocks".getBytes)) *>
IO.sleep(1.second) *>
socket.endOfOutput *> socket.endOfInput
}
.compile
.drain
}
}
}

}

0 comments on commit 746e7c0

Please sign in to comment.