Skip to content

Commit

Permalink
support custom parser with 1. active mode (default), 2. passive mode
Browse files Browse the repository at this point in the history
  • Loading branch information
LoranceChen committed May 16, 2018
1 parent b2b12df commit 36a57e5
Show file tree
Hide file tree
Showing 23 changed files with 963 additions and 408 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ class JProtoServer(jProtos: Observable[JProtocol], routes: List[Router]) {
("type" -> "once") ~
("status" -> "error") ~
("load" -> e.getStackTrace.toString)
skt.send(finalJson).flatMap(_ => Continue)
skt.sendRaw(finalJson).flatMap(_ => Continue)
case Success(rst) =>
val finalJson =
("taskId" -> taskId) ~
("type" -> "once") ~
("status" -> "end") ~
("load" -> rst)
skt.send(finalJson).flatMap(_ => Continue)
skt.sendRaw(finalJson).flatMap(_ => Continue)
}
rst
case FurEndPoint(jValRstFur) =>
Expand All @@ -77,7 +77,7 @@ class JProtoServer(jProtos: Observable[JProtocol], routes: List[Router]) {
("type" -> "once") ~
("status" -> "end") ~
("load" -> jValRst)
p.tryCompleteWith(skt.send(finalJson))
p.tryCompleteWith(skt.sendRaw(finalJson))

})
jValRstFur.failed.map { error =>
Expand All @@ -88,7 +88,7 @@ class JProtoServer(jProtos: Observable[JProtocol], routes: List[Router]) {
("type" -> "once") ~
("status" -> "error") ~
("load" -> error.getStackTrace.mkString)
p.tryCompleteWith(skt.send(finalJson))
p.tryCompleteWith(skt.sendRaw(finalJson))
}

val rst: Future[Ack] = p.future.flatMap(_ => Continue)
Expand All @@ -99,33 +99,33 @@ class JProtoServer(jProtos: Observable[JProtocol], routes: List[Router]) {
event => {
val finalJson: JValue =
("taskId" -> taskId) ~
("type" -> "stream") ~
("status" -> "on") ~
("load" -> event)
("type" -> "stream") ~
("status" -> "on") ~
("load" -> event)

//this stream i
skt.send(finalJson).flatMap(_ => Continue)
skt.sendRaw(finalJson).flatMap(_ => Continue)
},
error => {
logger.error("StreamEndPoint failed:", error)
val finalJson: JValue =
("taskId" -> taskId) ~
("type" -> "stream") ~
("status" -> "error") ~
("load" -> error.getStackTrace.mkString)
("type" -> "stream") ~
("status" -> "error") ~
("load" -> error.getStackTrace.mkString)

val rst: Future[Ack] = skt.send(finalJson).flatMap(_ => Continue)
val rst: Future[Ack] = skt.sendRaw(finalJson).flatMap(_ => Continue)

promise.tryCompleteWith(Continue)
Unit
},
() => {
val finalJson: JValue =
("taskId" -> taskId) ~
("type" -> "stream") ~
("status" -> "end")
("type" -> "stream") ~
("status" -> "end")

skt.send(finalJson)
skt.sendRaw(finalJson)

promise.tryCompleteWith(Continue)
Unit
Expand Down
97 changes: 70 additions & 27 deletions src/main/scala/lorance/rxsocket/presentation/json/JProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import monix.execution.Scheduler.Implicits.global
/**
* create a JProtocol to dispatch all json relate info bind with socket and it's read stream
*/
class JProtocol(val connectedSocket: ConnectedSocket, read: Observable[CompletedProto]) {
class JProtocol(val connectedSocket: ConnectedSocket[CompletedProto], read: Observable[CompletedProto]) {
private val logger: Logger = LoggerFactory.getLogger(getClass)

private val tasks = new ConcurrentHashMap[String, PublishSubject[JValue]]()
Expand All @@ -52,6 +52,7 @@ class JProtocol(val connectedSocket: ConnectedSocket, read: Observable[Completed

/**
* inner json subscribe which parse json format as this:
* todo: the json format should be custom beside of the class as JsonParser
* 1. simple dispatch mode
* {
* taskId: ...
Expand Down Expand Up @@ -143,8 +144,11 @@ class JProtocol(val connectedSocket: ConnectedSocket, read: Observable[Completed
Continue
}


def send(any: Any) = {
/**
* make sure you have organized json format, recommand use `send` method below for
* auto assemble json.
*/
def sendRaw(any: Any): Future[Unit] = {
if(connectedSocket.isSocketClosed) {
Future.failed(SocketClosedException)
} else {
Expand All @@ -153,7 +157,7 @@ class JProtocol(val connectedSocket: ConnectedSocket, read: Observable[Completed
}
}

def send(jValue: JValue) = {
def sendRaw(jValue: JValue): Future[Unit] = {
if(connectedSocket.isSocketClosed) {
Future.failed(SocketClosedException)
} else {
Expand All @@ -162,6 +166,24 @@ class JProtocol(val connectedSocket: ConnectedSocket, read: Observable[Completed
}
}

def send(any: Any, taskId: String): Future[Unit] = {
if(connectedSocket.isSocketClosed) {
Future.failed(SocketClosedException)
} else {
val bytes = JsonParse.enCodeWithTaskId(any, taskId)
connectedSocket.send(ByteBuffer.wrap(bytes))
}
}

def send(jValue: JValue, taskId: String): Future[Unit] = {
if(connectedSocket.isSocketClosed) {
Future.failed(SocketClosedException)
} else {
val bytes = JsonParse.enCodeWithTaskId(jValue, taskId)
connectedSocket.send(ByteBuffer.wrap(bytes))
}
}

/**
* protocol:
* {
Expand All @@ -181,28 +203,42 @@ class JProtocol(val connectedSocket: ConnectedSocket, read: Observable[Completed
val taskId = Task.getId
this.addTask(taskId, register)

val resultFur = register.map { s => s.extract[Rsp]}
.timeoutOnSlowUpstream(Duration(presentation.JPROTO_TIMEOUT, TimeUnit.SECONDS))
.future

resultFur.onComplete({
case Failure(ex) =>
logger.error(s"[Throw] JProtocol.taskResult - $taskId", ex)
this.removeTask(taskId)
case Success(_) =>
this.removeTask(taskId)
})
val resultObv = register.map { s => s.extract[Rsp]}
// val resultFur = register.map { s => s.extract[Rsp]}
// .timeoutOnSlowUpstream(Duration(presentation.JPROTO_TIMEOUT, TimeUnit.SECONDS))
// .future
//
// resultFur.onComplete({
// case Failure(ex) =>
// logger.error(s"[Throw] JProtocol.taskResult - $taskId", ex)
// this.removeTask(taskId)
// case Success(_) =>
// this.removeTask(taskId)
// })

//send msg after prepare stream
val mergeTaskId: JObject =
("taskId" -> taskId) ~
("load" -> decompose(any))
("load" -> decompose(any))

val bytes = JsonParse.enCode(mergeTaskId)
val sendFur = connectedSocket.send(ByteBuffer.wrap(bytes))
// println(s"JProtocol.sendWithRsp resultFur - $any")
// resultFur
sendFur.flatMap(_ => resultFur)

sendFur.flatMap(_ => {
val resultFur = resultObv
.timeoutOnSlowUpstream(Duration(presentation.JPROTO_TIMEOUT, TimeUnit.SECONDS))
.future

resultFur.onComplete({
case Failure(ex) =>
logger.error(s"[Throw] JProtocol.taskResult - $taskId", ex)
this.removeTask(taskId)
case Success(_) =>
this.removeTask(taskId)
})

resultFur
})
}
}

Expand All @@ -225,22 +261,29 @@ class JProtocol(val connectedSocket: ConnectedSocket, read: Observable[Completed
}

val resultStream = additional.map(f => f(extract)).getOrElse(extract)
.timeoutOnSlowUpstream(Duration(presentation.JPROTO_TIMEOUT, TimeUnit.SECONDS))
.doOnError { e => logger.error(s"[Throw] JProtocol.taskResult - $any", e) }
.doOnComplete { () =>
this.removeTask(taskId)
}
.doOnEarlyStop(() => this.removeTask(taskId))


//send msg after prepare stream
val mergeTaskId =
("taskId" -> taskId) ~
("load" -> decompose(any))
("load" -> decompose(any))
val bytes = JsonParse.enCode(mergeTaskId)
val sendFur = connectedSocket.send(ByteBuffer.wrap(bytes))

Observable.fromFuture(sendFur).flatMap(_ =>
resultStream.share
resultStream
.timeoutOnSlowUpstream(Duration(presentation.JPROTO_TIMEOUT, TimeUnit.SECONDS))
.doOnError { e =>
logger.error(s"JProtocol.taskResult - $any", e)
this.removeTask(taskId)
}
.doOnComplete { () =>
this.removeTask(taskId)
}
.doOnEarlyStop(() => this.removeTask(taskId))
)
.share

}
}

Expand Down
11 changes: 11 additions & 0 deletions src/main/scala/lorance/rxsocket/presentation/json/JsonParse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package lorance.rxsocket.presentation.json
import java.nio.charset.StandardCharsets

import org.json4s.JValue
import org.json4s.JsonDSL._
import org.json4s._
import org.json4s.Extraction._
import org.json4s.native.JsonMethods._
Expand All @@ -27,6 +28,14 @@ object JsonParse {
session.enCode(1.toByte, jStr)
}

def enCodeWithTaskId(jValue: JValue, taskId: String): Array[Byte] = {
enCode(("taskId" -> taskId) ~ ("load" -> jValue))
}
def enCodeWithTaskId(obj: Any, taskId: String): Array[Byte] = {
enCode(("taskId" -> taskId) ~ ("load" -> decompose(obj)))
}


def deCode[A](jValue: JValue)(implicit mf: scala.reflect.Manifest[A]): A = {
jValue.extract[A]
}
Expand All @@ -38,4 +47,6 @@ object JsonParse {
def deCode[A](jsonArray: Array[Byte])(implicit mf: scala.reflect.Manifest[A]): A = {
deCode(parse(new String(jsonArray, StandardCharsets.UTF_8)))
}


}
Loading

0 comments on commit 36a57e5

Please sign in to comment.