Skip to content

Commit

Permalink
Port all prep functions to Scala
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge committed Aug 28, 2023
1 parent d133eb2 commit 3e87161
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 77 deletions.
41 changes: 0 additions & 41 deletions uring/src/main/resources/scala-native/uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,3 @@ struct io_uring_sqe *fs2_io_uring_get_sqe(struct io_uring *ring) {
void fs2_io_uring_cq_advance(struct io_uring *ring, unsigned nr) {
io_uring_cq_advance(ring, nr);
}

void fs2_io_uring_prep_nop(struct io_uring_sqe *sqe) { io_uring_prep_nop(sqe); }

void fs2_io_uring_prep_accept(struct io_uring_sqe *sqe, int fd,
struct sockaddr *addr, socklen_t *addrlen,
int flags) {
io_uring_prep_accept(sqe, fd, addr, addrlen, flags);
}

void fs2_io_uring_prep_cancel64(struct io_uring_sqe *sqe, __u64 user_data,
int flags) {
io_uring_prep_cancel64(sqe, user_data, flags);
}

void fs2_io_uring_prep_close(struct io_uring_sqe *sqe, int fd) {
io_uring_prep_close(sqe, fd);
}

void fs2_io_uring_prep_connect(struct io_uring_sqe *sqe, int fd,
const struct sockaddr *addr, socklen_t addrlen) {
io_uring_prep_connect(sqe, fd, addr, addrlen);
}

void fs2_io_uring_prep_recv(struct io_uring_sqe *sqe, int sockfd, void *buf,
size_t len, int flags) {
io_uring_prep_recv(sqe, sockfd, buf, len, flags);
}

void fs2_io_uring_prep_send(struct io_uring_sqe *sqe, int sockfd,
const void *buf, size_t len, int flags) {
io_uring_prep_send(sqe, sockfd, buf, len, flags);
}

void fs2_io_uring_prep_shutdown(struct io_uring_sqe *sqe, int fd, int how) {
io_uring_prep_shutdown(sqe, fd, how);
}

void fs2_io_uring_prep_socket(struct io_uring_sqe *sqe, int domain, int type,
int protocol, unsigned int flags) {
io_uring_prep_socket(sqe, domain, type, protocol, flags);
}
2 changes: 1 addition & 1 deletion uring/src/main/scala/fs2/io/uring/net/UringSocket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.comcast.ip4s.IpAddress
import com.comcast.ip4s.SocketAddress
import fs2.Pipe
import fs2.io.net.Socket
import fs2.io.uring.unsafe.uring._
import fs2.io.uring.unsafe.uringOps._
import fs2.io.uring.unsafe.util._

import java.io.IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import fs2.io.net.Socket
import fs2.io.net.SocketGroup
import fs2.io.net.SocketOption
import fs2.io.uring.unsafe.netinetin._
import fs2.io.uring.unsafe.uring._
import fs2.io.uring.unsafe.uringOps._

import scala.scalanative.libc.errno._
import scala.scalanative.posix.sys.socket._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import fs2.io.net.unixsocket.UnixSockets
import fs2.io.uring.unsafe.syssocket._
import fs2.io.uring.unsafe.sysun._
import fs2.io.uring.unsafe.sysunOps._
import fs2.io.uring.unsafe.uring._
import fs2.io.uring.unsafe.uringOps._
import fs2.io.uring.unsafe.util._

import scala.scalanative.libc.errno._
Expand Down
136 changes: 104 additions & 32 deletions uring/src/main/scala/fs2/io/uring/unsafe/uring.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ private[uring] object uring {
final val IORING_SETUP_SINGLE_ISSUER = 1 << 12
final val IORING_SETUP_DEFER_TASKRUN = 1 << 13

final val IORING_OP_NOP = 0
final val IORING_OP_ACCEPT = 13
final val IORING_OP_ASYNC_CANCEL = 14
final val IORING_OP_CONNECT = 16
final val IORING_OP_CLOSE = 19
final val IORING_OP_SEND = 26
final val IORING_OP_RECV = 27
final val IORING_OP_SHUTDOWN = 34
final val IORING_OP_SOCKET = 45

type __u8 = CUnsignedChar
type __u16 = CUnsignedShort
type __s32 = CInt
Expand All @@ -40,6 +50,8 @@ private[uring] object uring {
type __kernel_time64_t = CLongLong
type __kernel_timespec = CStruct2[__kernel_time64_t, CLongLong]

type __kernel_rwf_t = CUnsignedInt

type io_uring = CStruct9[
io_uring_sq,
io_uring_cq,
Expand Down Expand Up @@ -134,67 +146,91 @@ private[uring] object uring {
@name("fs2_io_uring_cq_advance")
def io_uring_cq_advance(ring: Ptr[io_uring], nr: CUnsignedInt): Unit = extern

@name("fs2_io_uring_prep_nop")
def io_uring_prep_nop(sqe: Ptr[io_uring_sqe]): Unit = extern
}

private[uring] object uringOps {

import uring._

def io_uring_prep_rw(
op: Int,
sqe: Ptr[io_uring_sqe],
fd: Int,
addr: Ptr[_],
len: CUnsignedInt,
offset: __u64
): Unit = {
sqe.opcode = op.toUByte
sqe.flags = 0.toUByte
sqe.ioprio = 0.toUShort
sqe.fd = fd
sqe.off = offset
sqe.addr = if (addr eq null) 0.toULong else addr.toLong.toULong
sqe.len = len
sqe.rw_flags = 0.toUInt
sqe.__pad2(0) = 0.toULong
sqe.__pad2(1) = 0.toULong
sqe.__pad2(2) = 0.toULong
}

def io_uring_prep_nop(sqe: Ptr[io_uring_sqe]): Unit =
io_uring_prep_rw(IORING_OP_NOP, sqe, -1, null, 0.toUInt, 0.toULong)

@name("fs2_io_uring_prep_accept")
def io_uring_prep_accept(
sqe: Ptr[io_uring_sqe],
fd: CInt,
addr: Ptr[sockaddr],
addrlen: Ptr[socklen_t],
flags: CInt
): Unit = extern
): Unit = {
io_uring_prep_rw(
IORING_OP_ACCEPT,
sqe,
fd,
addr,
0.toUInt,
if (addrlen eq null) 0.toULong else addrlen.toLong.toULong
)
sqe.accept_flags = flags.toUInt
}

@name("fs2_io_uring_prep_cancel64")
def io_uring_prep_cancel64(sqe: Ptr[io_uring_sqe], user_data: __u64, flags: CInt): Unit = extern
def io_uring_prep_cancel64(sqe: Ptr[io_uring_sqe], user_data: __u64, flags: CInt): Unit = {
io_uring_prep_rw(IORING_OP_ASYNC_CANCEL, sqe, -1, null, 0.toUInt, 0.toULong);
sqe.addr = user_data;
sqe.cancel_flags = flags.toUInt
}

@name("fs2_io_uring_prep_close")
def io_uring_prep_close(sqe: Ptr[io_uring_sqe], fd: CInt): Unit = extern
def io_uring_prep_close(sqe: Ptr[io_uring_sqe], fd: CInt): Unit =
io_uring_prep_rw(IORING_OP_CLOSE, sqe, fd, null, 0.toUInt, 0.toULong)

@name("fs2_io_uring_prep_connect")
def io_uring_prep_connect(
sqe: Ptr[io_uring_sqe],
fd: CInt,
addr: Ptr[sockaddr],
addrlen: socklen_t
): Unit = extern
): Unit = io_uring_prep_rw(IORING_OP_CONNECT, sqe, fd, addr, 0.toUInt, addrlen)

@name("fs2_io_uring_prep_recv")
def io_uring_prep_recv(
sqe: Ptr[io_uring_sqe],
sockfd: CInt,
buf: Ptr[Byte],
len: size_t,
flags: CInt
): Unit = extern
): Unit = {
io_uring_prep_rw(IORING_OP_RECV, sqe, sockfd, buf, len.toUInt, 0.toULong)
sqe.msg_flags = flags.toUInt
}

@name("fs2_io_uring_prep_send")
def io_uring_prep_send(
sqe: Ptr[io_uring_sqe],
sockfd: CInt,
buf: Ptr[Byte],
len: size_t,
flags: CInt
): Unit = extern

@name("fs2_io_uring_prep_shutdown")
def io_uring_prep_shutdown(sqe: Ptr[io_uring_sqe], fd: CInt, how: CInt): Unit = extern

@name("fs2_io_uring_prep_socket")
def io_uring_prep_socket(
sqe: Ptr[io_uring_sqe],
domain: CInt,
`type`: CInt,
protocol: CInt,
flags: CUnsignedInt
): Unit = extern

}

private[uring] object uringOps {

import uring._
): Unit = {
io_uring_prep_rw(IORING_OP_SEND, sqe, sockfd, buf, len.toUInt, 0.toULong)
sqe.msg_flags = flags.toUInt
}

def io_uring_sqe_set_data[A <: AnyRef](sqe: Ptr[io_uring_sqe], data: A): Unit =
sqe.user_data = castRawPtrToLong(castObjectToRawPtr(data)).toULong
Expand All @@ -205,16 +241,52 @@ private[uring] object uringOps {
implicit final class io_uring_sqeOps(val io_uring_sqe: Ptr[io_uring_sqe]) extends AnyVal {
def opcode: __u8 = io_uring_sqe._1
def opcode_=(opcode: __u8): Unit = !io_uring_sqe.at1 = opcode

def flags: __u8 = io_uring_sqe._2
def flags_=(flags: __u8): Unit = !io_uring_sqe.at2 = flags

def ioprio: __u16 = io_uring_sqe._3
def ioprio_=(ioprio: __u16): Unit = !io_uring_sqe.at3 = ioprio

def fd: __s32 = io_uring_sqe._4
def fd_=(fd: __s32): Unit = !io_uring_sqe.at4 = fd

def off: __u64 = io_uring_sqe._5
def off_=(off: __u64): Unit = !io_uring_sqe.at5 = off

def addr: __u64 = io_uring_sqe._6
def addr_=(addr: __u64): Unit = !io_uring_sqe.at6 = addr

def len: __u32 = io_uring_sqe._7
def len_=(len: __u32): Unit = !io_uring_sqe.at7 = len

def rw_flags: __kernel_rwf_t = io_uring_sqe._8
def rw_flags_=(rw_flags: __kernel_rwf_t): Unit = !io_uring_sqe.at8 = rw_flags
def msg_flags: __u32 = io_uring_sqe._8
def msg_flags_=(msg_flags: __u32): Unit = !io_uring_sqe.at8 = msg_flags
def accept_flags: __u32 = io_uring_sqe._8
def accept_flags_=(accept_flags: __u32): Unit = !io_uring_sqe.at8 = accept_flags
def cancel_flags: __u32 = io_uring_sqe._8
def cancel_flags_=(cancel_flags: __u32): Unit = !io_uring_sqe.at8 = cancel_flags

def user_data: __u64 = io_uring_sqe._9
def user_data_=(user_data: __u64): Unit = !io_uring_sqe.at9 = user_data

def __pad2: CArray[__u64, Nat._3] = io_uring_sqe._10
}

def io_uring_prep_shutdown(sqe: Ptr[io_uring_sqe], fd: CInt, how: CInt): Unit =
io_uring_prep_rw(IORING_OP_SHUTDOWN, sqe, fd, null, how.toUInt, 0.toULong)

def io_uring_prep_socket(
sqe: Ptr[io_uring_sqe],
domain: CInt,
`type`: CInt,
protocol: CInt,
flags: CUnsignedInt
): Unit = {
io_uring_prep_rw(IORING_OP_SOCKET, sqe, domain, null, protocol.toUInt, `type`.toULong)
sqe.rw_flags = flags.toUInt
}

implicit final class io_uring_cqeOps(val io_uring_cqe: Ptr[io_uring_cqe]) extends AnyVal {
Expand Down
2 changes: 1 addition & 1 deletion uring/src/test/scala/fs2/io/uring/UringRuntimeSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package fs2.io.uring
import cats.effect.IO
import fs2.io.uring.unsafe.UringRuntime
import fs2.io.uring.unsafe.UringExecutorScheduler
import fs2.io.uring.unsafe.uring._
import fs2.io.uring.unsafe.uringOps._

import scala.concurrent.duration._

Expand Down

0 comments on commit 3e87161

Please sign in to comment.