Skip to content

Commit

Permalink
feat: begin work on request handler
Browse files Browse the repository at this point in the history
Signed-off-by: Sphericalkat <amolele@gmail.com>
  • Loading branch information
SphericalKat committed May 1, 2023
1 parent 3857de5 commit f13f65a
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 2 deletions.
10 changes: 10 additions & 0 deletions atomic/src/main/kotlin/jp/co/goalist/AtomicBroker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import jp.co.goalist.transport.Transport
import jp.co.goalist.utils.getSystemName
import org.slf4j.LoggerFactory
import java.util.UUID
import kotlin.system.exitProcess

class AtomicBroker(
nodeID: String? = null,
Expand All @@ -16,6 +17,8 @@ class AtomicBroker(
var nodeID: String
private set

val instanceID: String

var transporter: String = transporter
private set

Expand All @@ -30,6 +33,7 @@ class AtomicBroker(

init {
this.nodeID = nodeID ?: "${getSystemName()}-${UUID.randomUUID()}"
this.instanceID = UUID.randomUUID().toString()
this.transport = Transport.resolveTransport(this)
this.serializer = Serializers.resolve(serializer, this)

Expand All @@ -44,6 +48,12 @@ class AtomicBroker(
logger.info("ServiceBroker with this.services.length service(s) started successfully.")
}

fun fatal(message: String, e: Exception? = null, needExit: Boolean = true) {
logger.error("Fatal error: $message $e")

if (needExit) exitProcess(1)
}

companion object {
const val projectName = "ATOMIC"
val logger = LoggerFactory.getLogger(AtomicBroker::class.java)
Expand Down
2 changes: 2 additions & 0 deletions atomic/src/main/kotlin/jp/co/goalist/Errors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ package jp.co.goalist

class Errors {
class BrokerOptionsException(message: String): Exception(message)

class AtomicServerError(message: String): Exception(message)
}
2 changes: 1 addition & 1 deletion atomic/src/main/kotlin/jp/co/goalist/Packets.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ enum class Packets(val string: String) {
data class Packet(
val type: Packets = Packets.PACKET_UNKNOWN,
val target: String? = null,
val payload: MutableMap<String, Any>
val payload: MutableMap<String, Any>?
)
55 changes: 54 additions & 1 deletion atomic/src/main/kotlin/jp/co/goalist/transport/Transport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import org.slf4j.LoggerFactory
abstract class Transport(protected val broker: AtomicBroker) {
protected var prefix: String = "ATOM"
protected var nodeID: String
protected val instanceID: String
var connected: Boolean

init {
this.connected = false
this.nodeID = this.broker.nodeID
this.instanceID = this.broker.instanceID
}

abstract fun connect()
Expand All @@ -30,13 +32,64 @@ abstract class Transport(protected val broker: AtomicBroker) {

protected fun receive(cmd: Packets, msg: ByteArray) {
try {
// deserialize here
val packet = this.deserialize(cmd, msg)
} catch (e: Exception) {
logger.warn("Invalid incoming packet. Type: ${cmd.string}", e)
logger.debug("Content: {}", msg)
}
}

protected fun messageHandler(cmd: Packets, packet: Packet): Boolean {
try {
val payload = packet.payload ?: throw Errors.AtomicServerError("Missing response payload.")

// TODO: check protocol version here

// packet came from our own node
if (payload["sender"] as String == this.nodeID) {
// detect nodeID conflicts
if (cmd == Packets.PACKET_INFO && payload["instanceID"] as String == this.instanceID) {
this.broker.fatal("ServiceBroker has detected a nodeID conflict. Use unique nodeIDs. ServiceBroker stopped.")
return false
}

// skip our own packets
if (cmd != Packets.PACKET_EVENT && cmd != Packets.PACKET_REQUEST && cmd != Packets.PACKET_RESPONSE) {
return false
}
}

// request
if (cmd == Packets.PACKET_REQUEST) {

}

return true
} catch (e: Exception) {
logger.error("cmd: $cmd packet: $packet", e)
return false
}
}

private fun requestHandler(payload: MutableMap<String, Any>) {
val requestID = payload["requestID"]?.let { "with requestID '$it'" } ?: ""
logger.debug("<= Request '{}' {} received from '{}' node.", payload["action"], requestID, payload["sender"])

try {
if (this.broker.stopping) {
logger.warn(
"Incoming '{}' {} received from '{}' node is dropped because broker is stopped.",
payload["action"],
requestID,
payload["sender"]
)

// throw new error here
throw Errors.AtomicServerError("${payload["action"]} ${this.nodeID}")
}
}
}

protected fun deserialize(type: Packets, buf: ByteArray): Packet? {
if (buf.isEmpty()) return null

Expand Down

0 comments on commit f13f65a

Please sign in to comment.