Skip to content

Commit

Permalink
Gracefully handle SIGTERM, GameDataEvent performance improvements (#116)
Browse files Browse the repository at this point in the history
* Increase server.nettyThreadpoolSize default to 30, minor optimizations for GameDataEvents, gracefully handle SIGTERM

* Do not attempt to post duplicate tweets (they will fail).

* Remove improvedLagstat switch. More performance improvements.

* flags refactorings
  • Loading branch information
hopskipnfall committed May 24, 2024
1 parent 94eafae commit 5b0b2c8
Show file tree
Hide file tree
Showing 17 changed files with 166 additions and 128 deletions.
16 changes: 8 additions & 8 deletions emulinker/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ repositories {

dependencies {
api("org.jetbrains.kotlin:kotlin-stdlib:1.8.21")
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")

implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.5.1")

Expand All @@ -44,13 +43,14 @@ dependencies {
api("commons-pool:commons-pool:1.2")

val ktorVersion = "2.3.9"
api("io.ktor:ktor-network-jvm:$ktorVersion")
api("io.ktor:ktor-server-core-jvm:$ktorVersion")
api("io.ktor:ktor-server-netty-jvm:$ktorVersion")
api("io.ktor:ktor-server-status-pages-jvm:$ktorVersion")
api("io.ktor:ktor-server-default-headers-jvm:$ktorVersion")

api("io.reactivex.rxjava3:rxjava:3.1.1")
implementation("io.ktor:ktor-network-jvm:$ktorVersion")
implementation("io.ktor:ktor-server-core-jvm:$ktorVersion")
implementation("io.ktor:ktor-server-netty-jvm:$ktorVersion")
implementation("io.ktor:ktor-server-status-pages-jvm:$ktorVersion")
implementation("io.ktor:ktor-server-default-headers-jvm:$ktorVersion")

// This is only used by the fake testing client, hopefully we can remove this.
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")

testImplementation("junit:junit:4.13.2")
testImplementation("com.google.truth:truth:1.1.3")
Expand Down
8 changes: 3 additions & 5 deletions emulinker/conf/emulinker.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
# CHARSET CONFIGURATION
# =====================
# Do not use UTF-8, cause standard clients do not support it.
# Cyrillic - Cp1251; Japanese - Shift_JIS; Latin - Cp1252; Korean - Cp949 etc.
emulinker.charset=Cp1251
# Cyrillic - Cp1251; Latin - Cp1252; Korean - Cp949 etc.
emulinker.charset=Shift_JIS

# NETWORKING CONFIGURATION
# ========================
Expand Down Expand Up @@ -153,6 +153,4 @@ twitter.auth.oAuthAccessTokenSecret=
twitter.auth.oAuthConsumerKey=
twitter.auth.oAuthConsumerSecret=

server.improvedLagstatEnabled=true

server.nettyThreadpoolSize=15
# server.nettyThreadpoolSize=30
9 changes: 6 additions & 3 deletions emulinker/src/main/java/org/emulinker/config/RuntimeFlags.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ data class RuntimeFlags(
val allowSinglePlayer: Boolean,
val charset: Charset,
val chatFloodTime: Int,
// tbh I have no idea what this does.
val clientTypes: List<String>,
val connectionTypes: List<String>,
val coreThreadPoolSize: Int,
val createGameFloodTime: Int,
Expand All @@ -22,7 +24,6 @@ data class RuntimeFlags(
val gameDesynchTimeouts: Int,
val gameTimeout: Duration,
val idleTimeout: Duration,
val improvedLagstatEnabled: Boolean,
val keepAliveTimeout: Duration,
val maxChatLength: Int,
val maxClientNameLength: Int,
Expand All @@ -38,6 +39,7 @@ data class RuntimeFlags(
val serverAddress: String,
val serverLocation: String,
val serverName: String,
val serverPort: Int,
val serverWebsite: String,
val touchEmulinker: Boolean,
val touchKaillera: Boolean,
Expand Down Expand Up @@ -87,6 +89,7 @@ data class RuntimeFlags(
allowSinglePlayer = config.getBoolean("server.allowSinglePlayer", true),
charset = Charset.forName(config.getString("emulinker.charset")),
chatFloodTime = config.getInt("server.chatFloodTime"),
clientTypes = config.getStringArray("controllers.v086.clientTypes.clientType").toList(),
connectionTypes = config.getList("server.allowedConnectionTypes") as List<String>,
coreThreadPoolSize = config.getInt("server.coreThreadpoolSize", 5),
createGameFloodTime = config.getInt("server.createGameFloodTime"),
Expand All @@ -95,7 +98,6 @@ data class RuntimeFlags(
gameDesynchTimeouts = config.getInt("game.desynchTimeouts"),
gameTimeout = config.getInt("game.timeoutMillis").milliseconds,
idleTimeout = config.getInt("server.idleTimeout").seconds,
improvedLagstatEnabled = config.getBoolean("server.improvedLagstatEnabled", true),
keepAliveTimeout = config.getInt("server.keepAliveTimeout").seconds,
maxChatLength = config.getInt("server.maxChatLength"),
maxClientNameLength = config.getInt("server.maxClientNameLength"),
Expand All @@ -111,6 +113,7 @@ data class RuntimeFlags(
serverAddress = config.getString("masterList.serverConnectAddress", ""),
serverLocation = config.getString("masterList.serverLocation", "Unknown"),
serverName = config.getString("masterList.serverName", "Emulinker Server"),
serverPort = config.getInt("controllers.connect.port"),
serverWebsite = config.getString("masterList.serverWebsite", ""),
touchEmulinker = config.getBoolean("masterList.touchEmulinker", false),
touchKaillera = config.getBoolean("masterList.touchKaillera", false),
Expand All @@ -125,7 +128,7 @@ data class RuntimeFlags(
config.getStringArray("twitter.preventBroadcastNameSuffixes").toList(),
v086BufferSize = config.getInt("controllers.v086.bufferSize", 4096),
// TODO(nue): This default works well, but maybe we can experiment further.
nettyFlags = config.getInt("server.nettyThreadpoolSize", 15),
nettyFlags = config.getInt("server.nettyThreadpoolSize", 30),
)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
package org.emulinker.kaillera.access

import java.time.Instant
import java.util.*
import java.util.Locale
import kotlin.time.Duration
import kotlin.time.toJavaDuration
import org.emulinker.util.WildcardStringPattern

sealed class TemporaryAttribute(accessStr: String, val duration: Duration) {
private val patterns =
accessStr
.lowercase(Locale.getDefault())
.splitToSequence("|")
.map { WildcardStringPattern(it) }
.toList()
private val patterns: List<WildcardStringPattern> =
accessStr.lowercase(Locale.getDefault()).split("|").map { WildcardStringPattern(it) }

private val endTime = Instant.now().plus(duration.toJavaDuration())
private val endTimeMs = System.currentTimeMillis() + duration.inWholeMilliseconds

val isExpired
get() = Instant.now().isAfter(endTime)
get() = System.currentTimeMillis() > endTimeMs

fun matches(address: String): Boolean {
return patterns.any { it.match(address) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import java.net.InetSocketAddress
import java.nio.ByteOrder
import java.util.concurrent.ConcurrentHashMap
import javax.inject.Inject
import kotlin.concurrent.thread
import org.apache.commons.configuration.Configuration
import org.emulinker.config.RuntimeFlags
import org.emulinker.kaillera.access.AccessManager
Expand Down Expand Up @@ -56,6 +57,21 @@ constructor(
fun bind(port: Int) {
embeddedServer(Netty, port = port) {
val group = NioEventLoopGroup(flags.nettyFlags)

Runtime.getRuntime()
.addShutdownHook(
thread(start = false) {
logger.atInfo().log("Received SIGTERM, shutting down gracefully.")
try {
clientHandlers.values.forEach { handler ->
handler.user.server.quit(handler.user, "Server shutting down")
}
} finally {
nettyChannel.close()
}
}
)

try {
Bootstrap().apply {
group(group)
Expand Down Expand Up @@ -129,8 +145,7 @@ constructor(
}

val buf = nettyChannel.alloc().buffer(bufferSize)
RequestPrivateKailleraPortResponse(config.getInt("controllers.connect.port"))
.writeTo(buf)
RequestPrivateKailleraPortResponse(flags.serverPort).writeTo(buf)
ctx.writeAndFlush(DatagramPacket(buf, remoteSocketAddress))
}
else -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,25 @@ import dagger.assisted.AssistedInject
import io.netty.buffer.ByteBuf
import io.netty.channel.socket.DatagramPacket
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.ByteOrder
import org.emulinker.config.RuntimeFlags
import org.emulinker.kaillera.controller.CombinedKailleraController
import org.emulinker.kaillera.controller.messaging.MessageFormatException
import org.emulinker.kaillera.controller.messaging.ParseException
import org.emulinker.kaillera.controller.v086.action.FatalActionException
import org.emulinker.kaillera.controller.v086.action.GameDataAction
import org.emulinker.kaillera.controller.v086.action.V086Action
import org.emulinker.kaillera.controller.v086.action.V086GameEventHandler
import org.emulinker.kaillera.controller.v086.action.V086ServerEventHandler
import org.emulinker.kaillera.controller.v086.action.V086UserEventHandler
import org.emulinker.kaillera.controller.v086.protocol.GameData
import org.emulinker.kaillera.controller.v086.protocol.V086Bundle
import org.emulinker.kaillera.controller.v086.protocol.V086Bundle.Companion.parse
import org.emulinker.kaillera.controller.v086.protocol.V086BundleFormatException
import org.emulinker.kaillera.controller.v086.protocol.V086Message
import org.emulinker.kaillera.model.KailleraUser
import org.emulinker.kaillera.model.event.GameDataEvent
import org.emulinker.kaillera.model.event.GameEvent
import org.emulinker.kaillera.model.event.KailleraEvent
import org.emulinker.kaillera.model.event.KailleraEventListener
Expand All @@ -41,6 +45,7 @@ class V086ClientHandler
constructor(
metrics: MetricRegistry,
private val flags: RuntimeFlags,
private val gameDataAction: GameDataAction,
// TODO(nue): Try to replace this with remoteSocketAddress.
/** I think this is the address from when the user called the connect controller. */
@Assisted val connectRemoteSocketAddress: InetSocketAddress,
Expand Down Expand Up @@ -68,7 +73,12 @@ constructor(
)
return
}
clientRequestTimer.time().use { handleReceivedInternal(buf) }
if (flags.metricsEnabled) {
// TODO: Can we use measureNanoTime?
clientRequestTimer.time().use { handleReceivedInternal(buf) }
} else {
handleReceivedInternal(buf)
}
}

lateinit var user: KailleraUser
Expand Down Expand Up @@ -157,6 +167,7 @@ constructor(
private fun handleReceivedInternal(buffer: ByteBuf) {
val inBundle: V086Bundle =
if (CompiledFlags.USE_BYTEREADPACKET_INSTEAD_OF_BYTEBUFFER) {
// Note: This is currently DISABLED as it's unstable (see tests marked as @Ignore).
try {
parse(buffer, lastMessageNumber)
} catch (e: ParseException) {
Expand Down Expand Up @@ -190,7 +201,7 @@ constructor(
// TODO: datagram.packet.release()
}
} else {
val newBuffer = buffer.nioBuffer()
val newBuffer: ByteBuffer = buffer.nioBuffer()
try {
parse(newBuffer, lastMessageNumber)
} catch (e: ParseException) {
Expand Down Expand Up @@ -240,14 +251,23 @@ constructor(
val messages = inBundle.messages
// TODO(nue): Combine these two cases? This seems unnecessary.
if (inBundle.numMessages == 1) {
lastMessageNumber = messages[0]!!.messageNumber
val action = controller.actions[messages[0]!!.messageTypeId.toInt()]
val m: V086Message = messages[0]!!
lastMessageNumber = m.messageNumber
val messageTypeId = m.messageTypeId
val action: V086Action<out V086Message>? =
// Checking for GameData first is a speed optimization.
if (messageTypeId == GameData.ID) {
gameDataAction
} else {
controller.actions[m.messageTypeId.toInt()]
}
if (action == null) {
logger
.atSevere()
.log("No action defined to handle client message: %s", messages.firstOrNull())
return
}
(action as V086Action<V086Message>?)!!.performAction(messages[0]!!, this)
(action as V086Action<V086Message>).performAction(m, this)
} else {
// read the bundle from back to front to process the oldest messages first
for (i in inBundle.numMessages - 1 downTo 0) {
Expand All @@ -257,7 +277,8 @@ constructor(
* if (messages [i].getNumber() > lastMessageNumber)
*/
prevMessageNumber = lastMessageNumber
lastMessageNumber = messages[i]!!.messageNumber
val m: V086Message = messages[i]!!
lastMessageNumber = m.messageNumber
if (prevMessageNumber + 1 != lastMessageNumber) {
if (prevMessageNumber == 0xFFFF && lastMessageNumber == 0) {
// exception; do nothing
Expand All @@ -268,12 +289,19 @@ constructor(
user.droppedPacket()
}
}
val action = controller.actions[messages[i]!!.messageTypeId.toInt()]
val messageTypeId = m.messageTypeId
val action: V086Action<out V086Message>? =
// Checking for GameData first is a speed optimization.
if (messageTypeId == GameData.ID) {
gameDataAction
} else {
controller.actions[m.messageTypeId.toInt()]
}
if (action == null) {
logger.atSevere().log("No action defined to handle client message: %s", messages[i])
} else {
// logger.atFine().log(user + " -> " + message);
(action as V086Action<V086Message>).performAction(messages[i]!!, this)
(action as V086Action<V086Message>).performAction(m, this)
}
}
}
Expand All @@ -285,6 +313,10 @@ constructor(

override fun actionPerformed(event: KailleraEvent) {
when (event) {
// Check for GameDataEvent first to avoid map lookup slowness.
is GameDataEvent -> {
gameDataAction.handleEvent(event, this)
}
is GameEvent -> {
val eventHandler = controller.gameEventHandlers[event::class]
if (eventHandler == null) {
Expand Down Expand Up @@ -330,14 +362,14 @@ constructor(
var numToSend = 3 * timeoutCounter
if (numToSend > V086Controller.MAX_BUNDLE_SIZE) numToSend = V086Controller.MAX_BUNDLE_SIZE
logger.atFine().log("%s: resending last %d messages", this, numToSend)
send(null, numToSend)
resendFromCache(numToSend)
lastResend = System.currentTimeMillis()
} else {
logger.atFine().log("Skipping resend...")
}
}

fun send(outMessage: V086Message?, numToSend: Int = 5) {
private fun resendFromCache(numToSend: Int = 5) {
synchronized(sendMutex) {
var numToSend = numToSend

Expand All @@ -346,9 +378,24 @@ constructor(
.alloc()
.directBuffer(flags.v086BufferSize)
.order(ByteOrder.LITTLE_ENDIAN)
if (outMessage != null) {
lastMessageBuffer.add(outMessage)
}
numToSend = lastMessageBuffer.fill(outMessages, numToSend)
val outBundle = V086Bundle(outMessages, numToSend)
stripFromProdBinary { logger.atFinest().log("<- TO P%d: (RESEND)", user.id) }
outBundle.writeTo(buf)
combinedKailleraController.send(DatagramPacket(buf, remoteSocketAddress!!))
}
}

fun send(outMessage: V086Message, numToSend: Int = 5) {
synchronized(sendMutex) {
var numToSend = numToSend

val buf =
combinedKailleraController.nettyChannel
.alloc()
.directBuffer(flags.v086BufferSize)
.order(ByteOrder.LITTLE_ENDIAN)
lastMessageBuffer.add(outMessage)
numToSend = lastMessageBuffer.fill(outMessages, numToSend)
val outBundle = V086Bundle(outMessages, numToSend)
stripFromProdBinary { logger.atFinest().log("<- TO P%d: %s", user.id, outMessage) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ internal constructor(
) : KailleraServerController {
private var isRunning = false

override val clientTypes: Array<String> =
config.getStringArray("controllers.v086.clientTypes.clientType")
override val clientTypes: Array<String> = flags.clientTypes.toTypedArray()

var clientHandlers: MutableMap<Int, V086ClientHandler> = ConcurrentHashMap()

Expand Down
Loading

0 comments on commit 5b0b2c8

Please sign in to comment.