From 000ca914beee95dd728c47c8e0bfe10deb977574 Mon Sep 17 00:00:00 2001 From: Barbara Gomes Date: Fri, 22 Jul 2016 11:53:26 -0700 Subject: [PATCH 01/10] [IOTA-28] Preparing for Receiver actor - Creating separate object for fey_core ActorRef - Updating ORCHESTRATION_RECEIVED to accept Option[File] --- .../org/apache/iota/fey/Application.scala | 10 ++++- .../scala/org/apache/iota/fey/FeyCore.scala | 42 ++++++++++++++----- .../org/apache/iota/fey/FeyGenericActor.scala | 1 + .../apache/iota/fey/JsonReceiverActor.scala | 9 ++-- .../scala/org/apache/iota/fey/MyService.scala | 2 +- .../org/apache/iota/fey/FeyCoreSpec.scala | 12 +++--- 6 files changed, 54 insertions(+), 22 deletions(-) diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala index eb08d63..637fbf0 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala @@ -42,8 +42,9 @@ object SYSTEM_ACTORS{ import FEY_SYSTEM._ - val fey = system.actorOf(FeyCore.props, name = "FEY-CORE") - fey ! FeyCore.START + FEY_CORE_ACTOR + + FEY_CORE_ACTOR.actorRef ! FeyCore.START val service = system.actorOf(Props[MyServiceActor], name = "FEY_REST_API") @@ -52,6 +53,11 @@ object SYSTEM_ACTORS{ } +object FEY_CORE_ACTOR{ + import FEY_SYSTEM._ + val actorRef = system.actorOf(FeyCore.props, name = "FEY-CORE") +} + object FEY_MONITOR{ import FEY_SYSTEM._ diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala index 8028f05..4909a0b 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala @@ -52,17 +52,15 @@ protected class FeyCore extends Actor with ActorLogging{ val jsonReceiverActor: ActorRef = context.actorOf(Props[JsonReceiverActor], name = JSON_RECEIVER_NAME) context.watch(jsonReceiverActor) - case ORCHESTRATION_RECEIVED(orchestrationJson, file) => - log.info(s"NEW FILE ${file.getAbsolutePath}") - try{ - processJson(orchestrationJson) - renameProcessedFile(file, "processed") - }catch { - case e: Exception => - renameProcessedFile(file, "failed") - log.error(e, s"JSON not processed ${file.getAbsolutePath}") + case ORCHESTRATION_RECEIVED(orchestrationJson, optionFile) => + optionFile match { + case Some(file) => + orchestrationReceivedWithFile(orchestrationJson, file) + case None => + orchestrationReceivedNoFile(orchestrationJson) } + case STOP_EMPTY_ORCHESTRATION(orchID) => log.warning(s"Deleting Empty Orchestration $orchID") deleteOrchestration(orchID) @@ -76,6 +74,29 @@ protected class FeyCore extends Actor with ActorLogging{ } + private def orchestrationReceivedNoFile(json: JsValue) = { + val orchGUID = (json \ GUID).as[String] + log.info(s"Orchestration $orchGUID received") + try{ + processJson(json) + }catch { + case e: Exception => + log.error(e, s"JSON for orchestration $orchGUID could not be processed") + } + } + + private def orchestrationReceivedWithFile(json: JsValue, file: File) = { + log.info(s"NEW FILE ${file.getAbsolutePath}") + try{ + processJson(json) + renameProcessedFile(file, "processed") + }catch { + case e: Exception => + renameProcessedFile(file, "failed") + log.error(e, s"JSON not processed ${file.getAbsolutePath}") + } + } + private def processTerminatedMessage(actorRef: ActorRef) = { monitoring_actor ! Monitor.TERMINATE(actorRef.path.toString, Utils.getTimestamp) log.info(s"TERMINATED ${actorRef.path.name}") @@ -306,7 +327,8 @@ protected object FeyCore{ * @param json * @param file */ - case class ORCHESTRATION_RECEIVED(json: JsValue, file: File) + case class ORCHESTRATION_RECEIVED(json: JsValue, file: Option[File]) + case class STOP_EMPTY_ORCHESTRATION(orchID: String) diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala index 9a6fb78..36f39fa 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala @@ -1,3 +1,4 @@ + /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with diff --git a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala index 1664478..3fb5330 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala @@ -29,13 +29,16 @@ class JsonReceiverActor extends Actor with ActorLogging { import JsonReceiverActor._ val monitoring_actor = FEY_MONITOR.actorRef - val watchFileTask = new WatchServiceReceiver(self) - var watchThread = new Thread(watchFileTask, GLOBAL_DEFINITIONS.WATCH_SERVICE_THREAD) + var watchFileTask: WatchServiceReceiver = _ + var watchThread: Thread = _ override def preStart() { prepareDynamicJarRepo() processCheckpointFiles() + watchFileTask = new WatchServiceReceiver(self) + watchThread = new Thread(watchFileTask, GLOBAL_DEFINITIONS.WATCH_SERVICE_THREAD) + monitoring_actor ! Monitor.START(Utils.getTimestamp) watchThread.setDaemon(true) watchThread.start() @@ -74,7 +77,7 @@ class JsonReceiverActor extends Actor with ActorLogging { override def receive: Receive = { case JSON_RECEIVED(json, file) => log.info(s"JSON RECEIVED => ${Json.stringify(json)}") - context.parent ! FeyCore.ORCHESTRATION_RECEIVED(json, file) + context.parent ! FeyCore.ORCHESTRATION_RECEIVED(json, Some(file)) case _ => } diff --git a/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala b/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala index 41a6982..0935c5b 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala @@ -49,7 +49,7 @@ sealed trait MyService extends HttpService { get{ respondWithMediaType(`text/html`) { complete { - SYSTEM_ACTORS.fey ! JSON_TREE + FEY_CORE_ACTOR.actorRef ! JSON_TREE Thread.sleep(2000) val json = IdentifyFeyActors.generateTreeJson() IdentifyFeyActors.getHTMLTree(json) diff --git a/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala index b5a2ca9..c616821 100644 --- a/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala +++ b/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala @@ -62,7 +62,7 @@ class FeyCoreSpec extends BaseAkkaSpec { val orchestration_name = "TEST-ACTOR" "Sending FeyCore.ORCHESTRATION_RECEIVED with CREATE command to FeyCore" should { - feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.create_json_test), new File("/tmp/fey/test/json")) + feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.create_json_test), None) s"result in creating an Orchestration child actor with the name '$orchestration_name'" in { orchestrationref = TestProbe().expectActor(s"$feyPath/$orchestration_name") } @@ -85,14 +85,14 @@ class FeyCoreSpec extends BaseAkkaSpec { "Sending FeyCore.ORCHESTRATION_RECEIVED with UPDATE command to FeyCore" should { s"result in creating a new Performer child actor with the name '$orchestration_name/MY-ENSEMBLE-0001/TEST-0002'" in { - feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.update_json_test), new File("/tmp/fey/test/json")) + feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.update_json_test), None) ensemble1Test2ref = TestProbe().expectActor(s"$feyPath/$orchestration_name/MY-ENSEMBLE-0001/TEST-0002") } } "Sending FeyCore.ORCHESTRATION_RECEIVED with UPDATE command and DELETE ensemble to FeyCore" should { s"result in termination of Ensemble with the name '$orchestration_name/MY-ENSEMBLE-0001'" in { - feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.update_delete_json_test), new File("/tmp/fey/test/json")) + feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.update_delete_json_test), None) TestProbe().verifyActorTermination(ensemble1ref) } s"result in termination of Performer with the name '$orchestration_name/MY-ENSEMBLE-0001/TEST-0001'" in { @@ -106,7 +106,7 @@ class FeyCoreSpec extends BaseAkkaSpec { "Sending FeyCore.ORCHESTRATION_RECEIVED with RECREATE command and same Timestamp to FeyCore" should { s"result in logging a 'not recreated' message at Warn " in { EventFilter.warning(pattern = s".*$orchestration_name not recreated.*", occurrences = 1) intercept { - feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.recreate_timestamp_json_test), new File("/tmp/fey/test/json")) + feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.recreate_timestamp_json_test), None) } } } @@ -121,7 +121,7 @@ class FeyCoreSpec extends BaseAkkaSpec { "Sending FeyCore.ORCHESTRATION_RECEIVED with DELETE command to FeyCore" should { s"result in termination of Orchestration with the name '$orchestration_name'" in { - feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.delete_json_test), new File("/tmp/fey/test/json")) + feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.delete_json_test), None) TestProbe().verifyActorTermination(orchestrationref) } "result in sending TERMINATE message to Monitor actor" in { @@ -140,7 +140,7 @@ class FeyCoreSpec extends BaseAkkaSpec { "Sending FeyCore.STOP_EMPTY_ORCHESTRATION to FeyCore" should { s"result in termination of 'TEST-ORCH-2'" in { - feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.orchestration_test_json), new File("/tmp/fey/test/json")) + feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.orchestration_test_json), None) val ref = TestProbe().expectActor(s"$feyPath/TEST-ORCH-2") FEY_CACHE.activeOrchestrations should have size(1) FEY_CACHE.activeOrchestrations should contain key("TEST-ORCH-2") From a865f2fb25d3f8657ddae6620588eff232574b45 Mon Sep 17 00:00:00 2001 From: Barbara Gomes Date: Fri, 22 Jul 2016 14:36:32 -0700 Subject: [PATCH 02/10] [IOTA-28] Implementing GenericReceiver actor --- .../iota/fey/FeyGenericActorReceiver.scala | 198 ++++++++++++++++++ 1 file changed, 198 insertions(+) create mode 100644 fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala new file mode 100644 index 0000000..07c7f25 --- /dev/null +++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala @@ -0,0 +1,198 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iota.fey + +import java.io.{File, FileOutputStream} +import java.net.URL +import java.nio.file.{Files, Paths} +import com.eclipsesource.schema._ +import akka.actor.ActorRef +import com.eclipsesource.schema.SchemaValidator +import org.apache.commons.io.IOUtils +import play.api.libs.json._ +import scala.concurrent.duration._ +import scala.util.Properties._ + +abstract class FeyGenericActorReceiver(override val params: Map[String,String] = Map.empty, + override val backoff: FiniteDuration = 1.minutes, + override val connectTo: Map[String,ActorRef] = Map.empty, + override val schedulerTimeInterval: FiniteDuration = 2.seconds, + override val orchestrationName: String = "", + override val orchestrationID: String = "", + override val autoScale: Boolean = false) extends FeyGenericActor{ + + private[fey] val feyCore = FEY_CORE_ACTOR.actorRef + + override final def processMessage[T](message: T, sender: ActorRef): Unit = { + try { + val jsonString = getJSONString(message) + processJson(jsonString) + startBackoff() + }catch{ + case e: Exception => log.error(e, s"Could not process message $message") + } + } + + private[fey] def processJson(jsonString: String) = { + var orchID:String = "None" + try{ + val orchestrationJSON = Json.parse(jsonString) + orchID = (orchestrationJSON \ JSON_PATH.GUID).as[String] + val valid = validJson(orchestrationJSON) + if(valid && (orchestrationJSON \ JSON_PATH.COMMAND).as[String].toUpperCase != "DELETE"){ + checkForLocation(orchestrationJSON) + } + if(valid) { + feyCore ! FeyCore.ORCHESTRATION_RECEIVED(orchestrationJSON, None) + }else{ + log.warning(s"Could not forward Orchestration $orchID. Invalid JSON schema") + } + } catch { + case e: Exception => + log.error(e, s"Orchestration $orchID could not be forwarded") + } + } + + /** + * Return a JSON string + * @param input the received process message + * @tparam T + * @return String that can be converted to JSON + */ + def getJSONString[T](input: T): String + + /** + * Checks if JSON complies with defined Schema + * + * @param json + * @return true if it complies or false if it does not + */ + final def validJson(json: JsValue): Boolean = { + try { + val result = SchemaValidator.validate(CONFIG.JSON_SPEC, json) + if (result.isError) { + log.error("Incorrect JSON schema \n" + result.asEither.left.get.toJson.as[List[JsObject]].map(error => { + val path = (error \ "instancePath").as[String] + val msg = (error \ "msgs").as[List[String]].mkString("\n\t") + s"$path \n\tErrors: $msg" + }).mkString("\n")) + false + } else { + true + } + }catch{ + case e: Exception => + log.error("Error while validating JSON", e) + false + } + } + + /** + * Checks if any of the performers need to have its jar downloaded + * All the Receivers must call this method so the Jars can be downloaded at runtime + * + * @param json Orchestration JSON object + */ + final def checkForLocation(json: JsValue): Unit = { + (json \ JSON_PATH.ENSEMBLES).as[List[JsObject]].foreach(ensemble => { + (ensemble \ JSON_PATH.PERFORMERS).as[List[JsObject]].foreach(performer => { + if((performer \ JSON_PATH.SOURCE).as[JsObject].keys.contains(JSON_PATH.JAR_LOCATION)){ + val location = (performer \ JSON_PATH.SOURCE \ JSON_PATH.JAR_LOCATION).as[JsObject] + val jarName = (performer \ JSON_PATH.SOURCE \ JSON_PATH.SOURCE_NAME).as[String] + val url = (location \ JSON_PATH.JAR_LOCATION_URL).as[String].toLowerCase + if( (url.startsWith("https://") || url.startsWith("http://")) && !jarDownloaded(jarName)){ + + val credentials:Option[JsObject] = { + if(location.keys.contains(JSON_PATH.JAR_CREDENTIALS_URL)){ + Option((location \ JSON_PATH.JAR_CREDENTIALS_URL).as[JsObject]) + }else{ + None + } + } + + downloadJAR(url, jarName, credentials) + } + }else{ + log.debug("Location not defined in JSON") + } + }) + }) + } + + /** + * Checks if the jar already exists + * + * @param jarName + * @return + */ + private final def jarDownloaded(jarName: String): Boolean = { + try { + Files.exists(Paths.get(s"${CONFIG.DYNAMIC_JAR_REPO}/$jarName")) + }catch{ + case e: Exception => + log.error(s"Could not check if $jarName exists", e) + true + } + } + + private final def downloadJAR(url: String, jarName: String, credentials: Option[JsObject]): Unit = { + var outputStream: FileOutputStream = null + try{ + log.info(s"Downloading $jarName from $url") + + val connection = new URL(s"$url/$jarName").openConnection + + resolveCredentials(credentials) match{ + case Some(userpass) => + connection.setRequestProperty(HttpBasicAuth.AUTHORIZATION, HttpBasicAuth.getHeader(userpass._1, userpass._2)) + case None => + } + + // Download Jar + outputStream = new FileOutputStream(s"${CONFIG.DYNAMIC_JAR_REPO}/$jarName") + IOUtils.copy(connection.getInputStream,outputStream) + outputStream.close() + + }catch{ + case e: Exception => + if(outputStream != null) { + outputStream.close() + (new File(s"${CONFIG.DYNAMIC_JAR_REPO}/$jarName")).delete() + } + log.error(s"Could not download $jarName from $url", e) + } + } + + /** + * Tries to resolve the credentials looking to the environment variable + * If it is not possible to find a env var with that name, then use the name itself + * @param credentials + * @return (user, password) + */ + def resolveCredentials(credentials: Option[JsObject]):Option[(String, String)] = { + credentials match { + case None => None + case Some(cred) => + val user = (cred \ JSON_PATH.JAR_CRED_USER).as[String] + val password = (cred \ JSON_PATH.JAR_CRED_PASSWORD).as[String] + Option(envOrElse(user,user), envOrElse(password,password)) + } + } + +} From a37164e6150d8bb64bd4dc9cbc50f5ab27b97972 Mon Sep 17 00:00:00 2001 From: Barbara Gomes Date: Fri, 22 Jul 2016 14:38:05 -0700 Subject: [PATCH 03/10] [IOTA-28] Adding tests for GenericReceiver --- .../src/test/resources/fey-test-actor.jar | Bin 12703 -> 17735 bytes .../org/apache/iota/fey/BaseAkkaSpec.scala | 13 ++ .../org/apache/iota/fey/FeyCoreSpec.scala | 77 +++++++++- .../fey/FeyGenericActorReceiverSpec.scala | 139 ++++++++++++++++++ .../fey/FeyGenericActorReceiverTest.scala | 70 +++++++++ .../apache/iota/fey/JsonReceiverSpec.scala | 2 +- .../org/apache/iota/fey/Utils_JSONTest.scala | 131 +++++++++++++++++ 7 files changed, 428 insertions(+), 4 deletions(-) create mode 100644 fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorReceiverSpec.scala create mode 100644 fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorReceiverTest.scala diff --git a/fey-core/src/test/resources/fey-test-actor.jar b/fey-core/src/test/resources/fey-test-actor.jar index d440cd1a245c24f2634f6c994e4fddc24180f524..e072c7b9f23c05048c0203ad3a5952eaae673f21 100644 GIT binary patch delta 5094 zcmaKwWmJ@1*T?BbB^~J;dgyLx>4qVsVL-ZbfB}@DTTo&KK}rxLq(r&|hETdeK!u@0 zVSH}9*Ym#j^R8#T*Y)9du63QW&)OgMf9?G*b{vFblN)H`;*nrs5n*B3`JQKxv*X@E z6!-_$e18BJ#I5!@ukZMXf6I4J0(0Cyi+SP^ia(2T`XfBNzkX?Ejf3d3Y=tPZ1cZs- zO|o$Pz3&%$3<(`B9yJyb5z*}}npppMRI#!Qh3JTkoE@DZLC$_zb%Gqw6jM)QdQ<8s z(7Y3C9HkPxf_r8LXxCo;!9=SMcu!0`CRfQFyy_((X+PRPAB<+ZynoFXh0-f?lr`;= z-c0k8YsZt}Z~|WWOf36PjQZQ2prI2OJRhpS1e!$%q+{&N2JxVI3vA2)S?tNNdU$^z z#>cO%u~0SEs`>M2O&s)~b!r7I2b|&s^m+|{ZXFgMeD|teu+e%s=g#c5&4DKD#rL8z z?cU~wGL7qK4`2&;!R00cM4WT9Nrvlddjq z7D(O8op}^g?QHSJ8rDM0GTa59=SGI4*jHsa?4^IK^3?L>HtkT}+uQ1)AfME!2+2|w z@UGp@CLvBZnu4~PH%O($Q*^}hWtZn&>eW=Sr0g~L*sM^Vb+I^iBvqG7LHT;;1xK(CDNEc6Dn$fuCVZ+( zzM2O~z!$e_&;-xy8H>|Oq6IZA4(O;+``zlL=muo|0~wuZm)$YpGp;kX3!P!1Nl^(g zkA!w$K>}2*HP@!T*lbTZ{ir)Z(zMK8yXaJt6E4I}<0z(7yOmqxH(eTIX11Sj*u4-} zJ^~3h=x@2{WoC_SSl^$GpI%QkNzUyr+dxxV&G1>LhB~oZugX^%YvA;tBJweOD?_d*+KH6X$J&Ba`f%$lHh-JY-x(z7nE-p8xS8KyQ*c zV(4k_HJgy40l`7n@4E)tghX95$9hb7SXdf=>DR52{dbMw|D!R`e`}1gV}Q3G2gvb> zy}y5%>32OUWok^hK%)(>opLMeBOE^_r=0{jq*kRy#*&Dk&qeS*2_8PkjSD`0YKN1^ zp%8u7Sx!^{ZUTmR2DR;cx*Ctzk+{KZewV~59TGZiwf(Y2xU@u*uuZTuN1-v0a0nB( zC;4s=;tzFM3k#!8Mnt7C`}Ka+`(m-YK;?2z?Kz^dFKkY-b?gC>ubLDsnk1vbd*VH) zBiiSpv*=f2m&^L{+p1GVk)Z<)-3Hym3VXL{&-GtMZ(QZOao6`29iHy&D>2*eYMr@K zH3z=PFGx}HC9gS1Py*h+`1AG&K?v_(k4D3sN%i!RRbJ%^e zYl)=u)nK2oNvI_mFccFtNnBF)$yVZX7jG;{VWs>5j+*Dzct9ZN>-(u=hnP#K`Gm=Q zhhOWt1vi`;jXG;K$%!nY`57O$faqB%l`~6Fo$M{iTlclE$gCWkO!`G8o~MU&Gv31l z;M|3#Pr0>smCR*tM^zx`VKW-iqr-$f`gQ;tBiC%*af|t7-{T$9jW%7(010Mblnn@= z1LzF`cf?M!aL^d<8 zfEWDp=Jb9Uolm4UJQZ3vJCMK+Ue1h6Tk_Db$F*~VqGjv3?k+s|-d2}SdkUK<$tiCmPZP*6dN)REvOtDElln=bzxDg8Ei3-OXwF-B6nZ#98x4e!SmYMZX_oe$kK1k3z+N~ONN*tsGYhaR$Ju;^GGS%z0 zO#-_iNRihPU@l?PQ!$e*YCVNk67oEyMMz|rs2ky%>ArR>M*fE?>-y&D2udfey~fA! z%zSK{)aZ9Hf+0~6{mR#Z&M9`5rk19xZ!&c@pGmhBf)Bx&h_3GEPZ&)DO@(qia)2~% zk>ebsba&<Ic? z({mLpnskAxVqSC|*ga%ODG?PN82L0wO0RQfzE0E-CiW)t?)*%F$-bB8e$+B^eV0?P z$Ep{~4=Gw;@`ZQRe?7A3KW zwD6$FIwxn=&#ETK%9E-GFt%LY#@RKWYuft8B(PsQn|KnCva*(sHYrPpo9wlTq=bw% zC0OsPlPpw~%$Qa#dW=>BGDcq1E_DwGvjb~+_3Y@LTIPa40Xe5je(xJkO@W>^bQrcT zzQRlyRFz@n&Cm(o>==h@bTc~>yXPh#@ubG2ITG=pSd)9&b-eBcK$bYy$4G1*NHhxU zV=c&zhaCz(=4&r>{6-OstdVuyc{B+|P#4rs_N6|U75eVT*hrY>JQ@oY6v5GTn>Es} z@k=aMZ^);ZIQTXhFFCxe@9t%LojoD<4& zIVnLERVXNe+LIwouz*N6uG?Q7(nWac(6ld)IlHE{U9%V;L?^%FC|cW3p>LQ%mVorw1xESY#7Q~vwWTr|=nh`IhT3&Laj`Jla)NaZmD zV(WUx1-C;{KPE*fy>Q)=vJicVW5+7VT$czoj|4m&6h;ST9=Ur#;sH?_9QQRXDsu6kVu@8^vD_I*bUf~}&8ZAo!hYXJ8AtRu#?JND!L{w^ufx9$Z4Mb5*gyQS8DYgOf0(9?zDZvo~1|`UDW>>;ecP=_-?KWxMQ5e?Xk_^@GJOiv4=F zRqHUR;S%0Ut)@${{?cHzL+kBTr6BQ*XJU?&#Q7Jrtr#a|w>O}`{*X3m^k;r_$jcs= z0?YM2ir*QF^hs`*=0R6BPDGy8$?zfc(qWr_1bz9AwjgNh{wv+dkRc%wpx9aT&{Z6_ z2q^dHQi^?BY8RxkQpnJDl@I3zBks)CNCwrOo(z6U2a174qG)H2c^8J}?k~?D8XAcJx*+1vS%JRb7SFwYy@4UcQ9YThPw&rAUv17t3HkZ%wXi}}dH_D- z*yDlsYirk&y>h8x76KB?I(d&W}nBwS3UOL6s2uY63MxvrbaCZTZiPvc_HQ@1EkeqlK&} zZJDv71~d)nV~!#WVp~S>R@0%fjB2KB5uF0sHmzgDL*kXQyT_w|$#*f7>eClt?MFbT z?xF0+2H&^s!;WI@;fixrG~}fe1eO_NaDbCUZ}LrreVVY}I6;tp*{snk*2*(Y#G=`Z z%suIJV_3Ta$>Rp(t?V z3KRpQH6Ztej%`Xk-RitjbRiPqHkDK9?3Q~BLrh40u?7uh&LnU~9dvg_;|!1Q_~NQu zSfohXvvR(Bchbhpt}*cG$ikS=(vl@1o3UC|QIorPa_mJgk2;xKrO#m$Y3WSeTQwQk zEVj&Cw~X%3Rpl=>zd@D2MW=Lr;g2CBJTx!Q!0vW^%?pPruNNg*xWt{4e@KQ3H*~&! zzJO(1X4_J>+>I5hZ6=n4Lxx0AswW(1<1fkkcfq6QCJ^ZaHuXrKV!U&(1A z@e)22k{{;H9FNyIv^Qzb2U{tbWG}!8mS^#@f7T*LT5aFqj3K+;ogoUq?fKq$%njTaw*+v{XkkbC*`VLVFh!>OpQdYNHiC!S#F0=oiNKm*Pxx zE9zTQ^>Hq;b*dotzJ~04x(wqbfqGGA6c+rE2DX)&X?*hR?z-d)%=Swo`g=RsV3_$B zYP&^jqokT{O?EZz2UtS`@Yzlc!BT_J?Om!B->bd}wa_5=g{n9cmH{BNdWU)h2oI#A zD|$zu*rn&LKe6R6DdxMT?Eqi$DLnYmVs+C&G&8>fH40Kw|QfQhx$3Usm!dw#pf zj*Ql|7vm;Nea%tdpg*C6+1}#-$9h=3EXu z90(4MWsY4^wZa_~U;(c>V`76+V-*q0#l_=GENcYg9v^v={BV)}78GNA{_`kB)bwg; z25NP3+FHu%5?5sYVuK(}xwJgcT{2pX@Vv6g5_=!FJKxq49Ipo(AiH#>ZE&U&w}%cU zR6yBZ6Vv?0~wh zj&L|*LDTpPM3`vX>t`cv*AMds+pdlBGC_Q8HJG?9McXTvI_St}Tx!C7)fbSIxG7gK zAL14Lj_HQ2?3%tdPb(r0qdt2wb7Cp@q?6yk zJPBa&OYyc5tsKQxc`nQpkl>z$WRAqI6ByJt6tV@vWA4&)+ z-WpU{A%d=dxFM+jhhjo{e;6+W`NJb2!#`9LR_DdWp~m{(G4h`Srs_XzD=e%q(El~A zgjjCPt$#Mo{Qq%+MD+HA(tjI6Zxj3Hwf{_-?>~b0OWX$G`HvufciaZr_yo6G$^UxA M{+fwzly1NN1<49^0{{R3 delta 216 zcmX@!#W+8iH^7^jg^PiKgMne|gfA0$m6<`5u^O0)(FRjp`phMa$sEp^V7kOP5KNzNb`xP`V*mn> N>HN$L3{`p{9suv0P*VT^ diff --git a/fey-core/src/test/scala/org/apache/iota/fey/BaseAkkaSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/BaseAkkaSpec.scala index d77d28b..19c3b71 100644 --- a/fey-core/src/test/scala/org/apache/iota/fey/BaseAkkaSpec.scala +++ b/fey-core/src/test/scala/org/apache/iota/fey/BaseAkkaSpec.scala @@ -68,6 +68,19 @@ class BaseAkkaSpec extends BaseSpec with BeforeAndAfterAll with LoggingTest{ } } + def expectActorInSystem(path: String, lookInSystem: ActorSystem, max: FiniteDuration = 3.seconds): ActorRef = { + probe.within(max) { + var actor = null: ActorRef + probe.awaitAssert { + (lookInSystem actorSelection path).tell(Identify(path), probe.ref) + probe.expectMsgPF(100 milliseconds) { + case ActorIdentity(`path`, Some(ref)) => actor = ref + } + } + actor + } + } + def verifyActorTermination(actor: ActorRef)(implicit system: ActorSystem): Unit = { val watcher = TestProbe() watcher.watch(actor) diff --git a/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala index c616821..5ae55c2 100644 --- a/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala +++ b/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala @@ -18,11 +18,10 @@ package org.apache.iota.fey -import java.io.File +import java.nio.file.{Files, Paths} import akka.actor.{ActorRef, PoisonPill, Props} import akka.testkit.{EventFilter, TestProbe} - import scala.concurrent.duration.DurationInt class FeyCoreSpec extends BaseAkkaSpec { @@ -62,8 +61,8 @@ class FeyCoreSpec extends BaseAkkaSpec { val orchestration_name = "TEST-ACTOR" "Sending FeyCore.ORCHESTRATION_RECEIVED with CREATE command to FeyCore" should { - feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.create_json_test), None) s"result in creating an Orchestration child actor with the name '$orchestration_name'" in { + feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.create_json_test), None) orchestrationref = TestProbe().expectActor(s"$feyPath/$orchestration_name") } s"result in creating an Ensemble child actor with the name '$orchestration_name/MY-ENSEMBLE-0001'" in { @@ -155,10 +154,82 @@ class FeyCoreSpec extends BaseAkkaSpec { } } + var receiverRef:ActorRef = _ + var receiverEnsenble:ActorRef = _ + var receiverOrch:ActorRef = _ + val receiverJSON = getJSValueFromString(Utils_JSONTest.generic_receiver_json) + val receiverOrchName = (receiverJSON \ JSON_PATH.GUID).as[String] + + "Sending FeyCore.ORCHESTRATION_RECEIVED with CREATE command to FeyCore of a GenericReceiverActor" should { + s"result in creating an Orchestration child actor with the name '$receiverOrchName'" in { + feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(receiverJSON, None) + receiverOrch = TestProbe().expectActor(s"$feyPath/$receiverOrchName") + } + s"result in creating an Ensemble child actor with the name '$receiverOrchName/RECEIVER-ENSEMBLE'" in { + receiverEnsenble = TestProbe().expectActor(s"$feyPath/$receiverOrchName/RECEIVER-ENSEMBLE") + } + s"result in creating a Performer child actor with the name '$receiverOrchName/RECEIVER-ENSEMBLE/MY_RECEIVER_PERFORMER'" in { + receiverRef = TestProbe().expectActor(s"$feyPath/$receiverOrchName/RECEIVER-ENSEMBLE/MY_RECEIVER_PERFORMER") + } + s"result in new entry to FEY_CACHE.activeOrchestrations with key '$receiverOrchName'" in { + FEY_CACHE.activeOrchestrations should contain key(receiverOrchName) + } + } + + "Sending PROCESS message to the Receiver Performer" should { + "Send FeyCore.ORCHESTRATION_RECEIVED to FeyCore" in { + receiverRef ! FeyGenericActor.PROCESS(Utils_JSONTest.json_for_receiver_test) + TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER", FEY_SYSTEM.system) + } + s"result in creating an Orchestration child actor with the name 'RECEIVED-BY-ACTOR-RECEIVER'" in { + TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER", FEY_SYSTEM.system) + } + s"result in creating an Ensemble child actor with the name 'RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0001'" in { + TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0001", FEY_SYSTEM.system) + } + s"result in creating an Ensemble child actor with the name 'RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0002'" in { + TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0002", FEY_SYSTEM.system) + } + s"result in creating a Performer child actor with the name 'RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0002/TEST-0001'" in { + TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0002/TEST-0001", FEY_SYSTEM.system) + } + s"result in creating a Performer child actor with the name 'RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0001/TEST-0001'" in { + TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0001/TEST-0001", FEY_SYSTEM.system) + } + s"result in one new entry to FEY_CACHE.activeOrchestrations with key 'RECEIVED-BY-ACTOR-RECEIVER'" in { + FEY_CACHE.activeOrchestrations should have size(2) + FEY_CACHE.activeOrchestrations should contain key(receiverOrchName) + FEY_CACHE.activeOrchestrations should contain key("RECEIVED-BY-ACTOR-RECEIVER") + } + } + + "Sending PROCESS message to the Receiver Performer with command DELETE" should { + "STOP running orchestration" in { + val ref = TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER", FEY_SYSTEM.system) + receiverRef ! FeyGenericActor.PROCESS(Utils_JSONTest.json_for_receiver_test_delete) + TestProbe().verifyActorTermination(ref) + } + s"result in one entry in FEY_CACHE.activeOrchestrations" in { + FEY_CACHE.activeOrchestrations should have size(1) + FEY_CACHE.activeOrchestrations should contain key(receiverOrchName) + } + } + + "Sending PROCESS message to Receiver with checkpoint enabled" should { + "Save received JSON to checkpoint dir" in { + CONFIG.CHEKPOINT_ENABLED = true + receiverRef ! FeyGenericActor.PROCESS(Utils_JSONTest.json_for_receiver_test) + TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER", FEY_SYSTEM.system) + Files.exists(Paths.get(s"${CONFIG.CHECKPOINT_DIR}/RECEIVED-BY-ACTOR-RECEIVER.json")) should be(true) + CONFIG.CHEKPOINT_ENABLED = false + } + } + "Stopping FeyCore" should { "result in sending STOP message to Monitor actor" in { feyCoreRef ! PoisonPill monitor.expectMsgClass(1.seconds, classOf[Monitor.STOP]) + TestProbe().verifyActorTermination(receiverRef) } } diff --git a/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorReceiverSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorReceiverSpec.scala new file mode 100644 index 0000000..34235db --- /dev/null +++ b/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorReceiverSpec.scala @@ -0,0 +1,139 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.iota.fey + +import java.nio.file.{Files, Paths} + +import akka.actor.Props +import akka.testkit.{EventFilter, TestActorRef, TestProbe} + +import scala.concurrent.duration.{DurationInt, FiniteDuration} + +class FeyGenericActorReceiverSpec extends BaseAkkaSpec{ + + val parent = TestProbe("GENERIC-RECEIVER-PARENT") + val monitor = TestProbe("MONITOR-GENERIC") + val feyTB = TestProbe("GENERIC-FEY") + val connectToTB = TestProbe("REC-CONNECT") + + val genericRef: TestActorRef[FeyGenericActorReceiverTest] = + TestActorRef[FeyGenericActorReceiverTest]( Props(new FeyGenericActorReceiverTest(Map.empty, 0.seconds, + Map("connect" -> connectToTB.ref),300.milliseconds,"MY-ORCH", "MY-ORCH", false){ + override private[fey] val monitoring_actor = monitor.ref + override private[fey] val feyCore = feyTB.ref + }),parent.ref, "GENERIC-RECEIVER-TEST") + + var genericState:FeyGenericActorReceiverTest = genericRef.underlyingActor + val path = genericRef.path.toString + + + "Creating a GenericActor with Schedule time defined" should { + "result in scheduler started" in{ + genericState.isShedulerRunning() should be(true) + } + "result in onStart method called" in { + genericState.started should be(true) + } + "result in START message sent to Monitor" in{ + monitor.expectMsgClass(classOf[Monitor.START]) + } + "result in one active actor" in { + globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString) + Thread.sleep(500) + IdentifyFeyActors.actorsPath should have size(1) + IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/GENERIC-RECEIVER-TEST") + } + s"result in normal functioning of GenericActor" in { + genericRef ! "PROPAGATE" + connectToTB.expectMsg(FeyGenericActor.PROCESS("PROPAGATE-CALLED")) + } + } + + "Sending PROCESS message to GenericReceiver" should { + "log message to Warn saying that the JSON could not be forwarded to FeyCore when JSON is invalid" in { + EventFilter.warning(message = s"Could not forward Orchestration TEST-ACTOR. Invalid JSON schema", occurrences = 1) intercept { + genericRef ! FeyGenericActor.PROCESS("INVALID_JSON") + feyTB.expectNoMsg(1.seconds) + } + } + "send ORCHESTRATION_RECEIVED to FeyCore when JSON to be processed has a valid schema" in { + genericRef ! FeyGenericActor.PROCESS("VALID_JSON") + feyTB.expectMsgClass(classOf[FeyCore.ORCHESTRATION_RECEIVED]) + } + "Download jar from location and send ORCHESTRATION_RECEIVED to FeyCore when JSON has a location defined" in { + genericRef ! FeyGenericActor.PROCESS("JSON_LOCATION") + Files.exists(Paths.get(s"${CONFIG.DYNAMIC_JAR_REPO}/fey-virtual-sensor.jar")) + feyTB.expectMsgClass(classOf[FeyCore.ORCHESTRATION_RECEIVED]) + } + } + + "Scheduler component" should { + "call execute() method" in { + genericState.executing should be(true) + } + } + + "Sending EXCEPTION(IllegalArgumentException) message to GenericActor" should { + "Throw IllegalArgumentException" in { + EventFilter[IllegalArgumentException](occurrences = 1) intercept { + genericRef ! FeyGenericActor.EXCEPTION(new IllegalArgumentException("Testing")) + } + } + "Result in restart of the actor with sequence of Monitoring: STOP -> RESTART -> START" in { + monitor.expectMsgClass(classOf[Monitor.STOP]) + monitor.expectMsgClass(classOf[Monitor.RESTART]) + //Restart does not change the actorRef but does change the object inside the ActorReference + genericState = genericRef.underlyingActor + monitor.expectMsgClass(classOf[Monitor.START]) + } + "call onStart method" in { + genericState.started should be(true) + } + "call onRestart method" in { + Thread.sleep(100) + genericState.restarted should be(true) + } + "restart scheduler" in { + genericState.isShedulerRunning() should be(true) + } + } + + "Sending STOP to GenericActor" should{ + "terminate GenericActor" in{ + genericRef ! FeyGenericActor.STOP + TestProbe().verifyActorTermination(genericRef) + TestProbe().notExpectActor(path) + } + "call onStop method" in { + genericState.stopped should be(true) + } + "cancel scheduler" in{ + genericState.isShedulerRunning() should be(false) + } + "send STOP - TERMINATE message to Monitor" in{ + monitor.expectMsgClass(classOf[Monitor.STOP]) + } + "result in no active actors" in { + globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString) + Thread.sleep(500) + IdentifyFeyActors.actorsPath shouldBe empty + } + } +} diff --git a/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorReceiverTest.scala b/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorReceiverTest.scala new file mode 100644 index 0000000..c110923 --- /dev/null +++ b/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorReceiverTest.scala @@ -0,0 +1,70 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iota.fey + +import akka.actor.ActorRef +import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.concurrent.duration.FiniteDuration + +class FeyGenericActorReceiverTest(override val params: Map[String,String] = Map.empty, + override val backoff: FiniteDuration = 1.minutes, + override val connectTo: Map[String,ActorRef] = Map.empty, + override val schedulerTimeInterval: FiniteDuration = 2.seconds, + override val orchestrationName: String = "", + override val orchestrationID: String = "", + override val autoScale: Boolean = false) extends FeyGenericActorReceiver { + + override def customReceive:Receive = { + case "PROPAGATE" => propagateMessage("PROPAGATE-CALLED") + case x => log.debug(s"Message not treated: $x") + } + + override def getJSONString[T](input: T): String = { + input match{ + case "VALID_JSON" => Utils_JSONTest.create_json_test + case "INVALID_JSON" => Utils_JSONTest.test_json_schema_invalid + case "JSON_LOCATION" => Utils_JSONTest.location_test_2 + } + } + + var count = 0 + var started = false + var executing = false + var stopped = false + var restarted = false + + override def onStart(): Unit = { + started = true + } + + override def execute(): Unit = { + log.info(s"Executing action in ${self.path.name}") + executing = true + } + + override def onStop(): Unit = { + log.info(s"Actor ${self.path.name} stopped.") + stopped = true + } + + override def onRestart(reason: Throwable): Unit = { + restarted = true + } + +} diff --git a/fey-core/src/test/scala/org/apache/iota/fey/JsonReceiverSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/JsonReceiverSpec.scala index 1cf25ee..5691aef 100644 --- a/fey-core/src/test/scala/org/apache/iota/fey/JsonReceiverSpec.scala +++ b/fey-core/src/test/scala/org/apache/iota/fey/JsonReceiverSpec.scala @@ -63,7 +63,7 @@ class JsonReceiverSpec extends BaseAkkaSpec with LoggingTest{ } "download jar dynamically from URL" in { receiver.checkForLocation(getJSValueFromString(Utils_JSONTest.location_test)) - Files.exists(Paths.get(s"${CONFIG.DYNAMIC_JAR_REPO}/fey-stream.jar")) + Files.exists(Paths.get(s"${CONFIG.DYNAMIC_JAR_REPO}/fey-stream.jar")) should be(true) } } diff --git a/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala b/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala index 417ba54..70d3ddc 100644 --- a/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala +++ b/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala @@ -417,4 +417,135 @@ object Utils_JSONTest { } ] }""" + + val location_test_2 = + """{ + "guid": "Orch2", + "command": "CREATE", + "timestamp": "591997890", + "name": "DESCRIPTION", + "ensembles": [ + { + "guid": "En2", + "command": "NONE", + "performers": [ + { + "guid": "S2", + "schedule": 1000, + "backoff": 0, + "source": { + "name": "fey-virtual-sensor.jar", + "classPath": "org.apache.iota.fey.performer.Sensor", + "location" :{ + "url" : "https://github.com/apache/incubator-iota/raw/master/fey-examples/active-jar-repo" + }, + "parameters": { + } + } + } + ], + "connections": [ + ] + } + ] + }""" + + val generic_receiver_json = """{ + "guid": "RECEIVER_ORCHESTRATION", + "command": "CREATE", + "timestamp": "591997890", + "name": "DESCRIPTION", + "ensembles": [ + { + "guid": "RECEIVER-ENSEMBLE", + "command": "NONE", + "performers": [ + { + "guid": "MY_RECEIVER_PERFORMER", + "schedule": 0, + "backoff": 0, + "source": { + "name": "fey-test-actor.jar", + "classPath": "org.apache.iota.fey.TestReceiverActor", + "parameters": { + } + } + } + ], + "connections": [ + ] + } + ] + }""" + + val json_for_receiver_test = + """{ + "guid" : "RECEIVED-BY-ACTOR-RECEIVER", + "command" : "CREATE", + "timestamp": "213263914979", + "name" : "ORCHESTRATION FOR TEST", + "ensembles" : [ + { + "guid":"MY-ENSEMBLE-REC-0001", + "command": "NONE", + "performers":[ + { + "guid": "TEST-0001", + "schedule": 0, + "backoff": 0, + "source": { + "name": "fey-test-actor.jar", + "classPath": "org.apache.iota.fey.TestActor", + "parameters": {} + } + } + ], + "connections":[] + }, + { + "guid":"MY-ENSEMBLE-REC-0002", + "command": "NONE", + "performers":[ + { + "guid": "TEST-0001", + "schedule": 0, + "backoff": 0, + "source": { + "name": "fey-test-actor.jar", + "classPath": "org.apache.iota.fey.TestActor_2", + "parameters": {} + } + } + ], + "connections":[] + } + ] + }""" + + val json_for_receiver_test_delete = + """{ + "guid" : "RECEIVED-BY-ACTOR-RECEIVER", + "command" : "DELETE", + "timestamp": "213263914979", + "name" : "ORCHESTRATION FOR TEST", + "ensembles" : [ + { + "guid":"MY-ENSEMBLE-REC-0001", + "command": "NONE", + "performers":[ + { + "guid": "TEST-0001", + "schedule": 0, + "backoff": 0, + "source": { + "name": "fey-test-actor.jar", + "classPath": "org.apache.iota.fey.TestActor", + "parameters": {} + } + } + ], + "connections":[] + } + ] + }""" } From 8da99385e542479bf38e0c71c9ffbbe5e69540ef Mon Sep 17 00:00:00 2001 From: Barbara Gomes Date: Thu, 28 Jul 2016 17:31:42 -0700 Subject: [PATCH 04/10] Cleanup ORCHESTRATION_CACHE.orchestration_metadata when orchestration is terminated. --- fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala index 4909a0b..8541ee4 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala @@ -101,6 +101,7 @@ protected class FeyCore extends Actor with ActorLogging{ monitoring_actor ! Monitor.TERMINATE(actorRef.path.toString, Utils.getTimestamp) log.info(s"TERMINATED ${actorRef.path.name}") FEY_CACHE.activeOrchestrations.remove(actorRef.path.name) + ORCHESTRATION_CACHE.orchestration_metadata.remove(actorRef.path.name) if(!FEY_CACHE.orchestrationsAwaitingTermination.isEmpty) { checkForOrchestrationWaitingForTermination(actorRef.path.name) } From fedb5e2368e228eab047c1fb36a3a920ca53d4e7 Mon Sep 17 00:00:00 2001 From: Barbara Gomes Date: Thu, 28 Jul 2016 21:34:07 -0700 Subject: [PATCH 05/10] Adjusting GenericReceiver --- .../scala/org/apache/iota/fey/FeyGenericActorReceiver.scala | 4 +++- fey-core/src/main/scala/org/apache/iota/fey/Utils.scala | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala index 07c7f25..7361d01 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala @@ -42,7 +42,9 @@ abstract class FeyGenericActorReceiver(override val params: Map[String,String] = override final def processMessage[T](message: T, sender: ActorRef): Unit = { try { val jsonString = getJSONString(message) - processJson(jsonString) + if(jsonString != "{}") { + processJson(jsonString) + } startBackoff() }catch{ case e: Exception => log.error(e, s"Could not process message $message") diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala index fb62f37..bb0f4d5 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala @@ -145,7 +145,7 @@ protected object Utils { val ensembleJSON = metadata.map(ensenble => ensenble._2) val orchestrationSpec = Json.obj(JSON_PATH.GUID -> orchestrationID, - JSON_PATH.COMMAND -> "CREATE", + JSON_PATH.COMMAND -> "RECREATE", JSON_PATH.ORCHESTRATION_NAME -> "I DONT KNOW HOW TO SAVE IT YET =P", JSON_PATH.ORCHESTRATION_TIMESTAMP -> System.currentTimeMillis.toString, JSON_PATH.ENSEMBLES -> ensembleJSON From 2135e87e0527c61eaafaba956d5b3b7c156d233c Mon Sep 17 00:00:00 2001 From: Barbara Gomes Date: Fri, 29 Jul 2016 11:40:59 -0700 Subject: [PATCH 06/10] New Feature: User should be able to define a custom dispatcher for the performer - Added "dispatcher" property to Performer JSON (Optional) - Added "custom-dispatcher" configuration to Fey --- fey-core/src/main/resources/application.conf | 8 ++++++++ .../resources/fey-json-schema-validator.json | 3 +++ .../org/apache/iota/fey/Application.scala | 2 +- .../scala/org/apache/iota/fey/Ensemble.scala | 20 +++++++++++++++---- .../scala/org/apache/iota/fey/Utils.scala | 10 +++++++++- 5 files changed, 37 insertions(+), 6 deletions(-) diff --git a/fey-core/src/main/resources/application.conf b/fey-core/src/main/resources/application.conf index 3816151..a1193af 100644 --- a/fey-core/src/main/resources/application.conf +++ b/fey-core/src/main/resources/application.conf @@ -75,6 +75,14 @@ fey-global-configuration{ messages-per-resize = 500 } + // Holds all the Akka dispatchers configured by the user. + // These dispatchers will be given to Fey ActorSystem and will be available + // to be used by the performers. To do so, just specify the property "dispatcher" + // on the performer json using the path of the dispatcher inside the custom-dispatcher + custom-dispatchers{ + // No default custom implementations + } + } // Fey akka configuration. Can not be overwritten by user diff --git a/fey-core/src/main/resources/fey-json-schema-validator.json b/fey-core/src/main/resources/fey-json-schema-validator.json index 5dbf58a..579fbca 100644 --- a/fey-core/src/main/resources/fey-json-schema-validator.json +++ b/fey-core/src/main/resources/fey-json-schema-validator.json @@ -32,6 +32,9 @@ "controlAware":{ "type":"boolean" }, + "dispatcher":{ + "type": "string" + }, "autoScale":{ "type":"integer", "minimum":0 diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala index 637fbf0..d62c418 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala @@ -35,7 +35,7 @@ object Application extends App { } object FEY_SYSTEM{ - implicit val system = ActorSystem("FEY-MANAGEMENT-SYSTEM") + implicit val system = ActorSystem("FEY-MANAGEMENT-SYSTEM", CONFIG.getDispatcherForAkka().withFallback(ConfigFactory.load())) } object SYSTEM_ACTORS{ diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala index cc7640c..afa39f2 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala @@ -205,16 +205,22 @@ protected class Ensemble(val orchestrationID: String, val clazz = loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName) val autoScale = if(performerInfo.autoScale > 0) true else false + val dispatcher = if(performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else "" val actorProps = Props(clazz, performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, autoScale) - if(performerInfo.controlAware){ + // dispatcher has higher priority than controlAware. That means that if both are defined + // then the custom dispatcher will be used + if(dispatcher != ""){ + log.info(s"Using dispatcher: $dispatcher") + actorProps.withDispatcher(dispatcher) + } + else if(performerInfo.controlAware){ actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX) }else{ actorProps } - } /** @@ -282,7 +288,9 @@ object Ensemble { val params:Map[String,String] = getMapOfParams((performer \ SOURCE \ SOURCE_PARAMS).as[JsObject]) val controlAware:Boolean = if (performer.keys.contains(CONTROL_AWARE)) (performer \ CONTROL_AWARE).as[Boolean] else false val location: String = if ( (performer \ SOURCE).as[JsObject].keys.contains(JAR_LOCATION) ) CONFIG.DYNAMIC_JAR_REPO else CONFIG.JAR_REPOSITORY - (id, new Performer(id, jarName, classPath,params,schedule.millisecond,backoff.millisecond, autoScale,controlAware, location)) + val dispatcher: String = if (performer.keys.contains(PERFORMER_DISPATCHER)) (performer \ PERFORMER_DISPATCHER).as[String] else "" + + (id, new Performer(id, jarName, classPath, params, schedule.millisecond, backoff.millisecond, autoScale,controlAware, location, dispatcher)) }).toMap } @@ -314,8 +322,12 @@ object Ensemble { * @param parameters performer params * @param schedule performer schedule interval * @param backoff performer backoff interval + * @param autoScale if actor was started as a router and can autoscala + * @param controlAware if the actor uses a controlAware mailbox + * @param jarLocation download jar + * @param dispatcher Akka dispatcher that the actor is using */ case class Performer(uid: String, jarName: String, classPath: String, parameters: Map[String,String], schedule: FiniteDuration, backoff: FiniteDuration, - autoScale: Int, controlAware: Boolean, jarLocation: String) + autoScale: Int, controlAware: Boolean, jarLocation: String, dispatcher: String) diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala index bb0f4d5..8a0f36d 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala @@ -25,7 +25,7 @@ import ch.qos.logback.classic.{Level, Logger, LoggerContext} import ch.qos.logback.core.joran.spi.JoranException import ch.qos.logback.core.util.StatusPrinter import com.eclipsesource.schema.SchemaType -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{Config, ConfigFactory, ConfigValue} import org.slf4j.LoggerFactory import play.api.libs.json.{JsObject, JsValue, Json} @@ -195,6 +195,7 @@ object JSON_PATH{ val JAR_CREDENTIALS_URL = "credentials" val JAR_CRED_USER = "user" val JAR_CRED_PASSWORD = "password" + val PERFORMER_DISPATCHER = "dispatcher" } object CONFIG{ @@ -215,6 +216,7 @@ object CONFIG{ var MESSAGES_PER_RESIZE:Int = 500 var DYNAMIC_JAR_REPO = "" var DYNAMIC_JAR_FORCE_PULL = false + var CUSTOM_DISPATCHERS: ConfigValue = null def loadUserConfiguration(path: String) : Unit = { @@ -238,10 +240,16 @@ object CONFIG{ MESSAGES_PER_RESIZE = app.getInt("auto-scale.messages-per-resize") DYNAMIC_JAR_REPO = app.getString("dynamic-jar-population.downloaded-repository") DYNAMIC_JAR_FORCE_PULL = app.getBoolean("dynamic-jar-population.force-pull") + CUSTOM_DISPATCHERS = app.getValue("custom-dispatchers") setLogbackConfiguration() } + def getDispatcherForAkka():Config = { + val config = ConfigFactory.parseString("") + config.withValue("fey-custom-dispatchers", CUSTOM_DISPATCHERS) + } + /** * Resets logback context configuration and loads the new one */ From adf71bb1d84f6397a315dfc7b042b351201a47ae Mon Sep 17 00:00:00 2001 From: Barbara Gomes Date: Fri, 29 Jul 2016 12:35:23 -0700 Subject: [PATCH 07/10] New feature: Implement simple monitoring: Simple monitoring keeps track only of the latest event for each actor - Reason: Less memory usage, specially when using routers --- fey-core/src/main/resources/application.conf | 11 +++++ .../scala/org/apache/iota/fey/Monitor.scala | 48 ++++++++++++++++++- .../scala/org/apache/iota/fey/MyService.scala | 10 +++- .../scala/org/apache/iota/fey/Utils.scala | 26 +++++----- 4 files changed, 81 insertions(+), 14 deletions(-) diff --git a/fey-core/src/main/resources/application.conf b/fey-core/src/main/resources/application.conf index a1193af..a80f13e 100644 --- a/fey-core/src/main/resources/application.conf +++ b/fey-core/src/main/resources/application.conf @@ -83,6 +83,17 @@ fey-global-configuration{ // No default custom implementations } + // Configure monitoring options. If enabled the actors events will be stored + // together with other information, and the user should be able to visualize + // using the rest-api. + // Types: + // COMPLETE: Keeps track of all the events for all of the actors. Backed by a Trie data structure + // SIMPLE: Keeps track only of the latest event for each actor. Backed by HashMap + monitoring{ + enable = true, + type = "COMPLETE" + } + } // Fey akka configuration. Can not be overwritten by user diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala b/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala index 2de05b9..ded1c9f 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala @@ -32,12 +32,22 @@ protected class Monitor(eventsStore: Trie) extends Actor { val log: DiagnosticLoggingAdapter = Logging(this) log.mdc(Map("fileName" -> "monitor_events")) + + override def preStart(): Unit = { + if(CONFIG.MONITORING_ENABLED) { + if (CONFIG.MONITORING_TYPE == "SIMPLE") { + context.become(simple) + } else { + context.become(complete) + } + } + } + override def postStop(): Unit = { log.clearMDC() } - override def receive: Receive = { - + private def complete: Receive = { case START(timestamp, info) => logInfo(sender().path.toString, EVENTS.START, timestamp, info) eventsStore.append(sender().path.toString,MonitorEvent(EVENTS.START, timestamp, info)) @@ -53,7 +63,28 @@ protected class Monitor(eventsStore: Trie) extends Actor { case TERMINATE(actorPath, timestamp, info) => logInfo(actorPath, EVENTS.TERMINATE, timestamp, info) eventsStore.append(actorPath,MonitorEvent(EVENTS.TERMINATE, timestamp, info)) + } + + private def simple: Receive = { + case START(timestamp, info) => + logInfo(sender().path.toString, EVENTS.START, timestamp, info) + Monitor.simpleEvents.put(sender().path.toString, ('S',timestamp)) + case STOP(timestamp, info) => + logInfo(sender().path.toString, EVENTS.STOP, timestamp, info) + Monitor.simpleEvents.put(sender().path.toString, ('O',timestamp)) + + case RESTART(reason, timestamp) => + logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", reason) + Monitor.simpleEvents.put(sender().path.toString, ('R',timestamp)) + + case TERMINATE(actorPath, timestamp, info) => + logInfo(actorPath, EVENTS.TERMINATE, timestamp, info) + Monitor.simpleEvents.put(actorPath, ('T',timestamp)) + } + + override def receive: Receive = { + case _ => } def logInfo(path:String, event:String, timestamp: Long, info:String, reason:Throwable = null) = { @@ -80,6 +111,7 @@ protected object Monitor{ * Contains the lifecycle events for actors in Fey */ val events: Trie = new Trie("FEY-MANAGEMENT-SYSTEM") + val simpleEvents:scala.collection.mutable.HashMap[String,(Char, Long)] = scala.collection.mutable.HashMap.empty //Static HTML content from d3 val html = scala.io.Source.fromInputStream(getClass.getResourceAsStream("/eventsTable.html"), "UTF-8") @@ -100,6 +132,18 @@ protected object Monitor{ }).flatten } + def getSimpleHTMLEvents: String = { + val content = simpleEvents.map(event => { + event._2._1 match { + case 'S' => getTableLine(event._1, event._2._2, "START", "") + case 'O' => getTableLine(event._1, event._2._2, "STOP", "") + case 'R' => getTableLine(event._1, event._2._2, "RESTART", "") + case 'T' => getTableLine(event._1, event._2._2, "TERMINATE", "") + } + }).mkString("\n") + html.replace("$EVENTS_TABLE_CONTENT", content) + } + private def getTableLine(path: String,timestamp: Long, event: String, info: String):String = { s"$path$event$info$timestamp" } diff --git a/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala b/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala index 0935c5b..f686321 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala @@ -70,7 +70,15 @@ sealed trait MyService extends HttpService { get{ respondWithMediaType(`text/html`) { complete { - Monitor.getHTMLevents + try { + if(CONFIG.MONITORING_TYPE == "COMPLETE") { + Monitor.getHTMLevents + }else{ + Monitor.getSimpleHTMLEvents + } + }catch { + case e: Exception => "" + } } } } diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala index 8a0f36d..095785b 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala @@ -217,6 +217,8 @@ object CONFIG{ var DYNAMIC_JAR_REPO = "" var DYNAMIC_JAR_FORCE_PULL = false var CUSTOM_DISPATCHERS: ConfigValue = null + var MONITORING_ENABLED: Boolean = true + var MONITORING_TYPE: String = "COMPLETE" def loadUserConfiguration(path: String) : Unit = { @@ -230,17 +232,19 @@ object CONFIG{ } }.getConfig("fey-global-configuration").resolve() - CHECKPOINT_DIR = app.getString("checkpoint-directory") - JSON_REPOSITORY = app.getString("json-repository") - JSON_EXTENSION = app.getString("json-extension") - JAR_REPOSITORY = app.getString("jar-repository") - CHEKPOINT_ENABLED = app.getBoolean("enable-checkpoint") - LOG_LEVEL = app.getString("log-level").toUpperCase() - LOG_APPENDER = app.getString("log-appender").toUpperCase() - MESSAGES_PER_RESIZE = app.getInt("auto-scale.messages-per-resize") - DYNAMIC_JAR_REPO = app.getString("dynamic-jar-population.downloaded-repository") - DYNAMIC_JAR_FORCE_PULL = app.getBoolean("dynamic-jar-population.force-pull") - CUSTOM_DISPATCHERS = app.getValue("custom-dispatchers") + CHECKPOINT_DIR = app.getString("checkpoint-directory") + JSON_REPOSITORY = app.getString("json-repository") + JSON_EXTENSION = app.getString("json-extension") + JAR_REPOSITORY = app.getString("jar-repository") + CHEKPOINT_ENABLED = app.getBoolean("enable-checkpoint") + LOG_LEVEL = app.getString("log-level").toUpperCase() + LOG_APPENDER = app.getString("log-appender").toUpperCase() + MESSAGES_PER_RESIZE = app.getInt("auto-scale.messages-per-resize") + DYNAMIC_JAR_REPO = app.getString("dynamic-jar-population.downloaded-repository") + DYNAMIC_JAR_FORCE_PULL = app.getBoolean("dynamic-jar-population.force-pull") + CUSTOM_DISPATCHERS = app.getValue("custom-dispatchers") + MONITORING_ENABLED = app.getBoolean("monitoring.enable") + MONITORING_TYPE = app.getString("monitoring.type").toUpperCase() setLogbackConfiguration() } From 5e0ccb890bbc7cfa26e327eeb46bc19a4559abc2 Mon Sep 17 00:00:00 2001 From: Barbara Gomes Date: Fri, 29 Jul 2016 13:10:49 -0700 Subject: [PATCH 08/10] Saving orchestration name on checkpoint --- .../src/main/scala/org/apache/iota/fey/Orchestration.scala | 4 ++++ fey-core/src/main/scala/org/apache/iota/fey/Utils.scala | 5 +++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala index dc2e64d..d247279 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala @@ -104,6 +104,7 @@ protected class Orchestration(val name: String, private def replayOrchestrationState() = { val ensemblesSpec = ORCHESTRATION_CACHE.orchestration_metadata.get(guid).get.map(_._2).toList ORCHESTRATION_CACHE.orchestration_metadata.remove(guid) + ORCHESTRATION_CACHE.orchestration_name.remove(guid) self ! CREATE_ENSEMBLES(ensemblesSpec) } @@ -155,6 +156,7 @@ protected class Orchestration(val name: String, case None => case Some(ensembles) => ORCHESTRATION_CACHE.orchestration_metadata.put(guid, (ensembles -- ids)) + ORCHESTRATION_CACHE.orchestration_name.put(guid, name) } Utils.updateOrchestrationState(guid) } @@ -187,6 +189,7 @@ protected class Orchestration(val name: String, case Some(cachedEnsemble) => ORCHESTRATION_CACHE.orchestration_metadata.put(guid, cachedEnsemble ++ (newEnsembles.map(ensemble => (ensemble._1, ensemble._3)).toMap)) } + ORCHESTRATION_CACHE.orchestration_name.put(guid, name) Utils.updateOrchestrationState(guid) } @@ -246,4 +249,5 @@ protected object ORCHESTRATION_CACHE{ * Value = Map[Ensemble GUID, JsObject of the ensemble] */ val orchestration_metadata: HashMap[String, Map[String,JsObject]] = HashMap.empty[String, Map[String,JsObject]] + val orchestration_name: HashMap[String, String] = HashMap.empty } \ No newline at end of file diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala index 095785b..d4f3210 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala @@ -137,16 +137,17 @@ protected object Utils { file.delete() } ORCHESTRATION_CACHE.orchestration_metadata.remove(orchestrationID) + ORCHESTRATION_CACHE.orchestration_name.remove(orchestrationID) } case Some(orch) => ORCHESTRATION_CACHE.orchestration_metadata.get(orchestrationID) match { case None => log.warn(s"Could not save state for Orchestration ${orchestrationID}. No metadata defined.") case Some(metadata) => val ensembleJSON = metadata.map(ensenble => ensenble._2) - + val name: String = ORCHESTRATION_CACHE.orchestration_name.getOrElse(orchestrationID, "NOT SAVED") val orchestrationSpec = Json.obj(JSON_PATH.GUID -> orchestrationID, JSON_PATH.COMMAND -> "RECREATE", - JSON_PATH.ORCHESTRATION_NAME -> "I DONT KNOW HOW TO SAVE IT YET =P", + JSON_PATH.ORCHESTRATION_NAME -> name, JSON_PATH.ORCHESTRATION_TIMESTAMP -> System.currentTimeMillis.toString, JSON_PATH.ENSEMBLES -> ensembleJSON ) From 9d1a4e1dedf93af8c901d788b5d109909f0877d7 Mon Sep 17 00:00:00 2001 From: Barbara Gomes Date: Fri, 19 Aug 2016 15:33:49 -0700 Subject: [PATCH 09/10] Orchestration should not be deleted from checkpoint in case of failure, just when it is explicitly asked to be deleted --- .../main/scala/org/apache/iota/fey/FeyCore.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala index 8541ee4..3018d03 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala @@ -63,7 +63,9 @@ protected class FeyCore extends Actor with ActorLogging{ case STOP_EMPTY_ORCHESTRATION(orchID) => log.warning(s"Deleting Empty Orchestration $orchID") - deleteOrchestration(orchID) + /* In most of the cases, this message will represent an orchestration that failed + * In this case, we don't want to remove it from the checkpoint dir */ + deleteOrchestration(orchID, false) case Terminated(actor) => processTerminatedMessage(actor) @@ -160,7 +162,7 @@ protected class FeyCore extends Actor with ActorLogging{ case "RECREATE" => recreateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp) case "CREATE" => createOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp) case "UPDATE" => updateOrchestration(ensembles, orchestrationID, orchestrationName, orchestrationTimestamp) - case "DELETE" => deleteOrchestration(orchestrationID) + case "DELETE" => deleteOrchestration(orchestrationID,true) case x => throw new CommandNotRecognized(s"Command: $x") } } @@ -186,7 +188,7 @@ protected class FeyCore extends Actor with ActorLogging{ if(orchestration._1 != orchestrationTimestamp){ val orchestrationInfo = new OrchestrationInformation(ensemblesSpecJson,orchestrationID,orchestrationName,orchestrationTimestamp) FEY_CACHE.orchestrationsAwaitingTermination.put(orchestrationID, orchestrationInfo) - deleteOrchestration(orchestrationID) + deleteOrchestration(orchestrationID, true) }else{ log.warning(s"Orchestration ${orchestrationID} not recreated. Timestamp did not change.") } @@ -254,13 +256,15 @@ protected class FeyCore extends Actor with ActorLogging{ * @param orchestrationID * @return */ - private def deleteOrchestration(orchestrationID: String) = { + private def deleteOrchestration(orchestrationID: String, updateCheckpoint: Boolean) = { try{ FEY_CACHE.activeOrchestrations.get(orchestrationID) match { case Some(orchestration) => orchestration._2 ! PoisonPill FEY_CACHE.activeOrchestrations.remove(orchestrationID) - updateOrchestrationState(orchestrationID,true) + if(updateCheckpoint) { + updateOrchestrationState(orchestrationID, true) + } case None => log.warning(s"No active Orchestration $orchestrationID to be deleted") } From 9b619625d99fddc1593afa3278be735f681752e8 Mon Sep 17 00:00:00 2001 From: Barbara Gomes Date: Wed, 24 Aug 2016 16:07:51 -0700 Subject: [PATCH 10/10] SiftingAppender can not be stopped. Rolling to back to log to just one file for now. --- fey-core/src/main/resources/logback.xml | 45 +++++++------------ .../scala/org/apache/iota/fey/Monitor.scala | 10 ++--- 2 files changed, 21 insertions(+), 34 deletions(-) diff --git a/fey-core/src/main/resources/logback.xml b/fey-core/src/main/resources/logback.xml index 396832f..542ab89 100644 --- a/fey-core/src/main/resources/logback.xml +++ b/fey-core/src/main/resources/logback.xml @@ -28,36 +28,23 @@ - - - - - fileName - fey_core - - - - - - true - ${LOG_HOME}/${fileName}.log - - - ${LOG_HOME}/${fileName}-%d{yyyy-MM-dd}.%i.log - 1MB - 30 - 1GB - - - - UTF-8 - [%p] [%d{yy/MM/dd HH:mm:ss}] %c [%X{akkaSource}] : %msg%n - - - - + + + true + ${LOG_HOME}/fey_core.log + + + ${LOG_HOME}/fey_core-%d{yyyy-MM-dd}.%i.log + 1MB + 30 + 1GB + + + UTF-8 + [%p] [%d{yy/MM/dd HH:mm:ss}] %c [%X{akkaSource}] : %msg%n + diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala b/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala index ded1c9f..2100cbd 100644 --- a/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala +++ b/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala @@ -17,7 +17,7 @@ package org.apache.iota.fey -import akka.actor.Actor +import akka.actor.{Actor, ActorLogging} import akka.event.{DiagnosticLoggingAdapter, Logging} import scala.collection.mutable.ArrayBuffer @@ -25,12 +25,12 @@ import scala.collection.mutable.ArrayBuffer /** * Created by barbaragomes on 7/8/16. */ -protected class Monitor(eventsStore: Trie) extends Actor { +protected class Monitor(eventsStore: Trie) extends Actor with ActorLogging { import Monitor._ - val log: DiagnosticLoggingAdapter = Logging(this) - log.mdc(Map("fileName" -> "monitor_events")) + //val log: DiagnosticLoggingAdapter = Logging(this) + //log.mdc(Map("fileName" -> "monitor_events")) override def preStart(): Unit = { @@ -44,7 +44,7 @@ protected class Monitor(eventsStore: Trie) extends Actor { } override def postStop(): Unit = { - log.clearMDC() + //log.clearMDC() } private def complete: Receive = {