diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml index f203762..ebd2e7b 100644 --- a/.idea/kotlinc.xml +++ b/.idea/kotlinc.xml @@ -2,6 +2,6 @@ \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 94a25f7..35eb1dd 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,6 +1,6 @@ - + \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index df596c0..afdab33 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,7 +1,8 @@ - [versions] +polar = "2.3.0" [libraries] +polar = { module = "top.polar:api", version.ref = "polar" } [bundles] diff --git a/surf-queue-api/src/main/kotlin/dev/slne/surf/queue/api/InternalSurfQueueApi.kt b/surf-queue-api/src/main/kotlin/dev/slne/surf/queue/api/InternalSurfQueueApi.kt index 8f3615a..9f44a6b 100644 --- a/surf-queue-api/src/main/kotlin/dev/slne/surf/queue/api/InternalSurfQueueApi.kt +++ b/surf-queue-api/src/main/kotlin/dev/slne/surf/queue/api/InternalSurfQueueApi.kt @@ -1,7 +1,18 @@ package dev.slne.surf.queue.api +import dev.slne.surf.surfapi.shared.api.annotation.InternalAPIMarker + +/** + * Opt-in annotation marking internal surf-queue APIs that are not intended for + * external use. + * + * Any declaration annotated with this marker requires an explicit `@OptIn(InternalSurfQueueApi::class)` + * at the call site, ensuring consumers acknowledge they are using an unsupported, + * internal API that may change without notice. + */ @RequiresOptIn( level = RequiresOptIn.Level.ERROR, message = "This API is internal and should not be used outside of the library" ) +@InternalAPIMarker annotation class InternalSurfQueueApi diff --git a/surf-queue-api/src/main/kotlin/dev/slne/surf/queue/api/SurfQueue.kt b/surf-queue-api/src/main/kotlin/dev/slne/surf/queue/api/SurfQueue.kt index c52a540..fb6c595 100644 --- a/surf-queue-api/src/main/kotlin/dev/slne/surf/queue/api/SurfQueue.kt +++ b/surf-queue-api/src/main/kotlin/dev/slne/surf/queue/api/SurfQueue.kt @@ -1,34 +1,107 @@ package dev.slne.surf.queue.api +import dev.slne.surf.core.api.common.SurfCoreApi import dev.slne.surf.core.api.common.server.SurfServer -import dev.slne.surf.core.api.common.surfCoreApi import dev.slne.surf.queue.api.service.SurfQueueService import it.unimi.dsi.fastutil.objects.Object2IntMap +import it.unimi.dsi.fastutil.objects.ObjectList import java.util.* +/** + * Public API interface for a server-specific player queue. + * + * Each instance represents the queue for a single target server, identified by [serverName]. + * All queue operations are suspend functions and safe to call from any coroutine context. + */ interface SurfQueue { + + /** The name of the target server this queue belongs to. */ val serverName: String - fun server() = surfCoreApi.getServerByName(serverName) + /** + * Returns the [SurfServer] instance for this queue's target server. + */ + fun server() = SurfCoreApi.getServerByName(serverName) + /** + * Enqueues [uuid] using the priority resolved from LuckPerms. + * + * @return `true` if the player was newly added, `false` if already queued. + */ suspend fun enqueue(uuid: UUID): Boolean + + /** + * Enqueues [uuid] with an explicit [priority]. + * Priorities above the maximum representable value are capped automatically. + * + * @return `true` if the player was newly added, `false` if already queued. + */ suspend fun enqueue(uuid: UUID, priority: Int): Boolean + + /** + * Removes [uuid] from the queue. + * + * @return `true` if the player was removed, `false` if they were not queued. + */ suspend fun dequeue(uuid: UUID): Boolean + + /** + * Returns `true` if [uuid] is currently in the queue. + */ suspend fun isQueued(uuid: UUID): Boolean + + /** + * Returns the 0-based position of [uuid] in the queue, or `null` if not queued. + */ suspend fun getPosition(uuid: UUID): Int? + + /** + * Returns the total number of players currently in the queue. + */ suspend fun size(): Int + /** + * Returns `true` if the queue is paused. A paused queue stops processing transfers. + */ suspend fun isPaused(): Boolean + + /** Pauses the queue. */ suspend fun pause() + + /** Resumes the queue. */ suspend fun resume() - suspend fun getAllUuidsWithPosition(): Collection> + /** + * Returns all queued UUIDs together with their 1-based position. + * + * @deprecated Use [getAllUuidsOrderedByPosition] for better performance. + */ + @Deprecated( + "Use getAllUuidsOrderedByPosition for better performance", + ReplaceWith("getAllUuidsOrderedByPosition()") + ) + suspend fun getAllUuidsWithPosition(): ObjectList> + + /** + * Returns all queued UUIDs in ascending position order (position 1 first). + */ + suspend fun getAllUuidsOrderedByPosition(): ObjectList @OptIn(InternalSurfQueueApi::class) companion object { - fun byServer(serverName: String) = SurfQueueService.instance.get(serverName) + /** + * Returns the [SurfQueue] for the given [serverName], or creates a new one if it doesn't exist. + */ + fun byServer(serverName: String) = SurfQueueService.instance.getQueueByName(serverName) + + /** + * Returns the [SurfQueue] for the given [server], or creates a new one if it doesn't exist. + */ fun byServer(server: SurfServer) = byServer(server.name) } } +/** + * Convenience extension to retrieve the queue for this server. + */ fun SurfServer.queue() = SurfQueue.byServer(this) \ No newline at end of file diff --git a/surf-queue-api/src/main/kotlin/dev/slne/surf/queue/api/service/SurfQueueService.kt b/surf-queue-api/src/main/kotlin/dev/slne/surf/queue/api/service/SurfQueueService.kt index 0c3521a..cfce2cd 100644 --- a/surf-queue-api/src/main/kotlin/dev/slne/surf/queue/api/service/SurfQueueService.kt +++ b/surf-queue-api/src/main/kotlin/dev/slne/surf/queue/api/service/SurfQueueService.kt @@ -4,9 +4,24 @@ import dev.slne.surf.queue.api.InternalSurfQueueApi import dev.slne.surf.queue.api.SurfQueue import dev.slne.surf.surfapi.core.api.util.requiredService +/** + * Internal service interface responsible for creating and caching [SurfQueue] instances. + * + * The concrete implementation is + * loaded via `@AutoService` and accessed through the [instance] companion property. + * This interface is marked as [InternalSurfQueueApi] and should not be used directly; + * prefer [SurfQueue.byServer] instead. + */ @InternalSurfQueueApi interface SurfQueueService { - fun get(serverName: String): SurfQueue + /** + * Returns the [SurfQueue] for the given [serverName], creating a new one if it + * does not already exist in the cache. + * + * @param serverName the name of the target server + * @return the queue instance for that server + */ + fun getQueueByName(serverName: String): SurfQueue companion object { val instance = requiredService() diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/QueueInstance.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/QueueInstance.kt new file mode 100644 index 0000000..aec53d7 --- /dev/null +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/QueueInstance.kt @@ -0,0 +1,69 @@ +package dev.slne.surf.queue.common + +import dev.slne.surf.queue.common.queue.AbstractQueue +import dev.slne.surf.queue.common.queue.tick.QueueTicker +import dev.slne.surf.queue.common.queue.RedisQueueService +import dev.slne.surf.queue.common.redis.RedisInstance +import dev.slne.surf.surfapi.core.api.component.SurfComponentApi +import dev.slne.surf.surfapi.core.api.util.requiredService +import org.jetbrains.annotations.MustBeInvokedByOverriders + +/** + * Abstract base class that bootstraps the queue system. + * + * Responsible for connecting to Redis, loading queue state, managing + * [SurfComponentApi] lifecycle, and tearing down the [QueueTicker]. + * Platform-specific subclasses (Paper, Velocity) provide the [componentOwner] + * and implement [createQueue] to return the appropriate [AbstractQueue] variant. + * + * Implementations are responsible for starting the [QueueTicker] at the + * appropriate time during their platform's lifecycle. + */ +abstract class QueueInstance { // Implementations are responsible for starting the queue ticker task + /** The platform-specific owner object used with [SurfComponentApi]. */ + protected abstract val componentOwner: Any + + /** + * Connects to Redis, fetches existing queues, and loads all [dev.slne.surf.surfapi.shared.api.component.SurfComponent]s. + * Subclasses **must** call `super.load()`. + */ + @MustBeInvokedByOverriders + open suspend fun load() { + RedisInstance.get().connect() + RedisQueueService.get().fetchFromRedis() + SurfComponentApi.load(componentOwner) + } + + /** + * Enables all [dev.slne.surf.surfapi.shared.api.component.SurfComponent]s. Subclasses **must** call `super.enable()`. + */ + @MustBeInvokedByOverriders + open suspend fun enable() { + SurfComponentApi.enable(componentOwner) + } + + /** + * Stops the [QueueTicker], disables all [dev.slne.surf.surfapi.shared.api.component.SurfComponent]s, and disconnects + * from Redis. Subclasses **must** call `super.disable()`. + */ + @MustBeInvokedByOverriders + open suspend fun disable() { + QueueTicker.dispose() + + SurfComponentApi.disable(componentOwner) + RedisInstance.get().disconnect() + } + + /** + * Factory method to create a new [AbstractQueue] for the given [serverName]. + * + * @param serverName the name of the target server + * @return a platform-specific queue implementation + */ + abstract fun createQueue(serverName: String): AbstractQueue + + companion object { + val instance = requiredService() + fun get() = instance + } +} \ No newline at end of file diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/SurfQueueInstance.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/SurfQueueInstance.kt deleted file mode 100644 index 6ce476f..0000000 --- a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/SurfQueueInstance.kt +++ /dev/null @@ -1,37 +0,0 @@ -package dev.slne.surf.queue.common - -import dev.slne.surf.queue.common.queue.AbstractSurfQueue -import dev.slne.surf.queue.common.queue.RedisQueueService -import dev.slne.surf.queue.common.redis.RedisInstance -import dev.slne.surf.surfapi.core.api.component.SurfComponentApi -import dev.slne.surf.surfapi.core.api.util.requiredService -import org.jetbrains.annotations.MustBeInvokedByOverriders - -abstract class SurfQueueInstance { - protected abstract val componentOwner: Any - - @MustBeInvokedByOverriders - open suspend fun load() { - RedisInstance.get().connect() - RedisQueueService.get().fetchFromRedis() - SurfComponentApi.load(componentOwner) - } - - @MustBeInvokedByOverriders - open suspend fun enable() { - SurfComponentApi.enable(componentOwner) - } - - @MustBeInvokedByOverriders - open suspend fun disable() { - SurfComponentApi.disable(componentOwner) - RedisInstance.get().disconnect() - } - - abstract fun createQueue(serverName: String): AbstractSurfQueue - - companion object { - val instance = requiredService() - fun get() = instance - } -} \ No newline at end of file diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/hook/priority/FallbackPriority.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/hook/priority/FallbackPriority.kt index 20eb103..a7ccb3a 100644 --- a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/hook/priority/FallbackPriority.kt +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/hook/priority/FallbackPriority.kt @@ -4,6 +4,12 @@ import dev.slne.surf.surfapi.shared.api.component.ComponentMeta import dev.slne.surf.surfapi.shared.api.component.requirement.ConditionalOnMissingComponent import java.util.UUID +/** + * Default [PriorityHook] implementation that always returns priority `0`. + * + * Activated automatically by `@ConditionalOnMissingComponent` when no other + * [PriorityHook] (e.g., [LuckpermsPriorityHook]) is available. + */ @ComponentMeta @ConditionalOnMissingComponent(PriorityHook::class) class FallbackPriority : PriorityHook { diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/hook/priority/LuckpermsPriorityHook.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/hook/priority/LuckpermsPriorityHook.kt index 3566774..21a5d9d 100644 --- a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/hook/priority/LuckpermsPriorityHook.kt +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/hook/priority/LuckpermsPriorityHook.kt @@ -5,10 +5,18 @@ import net.luckperms.api.LuckPermsProvider import net.luckperms.api.node.NodeType import java.util.* +/** + * [PriorityHook] implementation that resolves queue priority from a LuckPerms + * meta key. + * + * Reads the `queue-priority` meta value from the player's inherited nodes. + * If the meta key is absent or not a valid integer, the priority defaults to `0`. + */ //@ComponentMeta //@DependsOnClass(LuckPerms::class) object LuckpermsPriorityHook : PriorityHook { // companion object { + /** The LuckPerms meta key used to store a player's queue priority. */ const val KEY = "queue-priority" // } diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/hook/priority/PriorityHook.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/hook/priority/PriorityHook.kt index d22a044..9f6698e 100644 --- a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/hook/priority/PriorityHook.kt +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/hook/priority/PriorityHook.kt @@ -4,10 +4,25 @@ import dev.slne.surf.surfapi.core.api.component.SurfComponentApi import dev.slne.surf.surfapi.shared.api.component.SurfComponent import java.util.* +/** + * Hook interface for resolving a player's queue priority. + * + * Implementations are registered as [SurfComponent]s. The active implementation + * is determined by the component system — [LuckpermsPriorityHook] when LuckPerms + * is present, otherwise [FallbackPriority]. + */ interface PriorityHook : SurfComponent { + + /** + * Returns the queue priority for the player identified by [uuid]. + * + * @param uuid the player's unique identifier + * @return a non-negative priority value; higher values mean higher priority + */ suspend fun getPriority(uuid: UUID): Int companion object { + /** Returns the first loaded [PriorityHook] implementation. */ fun get() = SurfComponentApi.componentsOfTypeLoaded(PriorityHook::class.java).first() } } diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/AbstractQueue.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/AbstractQueue.kt new file mode 100644 index 0000000..0dd6be8 --- /dev/null +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/AbstractQueue.kt @@ -0,0 +1,213 @@ +package dev.slne.surf.queue.common.queue + +import dev.slne.surf.queue.api.SurfQueue +import dev.slne.surf.queue.common.hook.priority.LuckpermsPriorityHook +import dev.slne.surf.queue.common.queue.entry.QueueEntry +import dev.slne.surf.surfapi.core.api.util.logger +import it.unimi.dsi.fastutil.objects.Object2IntMap +import it.unimi.dsi.fastutil.objects.ObjectArrayList +import it.unimi.dsi.fastutil.objects.ObjectList +import org.jetbrains.annotations.MustBeInvokedByOverriders +import java.time.Instant +import java.util.* +import java.util.concurrent.atomic.AtomicInteger + +/** + * Base implementation of [SurfQueue] backed by Redis. + * + * Subclasses must supply a [serverName] and may override [onEnqueued] / [onDequeued] + * for platform-specific side effects (e.g., metrics). They may also override [tick] + * to add periodic processing, but **must** call `super.tick()`. + * + * Ordering is determined by a packed Redis score that encodes priority, enqueue + * timestamp (relative to a per-queue epoch), and a monotonic sequence number for + * tie-breaking within the same millisecond. + * + * @param serverName The name of the server this queue targets. + */ +abstract class AbstractQueue(override val serverName: String) : SurfQueue { + + /** + * Redis keys used for queue storage and synchronization. + */ + protected val keys = RedisQueueKeys(serverName) + + /** + * Persistent storage for queue entries and scores. + */ + protected val store = RedisQueueStore(keys) + + /** + * Distributed lock manager for this queue. + */ + protected val lockManager = RedisQueueLockManager(keys) + + /** + * Millisecond epoch used to make timestamps relative, reducing score magnitude. + */ + protected val epochMs = store.initEpochMs() + + /** + * Monotonically increasing sequence counter used to break ties when + * multiple players enqueue within the same millisecond. Wraps around + * at MAX_SEQUENCE; by the time it wraps, the millisecond will have + * advanced so no collision occurs. + */ + private val enqueueSequence = AtomicInteger(0) + + /** Number of times [tick] has been called since creation. */ + var tickCount = 0 + private set + + companion object { + private val log = logger() + + /** + * Caps [priority] to [RedisQueueScore.MAX_PRIORITY] and logs a warning if it exceeds the limit. + */ + fun fixPriority(uuid: UUID, priority: Int): Int { + return if (priority <= RedisQueueScore.MAX_PRIORITY) { + priority + } else { + log.atWarning() + .log( + "Priority %d for %s exceeds max representable priority, capping to %d", + priority, + uuid, + RedisQueueScore.MAX_PRIORITY + ) + + RedisQueueScore.MAX_PRIORITY + } + } + } + + /** + * Increments [tickCount]. Subclasses that override this **must** invoke `super.tick()`. + */ + @MustBeInvokedByOverriders + open suspend fun tick() { + tickCount++ + } + + override suspend fun enqueue(uuid: UUID): Boolean { + val priority = LuckpermsPriorityHook.getPriority(uuid) + return enqueue(uuid, priority) + } + + override suspend fun enqueue(uuid: UUID, priority: Int): Boolean { + val priorityFixed = fixPriority(uuid, priority) + val now = Instant.now().toEpochMilli() + val sequence = enqueueSequence.getAndUpdate { current -> + if (current >= RedisQueueScore.MAX_SEQUENCE) 0 else current + 1 + } + + val score = RedisQueueScore.pack( + priorityFixed, + now - epochMs, + sequence + ) + + val meta = QueueEntry(uuid, now, priorityFixed) + val added = store.enqueueIfAbsent(uuid, meta, score) + + if (added) { + onEnqueued() + log.atInfo() + .log("Enqueued %s in queue %s with priority %d", uuid, serverName, priorityFixed) + } + + return added + } + + /** + * Called after a player is successfully enqueued. Override for side effects such as + * recording metrics. No-op by default. + */ + protected open fun onEnqueued() {} + + /** + * Called after a player is successfully dequeued. Override for side effects such as + * recording metrics. No-op by default. + */ + protected open fun onDequeued() {} + + override suspend fun dequeue(uuid: UUID): Boolean { + val removed = store.dequeue(uuid) + if (removed) { + onDequeued() + } + return removed + } + + override suspend fun isQueued(uuid: UUID): Boolean { + return store.isQueued(uuid) + } + + override suspend fun getPosition(uuid: UUID): Int? { + return store.rank(uuid) + } + + @Deprecated( + "Use getAllUuidsOrderedByPosition for better performance", + replaceWith = ReplaceWith("getAllUuidsOrderedByPosition()") + ) + override suspend fun getAllUuidsWithPosition(): ObjectList> { + val entries = store.readAllEntries() + val uuidsWithPosition = ObjectArrayList>(entries.size) + + for ((index, entry) in entries.withIndex()) { + uuidsWithPosition.add(Object2IntMap.entry(entry.value, index + 1)) + } + + return uuidsWithPosition + } + + override suspend fun getAllUuidsOrderedByPosition(): ObjectList { + val entries = store.readAllEntries() + val uuids = ObjectArrayList(entries.size) + + for (entry in entries) { + uuids.add(entry.value) + } + + return uuids + } + + override suspend fun size(): Int { + return store.size() + } + + override suspend fun isPaused(): Boolean { + return store.isPaused() + } + + override suspend fun resume() { + store.setPaused(false) + } + + override suspend fun pause() { + store.setPaused(true) + } + + /** + * Returns the raw [QueueEntry] metadata for [uuid], or `null` if not queued. + */ + suspend fun getEntryMeta(uuid: UUID): QueueEntry? = store.getMeta(uuid) + + /** + * Returns the packed [RedisQueueScore] for [uuid], or `null` if not queued. + */ + suspend fun getEntryScore(uuid: UUID): RedisQueueScore? = store.getScore(uuid) + + /** + * Returns the last-seen timestamp (epoch ms) for [uuid], or `null` if not recorded. + * Used to track disconnected players within their grace period. + */ + suspend fun getEntryLastSeen(uuid: UUID): Long? = store.getLastSeen(uuid) + + /** + * Returns the number of transfer retry attempts for [uuid], or `null` if not queued. + */ + suspend fun getEntryRetryCount(uuid: UUID): Int? = store.getRetryCount(uuid) +} \ No newline at end of file diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/AbstractSurfQueue.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/AbstractSurfQueue.kt deleted file mode 100644 index b2a006d..0000000 --- a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/AbstractSurfQueue.kt +++ /dev/null @@ -1,105 +0,0 @@ -package dev.slne.surf.queue.common.queue - -import dev.slne.surf.queue.api.SurfQueue -import dev.slne.surf.queue.common.hook.priority.LuckpermsPriorityHook -import dev.slne.surf.surfapi.core.api.util.logger -import it.unimi.dsi.fastutil.objects.Object2IntMap -import java.time.Instant -import java.util.* - -abstract class AbstractSurfQueue(override val serverName: String) : SurfQueue { - protected val keys = RedisQueueKeys(serverName) - protected val store = RedisQueueStore(keys) - protected val lockManager = RedisQueueLockManager(keys) - protected val epochMs = store.initEpochMs() - - companion object { - private val log = logger() - - fun fixPriority(uuid: UUID, priority: Int): Int { - return if (priority <= RedisQueueScorePacker.MAX_PRIORITY) { - priority - } else { - log.atWarning() - .log( - "Priority %d for %s exceeds max representable priority, capping to %d", - priority, - uuid, - RedisQueueScorePacker.MAX_PRIORITY - ) - - RedisQueueScorePacker.MAX_PRIORITY - } - } - } - - override suspend fun enqueue(uuid: UUID): Boolean { - val priority = LuckpermsPriorityHook.getPriority(uuid) - return enqueue(uuid, priority) - } - - override suspend fun enqueue(uuid: UUID, priority: Int): Boolean { - val priorityFixed = fixPriority(uuid, priority) - val now = Instant.now().toEpochMilli() - - val score = RedisQueueScorePacker.pack( - priorityFixed, - now - epochMs, - 0 - ) // TODO: set sequence if it happens to enqueue multiple times in the same ms - - - val meta = QueueEntry(uuid, now, priorityFixed) - val added = store.enqueueIfAbsent(uuid, meta, score) - - if (added) { - onEnqueued() - log.atInfo() - .log("Enqueued %s in queue %s with priority %d", uuid, serverName, priorityFixed) - } - - return added - } - - protected open fun onEnqueued() {} - protected open fun onDequeued() {} - - override suspend fun dequeue(uuid: UUID): Boolean { - val removed = store.dequeue(uuid) - if (removed) { - onDequeued() - } - return removed - } - - override suspend fun isQueued(uuid: UUID): Boolean { - return store.isQueued(uuid) - } - - override suspend fun getPosition(uuid: UUID): Int? { - return store.rank(uuid) - } - - override suspend fun getAllUuidsWithPosition(): Collection> { - return store.readAllEntries() - .mapIndexed { index, entry -> - Object2IntMap.entry(entry.value, index + 1) - } - } - - override suspend fun size(): Int { - return store.size() - } - - override suspend fun isPaused(): Boolean { - return store.isPaused() - } - - override suspend fun resume() { - store.setPaused(false) - } - - override suspend fun pause() { - store.setPaused(true) - } -} \ No newline at end of file diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/QueueEntry.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/QueueEntry.kt deleted file mode 100644 index dc2d093..0000000 --- a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/QueueEntry.kt +++ /dev/null @@ -1,17 +0,0 @@ -package dev.slne.surf.queue.common.queue - -import java.io.Serial -import java.io.Serializable -import java.util.UUID - -data class QueueEntry( - val uuid: UUID, - val addedAt: Long, - val priority: Int, -) : Serializable { - - companion object { - @Serial - const val serialVersionUID = 2L - } -} \ No newline at end of file diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueKeys.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueKeys.kt index df413ec..e0568ce 100644 --- a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueKeys.kt +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueKeys.kt @@ -15,7 +15,7 @@ data class RedisQueueKeys( val epochMsKey = "$QUEUE_PREFIX$serverName$EPOCH_MS_SUFFIX" companion object { - val QUEUE_PREFIX = RedisInstance.namespaced("queue:") + val QUEUE_PREFIX = RedisInstance.namespaced("queue:v2:") val EPOCH_MS_KEY_PATTERN = "$QUEUE_PREFIX*$EPOCH_MS_SUFFIX" const val EPOCH_MS_SUFFIX = ":epoch-ms" } diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueLockManager.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueLockManager.kt index 3ff1135..35cd380 100644 --- a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueLockManager.kt +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueLockManager.kt @@ -1,12 +1,17 @@ package dev.slne.surf.queue.common.queue import dev.slne.surf.queue.common.redis.redisApi +import dev.slne.surf.surfapi.core.api.util.logger import kotlinx.coroutines.future.await class RedisQueueLockManager(private val keys: RedisQueueKeys) { private val transferLock = redisApi.redisson.getLock(keys.transferLockKey) private val cleanupLock = redisApi.redisson.getLock(keys.cleanupLockKey) + companion object { + private val log = logger() + } + suspend fun withTransferLock( block: suspend (acquired: Boolean) -> T ): T { @@ -17,7 +22,13 @@ class RedisQueueLockManager(private val keys: RedisQueueKeys) { try { return block(true) } finally { - transferLock.unlockAsync(threadId).await() + try { + transferLock.unlockAsync(threadId).await() + } catch (e: Exception) { + log.atWarning() + .withCause(e) + .log("Failed to release transfer lock for %s", keys.serverName) + } } } @@ -29,7 +40,13 @@ class RedisQueueLockManager(private val keys: RedisQueueKeys) { try { block() } finally { - cleanupLock.unlockAsync(threadId).await() + try { + cleanupLock.unlockAsync(threadId).await() + } catch (e: Exception) { + log.atWarning() + .withCause(e) + .log("Failed to release cleanup lock for %s", keys.serverName) + } } } } \ No newline at end of file diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueScore.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueScore.kt new file mode 100644 index 0000000..350405e --- /dev/null +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueScore.kt @@ -0,0 +1,112 @@ +package dev.slne.surf.queue.common.queue + +/** + * Packs queue metadata into a single 53-bit value stored as a Redis ZSET score (Double). + * + * The layout is designed to: + * - Preserve total ordering inside Redis sorted sets + * - Avoid precision loss (fits exactly into IEEE-754 double: 53 bits) + * - Support priority, timestamp, and tie-breaking sequence + * + * ## Bit layout: + * + * `priority:7`|`deltaMs:40`|`sequence:6` + * + * + * Total: 53 bits → exactly representable as Double + * + * ## Field details: + * + * - priority (7 bits) + * Higher priority should come first. + * To achieve this with ascending ZSET ordering, the value is inverted: + * `storedPriority = MAX_PRIORITY - priority` + * + * - deltaMs (40 bits) + * Time component (usually epoch delta in milliseconds). + * Provides the primary ordering. + * Range: ~34.8 years + * + * - sequence (6 bits) + * Tie-breaker for entries created within the same millisecond. + * Range: 0–63 (64 entries per millisecond). + * + * Ordering behavior in Redis ZSET: + * 1. Lower storedPriority → higher logical priority + * 2. Lower deltaMs → earlier timestamp + * 3. Lower sequence → earlier insertion within same millisecond + * + * Important: + * - The total bit size MUST NOT exceed 53 bits, otherwise precision loss occurs in Double. + * - All values must be within their defined ranges. + */ +@JvmInline +value class RedisQueueScore(val packed: Double) { + companion object { + private const val PRIORITY_BITS = 7 + private const val DELTA_MS_BITS = 40 + private const val SEQUENCE_BITS = 6 + + private const val DELTA_MS_SHIFT = SEQUENCE_BITS + private const val PRIORITY_SHIFT = DELTA_MS_BITS + SEQUENCE_BITS + + private const val SEQUENCE_MASK = (1L shl SEQUENCE_BITS) - 1 // 0x3F (63) + private const val DELTA_MS_MASK = (1L shl DELTA_MS_BITS) - 1 // 0xFFFFFFFFFF + private const val PRIORITY_MASK = (1L shl PRIORITY_BITS) - 1 // 0x7F + + const val MAX_PRIORITY = PRIORITY_MASK.toInt() + const val MAX_DELTA_MS = DELTA_MS_MASK + const val MAX_SEQUENCE = SEQUENCE_MASK.toInt() + + /** + * Packs priority, timestamp and sequence into a single Double score. + * + * @param priority logical priority (0..MAX_PRIORITY), higher = more important + * @param deltaMs time value (usually epoch delta in ms) + * @param sequence tie-breaker for same timestamp (0..MAX_SEQUENCE) + * + * @return packed score suitable for Redis ZSET + * + * @throws IllegalArgumentException if any value is out of range + */ + fun pack(priority: Int, deltaMs: Long, sequence: Int): RedisQueueScore { + require(priority in 0..MAX_PRIORITY) { "priority out of range" } + require(deltaMs in 0..MAX_DELTA_MS) { "deltaMs out of range" } + require(sequence in 0..MAX_SEQUENCE) { "sequence out of range" } + + val invertedPriority = (MAX_PRIORITY - priority).toLong() + + val value = + (invertedPriority shl PRIORITY_SHIFT) or + (deltaMs shl DELTA_MS_SHIFT) or + sequence.toLong() + + return RedisQueueScore(value.toDouble()) + } + + fun optional(value: Double?): RedisQueueScore? = value?.let { RedisQueueScore(it) } + } + + /** + * @return packed score as a Long + */ + private val packedLong get() = packed.toLong() + + /** + * @return logical priority (0..[MAX_PRIORITY]), higher = more important + */ + val priority: Int + get() = MAX_PRIORITY - ((packedLong shr PRIORITY_SHIFT) and PRIORITY_MASK).toInt() + + /** + * @return time value (usually epoch delta in ms) + */ + val deltaMs: Long + get() = (packedLong shr DELTA_MS_SHIFT) and DELTA_MS_MASK + + /** + * @return tie-breaker for same timestamp (0..[MAX_SEQUENCE]) + */ + val sequence: Int + get() = (packedLong and SEQUENCE_MASK).toInt() +} \ No newline at end of file diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueScorePacker.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueScorePacker.kt deleted file mode 100644 index bbc35e9..0000000 --- a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueScorePacker.kt +++ /dev/null @@ -1,43 +0,0 @@ -package dev.slne.surf.queue.common.queue - -object RedisQueueScorePacker { - private const val PRIORITY_BITS = 7 - private const val DELTA_MS_BITS = 42 - private const val SEQUENCE_BITS = 4 - - private const val DELTA_MS_SHIFT = SEQUENCE_BITS - private const val PRIORITY_SHIFT = DELTA_MS_BITS + SEQUENCE_BITS - - private const val SEQUENCE_MASK = (1L shl SEQUENCE_BITS) - 1 // 0xF - private const val DELTA_MS_MASK = (1L shl DELTA_MS_BITS) - 1 // 0x3FFFFFFFFFF - private const val PRIORITY_MASK = (1L shl PRIORITY_BITS) - 1 // 0x7F - - const val MAX_PRIORITY = PRIORITY_MASK.toInt() - const val MAX_DELTA_MS = DELTA_MS_MASK - const val MAX_SEQUENCE = SEQUENCE_MASK - - fun unpack(score: Double): Unpacked { - val value = score.toLong() - val priority = PRIORITY_MASK - ((value shr (DELTA_MS_BITS + SEQUENCE_BITS)) and PRIORITY_MASK).toInt() - val deltaMs = (value shr SEQUENCE_BITS) and DELTA_MS_MASK - val sequence = (value and SEQUENCE_MASK).toInt() - return Unpacked(priority.toInt(), deltaMs, sequence) - } - - fun pack(priority: Int, deltaMs: Long, sequence: Int): Double { - // Priority: invert for desired direction. - val invPriority = PRIORITY_MASK - (priority.toLong() and PRIORITY_MASK) - - require(invPriority in 0..PRIORITY_MASK) { "Priority bounds" } - require(deltaMs in 0..DELTA_MS_MASK) { "deltaMs out of range" } - require(sequence in 0..SEQUENCE_MASK) { "sequence out of range" } - - val value = (invPriority shl (DELTA_MS_BITS + SEQUENCE_BITS)) or - ((deltaMs and DELTA_MS_MASK) shl SEQUENCE_BITS) or - (sequence.toLong() and SEQUENCE_MASK) - - return value.toDouble() - } - - data class Unpacked(val priority: Int, val deltaMs: Long, val sequence: Int) -} \ No newline at end of file diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueService.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueService.kt index 43a9aec..e34af2e 100644 --- a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueService.kt +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueService.kt @@ -4,19 +4,20 @@ import com.github.benmanes.caffeine.cache.Caffeine import com.google.auto.service.AutoService import dev.slne.surf.queue.api.InternalSurfQueueApi import dev.slne.surf.queue.api.service.SurfQueueService -import dev.slne.surf.queue.common.SurfQueueInstance +import dev.slne.surf.queue.common.QueueInstance import dev.slne.surf.queue.common.redis.redisApi import dev.slne.surf.redis.libs.redisson.api.options.KeysScanOptions -import dev.slne.surf.surfapi.core.api.util.logger -import kotlinx.coroutines.reactive.asFlow +import kotlinx.coroutines.reactive.collect +import java.time.Duration @OptIn(InternalSurfQueueApi::class) @AutoService(SurfQueueService::class) class RedisQueueService : SurfQueueService { private val queues = Caffeine.newBuilder() - .build { serverName -> SurfQueueInstance.get().createQueue(serverName) } + .expireAfterWrite(Duration.ofSeconds(QUEUE_REFRESH_INTERVAL_SECONDS * 4L)) + .build { serverName -> QueueInstance.get().createQueue(serverName) } - override fun get(serverName: String) = queues.get(serverName) + override fun getQueueByName(serverName: String) = queues.get(serverName) fun getAll() = queues.asMap().values fun delete(serverName: String) = queues.invalidate(serverName) @@ -26,20 +27,16 @@ class RedisQueueService : SurfQueueService { .getKeys( KeysScanOptions.defaults() .pattern(RedisQueueKeys.EPOCH_MS_KEY_PATTERN) - ).asFlow() - .collect { - val serverName = - it.replaceFirst(RedisQueueKeys.QUEUE_PREFIX, "").replaceFirst(RedisQueueKeys.EPOCH_MS_SUFFIX, "") - - log.atInfo() - .log("Found queue for server $serverName in Redis, fetching...") - - get(serverName) - } + ).map(::extractServerName) + .collect(::getQueueByName) } + private fun extractServerName(key: String) = key + .replaceFirst(RedisQueueKeys.QUEUE_PREFIX, "") + .replaceFirst(RedisQueueKeys.EPOCH_MS_SUFFIX, "") + companion object { - private val log = logger() + const val QUEUE_REFRESH_INTERVAL_SECONDS = 30 fun get() = SurfQueueService.instance as RedisQueueService } } \ No newline at end of file diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueStore.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueStore.kt index e110f8e..704bc54 100644 --- a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueStore.kt +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/RedisQueueStore.kt @@ -1,9 +1,12 @@ package dev.slne.surf.queue.common.queue -import dev.slne.surf.queue.common.queue.codec.QueueEntryCodec +import dev.slne.surf.queue.common.queue.entry.QueueEntryCodec +import dev.slne.surf.queue.common.queue.entry.QueueEntry import dev.slne.surf.queue.common.redis.redisApi import dev.slne.surf.redis.codec.UUIDCodec import dev.slne.surf.redis.libs.redisson.api.BatchOptions +import dev.slne.surf.redis.libs.redisson.api.BatchResult +import dev.slne.surf.redis.libs.redisson.api.RBatch import dev.slne.surf.redis.libs.redisson.client.codec.IntegerCodec import dev.slne.surf.redis.libs.redisson.client.codec.LongCodec import dev.slne.surf.redis.libs.redisson.client.protocol.ScoredEntry @@ -13,7 +16,7 @@ import org.jetbrains.annotations.Blocking import java.time.Instant import java.util.* -class RedisQueueStore(private val keys: RedisQueueKeys) { +class RedisQueueStore(keys: RedisQueueKeys) { private val scoredSet = redisApi.redisson.getScoredSortedSet(keys.entriesKey, UUIDCodec.INSTANCE) private val metaMap = redisApi.redisson.getMap( keys.metaKey, @@ -32,38 +35,49 @@ class RedisQueueStore(private val keys: RedisQueueKeys) { private val epochMsBucket = redisApi.redisson.getBucket(keys.epochMsKey, LongCodec.INSTANCE) private val pausedBucket = redisApi.redisson.getBucket(keys.pausedKey, IntegerCodec.INSTANCE) - companion object { private fun atomicBatchOptions(): BatchOptions { return BatchOptions.defaults().executionMode(BatchOptions.ExecutionMode.IN_MEMORY_ATOMIC) } } + // region RBatch helper + private inline fun createAtomicBatch(block: RBatch.() -> R) = + redisApi.redisson.createBatch(atomicBatchOptions()).run(block) + + private suspend inline fun executeAtomicBatch(block: RBatch.() -> Unit): BatchResult<*> = createAtomicBatch { + block() + executeAsync().await() + } + + private fun RBatch.getQueueScoredSet() = getScoredSortedSet(scoredSet.name, scoredSet.codec) + private fun RBatch.getQueueMetaMap() = getMap(metaMap.name, metaMap.codec) + private fun RBatch.getQueueLastSeenMap() = getMap(lastSeenMap.name, lastSeenMap.codec) + private fun RBatch.getQueueRetryCountMap() = getMap(retryCountMap.name, retryCountMap.codec) + // endregion + @Blocking fun initEpochMs(): Long { epochMsBucket.setIfAbsent(Instant.now().toEpochMilli()) return epochMsBucket.get() } - suspend fun enqueueIfAbsent(uuid: UUID, meta: QueueEntry, score: Double): Boolean { - val batch = redisApi.redisson.createBatch(atomicBatchOptions()) - - val addAsync = batch.getScoredSortedSet(scoredSet.name, scoredSet.codec) - .addIfAbsentAsync(score, uuid) - batch.getMap(metaMap.name, metaMap.codec) - .fastPutIfAbsentAsync(uuid, meta) + suspend fun enqueueIfAbsent(uuid: UUID, meta: QueueEntry, score: RedisQueueScore): Boolean { + val result = executeAtomicBatch { + getQueueScoredSet().addIfAbsentAsync(score.packed, uuid) + getQueueMetaMap().fastPutIfAbsentAsync(uuid, meta) + } - batch.executeAsync().await() - return addAsync.await() + return result.responses.first() as Boolean } suspend fun dequeue(uuid: UUID): Boolean { - return batchRemove(uuid) + return batchRemove(uuid) } suspend fun isQueued(uuid: UUID): Boolean = scoredSet.containsAsync(uuid).await() suspend fun rank(uuid: UUID): Int? = scoredSet.rankAsync(uuid).await() - suspend fun getScore(uuid: UUID): Double? = scoredSet.getScoreAsync(uuid).await() + suspend fun getScore(uuid: UUID): RedisQueueScore? = RedisQueueScore.optional(scoredSet.getScoreAsync(uuid).await()) suspend fun size(): Int = scoredSet.sizeAsync().await() suspend fun top1(): UUID? = scoredSet.entryRangeAsync(0, 0).await().firstOrNull()?.value @@ -71,13 +85,15 @@ class RedisQueueStore(private val keys: RedisQueueKeys) { suspend fun readAllEntries(): Collection> = scoredSet.entryRangeAsync(0, -1).await() suspend fun entriesAfter(uuid: UUID, limit: Int): Collection> { val currentScore = getScore(uuid) ?: return emptyList() - return scoredSet.entryRangeAsync(currentScore, false, Double.MAX_VALUE, true, 0, limit).await() + return scoredSet.entryRangeAsync(currentScore.packed, false, Double.MAX_VALUE, true, 0, limit).await() } suspend fun incrementRetryCount(uuid: UUID): Int { return retryCountMap.addAndGetAsync(uuid, 1).await() } + suspend fun getRetryCount(uuid: UUID): Int? = retryCountMap.getAsync(uuid).await() + suspend fun clearRetryCount(uuid: UUID) { retryCountMap.removeAsync(uuid).await() } @@ -100,24 +116,29 @@ class RedisQueueStore(private val keys: RedisQueueKeys) { suspend fun readAllLastSeen(): Map = lastSeenMap.readAllMapAsync().await() ?: emptyMap() - suspend fun addOrUpdateScore(uuid: UUID, score: Double): Boolean { - return scoredSet.addAsync(score, uuid).await() + suspend fun addOrUpdateScore(uuid: UUID, score: RedisQueueScore): Boolean { + return scoredSet.addAsync(score.packed, uuid).await() } private suspend fun batchRemove(uuid: UUID): Boolean { - val batch = redisApi.redisson.createBatch(atomicBatchOptions()) - - val removeAsync = batch.getScoredSortedSet(scoredSet.name, scoredSet.codec) - .removeAsync(uuid) - batch.getMap(metaMap.name, metaMap.codec) - .removeAsync(uuid) - batch.getMap(lastSeenMap.name, lastSeenMap.codec) - .removeAsync(uuid) - batch.getMap(retryCountMap.name, retryCountMap.codec) - .removeAsync(uuid) - - batch.executeAsync().await() - return removeAsync.await() + try { + val result = executeAtomicBatch { + getQueueScoredSet().removeAsync(uuid) + getQueueMetaMap().removeAsync(uuid) + getQueueLastSeenMap().removeAsync(uuid) + getQueueRetryCountMap().removeAsync(uuid) + } + + return result.responses.first() as Boolean + } catch (e: Exception) { + // If the batch fails, we need try to remove the individual elements manually + runCatching { scoredSet.removeAsync(uuid).await() } + runCatching { metaMap.removeAsync(uuid).await() } + runCatching { lastSeenMap.removeAsync(uuid).await() } + runCatching { retryCountMap.removeAsync(uuid).await() } + + throw e + } } suspend fun deleteAll() { diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/entry/QueueEntry.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/entry/QueueEntry.kt new file mode 100644 index 0000000..612aec6 --- /dev/null +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/entry/QueueEntry.kt @@ -0,0 +1,16 @@ +package dev.slne.surf.queue.common.queue.entry + +import java.util.* + +/** + * Immutable data class representing a queued player entry. + * + * @property uuid the player's unique identifier + * @property addedAt the epoch millisecond timestamp when the player was enqueued + * @property priority the player's queue priority (higher = more important) + */ +data class QueueEntry( + val uuid: UUID, + val addedAt: Long, + val priority: Int, +) \ No newline at end of file diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/codec/QueueEntryCodec.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/entry/QueueEntryCodec.kt similarity index 72% rename from surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/codec/QueueEntryCodec.kt rename to surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/entry/QueueEntryCodec.kt index ddea1be..2aa68db 100644 --- a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/codec/QueueEntryCodec.kt +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/entry/QueueEntryCodec.kt @@ -1,16 +1,25 @@ -package dev.slne.surf.queue.common.queue.codec +package dev.slne.surf.queue.common.queue.entry -import dev.slne.surf.queue.common.queue.QueueEntry import dev.slne.surf.redis.libs.redisson.client.codec.BaseCodec import dev.slne.surf.redis.libs.redisson.client.protocol.Decoder import dev.slne.surf.redis.libs.redisson.client.protocol.Encoder import dev.slne.surf.redis.shaded.io.netty.buffer.Unpooled -import java.util.* +import java.util.UUID class QueueEntryCodec : BaseCodec() { + companion object { + // @formatter:off + private const val BUFFER_SIZE = ( + (Long.SIZE_BYTES * 2) // uuid + + (Long.SIZE_BYTES) // addedAt + + (Int.SIZE_BYTES) // priority + ) + // @formatter:on + } + private val encoder = Encoder { obj -> val entry = obj as QueueEntry - val buf = Unpooled.buffer() + val buf = Unpooled.buffer(BUFFER_SIZE, BUFFER_SIZE) try { buf.writeLong(entry.uuid.mostSignificantBits) diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/tick/QueueTicker.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/tick/QueueTicker.kt new file mode 100644 index 0000000..fc4770c --- /dev/null +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/tick/QueueTicker.kt @@ -0,0 +1,59 @@ +package dev.slne.surf.queue.common.queue.tick + +import dev.slne.surf.queue.common.queue.RedisQueueService +import dev.slne.surf.surfapi.core.api.util.logger +import kotlinx.coroutines.* +import java.util.concurrent.Executors +import kotlin.time.Duration.Companion.seconds + +/** + * Singleton that runs the periodic queue tick loop on a dedicated single thread. + * + * Every second it ticks all known queues via [dev.slne.surf.queue.common.queue.AbstractQueue.tick] and periodically + * refreshes the queue list from Redis (every [dev.slne.surf.queue.common.queue.RedisQueueService.Companion.QUEUE_REFRESH_INTERVAL_SECONDS]). + * Tick failures for individual queues are caught by [SafeQueueTick] so one queue + * cannot crash the entire loop. + */ +@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class) +object QueueTicker { + private val log = logger() + + private val dispatcher = Executors + .newSingleThreadExecutor { r -> Thread(r, "QueueTicker") } + .asCoroutineDispatcher() + + private val scope = CoroutineScope( + SupervisorJob() + + dispatcher + + CoroutineExceptionHandler { _, t -> + log.atSevere() + .withCause(t) + .log("Unhandled exception in QueueTicker:") + } + ) + + fun start() { + var secondsElapsed = 0 + + scope.launch { + while (isActive) { + delay(1.seconds) + secondsElapsed++ + + if (secondsElapsed % RedisQueueService.QUEUE_REFRESH_INTERVAL_SECONDS == 0) { + RedisQueueService.get().fetchFromRedis() + } + + for (queue in RedisQueueService.get().getAll()) { + SafeQueueTick.tickSafe(queue, "heartbeat") { + queue.tick() + } + } + } + } + } + + fun dispose() { + scope.cancel("QueueTicker disposed") + } +} \ No newline at end of file diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/tick/SafeQueueTick.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/tick/SafeQueueTick.kt new file mode 100644 index 0000000..4c99373 --- /dev/null +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/queue/tick/SafeQueueTick.kt @@ -0,0 +1,20 @@ +package dev.slne.surf.queue.common.queue.tick + +import dev.slne.surf.queue.common.queue.AbstractQueue +import dev.slne.surf.surfapi.core.api.util.logger +import kotlin.coroutines.cancellation.CancellationException + +object SafeQueueTick { + val log = logger() + + inline fun tickSafe(queue: AbstractQueue, component: String, block: () -> Unit) { + try { + block() + } catch (e: Exception) { + if (e is CancellationException) throw e + log.atWarning() + .withCause(e) + .log("Failed to tick %s for queue %s", component, queue.serverName) + } + } +} \ No newline at end of file diff --git a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/redis/RedisInstance.kt b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/redis/RedisInstance.kt index 9a60b7c..a797c79 100644 --- a/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/redis/RedisInstance.kt +++ b/surf-queue-common/src/main/kotlin/dev/slne/surf/queue/common/redis/RedisInstance.kt @@ -1,7 +1,9 @@ package dev.slne.surf.queue.common.redis +import com.google.auto.service.AutoService import dev.slne.surf.redis.RedisApi import dev.slne.surf.surfapi.core.api.util.requiredService +import net.kyori.adventure.util.Services import org.jetbrains.annotations.MustBeInvokedByOverriders abstract class RedisInstance { @@ -26,6 +28,9 @@ abstract class RedisInstance { fun get() = instance fun namespaced(key: String) = "surf-queue:$key" + + @AutoService(RedisInstance::class) + class Fallback : RedisInstance(), Services.Fallback } } diff --git a/surf-queue-paper/build.gradle.kts b/surf-queue-paper/build.gradle.kts index b6d8de3..5c4ed89 100644 --- a/surf-queue-paper/build.gradle.kts +++ b/surf-queue-paper/build.gradle.kts @@ -1,4 +1,5 @@ import dev.slne.surf.surfapi.gradle.util.registerRequired +import dev.slne.surf.surfapi.gradle.util.registerSoft plugins { id("dev.slne.surf.surfapi.gradle.paper-plugin") @@ -12,9 +13,11 @@ surfPaperPluginApi { authors.addAll(providers.gradleProperty("authors").map { it.split(",") }) serverDependencies { registerRequired("LuckPerms") + registerSoft("PolarLoader") } } dependencies { api(project(":surf-queue-common")) + compileOnly(libs.polar) } \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/PaperMain.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/PaperMain.kt index c6b21a8..ee3c4c0 100644 --- a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/PaperMain.kt +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/PaperMain.kt @@ -1,20 +1,20 @@ package dev.slne.surf.queue.paper import com.github.shynixn.mccoroutine.folia.SuspendingJavaPlugin -import dev.slne.surf.queue.common.SurfQueueInstance +import dev.slne.surf.queue.common.QueueInstance import org.bukkit.plugin.java.JavaPlugin class PaperMain : SuspendingJavaPlugin() { override suspend fun onLoadAsync() { - SurfQueueInstance.get().load() + QueueInstance.get().load() } override suspend fun onEnableAsync() { - SurfQueueInstance.get().enable() + QueueInstance.get().enable() } override suspend fun onDisableAsync() { - SurfQueueInstance.get().disable() + QueueInstance.get().disable() } } diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/PaperSurfQueueInstance.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/PaperSurfQueueInstance.kt index 80db9ad..4bd612d 100644 --- a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/PaperSurfQueueInstance.kt +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/PaperSurfQueueInstance.kt @@ -1,15 +1,41 @@ package dev.slne.surf.queue.paper import com.google.auto.service.AutoService -import dev.slne.surf.queue.common.SurfQueueInstance -import dev.slne.surf.queue.common.queue.AbstractSurfQueue -import dev.slne.surf.queue.paper.queue.PaperSurfQueue +import dev.slne.surf.queue.common.QueueInstance +import dev.slne.surf.queue.common.queue.AbstractQueue +import dev.slne.surf.queue.common.queue.tick.QueueTicker +import dev.slne.surf.queue.paper.commands.queueCommand +import dev.slne.surf.queue.paper.config.SurfQueueConfig +import dev.slne.surf.queue.paper.hook.startup.QueueStartHook +import dev.slne.surf.queue.paper.listener.PlayerKickedDueToFullServerListener +import dev.slne.surf.queue.paper.metrics.QueueMetricsLogger +import dev.slne.surf.queue.paper.queue.PaperQueueImpl +import dev.slne.surf.surfapi.bukkit.api.event.register -@AutoService(SurfQueueInstance::class) -class PaperSurfQueueInstance : SurfQueueInstance() { - override val componentOwner get() = dev.slne.surf.queue.paper.plugin +@AutoService(QueueInstance::class) +class PaperSurfQueueInstance : QueueInstance() { + override val componentOwner get() = plugin - override fun createQueue(serverName: String): AbstractSurfQueue { - return PaperSurfQueue(serverName) + override suspend fun load() { + SurfQueueConfig.init() + super.load() + QueueStartHook.get().onServerReady { + QueueTicker.start() + } + } + + override suspend fun enable() { + super.enable() + PlayerKickedDueToFullServerListener.register() + queueCommand() + } + + override suspend fun disable() { + super.disable() + QueueMetricsLogger.stop() + } + + override fun createQueue(serverName: String): AbstractQueue { + return PaperQueueImpl(serverName) } } \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/QueueCommand.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/QueueCommand.kt new file mode 100644 index 0000000..9c3b416 --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/QueueCommand.kt @@ -0,0 +1,23 @@ +package dev.slne.surf.queue.paper.commands + +import dev.jorel.commandapi.kotlindsl.commandAPICommand +import dev.slne.surf.queue.paper.commands.sub.metricsCommand +import dev.slne.surf.queue.paper.commands.sub.queueDequeue +import dev.slne.surf.queue.paper.commands.sub.queueCleanup +import dev.slne.surf.queue.paper.commands.sub.queueClear +import dev.slne.surf.queue.paper.commands.sub.queueEnqueue +import dev.slne.surf.queue.paper.commands.sub.queueInfo +import dev.slne.surf.queue.paper.commands.sub.queueList +import dev.slne.surf.queue.paper.permission.PaperQueuePermissions + +fun queueCommand() = commandAPICommand("squeue") { + withPermission(PaperQueuePermissions.COMMAND_QUEUE) + + queueCleanup() + queueClear() + queueDequeue() + queueEnqueue() + queueInfo() + queueList() + metricsCommand() +} \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueCleanupCommand.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueCleanupCommand.kt new file mode 100644 index 0000000..ab632e6 --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueCleanupCommand.kt @@ -0,0 +1,41 @@ +package dev.slne.surf.queue.paper.commands.sub + +import dev.jorel.commandapi.CommandAPICommand +import dev.jorel.commandapi.kotlindsl.getValue +import dev.jorel.commandapi.kotlindsl.subcommand +import dev.slne.surf.core.api.common.server.SurfServer +import dev.slne.surf.core.api.paper.command.argument.surfBackendServerArgument +import dev.slne.surf.queue.common.queue.RedisQueueService +import dev.slne.surf.queue.paper.permission.PaperQueuePermissions +import dev.slne.surf.queue.paper.queue.PaperQueueImpl +import dev.slne.surf.surfapi.bukkit.api.command.executors.anyExecutorSuspend +import dev.slne.surf.surfapi.core.api.messages.adventure.sendText + +fun CommandAPICommand.queueCleanup() = subcommand("cleanup") { + withPermission(PaperQueuePermissions.COMMAND_CLEANUP) + surfBackendServerArgument("server", optional = true) + + anyExecutorSuspend { sender, arguments -> + val server: SurfServer? by arguments + val serverName = server?.name ?: SurfServer.current().name + val queue = RedisQueueService.get().getQueueByName(serverName) as PaperQueueImpl + + val sizeBefore = queue.size() + queue.forceCleanup() + val sizeAfter = queue.size() + val removed = sizeBefore - sizeAfter + + sender.sendText { + appendSuccessPrefix() + success("Forced cleanup of queue '") + variableValue(serverName) + success("' complete. Removed ") + variableValue("$removed") + success(" expired entries (") + variableValue("$sizeBefore") + success(" —> ") + variableValue("$sizeAfter") + success(").") + } + } +} \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueClearCommand.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueClearCommand.kt new file mode 100644 index 0000000..4f1c6cb --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueClearCommand.kt @@ -0,0 +1,35 @@ +package dev.slne.surf.queue.paper.commands.sub + +import dev.jorel.commandapi.CommandAPICommand +import dev.jorel.commandapi.kotlindsl.getValue +import dev.jorel.commandapi.kotlindsl.subcommand +import dev.slne.surf.core.api.common.server.SurfServer +import dev.slne.surf.core.api.paper.command.argument.surfBackendServerArgument +import dev.slne.surf.queue.common.queue.RedisQueueService +import dev.slne.surf.queue.paper.permission.PaperQueuePermissions +import dev.slne.surf.queue.paper.queue.PaperQueueImpl +import dev.slne.surf.surfapi.bukkit.api.command.executors.anyExecutorSuspend +import dev.slne.surf.surfapi.core.api.messages.adventure.sendText + +fun CommandAPICommand.queueClear() = subcommand("clear") { + withPermission(PaperQueuePermissions.COMMAND_CLEAR) + surfBackendServerArgument("server", optional = true) + + anyExecutorSuspend { sender, arguments -> + val server: SurfServer? by arguments + val serverName = server?.name ?: SurfServer.current().name + val queue = RedisQueueService.get().getQueueByName(serverName) as PaperQueueImpl + + val size = queue.size() + queue.delete() + + sender.sendText { + appendSuccessPrefix() + success("Cleared queue for server ") + variableValue(serverName) + success(". Removed ") + variableValue("$size") + success(" entries.") + } + } +} \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueDequeCommand.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueDequeCommand.kt new file mode 100644 index 0000000..a31902a --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueDequeCommand.kt @@ -0,0 +1,48 @@ +package dev.slne.surf.queue.paper.commands.sub + +import dev.jorel.commandapi.CommandAPICommand +import dev.jorel.commandapi.arguments.AsyncPlayerProfileArgument +import dev.jorel.commandapi.kotlindsl.argument +import dev.jorel.commandapi.kotlindsl.getValue +import dev.jorel.commandapi.kotlindsl.subcommand +import dev.slne.surf.core.api.common.server.SurfServer +import dev.slne.surf.core.api.paper.command.argument.surfBackendServerArgument +import dev.slne.surf.queue.common.queue.RedisQueueService +import dev.slne.surf.queue.paper.permission.PaperQueuePermissions +import dev.slne.surf.surfapi.bukkit.api.command.executors.anyExecutorSuspend +import dev.slne.surf.surfapi.bukkit.api.command.util.awaitAsyncPlayerProfile +import dev.slne.surf.surfapi.bukkit.api.command.util.idOrThrow +import dev.slne.surf.surfapi.core.api.messages.adventure.sendText + +fun CommandAPICommand.queueDequeue() = subcommand("dequeue") { + withPermission(PaperQueuePermissions.COMMAND_DEQUEUE) + argument(AsyncPlayerProfileArgument("player")) + surfBackendServerArgument("server", optional = true) + + anyExecutorSuspend { sender, arguments -> + val profile = arguments.awaitAsyncPlayerProfile("player") + val uuid = profile.idOrThrow() + val server: SurfServer? by arguments + val serverName = server?.name ?: SurfServer.current().name + val queue = RedisQueueService.get().getQueueByName(serverName) + + val dequeued = queue.dequeue(uuid) + if (dequeued) { + sender.sendText { + appendSuccessPrefix() + success("Removed ") + variableValue(profile.name ?: uuid.toString()) + success(" from the queue ") + variableValue(serverName) + success(".") + } + } else { + sender.sendText { + appendErrorPrefix() + error("Player ") + variableValue(profile.name ?: uuid.toString()) + error(" is not in the queue.") + } + } + } +} \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueEnqueueCommand.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueEnqueueCommand.kt new file mode 100644 index 0000000..c44799a --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueEnqueueCommand.kt @@ -0,0 +1,58 @@ +package dev.slne.surf.queue.paper.commands.sub + +import dev.jorel.commandapi.CommandAPICommand +import dev.jorel.commandapi.arguments.AsyncPlayerProfileArgument +import dev.jorel.commandapi.kotlindsl.argument +import dev.jorel.commandapi.kotlindsl.getValue +import dev.jorel.commandapi.kotlindsl.integerArgument +import dev.jorel.commandapi.kotlindsl.subcommand +import dev.slne.surf.core.api.common.server.SurfServer +import dev.slne.surf.core.api.paper.command.argument.surfBackendServerArgument +import dev.slne.surf.queue.common.queue.RedisQueueScore +import dev.slne.surf.queue.common.queue.RedisQueueService +import dev.slne.surf.queue.paper.permission.PaperQueuePermissions +import dev.slne.surf.surfapi.bukkit.api.command.executors.anyExecutorSuspend +import dev.slne.surf.surfapi.bukkit.api.command.util.awaitAsyncPlayerProfile +import dev.slne.surf.surfapi.bukkit.api.command.util.idOrThrow +import dev.slne.surf.surfapi.core.api.messages.adventure.sendText + +fun CommandAPICommand.queueEnqueue() = subcommand("enqueue") { + withPermission(PaperQueuePermissions.COMMAND_ENQUEUE) + + argument(AsyncPlayerProfileArgument("player")) + surfBackendServerArgument("server", optional = true) + integerArgument("priority", optional = true, min = 0, max = RedisQueueScore.MAX_PRIORITY) + + anyExecutorSuspend { sender, arguments -> + val profile = arguments.awaitAsyncPlayerProfile("player") + val uuid = profile.idOrThrow() + val server: SurfServer? by arguments + val priority: Int? by arguments + + val serverName = server?.name ?: SurfServer.current().name + val queue = RedisQueueService.get().getQueueByName(serverName) + + val enqueued = priority?.let { queue.enqueue(uuid, it) } ?: queue.enqueue(uuid) + if (enqueued) { + sender.sendText { + appendSuccessPrefix() + success("Enqueued ") + variableValue(profile.name ?: uuid.toString()) + success(" to the queue ") + variableValue(serverName) + priority?.let { + success(" with priority ") + variableValue(it) + } + success(".") + } + } else { + sender.sendText { + appendErrorPrefix() + error("Player ") + variableValue(profile.name ?: uuid.toString()) + error(" is already in the queue.") + } + } + } +} \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueInfoCommand.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueInfoCommand.kt new file mode 100644 index 0000000..aa00116 --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueInfoCommand.kt @@ -0,0 +1,134 @@ +package dev.slne.surf.queue.paper.commands.sub + +import dev.jorel.commandapi.CommandAPICommand +import dev.jorel.commandapi.arguments.AsyncPlayerProfileArgument +import dev.jorel.commandapi.kotlindsl.argument +import dev.jorel.commandapi.kotlindsl.getValue +import dev.jorel.commandapi.kotlindsl.subcommand +import dev.slne.surf.core.api.common.server.SurfServer +import dev.slne.surf.core.api.paper.command.argument.surfBackendServerArgument +import dev.slne.surf.queue.common.queue.RedisQueueService +import dev.slne.surf.queue.paper.permission.PaperQueuePermissions +import dev.slne.surf.surfapi.bukkit.api.command.executors.anyExecutorSuspend +import dev.slne.surf.surfapi.bukkit.api.command.util.awaitAsyncPlayerProfile +import dev.slne.surf.surfapi.bukkit.api.command.util.idOrThrow +import dev.slne.surf.surfapi.core.api.messages.adventure.sendText +import java.time.Duration +import java.time.Instant +import kotlin.time.Duration.Companion.seconds + +fun CommandAPICommand.queueInfo() = subcommand("info") { + withPermission(PaperQueuePermissions.COMMAND_INFO) + + argument(AsyncPlayerProfileArgument("player")) + surfBackendServerArgument("server", optional = true) + + anyExecutorSuspend { sender, arguments -> + val profile = arguments.awaitAsyncPlayerProfile("player") + val uuid = profile.idOrThrow() + val server: SurfServer? by arguments + val serverName = server?.name ?: SurfServer.current().name + val queue = RedisQueueService.get().getQueueByName(serverName) + + val isQueued = queue.isQueued(uuid) + if (!isQueued) { + sender.sendText { + appendErrorPrefix() + error("Player ") + variableValue(profile.name ?: uuid.toString()) + error(" is not in the queue.") + } + } + + val position = queue.getPosition(uuid) + val size = queue.size() + val meta = queue.getEntryMeta(uuid) + val score = queue.getEntryScore(uuid) + val lastSeen = queue.getEntryLastSeen(uuid) + val retryCount = queue.getEntryRetryCount(uuid) + + sender.sendText { + append { + appendSuccessPrefix() + success("=== Player Info: ") + variableValue(profile.name ?: uuid.toString()) + success(" ===") + } + appendNewline { + appendSuccessPrefix() + variableKey("UUID: ") + variableValue("$uuid") + } + appendNewline { + appendSuccessPrefix() + variableKey("Position: ") + variableValue("${(position ?: -1) + 1}") + spacer(" / ") + variableValue("$size") + } + if (meta != null) { + appendNewline { + appendSuccessPrefix() + variableKey("Priority: ") + variableValue("${meta.priority}") + } + appendNewline { + val addedAt = Instant.ofEpochMilli(meta.addedAt) + val timeInQueue = Duration.between(addedAt, Instant.now()).toSeconds().seconds + appendSuccessPrefix() + variableKey("Added at: ") + variableValue(addedAt.toString()) + spacer(" (") + variableValue(timeInQueue.toString()) + success(" ago)") + } + } + if (score != null) { + appendNewline { + appendSuccessPrefix() + variableKey("Score: ") + variableValue(String.format("%.0f", score.packed)) + spacer(" (") + variableKey("priority") + spacer("=") + variableValue(score.priority) + spacer(", ") + variableKey("deltaMs") + spacer("=") + variableValue(score.deltaMs) + spacer(", ") + variableKey("seq") + spacer("=") + variableValue(score.sequence) + spacer(")") + } + } + + if (lastSeen != null) { + appendNewline { + val lastSeenAt = Instant.ofEpochMilli(lastSeen) + val timeSinceLastSeen = Duration.between(lastSeenAt, Instant.now()).toSeconds().seconds + + appendSuccessPrefix() + variableKey("Last seen: ") + variableValue(lastSeenAt.toString()) + spacer(" (") + variableValue(timeSinceLastSeen.toString()) + success(" ago)") + } + } else { + appendNewline { + appendSuccessPrefix() + variableKey("Last seen: ") + variableValue("n/a (online or no data)") + } + } + + appendNewline { + appendSuccessPrefix() + variableKey("Retry count: ") + variableValue("${retryCount ?: 0}") + } + } + } +} \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueListCommand.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueListCommand.kt new file mode 100644 index 0000000..9124f96 --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueListCommand.kt @@ -0,0 +1,49 @@ +package dev.slne.surf.queue.paper.commands.sub + +import dev.jorel.commandapi.CommandAPICommand +import dev.jorel.commandapi.kotlindsl.getValue +import dev.jorel.commandapi.kotlindsl.subcommand +import dev.slne.surf.core.api.common.server.SurfServer +import dev.slne.surf.core.api.paper.command.argument.surfBackendServerArgument +import dev.slne.surf.queue.common.queue.RedisQueueService +import dev.slne.surf.queue.paper.permission.PaperQueuePermissions +import dev.slne.surf.surfapi.bukkit.api.command.executors.anyExecutorSuspend +import dev.slne.surf.surfapi.core.api.messages.adventure.buildText +import dev.slne.surf.surfapi.core.api.messages.adventure.sendText +import dev.slne.surf.surfapi.core.api.messages.pagination.Pagination +import it.unimi.dsi.fastutil.objects.Object2IntMap +import java.util.* + +private val pagination = Pagination> { + rowRenderer { value, _ -> + listOf(buildText { + val uuid = value.key + val position = value.intValue + variableKey(position) + info("—") + variableValue(uuid.toString()) + }) + } +} + +fun CommandAPICommand.queueList() = subcommand("list") { + withPermission(PaperQueuePermissions.COMMAND_LIST) + surfBackendServerArgument("server", optional = true) + + anyExecutorSuspend { sender, arguments -> + val server: SurfServer? by arguments + val serverName = server?.name ?: SurfServer.current().name + val queue = RedisQueueService.get().getQueueByName(serverName) + val entries = queue.getAllUuidsWithPosition() + + if (entries.isEmpty()) { + sender.sendText { + appendInfoPrefix() + info("The queue is empty.") + } + return@anyExecutorSuspend + } + + sender.sendMessage(pagination.renderComponent(entries)) + } +} \ No newline at end of file diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/command/metrics/MetricsCommand.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueMetricsCommand.kt similarity index 84% rename from surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/command/metrics/MetricsCommand.kt rename to surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueMetricsCommand.kt index 059f755..d0a38cc 100644 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/command/metrics/MetricsCommand.kt +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueueMetricsCommand.kt @@ -1,18 +1,18 @@ -package dev.slne.surf.queue.velocity.command.metrics +package dev.slne.surf.queue.paper.commands.sub -import dev.jorel.commandapi.CommandTree +import dev.jorel.commandapi.CommandAPICommand import dev.jorel.commandapi.kotlindsl.anyExecutor -import dev.jorel.commandapi.kotlindsl.literalArgument -import dev.slne.surf.queue.velocity.metrics.QueueMetrics -import dev.slne.surf.queue.velocity.metrics.QueueMetricsLogger -import dev.slne.surf.queue.velocity.permission.SurfQueuePermissions +import dev.jorel.commandapi.kotlindsl.subcommand +import dev.slne.surf.queue.paper.metrics.QueueMetrics +import dev.slne.surf.queue.paper.metrics.QueueMetricsLogger +import dev.slne.surf.queue.paper.permission.PaperQueuePermissions +import dev.slne.surf.surfapi.bukkit.api.command.executors.anyExecutorSuspend import dev.slne.surf.surfapi.core.api.messages.adventure.sendText -import dev.slne.surf.surfapi.velocity.api.command.executors.anyExecutorSuspend -fun CommandTree.metricsCommand() = literalArgument("metrics") { - withPermission(SurfQueuePermissions.COMMAND_METRICS) +fun CommandAPICommand.metricsCommand() = subcommand("metrics") { + withPermission(PaperQueuePermissions.COMMAND_METRICS) - literalArgument("startLogging") { + subcommand("startLogging") { anyExecutor { source, arguments -> QueueMetricsLogger.stop() QueueMetricsLogger.start() @@ -23,7 +23,7 @@ fun CommandTree.metricsCommand() = literalArgument("metrics") { } } - literalArgument("stopLogging") { + subcommand("stopLogging") { anyExecutor { source, arguments -> QueueMetricsLogger.stop() source.sendText { @@ -33,7 +33,7 @@ fun CommandTree.metricsCommand() = literalArgument("metrics") { } } - literalArgument("snapshot") { + subcommand("snapshot") { anyExecutorSuspend { sender, _ -> val snapshot = QueueMetrics.snapshot() val queueSizes = QueueMetrics.collectQueueSizes() diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueuePauseCommand.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueuePauseCommand.kt new file mode 100644 index 0000000..8a721c8 --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/commands/sub/QueuePauseCommand.kt @@ -0,0 +1,62 @@ +package dev.slne.surf.queue.paper.commands.sub + +import dev.jorel.commandapi.CommandAPICommand +import dev.jorel.commandapi.kotlindsl.getValue +import dev.jorel.commandapi.kotlindsl.subcommand +import dev.slne.surf.core.api.common.server.SurfServer +import dev.slne.surf.core.api.paper.command.argument.surfBackendServerArgument +import dev.slne.surf.queue.common.queue.RedisQueueService +import dev.slne.surf.queue.paper.permission.PaperQueuePermissions +import dev.slne.surf.surfapi.bukkit.api.command.executors.anyExecutorSuspend +import dev.slne.surf.surfapi.core.api.messages.adventure.sendText + +fun CommandAPICommand.queuePause() = subcommand("pause") { + withPermission(PaperQueuePermissions.COMMAND_PAUSE) + + subcommand("pause") { + surfBackendServerArgument("server", optional = true) + anyExecutorSuspend { sender, arguments -> + val server: SurfServer? by arguments + val serverName = server?.name ?: SurfServer.current().name + val queue = RedisQueueService.get().getQueueByName(serverName) + queue.pause() + + sender.sendText { + appendSuccessPrefix() + success("Paused queue") + } + } + } + + subcommand("resume") { + surfBackendServerArgument("server", optional = true) + anyExecutorSuspend { sender, arguments -> + val server: SurfServer? by arguments + val serverName = server?.name ?: SurfServer.current().name + val queue = RedisQueueService.get().getQueueByName(serverName) + queue.resume() + sender.sendText { + appendSuccessPrefix() + success("Resumed queue") + } + } + } + + subcommand("status") { + surfBackendServerArgument("server", optional = true) + anyExecutorSuspend { sender, arguments -> + val server: SurfServer? by arguments + val serverName = server?.name ?: SurfServer.current().name + val queue = RedisQueueService.get().getQueueByName(serverName) + val isPaused = queue.isPaused() + sender.sendText { + appendSuccessPrefix() + if (isPaused) { + info("The queue is currently paused") + } else { + info("The queue is currently running") + } + } + } + } +} \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/config/SurfQueueConfig.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/config/SurfQueueConfig.kt new file mode 100644 index 0000000..243a965 --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/config/SurfQueueConfig.kt @@ -0,0 +1,16 @@ +package dev.slne.surf.queue.paper.config + +import dev.slne.surf.queue.paper.plugin +import dev.slne.surf.surfapi.core.api.config.SpongeYmlConfigClass +import org.spongepowered.configurate.objectmapping.ConfigSerializable + +@ConfigSerializable +data class SurfQueueConfig( + val maxTransfersPerSecond: Int = 20, +) { + companion object : SpongeYmlConfigClass( + SurfQueueConfig::class.java, + plugin.dataPath, + "config.yml" + ) +} \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/hook/startup/DefaultQueueStartHook.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/hook/startup/DefaultQueueStartHook.kt new file mode 100644 index 0000000..2d217d4 --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/hook/startup/DefaultQueueStartHook.kt @@ -0,0 +1,13 @@ +package dev.slne.surf.queue.paper.hook.startup + +import dev.slne.surf.surfapi.shared.api.component.ComponentMeta +import dev.slne.surf.surfapi.shared.api.component.requirement.ConditionalOnMissingComponent + +@ComponentMeta +@ConditionalOnMissingComponent(QueueStartHook::class) +class DefaultQueueStartHook : QueueStartHook() { + + override suspend fun onEnable() { + runServerReadyTasks() + } +} \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/hook/startup/PolarQueueStartHook.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/hook/startup/PolarQueueStartHook.kt new file mode 100644 index 0000000..2053737 --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/hook/startup/PolarQueueStartHook.kt @@ -0,0 +1,14 @@ +package dev.slne.surf.queue.paper.hook.startup + +import dev.slne.surf.surfapi.shared.api.component.ComponentMeta +import dev.slne.surf.surfapi.shared.api.component.requirement.DependsOnClass +import top.polar.api.loader.LoaderApi + +@ComponentMeta +@DependsOnClass(LoaderApi::class) +class PolarQueueStartHook : QueueStartHook() { + + override suspend fun onLoad() { + LoaderApi.registerEnableCallback(::runServerReadyTasks) + } +} \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/hook/startup/QueueStartHook.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/hook/startup/QueueStartHook.kt new file mode 100644 index 0000000..c0ac566 --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/hook/startup/QueueStartHook.kt @@ -0,0 +1,25 @@ +package dev.slne.surf.queue.paper.hook.startup + +import dev.slne.surf.surfapi.core.api.component.AbstractComponent +import dev.slne.surf.surfapi.core.api.component.surfComponentApi +import java.util.concurrent.ConcurrentLinkedQueue + +abstract class QueueStartHook : AbstractComponent() { + private val serverReadyTasks = ConcurrentLinkedQueue<() -> Unit>() + + protected fun runServerReadyTasks() { + val iterator = serverReadyTasks.iterator() + while (iterator.hasNext()) { + iterator.next().invoke() + iterator.remove() + } + } + + fun onServerReady(block: () -> Unit) { + serverReadyTasks.add(block) + } + + companion object { + fun get() = surfComponentApi.componentsOfTypeLoaded(QueueStartHook::class.java).first() + } +} \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/listener/PlayerKickedDueToFullServerListener.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/listener/PlayerKickedDueToFullServerListener.kt new file mode 100644 index 0000000..40bdb56 --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/listener/PlayerKickedDueToFullServerListener.kt @@ -0,0 +1,36 @@ +package dev.slne.surf.queue.paper.listener + +import com.github.benmanes.caffeine.cache.Caffeine +import com.sksamuel.aedile.core.expireAfterWrite +import io.papermc.paper.event.player.PlayerServerFullCheckEvent +import org.bukkit.event.EventHandler +import org.bukkit.event.EventPriority +import org.bukkit.event.Listener +import org.bukkit.event.player.AsyncPlayerPreLoginEvent +import java.util.* +import kotlin.time.Duration.Companion.minutes + +object PlayerKickedDueToFullServerListener : Listener { + private val kicks = Caffeine.newBuilder() + .expireAfterWrite(2.minutes) + .maximumSize(10000) + .build() + + @EventHandler(priority = EventPriority.MONITOR) + fun onPlayerServerFullCheck(event: PlayerServerFullCheckEvent) { + if (!event.isAllowed) { + event.playerProfile.id?.let { kicks.put(it, true) } + } + } + + @EventHandler(priority = EventPriority.MONITOR) + fun onAsyncPlayerPreLogin(event: AsyncPlayerPreLoginEvent) { + if (event.loginResult == AsyncPlayerPreLoginEvent.Result.KICK_FULL) { + event.uniqueId.let { kicks.put(it, true) } + } + } + + fun consumeWasKickedDueToFullServer(uuid: UUID): Boolean { + return kicks.asMap().remove(uuid) ?: false + } +} \ No newline at end of file diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/metrics/QueueBstatsIntegration.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/metrics/QueueBstatsIntegration.kt similarity index 86% rename from surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/metrics/QueueBstatsIntegration.kt rename to surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/metrics/QueueBstatsIntegration.kt index 97e9c1e..30bec9c 100644 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/metrics/QueueBstatsIntegration.kt +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/metrics/QueueBstatsIntegration.kt @@ -1,10 +1,9 @@ -package dev.slne.surf.queue.velocity.metrics +package dev.slne.surf.queue.paper.metrics import dev.slne.surf.queue.common.queue.RedisQueueService -import dev.slne.surf.queue.velocity.plugin -import dev.slne.surf.queue.velocity.queue.VelocitySurfQueue +import dev.slne.surf.queue.paper.plugin +import dev.slne.surf.surfapi.bukkit.api.metrics.Metrics import dev.slne.surf.surfapi.core.api.util.logger -import dev.slne.surf.surfapi.velocity.api.metrics.Metrics import kotlinx.coroutines.runBlocking import java.util.concurrent.atomic.AtomicLong @@ -15,8 +14,8 @@ object QueueBstatsIntegration { private var lastEnqueues = AtomicLong(0) private var lastFailedTransfers = AtomicLong(0) - fun setup(metricsFactory: Metrics.Factory) { - val metrics = metricsFactory.make(plugin, 29544) + fun setup() { + val metrics = Metrics(plugin, 30644) metrics.addCustomChart(Metrics.SimplePie("queue_count") { try { @@ -57,7 +56,6 @@ object QueueBstatsIntegration { metrics.addCustomChart(Metrics.AdvancedPie("transfers_per_queue") { try { RedisQueueService.get().getAll() - .filterIsInstance() .associate { it.serverName to QueueMetrics.getTransfersFor(it.serverName).toInt() } .filterValues { it > 0 } } catch (_: Exception) { diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/metrics/QueueMetrics.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/metrics/QueueMetrics.kt similarity index 96% rename from surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/metrics/QueueMetrics.kt rename to surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/metrics/QueueMetrics.kt index ca8807b..b3797dc 100644 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/metrics/QueueMetrics.kt +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/metrics/QueueMetrics.kt @@ -1,7 +1,6 @@ -package dev.slne.surf.queue.velocity.metrics +package dev.slne.surf.queue.paper.metrics import dev.slne.surf.queue.common.queue.RedisQueueService -import dev.slne.surf.queue.velocity.queue.VelocitySurfQueue import dev.slne.surf.surfapi.core.api.util.logger import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong @@ -114,7 +113,6 @@ object QueueMetrics { suspend fun collectQueueSizes(): Map { return try { RedisQueueService.get().getAll() - .filterIsInstance() .associate { it.serverName to it.size() } } catch (e: Exception) { log.atWarning() diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/metrics/QueueMetricsLogger.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/metrics/QueueMetricsLogger.kt similarity index 92% rename from surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/metrics/QueueMetricsLogger.kt rename to surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/metrics/QueueMetricsLogger.kt index c73c8ca..48323aa 100644 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/metrics/QueueMetricsLogger.kt +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/metrics/QueueMetricsLogger.kt @@ -1,7 +1,7 @@ -package dev.slne.surf.queue.velocity.metrics +package dev.slne.surf.queue.paper.metrics -import com.github.shynixn.mccoroutine.velocity.launch -import dev.slne.surf.queue.velocity.plugin +import com.github.shynixn.mccoroutine.folia.launch +import dev.slne.surf.queue.paper.plugin import dev.slne.surf.surfapi.core.api.util.logger import kotlinx.coroutines.Job import kotlinx.coroutines.delay @@ -13,7 +13,7 @@ object QueueMetricsLogger { private var job: Job? = null fun start() { - job = plugin.container.launch { + job = plugin.launch { while (isActive) { delay(5.minutes) try { diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/metrics/QueueMetricsSnapshot.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/metrics/QueueMetricsSnapshot.kt similarity index 98% rename from surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/metrics/QueueMetricsSnapshot.kt rename to surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/metrics/QueueMetricsSnapshot.kt index 4027105..12a8dc1 100644 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/metrics/QueueMetricsSnapshot.kt +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/metrics/QueueMetricsSnapshot.kt @@ -1,4 +1,4 @@ -package dev.slne.surf.queue.velocity.metrics +package dev.slne.surf.queue.paper.metrics data class QueueMetricsSnapshot( val totalTransfers: Long, diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/permission/PaperQueuePermissions.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/permission/PaperQueuePermissions.kt new file mode 100644 index 0000000..42b880f --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/permission/PaperQueuePermissions.kt @@ -0,0 +1,18 @@ +package dev.slne.surf.queue.paper.permission + +import dev.slne.surf.surfapi.bukkit.api.permission.PermissionRegistry + +object PaperQueuePermissions : PermissionRegistry() { + private const val PREFIX = "surf.queue." + private const val COMMAND_PREFIX = PREFIX + "command" + + val COMMAND_QUEUE = create("$COMMAND_PREFIX.queue") + val COMMAND_CLEANUP = create("$COMMAND_QUEUE.cleanup") + val COMMAND_CLEAR = create("$COMMAND_QUEUE.clear") + val COMMAND_DEQUEUE = create("$COMMAND_QUEUE.dequeue") + val COMMAND_ENQUEUE = create("$COMMAND_QUEUE.enqueue") + val COMMAND_INFO = create("$COMMAND_QUEUE.info") + val COMMAND_LIST = create("$COMMAND_QUEUE.list") + val COMMAND_PAUSE = create("$COMMAND_QUEUE.pause") + val COMMAND_METRICS = create("$COMMAND_PREFIX.metrics") +} \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/PaperQueueImpl.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/PaperQueueImpl.kt new file mode 100644 index 0000000..9434576 --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/PaperQueueImpl.kt @@ -0,0 +1,45 @@ +package dev.slne.surf.queue.paper.queue + +import dev.slne.surf.core.api.common.server.SurfServer +import dev.slne.surf.queue.common.queue.AbstractQueue +import dev.slne.surf.queue.common.queue.tick.SafeQueueTick +import dev.slne.surf.queue.paper.metrics.QueueMetrics +import dev.slne.surf.queue.paper.queue.cleanup.PaperQueueCleanup +import dev.slne.surf.queue.paper.queue.transfer.PaperQueueTransferProcessor +import kotlin.time.Duration.Companion.minutes + +class PaperQueueImpl(serverName: String) : AbstractQueue(serverName) { + private val transferProcessor = PaperQueueTransferProcessor(serverName, store, lockManager, GRACE_PERIOD_MS) + private val cleanup = PaperQueueCleanup(this, store, lockManager) + + private val isTargetServer = SurfServer.current().name == serverName + + companion object { + val GRACE_PERIOD_MS = 1.minutes.inWholeMilliseconds + } + + override fun onEnqueued() { + QueueMetrics.recordEnqueue(serverName) + } + + override fun onDequeued() { + QueueMetrics.recordDequeue(serverName) + } + + override suspend fun tick() { + super.tick() + if (isTargetServer) { + QueueMetrics.recordTick() + SafeQueueTick.tickSafe(this, "cleanup") { cleanup.tick() } + SafeQueueTick.tickSafe(this, "transfers") { transferProcessor.tick() } + } + } + + suspend fun delete() { + store.deleteAll() + } + + suspend fun forceCleanup() { + cleanup.cleanupExpiredEntries() + } +} \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/PaperSurfQueue.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/PaperSurfQueue.kt deleted file mode 100644 index dcdb3bd..0000000 --- a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/PaperSurfQueue.kt +++ /dev/null @@ -1,5 +0,0 @@ -package dev.slne.surf.queue.paper.queue - -import dev.slne.surf.queue.common.queue.AbstractSurfQueue - -class PaperSurfQueue(serverName: String) : AbstractSurfQueue(serverName) \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/cleanup/PaperQueueCleanup.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/cleanup/PaperQueueCleanup.kt new file mode 100644 index 0000000..e9b505f --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/cleanup/PaperQueueCleanup.kt @@ -0,0 +1,80 @@ +package dev.slne.surf.queue.paper.queue.cleanup + +import dev.slne.surf.queue.common.queue.RedisQueueLockManager +import dev.slne.surf.queue.common.queue.RedisQueueStore +import dev.slne.surf.queue.paper.metrics.QueueMetrics +import dev.slne.surf.queue.paper.queue.PaperQueueImpl +import dev.slne.surf.surfapi.core.api.util.logger +import java.time.Instant +import java.util.UUID +import kotlin.collections.iterator +import kotlin.coroutines.cancellation.CancellationException + +class PaperQueueCleanup( + private val queue: PaperQueueImpl, + private val store: RedisQueueStore, + private val lockManager: RedisQueueLockManager +) { + companion object { + private val log = logger() + private const val CLEANUP_INTERVAL_TICKS = 10L + + @JvmStatic + private fun isExpired(now: Long, lastSeenTime: Long): Boolean { + return now - lastSeenTime >= PaperQueueImpl.GRACE_PERIOD_MS + } + } + + suspend fun tick() { + if (queue.tickCount % CLEANUP_INTERVAL_TICKS == 0L) { + lockManager.withCleanupLock { + cleanupExpiredEntries() + } + } + } + + suspend fun cleanupExpiredEntries() { + val now = Instant.now().toEpochMilli() + val allLastSeen = store.readAllLastSeen() + var removals = 0 + + try { + for ((uuid, lastSeenTime) in allLastSeen) { + if (isExpired(now, lastSeenTime)) { + removals += processExpiredEntry(uuid) + } + } + } catch (e: Exception) { + if (e is CancellationException) throw e + log.atWarning() + .withCause(e) + .log("Failed to cleanup expired entries for queue %s", queue.serverName) + } + + QueueMetrics.recordCleanupCycle(removals) + } + + private suspend fun processExpiredEntry(uuid: UUID): Int = try { + queue.dequeue(uuid) + log.atInfo() + .log("Cleanup: removed expired entry %s from queue %s", uuid, queue.serverName) + 1 + } catch (e: Exception) { + if (e is CancellationException) throw e + log.atWarning() + .withCause(e) + .log("Cleanup: dequeue failed for %s in queue %s, attempting forced removal", uuid, queue.serverName) + forceRemoveEntry(uuid) + } + + private suspend fun forceRemoveEntry(uuid: UUID): Int = try { + store.removeAllFor(uuid) + 1 + } catch (e: Exception) { + if (e is CancellationException) throw e + log.atWarning() + .withCause(e) + .log("Cleanup: forced removal also failed for %s in queue %s", uuid, queue.serverName) + 0 + } +} \ No newline at end of file diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/transfer/PaperQueueTransfer.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/transfer/PaperQueueTransfer.kt new file mode 100644 index 0000000..97aaf1f --- /dev/null +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/transfer/PaperQueueTransfer.kt @@ -0,0 +1,93 @@ +package dev.slne.surf.queue.paper.queue.transfer + +import dev.slne.surf.core.api.common.SurfCoreApi +import dev.slne.surf.core.api.common.player.SurfPlayer +import dev.slne.surf.core.api.common.server.SurfServer +import dev.slne.surf.core.api.common.server.connection.SurfServerConnectResult +import dev.slne.surf.queue.paper.config.SurfQueueConfig +import dev.slne.surf.queue.paper.listener.PlayerKickedDueToFullServerListener +import dev.slne.surf.surfapi.core.api.util.logger +import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.withTimeout +import net.kyori.adventure.text.Component +import org.bukkit.Bukkit +import java.util.* +import kotlin.coroutines.cancellation.CancellationException +import kotlin.math.min +import kotlin.time.Duration.Companion.seconds + +class PaperQueueTransfer( + private val processor: PaperQueueTransferProcessor, + private val serverName: String +) { + + companion object { + private val log = logger() + } + + suspend fun tryTransfer(): Int { + val availableSlots = Bukkit.getMaxPlayers() - Bukkit.getOnlinePlayers().size + + if (availableSlots <= 0) return 0 + val coreServer = SurfServer[serverName] ?: return 0 + val maxTransfers = min(availableSlots, SurfQueueConfig.getConfig().maxTransfersPerSecond) + + return processor.processTransfers(maxTransfers) { (uuid) -> + transferEntry(uuid, coreServer) + } + } + + private suspend fun transferEntry(uuid: UUID, targetServer: SurfServer): Pair { + try { + val corePlayer = SurfCoreApi.getPlayer(uuid) ?: return TransferAction.PLAYER_NOT_FOUND to null + val currentPlayerServer = corePlayer.currentServer + val currentPlayerServerName = currentPlayerServer?.name + ?: return TransferAction.PLAYER_NOT_CONNECTED_TO_A_SERVER to null // Probably transferring to another proxy + + if (currentPlayerServerName == serverName) { + return TransferAction.PLAYER_ALREADY_ON_SERVER to null + } + + return tryTransferPlayer(corePlayer, targetServer) + } catch (e: Exception) { + if (e is CancellationException) throw e + log.atWarning() + .withCause(e) + .log("Error during transfer for queue %s", serverName) + return TransferAction.ERROR to null + } + } + + private suspend fun tryTransferPlayer( + player: SurfPlayer, + targetServer: SurfServer + ): Pair { + val (status, message) = try { + withTimeout(30.seconds) { + SurfCoreApi.sendPlayerAwaiting(player, targetServer) + } + } catch (e: TimeoutCancellationException) { + log.atWarning() + .withCause(e) + .log("Timed out waiting for player %s to connect to server %s", player.uuid, targetServer.name) + return TransferAction.TIMEOUT to null + } + + return when (status) { + SurfServerConnectResult.Status.SERVER_NOT_FOUND -> TransferAction.SERVER_NOT_FOUND + SurfServerConnectResult.Status.ALREADY_CONNECTED -> TransferAction.PLAYER_ALREADY_ON_SERVER + SurfServerConnectResult.Status.CONNECTION_CANCELLED -> TransferAction.PLUGIN_CANCELLED_TRANSFER + SurfServerConnectResult.Status.CONNECTION_IN_PROGRESS -> TransferAction.PLAYER_ALREADY_CONNECTING + SurfServerConnectResult.Status.SERVER_DISCONNECTED -> { + if (PlayerKickedDueToFullServerListener.consumeWasKickedDueToFullServer(player.uuid)) { + TransferAction.SERVER_FULL + } else { + TransferAction.PLAYER_KICKED_FROM_SERVER + } + } + + SurfServerConnectResult.Status.SUCCESS -> TransferAction.DONE + SurfServerConnectResult.Status.UNKNOWN_ERROR -> TransferAction.ERROR + } to message + } +} \ No newline at end of file diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/RedisQueueTransferProcessor.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/transfer/PaperQueueTransferProcessor.kt similarity index 54% rename from surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/RedisQueueTransferProcessor.kt rename to surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/transfer/PaperQueueTransferProcessor.kt index ea58b27..f5268c3 100644 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/RedisQueueTransferProcessor.kt +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/transfer/PaperQueueTransferProcessor.kt @@ -1,24 +1,27 @@ -package dev.slne.surf.queue.velocity.queue +package dev.slne.surf.queue.paper.queue.transfer -import dev.slne.surf.queue.common.queue.QueueEntry +import dev.slne.surf.core.api.common.SurfCoreApi +import dev.slne.surf.core.api.common.util.sendText +import dev.slne.surf.queue.common.queue.entry.QueueEntry import dev.slne.surf.queue.common.queue.RedisQueueLockManager -import dev.slne.surf.queue.common.queue.RedisQueueScorePacker +import dev.slne.surf.queue.common.queue.RedisQueueScore import dev.slne.surf.queue.common.queue.RedisQueueStore -import dev.slne.surf.queue.velocity.metrics.QueueMetrics +import dev.slne.surf.queue.paper.metrics.QueueMetrics import dev.slne.surf.redis.libs.redisson.config.DecorrelatedJitterDelay import dev.slne.surf.redis.libs.redisson.config.DelayStrategy import dev.slne.surf.surfapi.core.api.util.logger +import net.kyori.adventure.text.Component import java.time.Duration import java.time.Instant import java.util.* -class RedisQueueTransferProcessor( +class PaperQueueTransferProcessor( private val serverName: String, private val store: RedisQueueStore, private val lockManager: RedisQueueLockManager, private val gracePeriodMs: Long ) { - private val transfer = QueueTransfer(this, serverName) + private val transfer = PaperQueueTransfer(this, serverName) private var delay = createDelay() private var attempts: Int = 0 private var nextTransferTime = System.currentTimeMillis() @@ -26,25 +29,27 @@ class RedisQueueTransferProcessor( companion object { private val log = logger() private fun createDelay(): DelayStrategy = - DecorrelatedJitterDelay(Duration.ofSeconds(2), Duration.ofSeconds(10)) + DecorrelatedJitterDelay(Duration.ofSeconds(2), Duration.ofSeconds(5)) } suspend fun tick() { if (store.isPaused()) return try { - // decrease CPU usage and redis commands when the queue is empty or the server is full -// if (System.currentTimeMillis() < nextTransferTime) return + // Exponential backoff: decrease CPU usage and Redis commands when the + // queue is empty or the target server is full. + if (System.currentTimeMillis() < nextTransferTime) return val transferred = transfer.tryTransfer() -// if (transferred <= 0) { -// val delay = delay.calcDelay(attempts) -// nextTransferTime = System.currentTimeMillis() + delay.toMillis() -// attempts++ -// } else { -// attempts = 0 -// delay = createDelay() -// } + if (transferred <= 0) { + val delayDuration = delay.calcDelay(attempts) + nextTransferTime = System.currentTimeMillis() + delayDuration.toMillis() + attempts++ + } else { + attempts = 0 + delay = createDelay() + nextTransferTime = System.currentTimeMillis() + } } catch (e: Exception) { log.atWarning() @@ -55,21 +60,19 @@ class RedisQueueTransferProcessor( suspend fun processTransfers( maxTransfers: Int, - tryTransfer: suspend (QueueEntry) -> TransferAction - ): Int { - return lockManager.withTransferLock { acquired -> - QueueMetrics.recordLockAttempt(acquired) - if (acquired) { - doProcessTransfers(maxTransfers, tryTransfer) - } else { - 0 - } + tryTransfer: suspend (QueueEntry) -> Pair + ): Int = lockManager.withTransferLock { acquired -> + QueueMetrics.recordLockAttempt(acquired) + if (acquired) { + doProcessTransfers(maxTransfers, tryTransfer) + } else { + 0 } } private suspend fun doProcessTransfers( maxTransfers: Int, - tryTransfer: suspend (QueueEntry) -> TransferAction + tryTransfer: suspend (QueueEntry) -> Pair ): Int { var transferred = 0 @@ -83,7 +86,8 @@ class RedisQueueTransferProcessor( } try { - when (val result = tryTransfer(entry)) { + val (action, message) = tryTransfer(entry) + when (action) { TransferAction.DONE -> { store.dequeue(uuid) transferred++ @@ -105,7 +109,9 @@ class RedisQueueTransferProcessor( TransferAction.PLAYER_KICKED_FROM_SERVER -> { QueueMetrics.recordFailedTransfer(serverName) - retryEntry(uuid, entry, maxRetries = 5) + retryEntry(uuid, entry, maxRetries = 5) { + sendConnectionResultMessage(entry.uuid, message) + } } TransferAction.PLAYER_ALREADY_CONNECTING -> { @@ -114,9 +120,28 @@ class RedisQueueTransferProcessor( } TransferAction.PLUGIN_CANCELLED_TRANSFER, - TransferAction.ERROR, TransferAction.TIMEOUT -> { + TransferAction.ERROR -> { QueueMetrics.recordFailedTransfer(serverName) - retryEntry(uuid, entry, maxRetries = 3) + retryEntry(uuid, entry, maxRetries = 3) { + sendConnectionResultMessage(entry.uuid, message) + } + } + + TransferAction.TIMEOUT -> { + // Timeout means the target server is likely unreachable. + // Dequeue immediately instead of retrying with another 30s timeout + // to avoid blocking the entire queue for extended periods. + store.dequeue(uuid) + QueueMetrics.recordFailedTransfer(serverName) + QueueMetrics.recordDequeue(serverName) + sendConnectionResultMessage(entry.uuid, message) + log.atWarning() + .log( + "Player %s removed from queue %s due to transfer timeout", + uuid, + serverName + ) + break } TransferAction.SERVER_FULL -> break @@ -130,12 +155,13 @@ class RedisQueueTransferProcessor( return transferred } - private suspend fun retryEntry(uuid: UUID, meta: QueueEntry, maxRetries: Int) { + private suspend fun retryEntry(uuid: UUID, meta: QueueEntry, maxRetries: Int, onMaxRetriesReached: () -> Unit) { val retryCount = store.incrementRetryCount(uuid) if (retryCount >= maxRetries) { store.dequeue(uuid) QueueMetrics.recordRetryExhausted() QueueMetrics.recordDequeue(serverName) + onMaxRetriesReached() log.atWarning() .log("Player %s removed from queue %s after %d failed transfer attempts", uuid, serverName, retryCount) } else { @@ -173,27 +199,52 @@ class RedisQueueTransferProcessor( .log("Player %s removed from queue %s (offline > %dms)", uuid, serverName, gracePeriodMs) } + /** + * Moves a queue entry behind the next entry in the sorted set so that + * the transfer loop can proceed to other players. + * + * If the entry is the last one in the queue (no next entry exists), + * we simply leave it in place and return instead of aborting the + * entire transfer loop. + */ private suspend fun skipEntry(uuid: UUID, meta: QueueEntry) { val currentScore = store.getScore(uuid) ?: throw AbortException() val nextEntries = store.entriesAfter(uuid, limit = 1) - if (nextEntries.isEmpty()) throw AbortException() + if (nextEntries.isEmpty()) { + // This entry is the last in the queue — nothing to skip past. + return + } val nextScoreRaw = nextEntries.first().score - val nextScore = RedisQueueScorePacker.unpack(nextScoreRaw) + val nextScore = RedisQueueScore(nextScoreRaw) - var nextSequence = nextScore.sequence + 1 - if (nextSequence > RedisQueueScorePacker.MAX_SEQUENCE) { - nextSequence = nextScore.sequence - } + val sequenceOverflow = nextScore.sequence >= RedisQueueScore.MAX_SEQUENCE + val nextSequence = if (sequenceOverflow) 0 else nextScore.sequence + 1 + // On overflow, also bump deltaMs so the new score is strictly greater + // than the entry we are skipping past. + val nextDeltaMs = if (sequenceOverflow) nextScore.deltaMs + 1 else nextScore.deltaMs - val newScore = RedisQueueScorePacker.pack( + val newScore = RedisQueueScore.pack( meta.priority, - if (nextSequence == nextScore.sequence) nextScore.deltaMs + 1 else nextScore.deltaMs, + nextDeltaMs, nextSequence ) store.addOrUpdateScore(uuid, newScore) } + private fun sendConnectionResultMessage(uuid: UUID, message: Component?) { + try { + if (message != null) { + val player = SurfCoreApi.getPlayer(uuid) ?: return + player.sendText { append(message) } + } + } catch (e: Exception) { + log.atWarning() + .withCause(e) + .log("Failed to send connection result message for player %s", uuid) + } + } + private class AbortException : Exception() } \ No newline at end of file diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/TransferAction.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/transfer/TransferAction.kt similarity index 85% rename from surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/TransferAction.kt rename to surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/transfer/TransferAction.kt index ed5727f..2c232fc 100644 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/TransferAction.kt +++ b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/queue/transfer/TransferAction.kt @@ -1,4 +1,4 @@ -package dev.slne.surf.queue.velocity.queue +package dev.slne.surf.queue.paper.queue.transfer enum class TransferAction { DONE, diff --git a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/redis/PaperRedisInstance.kt b/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/redis/PaperRedisInstance.kt deleted file mode 100644 index 9c8c567..0000000 --- a/surf-queue-paper/src/main/kotlin/dev/slne/surf/queue/paper/redis/PaperRedisInstance.kt +++ /dev/null @@ -1,7 +0,0 @@ -package dev.slne.surf.queue.paper.redis - -import com.google.auto.service.AutoService -import dev.slne.surf.queue.common.redis.RedisInstance - -@AutoService(RedisInstance::class) -class PaperRedisInstance : RedisInstance() \ No newline at end of file diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/VelocityMain.kt b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/VelocityMain.kt index 309cd6c..76654da 100644 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/VelocityMain.kt +++ b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/VelocityMain.kt @@ -7,33 +7,31 @@ import com.velocitypowered.api.event.proxy.ProxyInitializeEvent import com.velocitypowered.api.event.proxy.ProxyShutdownEvent import com.velocitypowered.api.plugin.PluginContainer import com.velocitypowered.api.proxy.ProxyServer -import dev.slne.surf.queue.common.SurfQueueInstance -import dev.slne.surf.surfapi.velocity.api.metrics.Metrics +import dev.slne.surf.queue.common.QueueInstance import kotlinx.coroutines.runBlocking class VelocityMain @Inject constructor( val proxy: ProxyServer, val container: PluginContainer, - val suspendingPluginContainer: SuspendingPluginContainer, - val metricsFactory: Metrics.Factory + suspendingPluginContainer: SuspendingPluginContainer, ) { init { suspendingPluginContainer.initialize(this) plugin = this runBlocking { - SurfQueueInstance.get().load() + QueueInstance.get().load() } } @Subscribe suspend fun onProxyInitialize(event: ProxyInitializeEvent) { - SurfQueueInstance.get().enable() + QueueInstance.get().enable() } @Subscribe suspend fun onProxyShutdown(event: ProxyShutdownEvent) { - SurfQueueInstance.get().disable() + QueueInstance.get().disable() } } diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/VelocitySurfQueueInstance.kt b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/VelocitySurfQueueInstance.kt index b50aae1..1b0c4fe 100644 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/VelocitySurfQueueInstance.kt +++ b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/VelocitySurfQueueInstance.kt @@ -1,48 +1,26 @@ package dev.slne.surf.queue.velocity import com.google.auto.service.AutoService -import dev.slne.surf.queue.common.SurfQueueInstance -import dev.slne.surf.queue.common.queue.AbstractSurfQueue +import dev.slne.surf.queue.common.QueueInstance +import dev.slne.surf.queue.common.queue.AbstractQueue +import dev.slne.surf.queue.common.queue.tick.QueueTicker import dev.slne.surf.queue.velocity.command.queueCommand import dev.slne.surf.queue.velocity.listener.QueuePlayerListener -import dev.slne.surf.queue.velocity.metrics.QueueBstatsIntegration -import dev.slne.surf.queue.velocity.metrics.QueueMetricsLogger -import dev.slne.surf.queue.velocity.queue.QueueTickTask -import dev.slne.surf.queue.velocity.queue.VelocitySurfQueue -import dev.slne.surf.surfapi.core.api.util.logger +import dev.slne.surf.queue.velocity.queue.VelocityQueueImpl -@AutoService(SurfQueueInstance::class) -class VelocitySurfQueueInstance : SurfQueueInstance() { +@AutoService(QueueInstance::class) +class VelocitySurfQueueInstance : QueueInstance() { override val componentOwner get() = plugin.container override suspend fun enable() { super.enable() plugin.proxy.eventManager.register(plugin, QueuePlayerListener) - QueueTickTask.startTransferring() queueCommand() - - try { - QueueBstatsIntegration.setup(plugin.metricsFactory) - } catch (e: Exception) { - log.atWarning() - .withCause(e) - .log("Failed to initialize bStats integration") - } - } - - override suspend fun disable() { - QueueMetricsLogger.stop() - QueueTickTask.shutdown() - - super.disable() - } - - override fun createQueue(serverName: String): AbstractSurfQueue { - return VelocitySurfQueue(serverName) + QueueTicker.start() } - companion object { - private val log = logger() + override fun createQueue(serverName: String): AbstractQueue { + return VelocityQueueImpl(serverName) } } \ No newline at end of file diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/command/QueueCommand.kt b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/command/QueueCommand.kt index a4e2a0f..a0d9b58 100644 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/command/QueueCommand.kt +++ b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/command/QueueCommand.kt @@ -1,14 +1,10 @@ package dev.slne.surf.queue.velocity.command import dev.jorel.commandapi.kotlindsl.commandTree -import dev.slne.surf.queue.velocity.command.metrics.metricsCommand import dev.slne.surf.queue.velocity.command.pause.queuePauseCommand -import dev.slne.surf.queue.velocity.command.test.testQueueCommands import dev.slne.surf.queue.velocity.permission.SurfQueuePermissions -fun queueCommand() = commandTree("squeue") { +fun queueCommand() = commandTree("squeue-velocity") { withPermission(SurfQueuePermissions.COMMAND_QUEUE) - testQueueCommands() - metricsCommand() queuePauseCommand() } \ No newline at end of file diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/command/test/TestQueueCommands.kt b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/command/test/TestQueueCommands.kt deleted file mode 100644 index 587b4a3..0000000 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/command/test/TestQueueCommands.kt +++ /dev/null @@ -1,75 +0,0 @@ -package dev.slne.surf.queue.velocity.command.test - -import dev.jorel.commandapi.CommandTree -import dev.jorel.commandapi.kotlindsl.anyExecutor -import dev.jorel.commandapi.kotlindsl.getValue -import dev.jorel.commandapi.kotlindsl.literalArgument -import dev.slne.surf.core.api.common.player.SurfPlayer -import dev.slne.surf.core.api.common.server.SurfServer -import dev.slne.surf.core.api.velocity.command.argument.surfBackendServerArgument -import dev.slne.surf.core.api.velocity.command.argument.surfPlayerArgument -import dev.slne.surf.queue.common.queue.RedisQueueService -import dev.slne.surf.queue.velocity.queue.QueueTickTask -import dev.slne.surf.surfapi.core.api.messages.adventure.sendText -import dev.slne.surf.surfapi.velocity.api.command.executors.anyExecutorSuspend - -fun CommandTree.testQueueCommands() = literalArgument("test-queue") { - - literalArgument("enqueue") { - surfPlayerArgument("player") { - surfBackendServerArgument("server") { - anyExecutorSuspend { sender, args -> - val player: SurfPlayer by args - val server: SurfServer by args - - val queue = RedisQueueService.get().get(server.name) - queue.enqueue(player.uuid) - sender.sendText { - appendSuccessPrefix() - success("Enqueued player ") - variableValue(player.lastKnownName ?: player.uuid.toString()) - success(" to queue ") - variableValue(server.name) - } - } - } - } - } - - literalArgument("tickQueues") { - anyExecutorSuspend { sender, args -> - QueueTickTask.tick() - sender.sendText { - appendSuccessPrefix() - success("Ticked queues") - } - } - } - - literalArgument("transferTask") { - literalArgument("start") { - anyExecutorSuspend { source, arguments -> - QueueTickTask.shutdown() - QueueTickTask.startTransferring() - source.sendText { - appendSuccessPrefix() - success("Started transfer task") - } - } - } - - literalArgument("stop") { - anyExecutorSuspend { source, arguments -> - QueueTickTask.shutdown() - source.sendText { - appendSuccessPrefix() - success("Stopped transfer task") - } - } - } - } - - literalArgument("test-display") { - - } -} \ No newline at end of file diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/listener/QueuePlayerListener.kt b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/listener/QueuePlayerListener.kt index a979e70..62118c4 100644 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/listener/QueuePlayerListener.kt +++ b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/listener/QueuePlayerListener.kt @@ -1,12 +1,10 @@ package dev.slne.surf.queue.velocity.listener -import com.github.shynixn.mccoroutine.velocity.launch import com.velocitypowered.api.event.Subscribe import com.velocitypowered.api.event.connection.DisconnectEvent import com.velocitypowered.api.event.connection.PostLoginEvent import dev.slne.surf.queue.common.queue.RedisQueueService -import dev.slne.surf.queue.velocity.plugin -import dev.slne.surf.queue.velocity.queue.VelocitySurfQueue +import dev.slne.surf.queue.velocity.queue.VelocityQueueImpl import dev.slne.surf.surfapi.core.api.util.logger import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -15,11 +13,11 @@ object QueuePlayerListener { private val log = logger() @Subscribe - fun onPostLogin(event: PostLoginEvent) { + suspend fun onPostLogin(event: PostLoginEvent) { val uuid = event.player.uniqueId - plugin.container.launch { + coroutineScope { for (queue in RedisQueueService.get().getAll()) { - require(queue is VelocitySurfQueue) { "Queue must be VelocitySurfQueue" } + require(queue is VelocityQueueImpl) { "Queue must be VelocityQueueImpl" } launch { try { queue.markPlayerReconnected(uuid) @@ -31,6 +29,7 @@ object QueuePlayerListener { } } } + } @Subscribe @@ -38,12 +37,10 @@ object QueuePlayerListener { val uuid = event.player.uniqueId coroutineScope { for (queue in RedisQueueService.get().getAll()) { - require(queue is VelocitySurfQueue) + require(queue is VelocityQueueImpl) launch { try { - if (queue.isQueued(uuid)) { - queue.markPlayerDisconnected(uuid) - } + queue.markPlayerDisconnected(uuid) } catch (e: Exception) { log.atWarning() .withCause(e) diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/permission/SurfQueuePermissions.kt b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/permission/SurfQueuePermissions.kt index 3d74b3e..b2377a3 100644 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/permission/SurfQueuePermissions.kt +++ b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/permission/SurfQueuePermissions.kt @@ -5,7 +5,5 @@ object SurfQueuePermissions { private const val COMMAND_PREFIX = PREFIX + "command." const val COMMAND_QUEUE = COMMAND_PREFIX + "queue" - - const val COMMAND_METRICS = COMMAND_PREFIX + "metrics" const val COMMAND_PAUSE = COMMAND_PREFIX + "pause" } \ No newline at end of file diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/display/QueueDisplay.kt b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/QueueDisplay.kt similarity index 68% rename from surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/display/QueueDisplay.kt rename to surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/QueueDisplay.kt index 7d6724a..ab3aee9 100644 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/display/QueueDisplay.kt +++ b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/QueueDisplay.kt @@ -1,12 +1,11 @@ -package dev.slne.surf.queue.velocity.queue.display +package dev.slne.surf.queue.velocity.queue -import dev.slne.surf.queue.velocity.queue.VelocitySurfQueue import dev.slne.surf.queue.velocity.util.toVelocityPlayer import dev.slne.surf.surfapi.core.api.messages.adventure.buildText -import it.unimi.dsi.fastutil.objects.Object2IntMap +import it.unimi.dsi.fastutil.objects.ObjectList import java.util.* -class QueueDisplay(private val queue: VelocitySurfQueue) { +class QueueDisplay(private val queue: VelocityQueueImpl) { companion object { private val spinner = arrayOf("∙∙∙", "●∙∙", "∙ ●∙", "∙∙ ●", "∙∙∙") @@ -14,11 +13,11 @@ class QueueDisplay(private val queue: VelocitySurfQueue) { private const val PAUSE_CHAR = '⏸' } - private var cachedUuidsWithPosition: Collection>? = null + private var cachedUuidsWithPosition: ObjectList? = null suspend fun tick() { - if (queue.getTickCount() % 3L == 0L) { - cachedUuidsWithPosition = queue.getAllUuidsWithPosition() + if (queue.tickCount % 3L == 0L) { + cachedUuidsWithPosition = queue.getAllUuidsOrderedByPosition() } updateActionBars() @@ -26,14 +25,12 @@ class QueueDisplay(private val queue: VelocitySurfQueue) { private suspend fun updateActionBars() { val uuidsWithPosition = cachedUuidsWithPosition ?: return - val spinnerIndex = (queue.getTickCount() % spinner.size).toInt() + val spinnerIndex = (queue.tickCount % spinner.size) val spinnerEnd = spinner[spinnerIndex] val spinnerStart = spinnerReversed[spinnerIndex] val paused = queue.isPaused() - for (entry in uuidsWithPosition) { - val uuid = entry.key - val position = entry.intValue + for ((index, uuid) in uuidsWithPosition.withIndex()) { val player = uuid.toVelocityPlayer() ?: continue player.sendActionBar(buildText { @@ -52,7 +49,7 @@ class QueueDisplay(private val queue: VelocitySurfQueue) { appendSpace() spacer('|') appendSpace() - variableValue("$position/${uuidsWithPosition.size}") + variableValue("${index + 1}/${uuidsWithPosition.size}") appendSpace() spacer(spinnerEnd) } diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/QueueTickTask.kt b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/QueueTickTask.kt deleted file mode 100644 index 47dce93..0000000 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/QueueTickTask.kt +++ /dev/null @@ -1,53 +0,0 @@ -package dev.slne.surf.queue.velocity.queue - -import com.github.shynixn.mccoroutine.velocity.launch -import dev.slne.surf.queue.common.queue.RedisQueueService -import dev.slne.surf.queue.velocity.plugin -import dev.slne.surf.surfapi.core.api.util.logger -import kotlinx.coroutines.* -import kotlin.time.Duration.Companion.seconds - -object QueueTickTask { - - private val log = logger() - private var job: Job? = null - - private var lastFetch = 0L - - fun startTransferring() { - job = plugin.container.launch { - while (isActive) { - delay(1.seconds) - tick() - } - } - } - - suspend fun shutdown() { - job?.cancelAndJoin() - job = null - } - - suspend fun tick() { - val now = System.currentTimeMillis() - if (now - lastFetch > 30_000) { - lastFetch = now - RedisQueueService.get().fetchFromRedis() - } - - coroutineScope { - for (queue in RedisQueueService.get().getAll()) { - require(queue is VelocitySurfQueue) { "Queue must be VelocitySurfQueue" } - launch { - try { - queue.tickSecond() - } catch (e: Exception) { - log.atWarning() - .withCause(e) - .log("Error during tickSecond for queue %s", queue.serverName) - } - } - } - } - } -} \ No newline at end of file diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/QueueTransfer.kt b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/QueueTransfer.kt deleted file mode 100644 index 5f7a473..0000000 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/QueueTransfer.kt +++ /dev/null @@ -1,86 +0,0 @@ -package dev.slne.surf.queue.velocity.queue - -import dev.slne.surf.core.api.common.player.SurfPlayer -import dev.slne.surf.core.api.common.server.SurfServer -import dev.slne.surf.core.api.common.server.connection.SurfServerConnectResult -import dev.slne.surf.core.api.common.surfCoreApi -import dev.slne.surf.surfapi.core.api.util.logger -import kotlinx.coroutines.TimeoutCancellationException -import kotlinx.coroutines.withTimeout -import kotlin.time.Duration.Companion.seconds - -class QueueTransfer(private val processor: RedisQueueTransferProcessor, private val serverName: String) { - - companion object { - private val log = logger() - } - - suspend fun tryTransfer(): Int { - val coreServer = surfCoreApi.getServerByName(serverName) ?: return 0 - - val playerCount = coreServer.getPlayerCount() - val maxPlayers = coreServer.maxPlayers - val availableSlots = maxPlayers - playerCount - - if (availableSlots <= 0) return 0 - - return processor.processTransfers(availableSlots) { entry -> - try { - val corePlayer = surfCoreApi.getPlayer(entry.uuid) - if (corePlayer == null) { - TransferAction.PLAYER_NOT_FOUND - } else { - val currentPlayerServer = corePlayer.currentServer - val currentPlayerServerName = currentPlayerServer?.name - - if (currentPlayerServer == null) { // Probably transferring to another proxy - TransferAction.PLAYER_NOT_CONNECTED_TO_A_SERVER - } else if (currentPlayerServerName == serverName) { - TransferAction.PLAYER_ALREADY_ON_SERVER - } else { - tryTransferPlayer(corePlayer, coreServer) - } - } - - } catch (e: Exception) { - log.atWarning() - .withCause(e) - .log("Error during transfer for queue %s", serverName) - TransferAction.ERROR - } - } - } - - private suspend fun tryTransferPlayer( - player: SurfPlayer, - targetServer: SurfServer - ): TransferAction { - val (status, message) = try { - withTimeout(30.seconds) { - surfCoreApi.sendPlayerAwaiting(player, targetServer) - } - } catch (e: TimeoutCancellationException) { - log.atWarning() - .withCause(e) - .log("Timed out waiting for player %s to connect to server %s", player.uuid, targetServer.name) - return TransferAction.TIMEOUT - } - - return when (status) { - SurfServerConnectResult.Status.SERVER_NOT_FOUND -> TransferAction.SERVER_NOT_FOUND - SurfServerConnectResult.Status.ALREADY_CONNECTED -> TransferAction.PLAYER_ALREADY_ON_SERVER - SurfServerConnectResult.Status.CONNECTION_CANCELLED -> TransferAction.PLUGIN_CANCELLED_TRANSFER - SurfServerConnectResult.Status.CONNECTION_IN_PROGRESS -> TransferAction.PLAYER_ALREADY_CONNECTING - SurfServerConnectResult.Status.SERVER_DISCONNECTED -> { - if (targetServer.maxPlayers <= targetServer.getPlayerCount()) { - TransferAction.SERVER_FULL - } else { - TransferAction.PLAYER_KICKED_FROM_SERVER - } - } - - SurfServerConnectResult.Status.SUCCESS -> TransferAction.DONE - SurfServerConnectResult.Status.UNKNOWN_ERROR -> TransferAction.ERROR - } - } -} \ No newline at end of file diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/RedisQueueCleanup.kt b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/RedisQueueCleanup.kt deleted file mode 100644 index bc023a2..0000000 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/RedisQueueCleanup.kt +++ /dev/null @@ -1,54 +0,0 @@ -package dev.slne.surf.queue.velocity.queue - -import dev.slne.surf.queue.common.queue.RedisQueueLockManager -import dev.slne.surf.queue.common.queue.RedisQueueStore -import dev.slne.surf.queue.velocity.metrics.QueueMetrics -import dev.slne.surf.surfapi.core.api.util.logger -import java.time.Instant - -class RedisQueueCleanup( - private val queue: VelocitySurfQueue, - private val store: RedisQueueStore, - private val lockManager: RedisQueueLockManager -) { - - companion object { - private val log = logger() - } - - suspend fun tick() { - if (queue.getTickCount() % 30 == 0L) { - lockManager.withCleanupLock { - cleanupExpiredEntries() - } - } - } - - suspend fun cleanupExpiredEntries() { - val now = Instant.now().toEpochMilli() - val allLastSeen = store.readAllLastSeen() - var removals = 0 - - try { - for ((uuid, lastSeenTime) in allLastSeen) { - if (now - lastSeenTime >= VelocitySurfQueue.GRACE_PERIOD_MS) { - try { - queue.dequeue(uuid) - removals++ - log.atInfo() - .log("Cleanup: removed expired entry %s from queue %s", uuid, queue.serverName) - } catch (_: Exception) { - store.removeAllFor(uuid) - removals++ - } - } - } - } catch (e: Exception) { - log.atWarning() - .withCause(e) - .log("Failed to cleanup expired entries for queue %s", queue.serverName) - } - - QueueMetrics.recordCleanupCycle(removals) - } -} \ No newline at end of file diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/VelocityQueueImpl.kt b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/VelocityQueueImpl.kt new file mode 100644 index 0000000..8d9f1ce --- /dev/null +++ b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/VelocityQueueImpl.kt @@ -0,0 +1,23 @@ +package dev.slne.surf.queue.velocity.queue + +import dev.slne.surf.queue.common.queue.AbstractQueue +import dev.slne.surf.queue.common.queue.tick.SafeQueueTick +import java.time.Instant +import java.util.* + +class VelocityQueueImpl(serverName: String) : AbstractQueue(serverName) { + val display = QueueDisplay(this) + + suspend fun markPlayerReconnected(uuid: UUID) { + store.clearLastSeen(uuid) + } + + suspend fun markPlayerDisconnected(uuid: UUID) { + store.putLastSeen(uuid, Instant.now().toEpochMilli()) + } + + override suspend fun tick() { + super.tick() + SafeQueueTick.tickSafe(this, "display") { display.tick() } + } +} \ No newline at end of file diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/VelocitySurfQueue.kt b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/VelocitySurfQueue.kt deleted file mode 100644 index 4419f80..0000000 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/queue/VelocitySurfQueue.kt +++ /dev/null @@ -1,68 +0,0 @@ -package dev.slne.surf.queue.velocity.queue - -import dev.slne.surf.queue.common.queue.AbstractSurfQueue -import dev.slne.surf.queue.velocity.metrics.QueueMetrics -import dev.slne.surf.queue.velocity.queue.display.QueueDisplay -import dev.slne.surf.surfapi.core.api.util.logger -import java.time.Instant -import java.util.* -import java.util.concurrent.atomic.AtomicLong -import kotlin.time.Duration.Companion.minutes - -class VelocitySurfQueue(serverName: String) : AbstractSurfQueue(serverName) { - private val transferProcessor = RedisQueueTransferProcessor(serverName, store, lockManager, GRACE_PERIOD_MS) - private val cleanup = RedisQueueCleanup(this, store, lockManager) - - private val tickCount = AtomicLong(0) - - val display = QueueDisplay(this) - - companion object { - private val log = logger() - - val GRACE_PERIOD_MS = 1.minutes.inWholeMilliseconds - const val LOCK_LEASE_SECONDS = 30L - } - - override fun onEnqueued() { - QueueMetrics.recordEnqueue(serverName) - } - - override fun onDequeued() { - QueueMetrics.recordDequeue(serverName) - } - - suspend fun markPlayerReconnected(uuid: UUID) { - store.clearLastSeen(uuid) - } - - suspend fun markPlayerDisconnected(uuid: UUID) { - store.putLastSeen(uuid, Instant.now().toEpochMilli()) - } - - fun getTickCount() = tickCount.get() - - suspend fun tickSecond() { - tickCount.incrementAndGet() - QueueMetrics.recordTick() - - safeTick("cleanup") { cleanup.tick() } - safeTick("transfers") { transferProcessor.tick() } - safeTick("display") { display.tick() } - } - - private inline fun safeTick(component: String, block: () -> Unit) { - try { - block() - } catch (e: Exception) { - log.atWarning() - .withCause(e) - .log("Failed to tick %s for queue %s", component, serverName) - } - } - - suspend fun delete() { - store.deleteAll() - } - -} \ No newline at end of file diff --git a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/redis/VelocityRedisInstance.kt b/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/redis/VelocityRedisInstance.kt deleted file mode 100644 index deac7c3..0000000 --- a/surf-queue-velocity/src/main/kotlin/dev/slne/surf/queue/velocity/redis/VelocityRedisInstance.kt +++ /dev/null @@ -1,11 +0,0 @@ -package dev.slne.surf.queue.velocity.redis - -import com.google.auto.service.AutoService -import dev.slne.surf.queue.common.redis.RedisInstance - -@AutoService(RedisInstance::class) -class VelocityRedisInstance : RedisInstance() { - override fun register() { - super.register() - } -} \ No newline at end of file