Skip to content

Commit

Permalink
Add brokerStarted and brokerStopped callbacks in MQTT Broker
Browse files Browse the repository at this point in the history
  • Loading branch information
davidepianca98 committed Jun 13, 2024
1 parent 8e7fd8f commit 225f2a0
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 1 deletion.
28 changes: 27 additions & 1 deletion kmqtt-broker/src/commonMain/kotlin/mqtt/broker/Broker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public class Broker(
public val persistence: Persistence? = null,
public val cluster: ClusterSettings? = null,
public val enableUdp: Boolean = false,
public val webSocketPort: Int? = null
public val webSocketPort: Int? = null,
private val miscCallbacks: MiscCallbacks? = null
) {

private val server = ServerSocketLoop(this)
Expand All @@ -55,6 +56,9 @@ public class Broker(

internal val lock = reentrantLock()

private var startCallbackCalled = false
private var stopCallbackCalled = false

init {
if (enableUdp && maximumPacketSize > 65535u) {
throw IllegalArgumentException("When UDP is enabled the maximum packet size can't be bigger than the datagram maximum size")
Expand All @@ -65,14 +69,32 @@ public class Broker(
* Starts the broker (blocking run)
*/
public fun listen() {
if (!startCallbackCalled) {
miscCallbacks?.brokerStarted()
startCallbackCalled = true
}
server.run()
if (!stopCallbackCalled) {
miscCallbacks?.brokerStopped()
stopCallbackCalled = true
}
}

/**
* Run a single iteration of the broker (non blocking run)
*/
public fun step() {
if (!startCallbackCalled) {
miscCallbacks?.brokerStarted()
startCallbackCalled = true
}
server.step()
if (!server.isRunning()) {
if (!stopCallbackCalled) {
miscCallbacks?.brokerStopped()
stopCallbackCalled = true
}
}
}

internal fun sendWill(session: Session?) {
Expand Down Expand Up @@ -414,6 +436,10 @@ public class Broker(
}
server.stop()
}
if (!stopCallbackCalled) {
miscCallbacks?.brokerStopped()
stopCallbackCalled = true
}
}

internal fun addClusterConnection(address: String) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package mqtt.broker.interfaces

public interface MiscCallbacks {

/**
* Called when the broker has started listening
*/
public fun brokerStarted()

/**
* Called when the broker has stopped listening
*/
public fun brokerStopped()
}
4 changes: 4 additions & 0 deletions kmqtt-broker/src/commonMain/kotlin/socket/ServerSocketLoop.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ internal class ServerSocketLoop(private val broker: Broker) {
}
}

fun isRunning(): Boolean {
return serverSocket.isRunning()
}

private fun selectCallback(attachment: Any?, state: SocketState): Boolean {
return broker.lock.withLock {
when (attachment) {
Expand Down

0 comments on commit 225f2a0

Please sign in to comment.