Skip to content
2 changes: 1 addition & 1 deletion .idea/kotlinc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@

[versions]
polar = "2.3.0"

[libraries]
polar = { module = "top.polar:api", version.ref = "polar" }

[bundles]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,107 @@
package dev.slne.surf.queue.api

import dev.slne.surf.core.api.common.SurfCoreApi
import dev.slne.surf.core.api.common.server.SurfServer
import dev.slne.surf.core.api.common.surfCoreApi
import dev.slne.surf.queue.api.service.SurfQueueService
import it.unimi.dsi.fastutil.objects.Object2IntMap
import it.unimi.dsi.fastutil.objects.ObjectList
import java.util.*

/**
* Public API interface for a server-specific player queue.
*
* Each instance represents the queue for a single target server, identified by [serverName].
* All queue operations are suspend functions and safe to call from any coroutine context.
*/
interface SurfQueue {

/** The name of the target server this queue belongs to. */
val serverName: String

fun server() = surfCoreApi.getServerByName(serverName)
/**
* Returns the [SurfServer] instance for this queue's target server.
*/
fun server() = SurfCoreApi.getServerByName(serverName)

/**
* Enqueues [uuid] using the priority resolved from LuckPerms.
*
* @return `true` if the player was newly added, `false` if already queued.
*/
suspend fun enqueue(uuid: UUID): Boolean

/**
* Enqueues [uuid] with an explicit [priority].
* Priorities above [RedisQueueScore.MAX_PRIORITY] are capped automatically.
*
* @return `true` if the player was newly added, `false` if already queued.
*/
suspend fun enqueue(uuid: UUID, priority: Int): Boolean

/**
* Removes [uuid] from the queue.
*
* @return `true` if the player was removed, `false` if they were not queued.
*/
suspend fun dequeue(uuid: UUID): Boolean

/**
* Returns `true` if [uuid] is currently in the queue.
*/
suspend fun isQueued(uuid: UUID): Boolean

/**
* Returns the 0-based position of [uuid] in the queue, or `null` if not queued.
*/
suspend fun getPosition(uuid: UUID): Int?

/**
* Returns the total number of players currently in the queue.
*/
suspend fun size(): Int

/**
* Returns `true` if the queue is paused. A paused queue stops processing transfers.
*/
suspend fun isPaused(): Boolean

/** Pauses the queue. */
suspend fun pause()

/** Resumes the queue. */
suspend fun resume()

suspend fun getAllUuidsWithPosition(): Collection<Object2IntMap.Entry<UUID>>
/**
* Returns all queued UUIDs together with their 1-based position.
*
* @deprecated Use [getAllUuidsOrderedByPosition] for better performance.
*/
@Deprecated(
"Use getAllUuidsOrderedByPosition for better performance",
ReplaceWith("getAllUuidsOrderedByPosition()")
)
suspend fun getAllUuidsWithPosition(): ObjectList<Object2IntMap.Entry<UUID>>

/**
* Returns all queued UUIDs in ascending position order (position 1 first).
*/
suspend fun getAllUuidsOrderedByPosition(): ObjectList<UUID>

@OptIn(InternalSurfQueueApi::class)
companion object {
/**
* Returns the [SurfQueue] for the given [serverName], or creates a new one if it doesn't exist.
*/
fun byServer(serverName: String) = SurfQueueService.instance.get(serverName)

/**
* Returns the [SurfQueue] for the given [server], or creates a new one if it doesn't exist.
*/
fun byServer(server: SurfServer) = byServer(server.name)
}
}

/**
* Convenience extension to retrieve the queue for this server.
*/
fun SurfServer.queue() = SurfQueue.byServer(this)
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package dev.slne.surf.queue.common

import dev.slne.surf.queue.common.queue.AbstractSurfQueue
import dev.slne.surf.queue.common.queue.AbstractQueue
import dev.slne.surf.queue.common.queue.QueueTicker
import dev.slne.surf.queue.common.queue.RedisQueueService
import dev.slne.surf.queue.common.redis.RedisInstance
import dev.slne.surf.surfapi.core.api.component.SurfComponentApi
import dev.slne.surf.surfapi.core.api.util.requiredService
import org.jetbrains.annotations.MustBeInvokedByOverriders

abstract class SurfQueueInstance {
abstract class QueueInstance { // Implementations are responsible for starting the queue ticker task
protected abstract val componentOwner: Any

@MustBeInvokedByOverriders
Expand All @@ -24,14 +25,16 @@ abstract class SurfQueueInstance {

@MustBeInvokedByOverriders
open suspend fun disable() {
QueueTicker.dispose()

SurfComponentApi.disable(componentOwner)
RedisInstance.get().disconnect()
}

abstract fun createQueue(serverName: String): AbstractSurfQueue
abstract fun createQueue(serverName: String): AbstractQueue

companion object {
val instance = requiredService<SurfQueueInstance>()
val instance = requiredService<QueueInstance>()
fun get() = instance
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package dev.slne.surf.queue.common.queue

import dev.slne.surf.queue.api.SurfQueue
import dev.slne.surf.queue.common.hook.priority.LuckpermsPriorityHook
import dev.slne.surf.surfapi.core.api.util.logger
import it.unimi.dsi.fastutil.objects.Object2IntMap
import it.unimi.dsi.fastutil.objects.ObjectArrayList
import it.unimi.dsi.fastutil.objects.ObjectList
import org.jetbrains.annotations.MustBeInvokedByOverriders
import java.time.Instant
import java.util.*
import java.util.concurrent.atomic.AtomicInteger

/**
* Base implementation of [SurfQueue] backed by Redis.
*
* Subclasses must supply a [serverName] and may override [onEnqueued] / [onDequeued]
* for platform-specific side effects (e.g., metrics). They may also override [tick]
* to add periodic processing, but **must** call `super.tick()`.
*
* Ordering is determined by a packed Redis score that encodes priority, enqueue
* timestamp (relative to a per-queue epoch), and a monotonic sequence number for
* tie-breaking within the same millisecond.
*
* @param serverName The name of the server this queue targets.
*/
abstract class AbstractQueue(override val serverName: String) : SurfQueue {

/**
* Redis keys used for queue storage and synchronization.
*/
protected val keys = RedisQueueKeys(serverName)

/**
* Persistent storage for queue entries and scores.
*/
protected val store = RedisQueueStore(keys)

/**
* Distributed lock manager for this queue.
*/
protected val lockManager = RedisQueueLockManager(keys)

/**
* Millisecond epoch used to make timestamps relative, reducing score magnitude.
*/
protected val epochMs = store.initEpochMs()

/**
* Monotonically increasing sequence counter used to break ties when
* multiple players enqueue within the same millisecond. Wraps around
* at MAX_SEQUENCE; by the time it wraps, the millisecond will have
* advanced so no collision occurs.
*/
private val enqueueSequence = AtomicInteger(0)

/** Number of times [tick] has been called since creation. */
var tickCount = 0
private set

companion object {
private val log = logger()

/**
* Caps [priority] to [RedisQueueScore.MAX_PRIORITY] and logs a warning if it exceeds the limit.
*/
fun fixPriority(uuid: UUID, priority: Int): Int {
return if (priority <= RedisQueueScore.MAX_PRIORITY) {
priority
} else {
log.atWarning()
.log(
"Priority %d for %s exceeds max representable priority, capping to %d",
priority,
uuid,
RedisQueueScore.MAX_PRIORITY
)

RedisQueueScore.MAX_PRIORITY
}
}
}

/**
* Increments [tickCount]. Subclasses that override this **must** invoke `super.tick()`.
*/
@MustBeInvokedByOverriders
open suspend fun tick() {
tickCount++
}

override suspend fun enqueue(uuid: UUID): Boolean {
val priority = LuckpermsPriorityHook.getPriority(uuid)
return enqueue(uuid, priority)
}

override suspend fun enqueue(uuid: UUID, priority: Int): Boolean {
val priorityFixed = fixPriority(uuid, priority)
val now = Instant.now().toEpochMilli()
val sequence = enqueueSequence.getAndUpdate { current ->
if (current >= RedisQueueScore.MAX_SEQUENCE) 0 else current + 1
}

val score = RedisQueueScore.pack(
priorityFixed,
now - epochMs,
sequence
)

val meta = QueueEntry(uuid, now, priorityFixed)
val added = store.enqueueIfAbsent(uuid, meta, score)

if (added) {
onEnqueued()
log.atInfo()
.log("Enqueued %s in queue %s with priority %d", uuid, serverName, priorityFixed)
}

return added
}

/**
* Called after a player is successfully enqueued. Override for side effects such as
* recording metrics. No-op by default.
*/
protected open fun onEnqueued() {}

/**
* Called after a player is successfully dequeued. Override for side effects such as
* recording metrics. No-op by default.
*/
protected open fun onDequeued() {}

override suspend fun dequeue(uuid: UUID): Boolean {
val removed = store.dequeue(uuid)
if (removed) {
onDequeued()
}
return removed
}

override suspend fun isQueued(uuid: UUID): Boolean {
return store.isQueued(uuid)
}

override suspend fun getPosition(uuid: UUID): Int? {
return store.rank(uuid)
}

@Deprecated(
"Use getAllUuidsOrderedByPosition for better performance",
replaceWith = ReplaceWith("getAllUuidsOrderedByPosition()")
)
override suspend fun getAllUuidsWithPosition(): ObjectList<Object2IntMap.Entry<UUID>> {
val entries = store.readAllEntries()
val uuidsWithPosition = ObjectArrayList<Object2IntMap.Entry<UUID>>(entries.size)

for ((index, entry) in entries.withIndex()) {
uuidsWithPosition.add(Object2IntMap.entry(entry.value, index + 1))
}

return uuidsWithPosition
}

override suspend fun getAllUuidsOrderedByPosition(): ObjectList<UUID> {
val entries = store.readAllEntries()
val uuids = ObjectArrayList<UUID>(entries.size)

for (entry in entries) {
uuids.add(entry.value)
}

return uuids
}

override suspend fun size(): Int {
return store.size()
}

override suspend fun isPaused(): Boolean {
return store.isPaused()
}

override suspend fun resume() {
store.setPaused(false)
}

override suspend fun pause() {
store.setPaused(true)
}

/**
* Returns the raw [QueueEntry] metadata for [uuid], or `null` if not queued.
*/
suspend fun getEntryMeta(uuid: UUID): QueueEntry? = store.getMeta(uuid)

/**
* Returns the packed [RedisQueueScore] for [uuid], or `null` if not queued.
*/
suspend fun getEntryScore(uuid: UUID): RedisQueueScore? = store.getScore(uuid)

/**
* Returns the last-seen timestamp (epoch ms) for [uuid], or `null` if not recorded.
* Used to track disconnected players within their grace period.
*/
suspend fun getEntryLastSeen(uuid: UUID): Long? = store.getLastSeen(uuid)

/**
* Returns the number of transfer retry attempts for [uuid], or `null` if not queued.
*/
suspend fun getEntryRetryCount(uuid: UUID): Int? = store.getRetryCount(uuid)
}
Loading
Loading