Skip to content

Commit 64e56eb

Browse files
committed
Convert bukkit async scheduler to coroutine
1 parent b0cd7b4 commit 64e56eb

File tree

6 files changed

+108
-72
lines changed

6 files changed

+108
-72
lines changed

bukkit/src/main/kotlin/io/github/rothes/esu/bukkit/module/ChatAntiSpamModule.kt

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,13 @@ import io.github.rothes.esu.bukkit.user
1212
import io.github.rothes.esu.bukkit.user.ConsoleUser
1313
import io.github.rothes.esu.bukkit.user.PlayerUser
1414
import io.github.rothes.esu.bukkit.util.ComponentBukkitUtils.user
15-
import io.github.rothes.esu.bukkit.util.scheduler.ScheduledTask
16-
import io.github.rothes.esu.bukkit.util.scheduler.Scheduler
1715
import io.github.rothes.esu.bukkit.util.version.adapter.PlayerAdapter.Companion.displayName_
1816
import io.github.rothes.esu.core.configuration.ConfigurationPart
1917
import io.github.rothes.esu.core.configuration.data.MessageData
2018
import io.github.rothes.esu.core.configuration.data.MessageData.Companion.message
2119
import io.github.rothes.esu.core.configuration.meta.Comment
2220
import io.github.rothes.esu.core.configuration.serializer.MapSerializer.DefaultedEnumMap
21+
import io.github.rothes.esu.core.coroutine.AsyncScope
2322
import io.github.rothes.esu.core.module.configuration.BaseModuleConfiguration
2423
import io.github.rothes.esu.core.user.User
2524
import io.github.rothes.esu.core.util.ComponentUtils.component
@@ -28,6 +27,10 @@ import io.github.rothes.esu.core.util.ComponentUtils.parsed
2827
import io.github.rothes.esu.core.util.ComponentUtils.unparsed
2928
import io.github.rothes.esu.lib.adventure.text.minimessage.tag.resolver.TagResolver
3029
import io.github.rothes.esu.lib.configurate.objectmapping.meta.PostProcess
30+
import kotlinx.coroutines.Job
31+
import kotlinx.coroutines.delay
32+
import kotlinx.coroutines.isActive
33+
import kotlinx.coroutines.launch
3134
import org.bukkit.Bukkit
3235
import org.incendo.cloud.component.DefaultValue
3336
import java.time.Duration
@@ -40,11 +43,16 @@ import kotlin.time.toJavaDuration
4043

4144
object ChatAntiSpamModule: BukkitModule<ChatAntiSpamModule.ModuleConfig, ChatAntiSpamModule.ModuleLocale>() {
4245

43-
private var purgeTask: ScheduledTask? = null
46+
private var purgeTask: Job? = null
4447

4548
override fun onEnable() {
4649
CasDataManager
47-
purgeTask = Scheduler.asyncTicks(20, 5 * 60 * 20) { CasDataManager.purgeCache(true) }
50+
purgeTask = AsyncScope.launch {
51+
while (isActive) {
52+
CasDataManager.purgeCache(true)
53+
delay(5.minutes)
54+
}
55+
}
4856
CasListeners.enable()
4957
Bukkit.getOnlinePlayers().map { it.user }.forEach {
5058
if (it.hasPerm("notify"))

bukkit/src/main/kotlin/io/github/rothes/esu/bukkit/module/CommandAntiSpamModule.kt

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,40 +4,47 @@ import com.google.common.collect.HashBasedTable
44
import io.github.rothes.esu.bukkit.user
55
import io.github.rothes.esu.bukkit.util.extension.ListenerExt.register
66
import io.github.rothes.esu.bukkit.util.extension.ListenerExt.unregister
7-
import io.github.rothes.esu.bukkit.util.scheduler.ScheduledTask
8-
import io.github.rothes.esu.bukkit.util.scheduler.Scheduler
97
import io.github.rothes.esu.core.configuration.ConfigurationPart
108
import io.github.rothes.esu.core.configuration.meta.Comment
9+
import io.github.rothes.esu.core.coroutine.AsyncScope
1110
import io.github.rothes.esu.core.module.configuration.BaseModuleConfiguration
1211
import io.github.rothes.esu.core.user.User
1312
import io.github.rothes.esu.core.util.extension.DurationExt.compareTo
13+
import kotlinx.coroutines.Job
14+
import kotlinx.coroutines.delay
15+
import kotlinx.coroutines.isActive
16+
import kotlinx.coroutines.launch
1417
import org.bukkit.event.EventHandler
1518
import org.bukkit.event.EventPriority
1619
import org.bukkit.event.Listener
1720
import org.bukkit.event.player.PlayerCommandPreprocessEvent
1821
import java.time.Duration
22+
import kotlin.time.Duration.Companion.minutes
1923
import kotlin.time.Duration.Companion.seconds
2024
import kotlin.time.toJavaDuration
2125

2226
object CommandAntiSpamModule: BukkitModule<CommandAntiSpamModule.ModuleConfig, CommandAntiSpamModule.ModuleLocale>() {
2327

24-
private var cacheTask: ScheduledTask? = null
28+
private var cacheTask: Job? = null
2529

2630
override fun onEnable() {
2731
Listeners.register()
28-
cacheTask = Scheduler.asyncTicks(5 * 60 * 20L, 5 * 60 * 20L) {
29-
val now = System.currentTimeMillis()
30-
Listeners.hits.cellSet().toList().forEach { cell ->
31-
val cmd = cell.columnKey
32-
val queue = cell.value
33-
val conf = config.commands.find { it.commands.any { regex -> regex.containsMatchIn(cmd) } }
34-
if (conf == null) {
35-
queue.clear()
36-
} else {
37-
queue.removeIf { now - it > conf.expireInterval }
38-
}
39-
if (queue.isEmpty()) {
40-
Listeners.hits.remove(cell.rowKey, cell.columnKey)
32+
cacheTask = AsyncScope.launch {
33+
while (isActive) {
34+
delay(5.minutes)
35+
val now = System.currentTimeMillis()
36+
Listeners.hits.cellSet().toList().forEach { cell ->
37+
val cmd = cell.columnKey
38+
val queue = cell.value
39+
val conf = config.commands.find { it.commands.any { regex -> regex.containsMatchIn(cmd) } }
40+
if (conf == null) {
41+
queue.clear()
42+
} else {
43+
queue.removeIf { now - it > conf.expireInterval }
44+
}
45+
if (queue.isEmpty()) {
46+
Listeners.hits.remove(cell.rowKey, cell.columnKey)
47+
}
4148
}
4249
}
4350
}

bukkit/src/main/kotlin/io/github/rothes/esu/bukkit/module/exploitfix/VaultUnlocking.kt

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,18 @@ import io.github.rothes.esu.bukkit.util.CoordinateUtils.blockKey_
77
import io.github.rothes.esu.bukkit.util.ServerCompatibility
88
import io.github.rothes.esu.bukkit.util.extension.ListenerExt.register
99
import io.github.rothes.esu.bukkit.util.extension.ListenerExt.unregister
10-
import io.github.rothes.esu.bukkit.util.scheduler.ScheduledTask
11-
import io.github.rothes.esu.bukkit.util.scheduler.Scheduler
10+
import io.github.rothes.esu.core.coroutine.AsyncScope
1211
import io.github.rothes.esu.core.util.CollectionUtils.removeWhile
1312
import io.github.rothes.esu.core.util.ComponentUtils.duration
1413
import io.github.rothes.esu.core.util.version.Version
1514
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap
1615
import it.unimi.dsi.fastutil.longs.LongArrayList
1716
import it.unimi.dsi.fastutil.longs.LongList
1817
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap
18+
import kotlinx.coroutines.Job
19+
import kotlinx.coroutines.delay
20+
import kotlinx.coroutines.isActive
21+
import kotlinx.coroutines.launch
1922
import org.bukkit.Material
2023
import org.bukkit.World
2124
import org.bukkit.block.Block
@@ -35,7 +38,7 @@ internal object VaultUnlocking: Listener {
3538

3639
private var enabled = AtomicBoolean(false)
3740
private val opened = Object2ObjectOpenHashMap<World, Long2ObjectOpenHashMap<LongList>>()
38-
private var purgeTask: ScheduledTask? = null
41+
private var purgeTask: Job? = null
3942

4043
private fun getBlockTimes(block: Block) =
4144
opened.getOrPut(block.world) { Long2ObjectOpenHashMap() }.getOrPut(block.blockKey_) { LongArrayList() }
@@ -48,21 +51,24 @@ internal object VaultUnlocking: Listener {
4851
}
4952
if (enabled.compareAndSet(expectedValue = false, newValue = true)) {
5053
this.register()
51-
purgeTask = Scheduler.async(30.minutes, 30.minutes) {
52-
val now = System.currentTimeMillis()
53-
val config = ExploitFixesModule.config.vaultUnlocking
54-
val expired = now - config.unlockExpiry.toMillis()
55-
val iterator = opened.iterator()
56-
for ((_, map) in iterator) {
57-
val iterator2 = map.iterator()
58-
for ((_, list) in iterator2) {
59-
list.removeWhile { it < expired }
60-
if (list.isEmpty()) {
61-
iterator2.remove()
54+
purgeTask = AsyncScope.launch {
55+
while (isActive) {
56+
delay(30.minutes)
57+
val now = System.currentTimeMillis()
58+
val config = ExploitFixesModule.config.vaultUnlocking
59+
val expired = now - config.unlockExpiry.toMillis()
60+
val iterator = opened.iterator()
61+
for ((_, map) in iterator) {
62+
val iterator2 = map.iterator()
63+
for ((_, list) in iterator2) {
64+
list.removeWhile { it < expired }
65+
if (list.isEmpty()) {
66+
iterator2.remove()
67+
}
68+
}
69+
if (map.isEmpty()) {
70+
iterator.remove()
6271
}
63-
}
64-
if (map.isEmpty()) {
65-
iterator.remove()
6672
}
6773
}
6874
}

bukkit/src/main/kotlin/io/github/rothes/esu/bukkit/module/networkthrottle/HighLatencyAdjust.kt

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,26 @@ import io.github.rothes.esu.bukkit.util.ServerCompatibility
1313
import io.github.rothes.esu.bukkit.util.extension.ListenerExt.register
1414
import io.github.rothes.esu.bukkit.util.extension.ListenerExt.unregister
1515
import io.github.rothes.esu.bukkit.util.extension.checkPacketEvents
16-
import io.github.rothes.esu.bukkit.util.scheduler.ScheduledTask
17-
import io.github.rothes.esu.bukkit.util.scheduler.Scheduler
1816
import io.github.rothes.esu.core.configuration.ConfigurationPart
1917
import io.github.rothes.esu.core.configuration.data.MessageData
2018
import io.github.rothes.esu.core.configuration.data.MessageData.Companion.message
2119
import io.github.rothes.esu.core.configuration.meta.Comment
20+
import io.github.rothes.esu.core.coroutine.AsyncScope
2221
import io.github.rothes.esu.core.module.CommonFeature
2322
import io.github.rothes.esu.core.module.Feature
2423
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap
24+
import kotlinx.coroutines.Job
25+
import kotlinx.coroutines.delay
26+
import kotlinx.coroutines.isActive
27+
import kotlinx.coroutines.launch
2528
import org.bukkit.Bukkit
2629
import org.bukkit.entity.Player
2730
import org.bukkit.event.EventHandler
2831
import org.bukkit.event.Listener
2932
import org.bukkit.event.player.PlayerQuitEvent
3033
import java.time.Duration
3134
import kotlin.math.min
35+
import kotlin.time.Duration.Companion.seconds
3236
import kotlin.time.toJavaDuration
3337

3438
object HighLatencyAdjust: CommonFeature<HighLatencyAdjust.FeatureConfig, HighLatencyAdjust.FeatureLang>(), Listener {
@@ -37,7 +41,7 @@ object HighLatencyAdjust: CommonFeature<HighLatencyAdjust.FeatureConfig, HighLat
3741

3842
val adjusted = hashMapOf<Player, Int>()
3943
val startTime = Object2LongOpenHashMap<Player>().also { it.defaultReturnValue(NO_TIME) }
40-
var task: ScheduledTask? = null
44+
var task: Job? = null
4145

4246
@EventHandler
4347
fun onQuit(e: PlayerQuitEvent) {
@@ -63,32 +67,35 @@ object HighLatencyAdjust: CommonFeature<HighLatencyAdjust.FeatureConfig, HighLat
6367
}
6468
data.originalViewDistance.clear()
6569

66-
task = Scheduler.asyncTicks(0, 15 * 20) {
67-
val now = System.currentTimeMillis()
68-
for (player in Bukkit.getOnlinePlayers()) {
69-
if (player.ping >= config.latencyThreshold) {
70-
val last = startTime.getLong(player)
71-
if (last == NO_TIME) {
72-
startTime[player] = now
73-
continue
74-
} else if ((now - last) < config.duration.toMillis()) {
75-
continue
76-
}
77-
startTime.removeLong(player)
78-
79-
if (!adjusted.containsKey(player)) {
80-
adjusted[player] = player.clientViewDistance
81-
player.sendViewDistance = min(player.clientViewDistance, player.viewDistance) - 1
82-
} else {
83-
if (player.sendViewDistance <= config.minViewDistance) {
70+
task = AsyncScope.launch {
71+
while (isActive) {
72+
val now = System.currentTimeMillis()
73+
for (player in Bukkit.getOnlinePlayers()) {
74+
if (player.ping >= config.latencyThreshold) {
75+
val last = startTime.getLong(player)
76+
if (last == NO_TIME) {
77+
startTime[player] = now
78+
continue
79+
} else if ((now - last) < config.duration.toMillis()) {
8480
continue
8581
}
86-
player.sendViewDistance--
82+
startTime.removeLong(player)
83+
84+
if (!adjusted.containsKey(player)) {
85+
adjusted[player] = player.clientViewDistance
86+
player.sendViewDistance = min(player.clientViewDistance, player.viewDistance) - 1
87+
} else {
88+
if (player.sendViewDistance <= config.minViewDistance) {
89+
continue
90+
}
91+
player.sendViewDistance--
92+
}
93+
player.user.message(lang, { adjustedWarning })
94+
} else {
95+
startTime.removeLong(player)
8796
}
88-
player.user.message(lang, { adjustedWarning })
89-
} else {
90-
startTime.removeLong(player)
9197
}
98+
delay(15.seconds)
9299
}
93100
}
94101
PacketEvents.getAPI().eventManager.registerListener(PacketListeners)

bukkit/src/main/kotlin/io/github/rothes/esu/bukkit/module/news/NewsDataManager.kt

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
package io.github.rothes.esu.bukkit.module.news
22

33
import io.github.rothes.esu.bukkit.module.NewsModule
4-
import io.github.rothes.esu.bukkit.util.scheduler.ScheduledTask
5-
import io.github.rothes.esu.bukkit.util.scheduler.Scheduler
64
import io.github.rothes.esu.core.storage.StorageManager
75
import io.github.rothes.esu.core.storage.StorageManager.database
86
import io.github.rothes.esu.core.storage.StorageManager.upgrader
97
import io.github.rothes.esu.core.user.User
108
import io.github.rothes.esu.core.util.DataSerializer.deserialize
119
import io.github.rothes.esu.core.util.DataSerializer.serialize
12-
import kotlinx.coroutines.launch
10+
import kotlinx.coroutines.*
1311
import kotlinx.datetime.LocalDateTime
1412
import org.jetbrains.exposed.v1.core.*
1513
import org.jetbrains.exposed.v1.datetime.datetime
1614
import org.jetbrains.exposed.v1.jdbc.*
1715
import org.jetbrains.exposed.v1.jdbc.transactions.transaction
1816
import org.jetbrains.exposed.v1.json.json
17+
import kotlin.time.Duration.Companion.minutes
18+
import kotlin.time.Duration.Companion.seconds
1919

2020
object NewsDataManager {
2121

@@ -36,7 +36,7 @@ object NewsDataManager {
3636
override val primaryKey: PrimaryKey = PrimaryKey(user, channel)
3737
}
3838

39-
private var task: ScheduledTask? = null
39+
private var task: Job? = null
4040
var news: List<NewsItem> = emptyList()
4141
private set
4242

@@ -57,15 +57,12 @@ object NewsDataManager {
5757

5858
fun start() {
5959
task?.cancel()
60-
fetchNews()
61-
fun schedule() {
62-
// We want a random offset
63-
task = Scheduler.asyncTicks(20 * 60L + (-200 .. 200).random()) {
60+
task = CoroutineScope(Dispatchers.IO).launch {
61+
while (isActive) {
6462
fetchNews()
65-
schedule()
63+
delay(1.minutes + (1 .. 10).random().seconds) // Random offset to stagger the queries
6664
}
6765
}
68-
schedule()
6966
}
7067

7168
fun shutdown() {
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.github.rothes.esu.core.coroutine
2+
3+
import kotlinx.coroutines.CoroutineScope
4+
import kotlinx.coroutines.Dispatchers
5+
import kotlin.coroutines.CoroutineContext
6+
7+
object AsyncScope : CoroutineScope {
8+
9+
override val coroutineContext: CoroutineContext = Dispatchers.Default
10+
11+
}

0 commit comments

Comments
 (0)