diff --git a/src/main/scala/tuples/space/client/JsonTupleSpace.scala b/src/main/scala/tuples/space/client/JsonTupleSpace.scala new file mode 100644 index 0000000..7fcd39d --- /dev/null +++ b/src/main/scala/tuples/space/client/JsonTupleSpace.scala @@ -0,0 +1,310 @@ +/* + * Copyright (c) 2023 Matteo Castellucci + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package io.github.cakelier +package tuples.space.client + +import java.util.UUID + +import scala.collection.mutable +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.concurrent.duration.DurationInt +import scala.util.Failure +import scala.util.Success + +import akka.Done +import akka.NotUsed +import akka.actor.ActorSystem +import akka.actor.typed.ActorRef +import akka.http.scaladsl.Http +import akka.http.scaladsl.HttpExt +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.model.ws.* +import akka.stream.OverflowStrategy +import akka.stream.QueueCompletionResult +import akka.stream.QueueOfferResult +import akka.stream.RestartSettings +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.RestartFlow +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.SourceQueueWithComplete +import io.circe.parser.* +import io.circe.syntax.* + +import tuples.space.* +import tuples.space.request.* +import tuples.space.request.Serializers.given +import tuples.space.response.* +import tuples.space.response.Serializers.given +import AnyOps.* + +trait JsonTupleSpace { + + def out(t: JsonTuple): Future[Unit] + + def rd(tt: JsonTemplate): Future[JsonTuple] + + def in(tt: JsonTemplate): Future[JsonTuple] + + def no(tt: JsonTemplate): Future[Unit] + + def outAll(ts: JsonTuple*): Future[Unit] + + def rdAll(tt: JsonTemplate): Future[Seq[JsonTuple]] + + def inAll(tt: JsonTemplate): Future[Seq[JsonTuple]] + + def rdp(tt: JsonTemplate): Future[Option[JsonTuple]] + + def inp(tt: JsonTemplate): Future[Option[JsonTuple]] + + def nop(tt: JsonTemplate): Future[Boolean] + + def close(): Future[Unit] +} + +object JsonTupleSpace { + + @SuppressWarnings( + Array( + "org.wartremover.warts.MutableDataStructures", + "org.wartremover.warts.Null", + "org.wartremover.warts.Var", + "scalafix:DisableSyntax.var", + "scalafix:DisableSyntax.null" + ) + ) + private class JsonTupleSpaceImpl( + uri: String, + bufferSize: Int, + completionPromise: Promise[Unit] + )( + using + system: ActorSystem + ) extends JsonTupleSpace { + + import system.dispatcher + + private val tupleRequests: mutable.Buffer[(JsonTuple, Promise[Unit])] = mutable.Buffer.empty + private val templateRdRequests: mutable.Buffer[(JsonTemplate, Promise[JsonTuple])] = mutable.Buffer.empty + private val templateInRequests: mutable.Buffer[(JsonTemplate, Promise[JsonTuple])] = mutable.Buffer.empty + private val templateNoRequests: mutable.Buffer[(JsonTemplate, Promise[Unit])] = mutable.Buffer.empty + private val templateRdpRequests: mutable.Buffer[(JsonTemplate, Promise[Option[JsonTuple]])] = mutable.Buffer.empty + private val templateInpRequests: mutable.Buffer[(JsonTemplate, Promise[Option[JsonTuple]])] = mutable.Buffer.empty + private val templateNopRequests: mutable.Buffer[(JsonTemplate, Promise[Boolean])] = mutable.Buffer.empty + private val templateRdAllRequests: mutable.Buffer[(JsonTemplate, Promise[Seq[JsonTuple]])] = mutable.Buffer.empty + private val templateInAllRequests: mutable.Buffer[(JsonTemplate, Promise[Seq[JsonTuple]])] = mutable.Buffer.empty + private val tupleSeqRequests: mutable.Buffer[(Seq[JsonTuple], Promise[Unit])] = mutable.Buffer.empty + + private var clientId: Option[UUID] = None + private var connectionCompletion: Promise[Unit] = Promise() + private val httpClient: HttpExt = Http() + + private val restartSettings: RestartSettings = RestartSettings( + minBackoff = 1.second, + maxBackoff = 30.seconds, + randomFactor = 0.1 + ) + + private val (queue: SourceQueueWithComplete[Message], source: Source[Message, NotUsed]) = + Source.queue[Message](bufferSize, OverflowStrategy.dropHead).preMaterialize() + + private val flowCompletion: Future[Done] = + source + .via( + RestartFlow.withBackoff(restartSettings)(() => + httpClient + .webSocketClientFlow(WebSocketRequest(uri)) + .mapError(t => { + synchronized { + failAllRemaining() + connectionCompletion = Promise() + } + t + }) + ) + ) + .mapAsync(1) { + case m: TextMessage.Strict => Future.successful(m) + case m: TextMessage.Streamed => m.textStream.runFold("")(_ + _).map(TextMessage.apply) + case _ => Future.successful(null) + } + .flatMapConcat(t => + (for { + j <- parse(t.text) + m <- j.as[Response] + } yield m).map(Source.single[Response]).getOrElse(Source.empty[Response]) + ) + .runWith(Sink.foreach { + case r: ConnectionSuccessResponse => + synchronized { + clientId match { + case Some(id) => + queue.offer(TextMessage(MergeRequest(r.clientId, id).asJson.noSpaces)).foreach { + case QueueOfferResult.Enqueued => () + case QueueOfferResult.Dropped => + connectionCompletion.failure(IllegalStateException("The request was dropped from its queue.")) + case QueueOfferResult.Failure(e) => connectionCompletion.failure(e) + case QueueOfferResult.QueueClosed => + connectionCompletion.failure(IllegalStateException("The queue was completed.")) + } + case None => + clientId = Some(r.clientId) + connectionCompletion.success(()) + completionPromise.success(()) + } + } + case r: MergeSuccessResponse => synchronized { connectionCompletion.success(()) } + case r: TupleResponse => completeRequest(r.request, tupleRequests, ()) + case r: SeqTupleResponse => completeRequest(r.request, tupleSeqRequests, ()) + case r: TemplateTupleResponse => + r.tpe match { + case TemplateTupleResponseType.In => completeRequest(r.request, templateInRequests, r.content) + case TemplateTupleResponseType.Rd => completeRequest(r.request, templateRdRequests, r.content) + } + case r: TemplateMaybeTupleResponse => + r.tpe match { + case TemplateMaybeTupleResponseType.Inp => completeRequest(r.request, templateInpRequests, r.content) + case TemplateMaybeTupleResponseType.Rdp => completeRequest(r.request, templateRdpRequests, r.content) + } + case r: TemplateSeqTupleResponse => + r.tpe match { + case TemplateSeqTupleResponseType.InAll => completeRequest(r.request, templateInAllRequests, r.content) + case TemplateSeqTupleResponseType.RdAll => completeRequest(r.request, templateRdAllRequests, r.content) + } + case r: TemplateResponse => completeRequest(r.request, templateNoRequests, ()) + case r: TemplateBooleanResponse => completeRequest(r.request, templateNopRequests, r.content) + }) + flowCompletion.onComplete(_ => synchronized { failAllRemaining() }) + + private def failAllRemaining(): Unit = + Seq(tupleRequests, templateRdRequests, templateInRequests, templateNoRequests) + .flatten + .map(_._2) + .foreach(_.failure(IllegalStateException("The queue was completed."))) + + private def completeRequest[A, B]( + request: A, + pendingRequests: mutable.Buffer[(A, Promise[B])], + response: B + ): Unit = synchronized { + pendingRequests + .find((r, _) => r === request) + .foreach((r, p) => { + p.success(response) + pendingRequests -= r -> p + }) + } + + private def failWithThrowable[A, B]( + e: Throwable + )( + content: A, + requestPromise: Promise[B], + pendingRequests: mutable.Buffer[(A, Promise[B])] + ): Future[B] = { + synchronized { pendingRequests -= content -> requestPromise } + Future.failed[B](e) + } + + private def doRequest[A, B]( + content: A, + requestBuilder: A => Request, + pendingRequests: mutable.Buffer[(A, Promise[B])] + ): Future[B] = connectionCompletion.future.flatMap { _ => + val requestPromise = Promise[B]() + synchronized { pendingRequests += content -> requestPromise } + queue + .offer(TextMessage.Strict(requestBuilder(content).asJson.noSpaces)) + .flatMap { + case QueueOfferResult.Enqueued => requestPromise.future + case QueueOfferResult.Dropped => + failWithThrowable( + IllegalStateException("The request was dropped from its queue.") + )( + content, + requestPromise, + pendingRequests + ) + case QueueOfferResult.QueueClosed => + failWithThrowable( + IllegalStateException("The queue was completed.") + )( + content, + requestPromise, + pendingRequests + ) + case QueueOfferResult.Failure(e) => failWithThrowable(e)(content, requestPromise, pendingRequests) + } + .transform { + case Failure(_) => Failure[B](IllegalStateException("The queue was completed.")) + case s => s + } + } + + override def out(t: JsonTuple): Future[Unit] = doRequest(t, TupleRequest.apply, tupleRequests) + + override def rd(tt: JsonTemplate): Future[JsonTuple] = + doRequest(tt, TemplateRequest(_, TemplateRequestType.Rd), templateRdRequests) + + override def in(tt: JsonTemplate): Future[JsonTuple] = + doRequest(tt, TemplateRequest(_, TemplateRequestType.In), templateInRequests) + + override def no(tt: JsonTemplate): Future[Unit] = + doRequest(tt, TemplateRequest(_, TemplateRequestType.No), templateNoRequests) + + override def outAll(ts: JsonTuple*): Future[Unit] = + doRequest(ts, SeqTupleRequest.apply, tupleSeqRequests) + + override def rdAll(tt: JsonTemplate): Future[Seq[JsonTuple]] = + doRequest(tt, TemplateRequest(_, TemplateRequestType.RdAll), templateRdAllRequests) + + override def inAll(tt: JsonTemplate): Future[Seq[JsonTuple]] = + doRequest(tt, TemplateRequest(_, TemplateRequestType.InAll), templateInAllRequests) + + override def nop(tt: JsonTemplate): Future[Boolean] = + doRequest(tt, TemplateRequest(_, TemplateRequestType.Nop), templateNopRequests) + + override def inp(tt: JsonTemplate): Future[Option[JsonTuple]] = + doRequest(tt, TemplateRequest(_, TemplateRequestType.Inp), templateInpRequests) + + override def rdp(tt: JsonTemplate): Future[Option[JsonTuple]] = + doRequest(tt, TemplateRequest(_, TemplateRequestType.Rdp), templateRdpRequests) + + override def close(): Future[Unit] = { + queue.complete() + flowCompletion.map(_ => ()) + } + } + + @SuppressWarnings(Array("org.wartremover.warts.DefaultArguments", "scalafix:DisableSyntax.defaultArgs")) + def apply(uri: String, bufferSize: Int = 1)(using system: ActorSystem = ActorSystem()): Future[JsonTupleSpace] = { + import system.dispatcher + + val completionPromise: Promise[Unit] = Promise[Unit]() + val tupleSpace = JsonTupleSpaceImpl(uri, bufferSize, completionPromise) + completionPromise.future.map(_ => tupleSpace) + } +} diff --git a/src/main/scala/tuples/space/request/Request.scala b/src/main/scala/tuples/space/request/Request.scala new file mode 100644 index 0000000..3a61410 --- /dev/null +++ b/src/main/scala/tuples/space/request/Request.scala @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2023 Matteo Castellucci + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package io.github.cakelier +package tuples.space.request + +import java.util.UUID + +import io.circe.Decoder +import io.circe.DecodingFailure +import io.circe.Encoder +import io.circe.Json +import io.circe.syntax.* + +import tuples.space.* + +sealed trait Request + +object Request { + + sealed trait TupleRequest extends Request { + + val content: JsonTuple + } + + object TupleRequest { + + final private case class TupleRequestImpl(content: JsonTuple) extends TupleRequest + + def apply(content: JsonTuple): TupleRequest = TupleRequestImpl(content) + } + + sealed trait SeqTupleRequest extends Request { + + val content: Seq[JsonTuple] + } + + object SeqTupleRequest { + + final private case class SeqTupleRequestImpl(content: Seq[JsonTuple]) extends SeqTupleRequest + + def apply(content: Seq[JsonTuple]): SeqTupleRequest = SeqTupleRequestImpl(content) + } + + sealed trait TemplateRequest extends Request { + + val content: JsonTemplate + + val tpe: TemplateRequestType + } + + object TemplateRequest { + + final private case class TemplateRequestImpl(content: JsonTemplate, tpe: TemplateRequestType) extends TemplateRequest + + def apply(content: JsonTemplate, tpe: TemplateRequestType): TemplateRequest = TemplateRequestImpl(content, tpe) + } + + sealed trait MergeRequest extends Request { + + val clientId: UUID + + val oldClientId: UUID + } + + object MergeRequest { + + final private case class MergeRequestImpl(clientId: UUID, oldClientId: UUID) extends MergeRequest + + def apply(clientId: UUID, oldClientId: UUID): MergeRequest = MergeRequestImpl(clientId, oldClientId) + } +} + +export Request.* diff --git a/src/main/scala/tuples/space/request/Serializers.scala b/src/main/scala/tuples/space/request/Serializers.scala new file mode 100644 index 0000000..6f901b8 --- /dev/null +++ b/src/main/scala/tuples/space/request/Serializers.scala @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2023 Matteo Castellucci + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package io.github.cakelier +package tuples.space.request + +import io.circe.Decoder +import io.circe.DecodingFailure +import io.circe.Encoder +import io.circe.Json +import io.circe.syntax.* + +import tuples.space.* +import tuples.space.JsonSerializable.given +import tuples.space.request.Request.MergeRequest +import AnyOps.* + +object Serializers { + + given Encoder[TupleRequest] = r => + Json.obj( + "content" -> r.content.asJson, + "type" -> "out".asJson + ) + + given Decoder[TupleRequest] = c => + for { + content <- c.downField("content").as[JsonTuple] + - <- c + .downField("type") + .as[String] + .filterOrElse( + _ === "out", + DecodingFailure( + DecodingFailure.Reason.CustomReason("The value for the type field was not valid"), + c.downField("type") + ) + ) + } yield TupleRequest(content) + + given Encoder[SeqTupleRequest] = r => + Json.obj( + "content" -> r.content.asJson, + "type" -> "outAll".asJson + ) + + given Decoder[SeqTupleRequest] = c => + for { + content <- c.downField("content").as[Seq[JsonTuple]] + - <- c + .downField("type") + .as[String] + .filterOrElse( + _ === "outAll", + DecodingFailure( + DecodingFailure.Reason.CustomReason("The value for the type field was not valid"), + c.downField("type") + ) + ) + } yield SeqTupleRequest(content) + + given Encoder[TemplateRequest] = r => + Json.obj( + "content" -> r.content.asJson, + "type" -> (r.tpe match { + case TemplateRequestType.In => "in" + case TemplateRequestType.Rd => "rd" + case TemplateRequestType.No => "no" + case TemplateRequestType.Inp => "inp" + case TemplateRequestType.Rdp => "rdp" + case TemplateRequestType.Nop => "nop" + case TemplateRequestType.InAll => "inAll" + case TemplateRequestType.RdAll => "rdAll" + }).asJson + ) + + given Decoder[TemplateRequest] = c => + for { + content <- c.downField("content").as[JsonTemplate] + tpe <- c.downField("type").as[String].flatMap { + case "in" => Right[DecodingFailure, TemplateRequestType](TemplateRequestType.In) + case "rd" => Right[DecodingFailure, TemplateRequestType](TemplateRequestType.Rd) + case "no" => Right[DecodingFailure, TemplateRequestType](TemplateRequestType.No) + case "inp" => Right[DecodingFailure, TemplateRequestType](TemplateRequestType.Inp) + case "rdp" => Right[DecodingFailure, TemplateRequestType](TemplateRequestType.Rdp) + case "nop" => Right[DecodingFailure, TemplateRequestType](TemplateRequestType.Nop) + case "inAll" => Right[DecodingFailure, TemplateRequestType](TemplateRequestType.InAll) + case "rdAll" => Right[DecodingFailure, TemplateRequestType](TemplateRequestType.RdAll) + case _ => + Left[DecodingFailure, TemplateRequestType]( + DecodingFailure( + DecodingFailure.Reason.CustomReason("The value for the type field was not valid"), + c.downField("type") + ) + ) + } + } yield TemplateRequest(content, tpe) + + given Encoder[MergeRequest] = r => + Json.obj( + "clientId" -> r.clientId.asJson, + "oldClientId" -> r.oldClientId.asJson + ) + + given Decoder[MergeRequest] = Decoder.forProduct2("clientId", "oldClientId")(MergeRequest.apply) + + given Encoder[Request] = { + case r: TupleRequest => r.asJson + case r: TemplateRequest => r.asJson + case r: SeqTupleRequest => r.asJson + case r: MergeRequest => r.asJson + } + + given Decoder[Request] = r => + r.as[TupleRequest] + .orElse[DecodingFailure, Request](r.as[SeqTupleRequest]) + .orElse[DecodingFailure, Request](r.as[MergeRequest]) + .orElse[DecodingFailure, Request](r.as[TemplateRequest]) +} diff --git a/src/main/scala/tuples/space/request/TemplateRequestType.scala b/src/main/scala/tuples/space/request/TemplateRequestType.scala new file mode 100644 index 0000000..97cf270 --- /dev/null +++ b/src/main/scala/tuples/space/request/TemplateRequestType.scala @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2023 Matteo Castellucci + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package io.github.cakelier +package tuples.space.request + +enum TemplateRequestType { + + case In extends TemplateRequestType + + case Rd extends TemplateRequestType + + case No extends TemplateRequestType + + case Inp extends TemplateRequestType + + case Rdp extends TemplateRequestType + + case Nop extends TemplateRequestType + + case InAll extends TemplateRequestType + + case RdAll extends TemplateRequestType +} diff --git a/src/main/scala/tuples/space/response/Response.scala b/src/main/scala/tuples/space/response/Response.scala new file mode 100644 index 0000000..fab61f4 --- /dev/null +++ b/src/main/scala/tuples/space/response/Response.scala @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2023 Matteo Castellucci + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package io.github.cakelier +package tuples.space.response + +import java.util.UUID + +import tuples.space.* + +sealed trait Response + +object Response { + + sealed trait TupleResponse extends Response { + + val request: JsonTuple + } + + object TupleResponse { + + final private case class TupleResponseImpl(request: JsonTuple) extends TupleResponse + + def apply(request: JsonTuple): TupleResponse = TupleResponseImpl(request) + } + + sealed trait SeqTupleResponse extends Response { + + val request: Seq[JsonTuple] + } + + object SeqTupleResponse { + + final private case class SeqTupleResponseImpl(request: Seq[JsonTuple]) extends SeqTupleResponse + + def apply(request: Seq[JsonTuple]): SeqTupleResponse = SeqTupleResponseImpl(request) + } + + sealed trait TemplateTupleResponse extends Response { + + val request: JsonTemplate + + val tpe: TemplateTupleResponseType + + val content: JsonTuple + } + + object TemplateTupleResponse { + + final private case class TemplateTupleResponseImpl(request: JsonTemplate, tpe: TemplateTupleResponseType, content: JsonTuple) + extends TemplateTupleResponse + + def apply(request: JsonTemplate, tpe: TemplateTupleResponseType, content: JsonTuple): TemplateTupleResponse = + TemplateTupleResponseImpl(request, tpe, content) + } + + sealed trait TemplateMaybeTupleResponse extends Response { + + val request: JsonTemplate + + val tpe: TemplateMaybeTupleResponseType + + val content: Option[JsonTuple] + } + + object TemplateMaybeTupleResponse { + + final private case class TemplateMaybeTupleResponseImpl( + request: JsonTemplate, + tpe: TemplateMaybeTupleResponseType, + content: Option[JsonTuple] + ) extends TemplateMaybeTupleResponse + + def apply( + request: JsonTemplate, + tpe: TemplateMaybeTupleResponseType, + content: Option[JsonTuple] + ): TemplateMaybeTupleResponse = + TemplateMaybeTupleResponseImpl(request, tpe, content) + } + + sealed trait TemplateSeqTupleResponse extends Response { + + val request: JsonTemplate + + val tpe: TemplateSeqTupleResponseType + + val content: Seq[JsonTuple] + } + + object TemplateSeqTupleResponse { + + final private case class TemplateSeqTupleResponseImpl( + request: JsonTemplate, + tpe: TemplateSeqTupleResponseType, + content: Seq[JsonTuple] + ) extends TemplateSeqTupleResponse + + def apply(request: JsonTemplate, tpe: TemplateSeqTupleResponseType, content: Seq[JsonTuple]): TemplateSeqTupleResponse = + TemplateSeqTupleResponseImpl(request, tpe, content) + } + + sealed trait TemplateResponse extends Response { + + val request: JsonTemplate + } + + object TemplateResponse { + + final private case class TemplateResponseImpl(request: JsonTemplate) extends TemplateResponse + + def apply(request: JsonTemplate): TemplateResponse = TemplateResponseImpl(request) + } + + sealed trait TemplateBooleanResponse extends Response { + + val request: JsonTemplate + + val content: Boolean + } + + object TemplateBooleanResponse { + + final private case class TemplateBooleanResponseImpl(request: JsonTemplate, content: Boolean) extends TemplateBooleanResponse + + def apply(request: JsonTemplate, content: Boolean): TemplateBooleanResponse = TemplateBooleanResponseImpl(request, content) + } + + sealed trait ConnectionSuccessResponse extends Response { + + val clientId: UUID + } + + object ConnectionSuccessResponse { + + final private case class ConnectionSuccessResponseImpl(clientId: UUID) extends ConnectionSuccessResponse + + def apply(clientId: UUID): ConnectionSuccessResponse = ConnectionSuccessResponseImpl(clientId) + } + + sealed trait MergeSuccessResponse extends Response { + + val newClientId: UUID + + val oldClientId: UUID + } + + object MergeSuccessResponse { + + final private case class MergeSuccessResponseImpl(newClientId: UUID, oldClientId: UUID) extends MergeSuccessResponse + + def apply(newClientId: UUID, oldClientId: UUID): MergeSuccessResponse = MergeSuccessResponseImpl(newClientId, oldClientId) + } +} + +export Response.* diff --git a/src/main/scala/tuples/space/response/Serializers.scala b/src/main/scala/tuples/space/response/Serializers.scala new file mode 100644 index 0000000..c1864bc --- /dev/null +++ b/src/main/scala/tuples/space/response/Serializers.scala @@ -0,0 +1,248 @@ +/* + * Copyright (c) 2023 Matteo Castellucci + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package io.github.cakelier +package tuples.space.response + +import io.circe.Decoder +import io.circe.DecodingFailure +import io.circe.Encoder +import io.circe.Json +import io.circe.syntax.* + +import AnyOps.* +import tuples.space.* +import tuples.space.JsonSerializable.given + +object Serializers { + + given Encoder[TupleResponse] = r => + Json.obj( + "request" -> r.request.asJson, + "type" -> "out".asJson, + "content" -> ().asJson + ) + + given Decoder[TupleResponse] = c => + for { + request <- c.downField("request").as[JsonTuple] + _ <- c + .downField("type") + .as[String] + .filterOrElse( + _ === "out", + DecodingFailure( + DecodingFailure.Reason.CustomReason("The value for the type field was not valid"), + c.downField("type") + ) + ) + _ <- c.downField("content").as[Unit] + } yield TupleResponse(request) + + given Encoder[SeqTupleResponse] = r => + Json.obj( + "request" -> r.request.asJson, + "type" -> "outAll".asJson, + "content" -> ().asJson + ) + + given Decoder[SeqTupleResponse] = c => + for { + request <- c.downField("request").as[Seq[JsonTuple]] + _ <- c + .downField("type") + .as[String] + .filterOrElse( + _ === "outAll", + DecodingFailure( + DecodingFailure.Reason.CustomReason("The value for the type field was not valid"), + c.downField("type") + ) + ) + _ <- c.downField("content").as[Unit] + } yield SeqTupleResponse(request) + + given Encoder[TemplateTupleResponse] = r => + Json.obj( + "request" -> r.request.asJson, + "type" -> (r.tpe match { + case TemplateTupleResponseType.In => "in" + case TemplateTupleResponseType.Rd => "rd" + }).asJson, + "content" -> r.content.asJson + ) + + given Decoder[TemplateTupleResponse] = c => + for { + request <- c.downField("request").as[JsonTemplate] + tpe <- c.downField("type").as[String].flatMap { + case "in" => Right[DecodingFailure, TemplateTupleResponseType](TemplateTupleResponseType.In) + case "rd" => Right[DecodingFailure, TemplateTupleResponseType](TemplateTupleResponseType.Rd) + case _ => + Left[DecodingFailure, TemplateTupleResponseType]( + DecodingFailure( + DecodingFailure.Reason.CustomReason("The value for the type field was not valid"), + c.downField("type") + ) + ) + } + content <- c.downField("content").as[JsonTuple] + } yield TemplateTupleResponse(request, tpe, content) + + given Encoder[TemplateMaybeTupleResponse] = r => + Json.obj( + "request" -> r.request.asJson, + "type" -> (r.tpe match { + case TemplateMaybeTupleResponseType.Inp => "inp" + case TemplateMaybeTupleResponseType.Rdp => "rdp" + }).asJson, + "content" -> r.content.asJson + ) + + given Decoder[TemplateMaybeTupleResponse] = c => + for { + request <- c.downField("request").as[JsonTemplate] + tpe <- c.downField("type").as[String].flatMap { + case "inp" => Right[DecodingFailure, TemplateMaybeTupleResponseType](TemplateMaybeTupleResponseType.Inp) + case "rdp" => Right[DecodingFailure, TemplateMaybeTupleResponseType](TemplateMaybeTupleResponseType.Rdp) + case _ => + Left[DecodingFailure, TemplateMaybeTupleResponseType]( + DecodingFailure( + DecodingFailure.Reason.CustomReason("The value for the type field was not valid"), + c.downField("type") + ) + ) + } + content <- c.downField("content").as[Option[JsonTuple]] + } yield TemplateMaybeTupleResponse(request, tpe, content) + + given Encoder[TemplateSeqTupleResponse] = r => + Json.obj( + "request" -> r.request.asJson, + "type" -> (r.tpe match { + case TemplateSeqTupleResponseType.InAll => "inAll" + case TemplateSeqTupleResponseType.RdAll => "rdAll" + }).asJson, + "content" -> r.content.asJson + ) + + given Decoder[TemplateSeqTupleResponse] = c => + for { + request <- c.downField("request").as[JsonTemplate] + tpe <- c.downField("type").as[String].flatMap { + case "inAll" => Right[DecodingFailure, TemplateSeqTupleResponseType](TemplateSeqTupleResponseType.InAll) + case "rdAll" => Right[DecodingFailure, TemplateSeqTupleResponseType](TemplateSeqTupleResponseType.RdAll) + case _ => + Left[DecodingFailure, TemplateSeqTupleResponseType]( + DecodingFailure( + DecodingFailure.Reason.CustomReason("The value for the type field was not valid"), + c.downField("type") + ) + ) + } + content <- c.downField("content").as[Seq[JsonTuple]] + } yield TemplateSeqTupleResponse(request, tpe, content) + + given Encoder[TemplateResponse] = r => + Json.obj( + "request" -> r.request.asJson, + "type" -> "no".asJson, + "content" -> ().asJson + ) + + given Decoder[TemplateResponse] = c => + for { + request <- c.downField("request").as[JsonTemplate] + _ <- c + .downField("type") + .as[String] + .filterOrElse( + _ === "no", + DecodingFailure( + DecodingFailure.Reason.CustomReason("The value for the type field was not valid"), + c.downField("type") + ) + ) + _ <- c.downField("content").as[Unit] + } yield TemplateResponse(request) + + given Encoder[TemplateBooleanResponse] = r => + Json.obj( + "request" -> r.request.asJson, + "type" -> "nop".asJson, + "content" -> r.content.asJson + ) + + given Decoder[TemplateBooleanResponse] = c => + for { + request <- c.downField("request").as[JsonTemplate] + _ <- c + .downField("type") + .as[String] + .filterOrElse( + _ === "nop", + DecodingFailure( + DecodingFailure.Reason.CustomReason("The value for the type field was not valid"), + c.downField("type") + ) + ) + content <- c.downField("content").as[Boolean] + } yield TemplateBooleanResponse(request, content) + + given Encoder[ConnectionSuccessResponse] = r => + Json.obj( + "clientId" -> r.clientId.asJson + ) + + given Decoder[ConnectionSuccessResponse] = Decoder.forProduct1("clientId")(ConnectionSuccessResponse.apply) + + given Encoder[MergeSuccessResponse] = r => + Json.obj( + "newClientId" -> r.newClientId.asJson, + "oldClientId" -> r.oldClientId.asJson + ) + + given Decoder[MergeSuccessResponse] = + Decoder.forProduct2("newClientId", "oldClientId")(MergeSuccessResponse.apply) + + given Encoder[Response] = { + case r: TupleResponse => r.asJson + case r: SeqTupleResponse => r.asJson + case r: TemplateTupleResponse => r.asJson + case r: TemplateResponse => r.asJson + case r: TemplateBooleanResponse => r.asJson + case r: TemplateMaybeTupleResponse => r.asJson + case r: TemplateSeqTupleResponse => r.asJson + case r: ConnectionSuccessResponse => r.asJson + case r: MergeSuccessResponse => r.asJson + } + + given Decoder[Response] = r => + r.as[TupleResponse] + .orElse(r.as[SeqTupleResponse]) + .orElse(r.as[TemplateResponse]) + .orElse(r.as[TemplateBooleanResponse]) + .orElse(r.as[TemplateMaybeTupleResponse]) + .orElse(r.as[TemplateSeqTupleResponse]) + .orElse(r.as[TemplateTupleResponse]) + .orElse(r.as[ConnectionSuccessResponse]) + .orElse(r.as[MergeSuccessResponse]) +} diff --git a/src/main/scala/tuples/space/response/TemplateMaybeTupleResponseType.scala b/src/main/scala/tuples/space/response/TemplateMaybeTupleResponseType.scala new file mode 100644 index 0000000..1c4ad6b --- /dev/null +++ b/src/main/scala/tuples/space/response/TemplateMaybeTupleResponseType.scala @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2023 Matteo Castellucci + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package io.github.cakelier +package tuples.space.response + +enum TemplateMaybeTupleResponseType { + + case Inp extends TemplateMaybeTupleResponseType + + case Rdp extends TemplateMaybeTupleResponseType +} diff --git a/src/main/scala/tuples/space/response/TemplateSeqTupleResponseType.scala b/src/main/scala/tuples/space/response/TemplateSeqTupleResponseType.scala new file mode 100644 index 0000000..e74dbd6 --- /dev/null +++ b/src/main/scala/tuples/space/response/TemplateSeqTupleResponseType.scala @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2023 Matteo Castellucci + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package io.github.cakelier +package tuples.space.response + +enum TemplateSeqTupleResponseType { + case InAll extends TemplateSeqTupleResponseType + case RdAll extends TemplateSeqTupleResponseType +} diff --git a/src/main/scala/tuples/space/response/TemplateTupleResponseType.scala b/src/main/scala/tuples/space/response/TemplateTupleResponseType.scala new file mode 100644 index 0000000..2c5e229 --- /dev/null +++ b/src/main/scala/tuples/space/response/TemplateTupleResponseType.scala @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2023 Matteo Castellucci + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package io.github.cakelier +package tuples.space.response + +enum TemplateTupleResponseType { + + case In extends TemplateTupleResponseType + + case Rd extends TemplateTupleResponseType +} diff --git a/src/test/scala/tuples/space/client/JsonTupleSpaceTest.scala b/src/test/scala/tuples/space/client/JsonTupleSpaceTest.scala new file mode 100644 index 0000000..75ddead --- /dev/null +++ b/src/test/scala/tuples/space/client/JsonTupleSpaceTest.scala @@ -0,0 +1,415 @@ +/* + * Copyright (c) 2023 Matteo Castellucci + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package io.github.cakelier +package tuples.space.client + +import java.util.UUID +import java.util.concurrent.TimeoutException + +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt + +import akka.actor.ActorSystem +import akka.actor.typed.ActorRef +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.ws.Message +import akka.http.scaladsl.model.ws.TextMessage +import akka.http.scaladsl.server.Directives.* +import akka.stream.OverflowStrategy +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.typed.scaladsl.ActorSource +import io.circe.parser.* +import io.circe.syntax.* +import org.scalatest.BeforeAndAfterAll +import org.scalatest.OptionValues.* +import org.scalatest.TryValues.* +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers.* +import org.scalatest.time.Seconds +import org.scalatest.time.Span + +import AnyOps.* +import tuples.space.* +import tuples.space.JsonTuple.JsonNil +import tuples.space.request.* +import tuples.space.request.Request.MergeRequest +import tuples.space.request.Serializers.given +import tuples.space.response.* +import tuples.space.response.Response.{ConnectionSuccessResponse, MergeSuccessResponse} +import tuples.space.response.Serializers.given + +@SuppressWarnings(Array("org.wartremover.warts.Throw", "org.wartremover.warts.Var")) +class JsonTupleSpaceTest extends AnyFunSpec with BeforeAndAfterAll with Eventually { + + private var server: Option[Http.ServerBinding] = None + private val tuple: JsonTuple = 1 #: "Example" #: JsonNil + private val poisonPill: JsonTuple = "BOOM!" #: JsonNil + private val template: JsonTemplate = tuples.space.complete(int gte 1, string matches "[A-Z]".r) + private val foreverPendingTemplate: JsonTemplate = tuples.space.complete(nil) + private val nonMatchingTemplate: JsonTemplate = tuples.space.partial(double) + private var useFirstUUID: Boolean = true + private val firstUUID: UUID = UUID.randomUUID() + private val secondUUID: UUID = UUID.randomUUID() + private var actor: Option[ActorRef[Message]] = None + + override def beforeAll(): Unit = { + given ActorSystem = ActorSystem() + server = Some( + Await.result( + Http() + .newServerAt("localhost", 8080) + .bind(path("") { + handleWebSocketMessages { + val (actorRef, actorSource) = + ActorSource + .actorRef[Message](PartialFunction.empty, PartialFunction.empty, 100, OverflowStrategy.dropHead) + .preMaterialize() + actor = Some(actorRef) + actorRef ! TextMessage(ConnectionSuccessResponse(if (useFirstUUID) firstUUID else secondUUID).asJson.noSpaces) + + Flow[Message] + .mapConcat { + case t: TextMessage.Strict => + (for { + j <- parse(t.text) + m <- j.as[Request] + r = m match { + case t: TupleRequest if t.content === poisonPill => throw new RuntimeException() + case t: TupleRequest => TextMessage(TupleResponse(t.content).asJson.noSpaces) :: Nil + case t: TemplateRequest if t.content === foreverPendingTemplate => Nil + case t: TemplateRequest => + t.tpe match { + case TemplateRequestType.In => + TextMessage( + TemplateTupleResponse(t.content, TemplateTupleResponseType.In, tuple).asJson.noSpaces + ) :: Nil + case TemplateRequestType.Rd => + TextMessage( + TemplateTupleResponse(t.content, TemplateTupleResponseType.Rd, tuple).asJson.noSpaces + ) :: Nil + case TemplateRequestType.No => TextMessage(TemplateResponse(t.content).asJson.noSpaces) :: Nil + case TemplateRequestType.Inp if t.content === template => + TextMessage( + TemplateMaybeTupleResponse(t.content, TemplateMaybeTupleResponseType.Inp, Some(tuple)) + .asJson + .noSpaces + ) :: Nil + case TemplateRequestType.Inp => + TextMessage( + TemplateMaybeTupleResponse(t.content, TemplateMaybeTupleResponseType.Inp, None).asJson.noSpaces + ) :: Nil + case TemplateRequestType.Rdp if t.content === template => + TextMessage( + TemplateMaybeTupleResponse(t.content, TemplateMaybeTupleResponseType.Rdp, Some(tuple)) + .asJson + .noSpaces + ) :: Nil + case TemplateRequestType.Rdp => + TextMessage( + TemplateMaybeTupleResponse(t.content, TemplateMaybeTupleResponseType.Rdp, None).asJson.noSpaces + ) :: Nil + case TemplateRequestType.Nop if t.content === template => + TextMessage(TemplateBooleanResponse(t.content, true).asJson.noSpaces) :: Nil + case TemplateRequestType.Nop => + TextMessage(TemplateBooleanResponse(t.content, false).asJson.noSpaces) :: Nil + case TemplateRequestType.InAll if t.content === template => + TextMessage( + TemplateSeqTupleResponse(t.content, TemplateSeqTupleResponseType.InAll, Seq(tuple)) + .asJson + .noSpaces + ) :: Nil + case TemplateRequestType.InAll => + TextMessage( + TemplateSeqTupleResponse(t.content, TemplateSeqTupleResponseType.InAll, Seq.empty).asJson.noSpaces + ) :: Nil + case TemplateRequestType.RdAll if t.content === template => + TextMessage( + TemplateSeqTupleResponse(t.content, TemplateSeqTupleResponseType.RdAll, Seq(tuple)) + .asJson + .noSpaces + ) :: Nil + case TemplateRequestType.RdAll => + TextMessage( + TemplateSeqTupleResponse(t.content, TemplateSeqTupleResponseType.RdAll, Seq.empty).asJson.noSpaces + ) :: Nil + } + case t: SeqTupleRequest => TextMessage(SeqTupleResponse(t.content).asJson.noSpaces) :: Nil + case t: MergeRequest => + t.clientId shouldBe secondUUID + t.oldClientId shouldBe firstUUID + TextMessage(MergeSuccessResponse(t.clientId, t.oldClientId).asJson.noSpaces) :: Nil + } + } yield r).toOption.getOrElse(Nil) + case m => Nil + } + .via(Flow.fromSinkAndSourceCoupled(Sink.foreach(m => actor.foreach(_ ! m)), actorSource)) + } + }), + Integer.MAX_VALUE.seconds + ) + ) + } + + override def afterAll(): Unit = Await.result(server.getOrElse(fail()).unbind(), Integer.MAX_VALUE.seconds) + + describe("A tuple space") { + describe("when the server is active") { + it("should correctly connect to it") { + val client: Future[JsonTupleSpace] = JsonTupleSpace("ws://localhost:8080/") + val tupleSpace: JsonTupleSpace = eventually(Timeout(Span(10, Seconds))) { + client.value.value.success.value + } + Await.result(tupleSpace.close(), 10.seconds) + } + } + + describe("when the server is inactive") { + it("should not connect to it") { + val client: Future[JsonTupleSpace] = JsonTupleSpace("ws://localhost:8081/") + a[TimeoutException] should be thrownBy Await.ready(client, 10.seconds) + } + } + + describe("when the server is active but the path is wrong") { + it("should not connect to it") { + val client: Future[JsonTupleSpace] = JsonTupleSpace("ws://localhost:8080/hello") + a[TimeoutException] should be thrownBy Await.ready(client, 10.seconds) + } + } + + describe("when an out operation is issued") { + it("should complete with success") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[Unit] = client.out(tuple) + eventually(Timeout(Span(10, Seconds))) { + result.value.value.success.value + } shouldBe () + Await.result(client.close(), 10.seconds) + } + } + + describe("when an in operation is issued") { + it("should complete with success") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[JsonTuple] = client.in(template) + eventually(Timeout(Span(10, Seconds))) { + result.value.value.success.value + } shouldBe tuple + Await.result(client.close(), 10.seconds) + } + } + + describe("when a rd operation is issued") { + it("should complete with success") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[JsonTuple] = client.rd(template) + eventually(Timeout(Span(10, Seconds))) { + result.value.value.success.value + } shouldBe tuple + Await.result(client.close(), 10.seconds) + } + } + + describe("when a no operation is issued") { + it("should complete with success") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[Unit] = client.no(template) + eventually(Timeout(Span(10, Seconds))) { + result.value.value.success.value + } shouldBe () + Await.result(client.close(), 10.seconds) + } + } + + describe("when an operation is issued after closing the client") { + it("should complete with failure") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + Await.result(client.close(), 10.seconds) + val result: Future[Unit] = client.no(template) + val ex: Throwable = eventually(Timeout(Span(10, Seconds))) { + result.value.value.failure.exception + } + ex shouldBe a[IllegalStateException] + ex should have message "The queue was completed." + } + } + + describe("when is closed twice") { + it("should do nothing") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val closeResult1 = client.close() + eventually(Timeout(Span(10, Seconds))) { + closeResult1.value.value.success.value + } + val closeResult2 = client.close() + eventually(Timeout(Span(10, Seconds))) { + closeResult2.value.value.success.value + } + } + } + + describe("when an operation is issued and before completing it the client is closed") { + it("should complete with failure") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[JsonTuple] = client.in(foreverPendingTemplate) + Await.result(client.close(), 10.seconds) + val ex: Throwable = eventually(Timeout(Span(10, Seconds))) { + result.value.value.failure.exception + } + ex shouldBe a[IllegalStateException] + ex should have message "The queue was completed." + } + } + + describe("when an inp operation with a template matching an existing tuple is issued") { + it("should complete with success") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[Option[JsonTuple]] = client.inp(template) + eventually(Timeout(Span(10, Seconds))) { + result.value.value.success.value + } shouldBe Some(tuple) + Await.result(client.close(), 10.seconds) + } + } + + describe("when an inp operation with a template matching an non-existing tuple is issued") { + it("should complete with success") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[Option[JsonTuple]] = client.inp(nonMatchingTemplate) + eventually(Timeout(Span(10, Seconds))) { + result.value.value.success.value + } shouldBe None + Await.result(client.close(), 10.seconds) + } + } + + describe("when a rdp operation with a template matching an existing tuple is issued") { + it("should complete with success") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[Option[JsonTuple]] = client.rdp(template) + eventually(Timeout(Span(10, Seconds))) { + result.value.value.success.value + } shouldBe Some(tuple) + Await.result(client.close(), 10.seconds) + } + } + + describe("when a rdp operation with a template matching an non-existing tuple is issued") { + it("should complete with success") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[Option[JsonTuple]] = client.rdp(nonMatchingTemplate) + eventually(Timeout(Span(10, Seconds))) { + result.value.value.success.value + } shouldBe None + Await.result(client.close(), 10.seconds) + } + } + + describe("when a nop operation with a template matching an existing tuple is issued") { + it("should complete with success") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[Boolean] = client.nop(template) + eventually(Timeout(Span(10, Seconds))) { + result.value.value.success.value + } shouldBe true + Await.result(client.close(), 10.seconds) + } + } + + describe("when a nop operation with a template matching an non-existing tuple is issued") { + it("should complete with success") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[Boolean] = client.nop(nonMatchingTemplate) + eventually(Timeout(Span(10, Seconds))) { + result.value.value.success.value + } shouldBe false + Await.result(client.close(), 10.seconds) + } + } + + describe("when an inAll operation with a template matching an existing tuple is issued") { + it("should complete with success") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[Seq[JsonTuple]] = client.inAll(template) + eventually(Timeout(Span(10, Seconds))) { + result.value.value.success.value + } shouldBe Seq(tuple) + Await.result(client.close(), 10.seconds) + } + } + + describe("when an inAll operation with a template matching an non-existing tuple is issued") { + it("should complete with success") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[Seq[JsonTuple]] = client.inAll(nonMatchingTemplate) + eventually(Timeout(Span(10, Seconds))) { + result.value.value.success.value + } shouldBe Seq.empty[JsonTuple] + Await.result(client.close(), 10.seconds) + } + } + + describe("when a rdAll operation with a template matching an existing tuple is issued") { + it("should complete with success") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[Seq[JsonTuple]] = client.rdAll(template) + eventually(Timeout(Span(10, Seconds))) { + result.value.value.success.value + } shouldBe Seq(tuple) + Await.result(client.close(), 10.seconds) + } + } + + describe("when a rdAll operation with a template matching an non-existing tuple is issued") { + it("should complete with success") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val result: Future[Seq[JsonTuple]] = client.rdAll(nonMatchingTemplate) + eventually(Timeout(Span(10, Seconds))) { + result.value.value.success.value + } shouldBe Seq.empty[JsonTuple] + Await.result(client.close(), 10.seconds) + } + } + + describe("when a disconnection occurs") { + it("should reconnect with success, asking for its old client id") { + val client: JsonTupleSpace = Await.result(JsonTupleSpace("ws://localhost:8080/"), Integer.MAX_VALUE.seconds) + val outResult = client.out(tuple) + eventually(Timeout(Span(10, Seconds))) { + outResult.value.value.success.value + } shouldBe () + useFirstUUID = false + a[IllegalStateException] should be thrownBy Await.result(client.out(poisonPill), Integer.MAX_VALUE.seconds) + val inResult = client.in(template) + eventually(Timeout(Span(10, Seconds))) { + inResult.value.value.success.value + } shouldBe tuple + Await.result(client.close(), 10.seconds) + } + } + } +}