Skip to content

Commit

Permalink
feat: make use of Kotlin coroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
Cubxity committed Jun 23, 2021
1 parent efe13db commit 4544e1a
Show file tree
Hide file tree
Showing 16 changed files with 109 additions and 171 deletions.
2 changes: 1 addition & 1 deletion .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package dev.cubxity.plugins.metrics.api
import dev.cubxity.plugins.metrics.api.logging.Logger
import dev.cubxity.plugins.metrics.api.metric.MetricsManager
import dev.cubxity.plugins.metrics.api.platform.Platform
import dev.cubxity.plugins.metrics.api.scheduler.SchedulerAdapter
import kotlinx.coroutines.CoroutineDispatcher

/**
* The UnifiedMetrics API
Expand All @@ -48,9 +48,9 @@ interface UnifiedMetrics {
val logger: Logger

/**
* The platform's scheduler.
* The platform's dispatcher.
*/
val scheduler: SchedulerAdapter
val dispatcher: CoroutineDispatcher

/**
* The metrics api.
Expand Down

This file was deleted.

1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ subprojects {
tasks.withType<KotlinCompile> {
kotlinOptions {
jvmTarget = "1.8"
freeCompilerArgs = listOf("-Xopt-in=kotlin.RequiresOptIn")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* UnifiedMetrics is a fully-featured metrics collection plugin for Minecraft servers.
* Copyright (C) 2021 Cubxity
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package dev.cubxity.plugins.metrics.bukkit

import kotlinx.coroutines.*
import org.bukkit.Bukkit
import org.bukkit.plugin.java.JavaPlugin
import kotlin.coroutines.CoroutineContext

@OptIn(InternalCoroutinesApi::class)
class BukkitDispatcher(private val plugin: JavaPlugin) : CoroutineDispatcher(), Delay {
@OptIn(ExperimentalCoroutinesApi::class)
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val task = plugin.server.scheduler.runTaskLater(
plugin,
Runnable {
continuation.apply { resumeUndispatched(Unit) }
},
timeMillis / 50
)
continuation.invokeOnCancellation { task.cancel() }
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!context.isActive) {
return
}

if (Bukkit.isPrimaryThread()) {
block.run()
} else {
plugin.server.scheduler.runTask(plugin, block)
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
package dev.cubxity.plugins.metrics.bukkit.bootstrap

import dev.cubxity.plugins.metrics.api.platform.PlatformType
import dev.cubxity.plugins.metrics.bukkit.BukkitSchedulerAdapter
import dev.cubxity.plugins.metrics.bukkit.BukkitDispatcher
import dev.cubxity.plugins.metrics.bukkit.UnifiedMetricsBukkitPlugin
import dev.cubxity.plugins.metrics.common.UnifiedMetricsBootstrap
import dev.cubxity.plugins.metrics.common.plugin.logger.JavaLogger
import kotlinx.coroutines.CoroutineDispatcher
import org.bukkit.plugin.java.JavaPlugin
import java.nio.file.Path

Expand All @@ -47,7 +48,7 @@ class UnifiedMetricsBukkitBootstrap : JavaPlugin(), UnifiedMetricsBootstrap {

override val logger = JavaLogger(getLogger())

override val scheduler = BukkitSchedulerAdapter(this)
override val dispatcher: CoroutineDispatcher = BukkitDispatcher(this)

override fun onEnable() {
plugin.enable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package dev.cubxity.plugins.metrics.common

import dev.cubxity.plugins.metrics.api.logging.Logger
import dev.cubxity.plugins.metrics.api.platform.PlatformType
import dev.cubxity.plugins.metrics.api.scheduler.SchedulerAdapter
import kotlinx.coroutines.CoroutineDispatcher
import java.nio.file.Path

interface UnifiedMetricsBootstrap {
Expand Down Expand Up @@ -55,7 +55,7 @@ interface UnifiedMetricsBootstrap {
val logger: Logger

/**
* The platform's scheduler
* The platform's dispatcher
*/
val scheduler: SchedulerAdapter
val dispatcher: CoroutineDispatcher
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import dev.cubxity.plugins.metrics.api.UnifiedMetrics
import dev.cubxity.plugins.metrics.api.logging.Logger
import dev.cubxity.plugins.metrics.api.metric.MetricsManager
import dev.cubxity.plugins.metrics.api.platform.Platform
import dev.cubxity.plugins.metrics.api.scheduler.SchedulerAdapter
import dev.cubxity.plugins.metrics.common.config.ServerSpec
import dev.cubxity.plugins.metrics.common.plugin.UnifiedMetricsPlugin
import kotlinx.coroutines.CoroutineDispatcher

open class UnifiedMetricsApiProvider(val plugin: UnifiedMetricsPlugin) : UnifiedMetrics {
override val platform: Platform = PlatformImpl(plugin)
Expand All @@ -35,8 +35,8 @@ open class UnifiedMetricsApiProvider(val plugin: UnifiedMetricsPlugin) : Unified
override val logger: Logger
get() = plugin.bootstrap.logger

override val scheduler: SchedulerAdapter
get() = plugin.bootstrap.scheduler
override val dispatcher: CoroutineDispatcher
get() = plugin.bootstrap.dispatcher

override val metricsManager: MetricsManager =
MetricsManagerImpl(plugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ abstract class AbstractUnifiedMetricsPlugin : UnifiedMetricsPlugin {

UnifiedMetricsProvider.unregister()
_apiProvider = null

bootstrap.scheduler.shutdown()
}

abstract fun registerPlatformService(api: UnifiedMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package dev.cubxity.plugins.metrics.api.scheduler
package dev.cubxity.plugins.metrics.common.plugin.dispatcher

import java.util.concurrent.Executor
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Runnable
import kotlin.coroutines.CoroutineContext

interface SchedulerAdapter {
val sync: Executor

fun asyncRepeating(task: Runnable, interval: Long, unit: TimeUnit): SchedulerTask

fun shutdown()
object CurrentThreadDispatcher : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
block.run()
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import dev.cubxity.plugins.metrics.api.metric.MetricsDriver
import dev.cubxity.plugins.metrics.api.metric.collect
import dev.cubxity.plugins.metrics.api.metric.data.MetricSample
import dev.cubxity.plugins.metrics.influx.config.InfluxSpec
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.*
import com.influxdb.client.write.Point as InfluxPoint

class InfluxMetricsDriver(private val api: UnifiedMetrics, private val config: Config) : MetricsDriver {
private val coroutineScope = CoroutineScope(Dispatchers.Default) + SupervisorJob()

private var influxDBClient: InfluxDBClient? = null
private var writeApi: WriteApi? = null

Expand All @@ -53,6 +55,7 @@ class InfluxMetricsDriver(private val api: UnifiedMetrics, private val config: C
}

override fun close() {
coroutineScope.cancel()
writeApi?.close()
influxDBClient?.close()
influxDBClient = null
Expand All @@ -61,15 +64,19 @@ class InfluxMetricsDriver(private val api: UnifiedMetrics, private val config: C
private fun scheduleTasks() {
val interval = config[InfluxSpec.interval]

val scheduler = api.scheduler

scheduler.asyncRepeating({
try {
writeSamples(api.metricsManager.collect())
} catch (error: Throwable) {
api.logger.severe("An error occurred whilst writing samples to InfluxDB", error)
coroutineScope.launch {
while (true) {
try {
val samples = withContext(api.dispatcher) {
api.metricsManager.collect()
}
writeSamples(samples)
} catch (error: Throwable) {
api.logger.severe("An error occurred whilst writing samples to InfluxDB", error)
}
delay(interval * 1000)
}
}, interval, TimeUnit.SECONDS)
}
}

private fun writeSamples(samples: List<MetricSample>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,26 @@ import dev.cubxity.plugins.metrics.api.UnifiedMetrics
import dev.cubxity.plugins.metrics.api.metric.collect
import dev.cubxity.plugins.metrics.api.metric.data.MetricType
import io.prometheus.client.Collector
import kotlinx.coroutines.runBlocking

class UnifiedMetricsCollector(private val api: UnifiedMetrics) : Collector() {
override fun collect(): List<MetricFamilySamples> {
return api.metricsManager.collect().map {
val keys = it.tags.keys.toList()
val values = it.tags.values.toList()
val sample = MetricFamilySamples.Sample(it.name, keys, values, it.value)
val type = when (it.type) {
MetricType.Counter -> Type.COUNTER
MetricType.Gauge -> Type.GAUGE
else -> Type.UNKNOWN
}
override fun collect(): List<MetricFamilySamples> = runBlocking(api.dispatcher) {
try {
api.metricsManager.collect().map {
val keys = it.tags.keys.toList()
val values = it.tags.values.toList()
val sample = MetricFamilySamples.Sample(it.name, keys, values, it.value)
val type = when (it.type) {
MetricType.Counter -> Type.COUNTER
MetricType.Gauge -> Type.GAUGE
else -> Type.UNKNOWN
}

MetricFamilySamples(it.name, type, "", listOf(sample))
MetricFamilySamples(it.name, type, "", listOf(sample))
}
} catch (exception: Exception) {
api.logger.severe("An error occurred whilst collecting metrics", exception)
}
emptyList()
}
}

0 comments on commit 4544e1a

Please sign in to comment.