diff --git a/gatling-core/src/main/scala/io/gatling/core/controller/inject/Injector.scala b/gatling-core/src/main/scala/io/gatling/core/controller/inject/Injector.scala index 4dc8d8f775..f024cd9677 100644 --- a/gatling-core/src/main/scala/io/gatling/core/controller/inject/Injector.scala +++ b/gatling-core/src/main/scala/io/gatling/core/controller/inject/Injector.scala @@ -84,6 +84,8 @@ private[inject] case class Injection(count: Long, continue: Boolean) { object Injector { val InjectorActorName = "gatling-injector" + val TickPeriod = 1 second + val InitialBatchWindow = TickPeriod * 2 def apply(system: ActorSystem, controller: ActorRef, statsEngine: StatsEngine, scenarios: List[Scenario]): ActorRef = { val userStreams: Map[String, UserStream] = scenarios.map(scenario => scenario.name -> UserStream(scenario, new PushbackIterator(scenario.injectionProfile.allUsers)))(breakOut) @@ -93,13 +95,11 @@ object Injector { private[inject] class Injector(controller: ActorRef, statsEngine: StatsEngine, defaultStreams: Map[String, UserStream]) extends InjectorFSM { + import Injector._ import InjectorState._ import InjectorData._ import InjectorCommand._ - private val tickPeriod = 1 second - private val initialBatchWindow = tickPeriod * 2 - val userIdGen = new LongCounter private def inject(streams: Map[String, UserStream], batchWindow: FiniteDuration, startMillis: Long, count: Long, timer: Cancellable): State = { @@ -147,12 +147,12 @@ private[inject] class Injector(controller: ActorRef, statsEngine: StatsEngine, d when(WaitingToStart) { case Event(Start, NoData) => - val timer = system.scheduler.schedule(initialBatchWindow, tickPeriod, self, Tick) - inject(defaultStreams, initialBatchWindow, nowMillis, 0, timer) + val timer = system.scheduler.schedule(InitialBatchWindow, TickPeriod, self, Tick) + inject(defaultStreams, InitialBatchWindow, nowMillis, 0, timer) } when(Started) { case Event(Tick, StartedData(startMillis, count, timer)) => - inject(defaultStreams, tickPeriod, startMillis, count, timer) + inject(defaultStreams, TickPeriod, startMillis, count, timer) } } diff --git a/gatling-core/src/test/scala/io/gatling/core/controller/inject/UserStreamSpec.scala b/gatling-core/src/test/scala/io/gatling/core/controller/inject/UserStreamSpec.scala index 502333276e..5d2139a8be 100644 --- a/gatling-core/src/test/scala/io/gatling/core/controller/inject/UserStreamSpec.scala +++ b/gatling-core/src/test/scala/io/gatling/core/controller/inject/UserStreamSpec.scala @@ -18,38 +18,93 @@ package io.gatling.core.controller.inject import scala.concurrent.duration._ import io.gatling.BaseSpec +import io.gatling.core.controller.inject.Injector._ import io.gatling.commons.util.PushbackIterator import io.gatling.commons.util.ClockSingleton._ import io.gatling.core.scenario.Scenario -import io.gatling.core.util.Shard - -import com.softwaremill.quicklens._ class UserStreamSpec extends BaseSpec { - "UserStream" should "continue injecting after first batch" in { + "UserStream" should "stream users properly over a long period" in { + + val scenario = Scenario("scenario", null, null, null, null, null) + val ramp = RampInjection(9000, 9 hours) + val injectionProfile = InjectionProfile(List(ramp)) + + val startTime = nowMillis + + val userStream = UserStream(scenario, new PushbackIterator(injectionProfile.allUsers)) + + var count = 0 + var cont = true + + var lastBatchTimeSinceStart = 0 + var lastBatchMaxOffset = Duration.Zero + + while (cont) { + // batches are scheduled every 1 second + lastBatchTimeSinceStart = count * (TickPeriod.toMillis.toInt + 5) // 5 ms scheduler drift on each iteration + + val injection = userStream.withStream(InitialBatchWindow, lastBatchTimeSinceStart + startTime, startTime) { + case (_, duration) => + // calls are sorted + lastBatchMaxOffset = duration + } + + count += 1 + cont = injection.continue + } + + val lastSchedulingOffset = lastBatchMaxOffset + lastBatchTimeSinceStart.millis + lastSchedulingOffset.toMillis shouldBe (9 hours).toMillis +- (4 seconds).toMillis + } + + it should "continue injecting after first batch" in { val scenario = Scenario("scenario", null, null, null, null, null) val ramp = RampInjection(144000000 / 30, 18000 seconds) - val initialBatchWindow = 2 seconds - val nodeCount = 30 + val injectionProfile = InjectionProfile(List(ramp)) val startTime = nowMillis - for { - nodeId <- 0 until nodeCount - shardCount = Shard.shard(_: Int, nodeId, nodeCount).length - shardedRamp = ramp.modify(_.users).using(shardCount) - injectionProfile = InjectionProfile(List(shardedRamp)) - timeSinceStart <- -1001 until 2000 - } { + for (timeSinceStart <- -1001 until 2000) { val userStream = UserStream(scenario, new PushbackIterator(injectionProfile.allUsers)) - val injection = userStream.withStream(initialBatchWindow, timeSinceStart + startTime, startTime) { - case (scn, duration) => // nothing + val injection = userStream.withStream(InitialBatchWindow, timeSinceStart + startTime, startTime) { + case _ => // nothing } injection.continue shouldBe true } } + + // [fl] + // + // + // + // + // + // + // + // + // + // + // + // + // + // + // + // + // + // + // + // + // + // + // + // + // + // + // + // [fl] }