Skip to content

Commit

Permalink
Block on System initialization before starting simulation, close #1018,
Browse files Browse the repository at this point in the history
close #1019
  • Loading branch information
Stephane Landelle committed Mar 15, 2013
1 parent 306c84f commit c4e33ff
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 34 deletions.
@@ -0,0 +1,27 @@
/**
* Copyright 2011-2013 eBusiness Information, Groupe Excilys (www.excilys.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.excilys.ebi.gatling.core.action

import scala.concurrent.duration.DurationInt

import com.excilys.ebi.gatling.core.config.GatlingConfiguration.configuration

import akka.util.Timeout

trait AkkaDefaults {

implicit val defaultTimeOut = Timeout(configuration.timeOut.actor seconds)
}
Expand Up @@ -15,18 +15,12 @@
*/
package com.excilys.ebi.gatling.core.action

import scala.concurrent.duration.DurationInt

import com.excilys.ebi.gatling.core.config.GatlingConfiguration.configuration
import com.excilys.ebi.gatling.core.util.ClassSimpleNameToString

import akka.actor.{ Actor, Terminated }
import akka.util.Timeout
import grizzled.slf4j.Logging

trait BaseActor extends Actor with ClassSimpleNameToString with Logging {

implicit val timeout = Timeout(configuration.timeOut.actor seconds)
trait BaseActor extends Actor with AkkaDefaults with ClassSimpleNameToString with Logging {

override def unhandled(message: Any) {
message match {
Expand Down
Expand Up @@ -20,22 +20,22 @@ import java.util.concurrent.CountDownLatch
import scala.concurrent.Future
import scala.util.{ Failure, Success }

import com.excilys.ebi.gatling.core.action.{ BaseActor, system }
import com.excilys.ebi.gatling.core.action.{ AkkaDefaults, BaseActor, system }
import com.excilys.ebi.gatling.core.result.message.Flush

import akka.actor.{ ActorRef, Props }
import akka.pattern.ask

object Terminator {
object Terminator extends AkkaDefaults {

private val terminator = system.actorOf(Props[Terminator])

def init(latch: CountDownLatch, userCount: Int) {
terminator ! Initialize(latch, userCount)
def askInit(latch: CountDownLatch, userCount: Int): Future[Any] = {
terminator ? Initialize(latch, userCount)
}

def registerDataWriter(dataWriter: ActorRef) {
terminator ! RegisterDataWriter(dataWriter)
def askDataWriterRegistration(dataWriter: ActorRef): Future[Any] = {
terminator ? RegisterDataWriter(dataWriter)
}

def endUser {
Expand All @@ -47,7 +47,7 @@ object Terminator {
}
}

class Terminator extends BaseActor {
class Terminator extends BaseActor with AkkaDefaults {

import context._

Expand All @@ -62,14 +62,17 @@ class Terminator extends BaseActor {
def uninitialized: Receive = {

case Initialize(latch, userCount) =>
info("Initializing")
this.latch = latch
this.userCount = userCount
registeredDataWriters = Nil
context.become(initialized)
sender ! true
info("Initialized")
}

def flush {
Future.sequence(registeredDataWriters.map(_.ask(Flush)))
Future.sequence(registeredDataWriters.map(_ ? Flush))
.onComplete {
case Success(_) =>
latch.countDown
Expand All @@ -82,6 +85,8 @@ class Terminator extends BaseActor {

case RegisterDataWriter(dataWriter: ActorRef) =>
registeredDataWriters = dataWriter :: registeredDataWriters
sender ! true
info("DataWriter registered")

case EndUser =>
userCount = userCount - 1
Expand Down
Expand Up @@ -15,38 +15,45 @@
*/
package com.excilys.ebi.gatling.core.result.writer

import scala.collection.immutable
import scala.concurrent.Future

import com.excilys.ebi.gatling.core.action.{ BaseActor, system }
import com.excilys.ebi.gatling.core.action.{ AkkaDefaults, BaseActor, system }
import com.excilys.ebi.gatling.core.action.system.dispatcher
import com.excilys.ebi.gatling.core.config.GatlingConfiguration.configuration
import com.excilys.ebi.gatling.core.result.message.{ Flush, GroupRecord, Init, RequestRecord, RequestStatus, RunRecord, ScenarioRecord, ShortScenarioDescription }
import com.excilys.ebi.gatling.core.result.terminator.Terminator
import com.excilys.ebi.gatling.core.scenario.Scenario
import com.excilys.ebi.gatling.core.util.TimeHelper.nowMillis

import akka.actor.{ Actor, ActorRef, Props }
import akka.routing.BroadcastRouter
import akka.pattern.ask
import grizzled.slf4j.Logging

object DataWriter {
object DataWriter extends AkkaDefaults with Logging {

private val dataWriters: immutable.Iterable[ActorRef] = configuration.data.dataWriterClasses.map { className =>
private val dataWriters: List[ActorRef] = configuration.data.dataWriterClasses.map { className =>
val clazz = Class.forName(className).asInstanceOf[Class[Actor]]
system.actorOf(Props(clazz))
}

private val router = system.actorOf(Props[Actor].withRouter(BroadcastRouter(routees = dataWriters)))
private def tellAll(message: Any) {
dataWriters.foreach(_ ! message)
}

def init(runRecord: RunRecord, scenarios: Seq[Scenario]) {
def askInit(runRecord: RunRecord, scenarios: Seq[Scenario]) = {
val shortScenarioDescriptions = scenarios.map(scenario => ShortScenarioDescription(scenario.name, scenario.configuration.users))
router ! Init(runRecord, shortScenarioDescriptions)

val responses = dataWriters.map(_ ? Init(runRecord, shortScenarioDescriptions))

Future.sequence(responses)
}

def user(scenarioName: String, userId: Int, event: String) {
router ! ScenarioRecord(scenarioName, userId, event, nowMillis)
tellAll(ScenarioRecord(scenarioName, userId, event, nowMillis))
}

def group(scenarioName: String, groupName: String, userId: Int, event: String) {
router ! GroupRecord(scenarioName, groupName, userId, event, nowMillis)
tellAll(GroupRecord(scenarioName, groupName, userId, event, nowMillis))
}

def logRequest(
Expand All @@ -61,7 +68,7 @@ object DataWriter {
requestMessage: Option[String] = None,
extraInfo: List[Any] = Nil) {

router ! RequestRecord(
tellAll(RequestRecord(
scenarioName,
userId,
requestName,
Expand All @@ -71,7 +78,7 @@ object DataWriter {
executionEndDate,
requestResult,
requestMessage,
extraInfo)
extraInfo))
}
}

Expand All @@ -96,9 +103,18 @@ trait DataWriter extends BaseActor {
def uninitialized: Receive = {
case Init(runRecord, scenarios) =>

Terminator.registerDataWriter(self)
onInitializeDataWriter(runRecord, scenarios)
context.become(initialized)
info("Initializing")

val originalSender = sender

Terminator.askDataWriterRegistration(self).onComplete {
case _ =>
info(s"Going on with initialization after Terminator registration")
onInitializeDataWriter(runRecord, scenarios)
context.become(initialized)
originalSender ! true
info("Initialized")
}
}

def initialized: Receive = {
Expand Down
Expand Up @@ -18,9 +18,12 @@ package com.excilys.ebi.gatling.core.runner
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit.SECONDS

import scala.concurrent.Await

import org.joda.time.DateTime.now

import com.excilys.ebi.gatling.core.action.system
import com.excilys.ebi.gatling.core.action.{ AkkaDefaults, system }
import com.excilys.ebi.gatling.core.action.system.dispatcher
import com.excilys.ebi.gatling.core.config.GatlingConfiguration.configuration
import com.excilys.ebi.gatling.core.result.message.RunRecord
import com.excilys.ebi.gatling.core.result.terminator.Terminator
Expand All @@ -29,7 +32,7 @@ import com.excilys.ebi.gatling.core.scenario.configuration.Simulation

import grizzled.slf4j.Logging

class Runner(selection: Selection) extends Logging {
class Runner(selection: Selection) extends AkkaDefaults with Logging {

def run: (String, Simulation) = {

Expand All @@ -50,8 +53,12 @@ class Runner(selection: Selection) extends Logging {
info(s"Total number of users : $totalNumberOfUsers")

val terminatorLatch = new CountDownLatch(1)
Terminator.init(terminatorLatch, totalNumberOfUsers)
DataWriter.init(runRecord, scenarios)

val init = Terminator
.askInit(terminatorLatch, totalNumberOfUsers)
.flatMap(_ => DataWriter.askInit(runRecord, scenarios))

Await.result(init, defaultTimeOut.duration)

debug("Launching All Scenarios")

Expand Down

0 comments on commit c4e33ff

Please sign in to comment.