Skip to content

Commit

Permalink
Add test checking scheduling offset
Browse files Browse the repository at this point in the history
  • Loading branch information
slandelle committed Oct 9, 2017
1 parent 6319735 commit 8173e90
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 21 deletions.
Expand Up @@ -84,6 +84,8 @@ private[inject] case class Injection(count: Long, continue: Boolean) {
object Injector {

val InjectorActorName = "gatling-injector"
val TickPeriod = 1 second
val InitialBatchWindow = TickPeriod * 2

def apply(system: ActorSystem, controller: ActorRef, statsEngine: StatsEngine, scenarios: List[Scenario]): ActorRef = {
val userStreams: Map[String, UserStream] = scenarios.map(scenario => scenario.name -> UserStream(scenario, new PushbackIterator(scenario.injectionProfile.allUsers)))(breakOut)
Expand All @@ -93,13 +95,11 @@ object Injector {

private[inject] class Injector(controller: ActorRef, statsEngine: StatsEngine, defaultStreams: Map[String, UserStream]) extends InjectorFSM {

import Injector._
import InjectorState._
import InjectorData._
import InjectorCommand._

private val tickPeriod = 1 second
private val initialBatchWindow = tickPeriod * 2

val userIdGen = new LongCounter

private def inject(streams: Map[String, UserStream], batchWindow: FiniteDuration, startMillis: Long, count: Long, timer: Cancellable): State = {
Expand Down Expand Up @@ -147,12 +147,12 @@ private[inject] class Injector(controller: ActorRef, statsEngine: StatsEngine, d

when(WaitingToStart) {
case Event(Start, NoData) =>
val timer = system.scheduler.schedule(initialBatchWindow, tickPeriod, self, Tick)
inject(defaultStreams, initialBatchWindow, nowMillis, 0, timer)
val timer = system.scheduler.schedule(InitialBatchWindow, TickPeriod, self, Tick)
inject(defaultStreams, InitialBatchWindow, nowMillis, 0, timer)
}

when(Started) {
case Event(Tick, StartedData(startMillis, count, timer)) =>
inject(defaultStreams, tickPeriod, startMillis, count, timer)
inject(defaultStreams, TickPeriod, startMillis, count, timer)
}
}
Expand Up @@ -18,38 +18,93 @@ package io.gatling.core.controller.inject
import scala.concurrent.duration._

import io.gatling.BaseSpec
import io.gatling.core.controller.inject.Injector._
import io.gatling.commons.util.PushbackIterator
import io.gatling.commons.util.ClockSingleton._
import io.gatling.core.scenario.Scenario
import io.gatling.core.util.Shard

import com.softwaremill.quicklens._

class UserStreamSpec extends BaseSpec {

"UserStream" should "continue injecting after first batch" in {
"UserStream" should "stream users properly over a long period" in {

val scenario = Scenario("scenario", null, null, null, null, null)
val ramp = RampInjection(9000, 9 hours)
val injectionProfile = InjectionProfile(List(ramp))

val startTime = nowMillis

val userStream = UserStream(scenario, new PushbackIterator(injectionProfile.allUsers))

var count = 0
var cont = true

var lastBatchTimeSinceStart = 0
var lastBatchMaxOffset = Duration.Zero

while (cont) {
// batches are scheduled every 1 second
lastBatchTimeSinceStart = count * (TickPeriod.toMillis.toInt + 5) // 5 ms scheduler drift on each iteration

val injection = userStream.withStream(InitialBatchWindow, lastBatchTimeSinceStart + startTime, startTime) {
case (_, duration) =>
// calls are sorted
lastBatchMaxOffset = duration
}

count += 1
cont = injection.continue
}

val lastSchedulingOffset = lastBatchMaxOffset + lastBatchTimeSinceStart.millis
lastSchedulingOffset.toMillis shouldBe (9 hours).toMillis +- (4 seconds).toMillis
}

it should "continue injecting after first batch" in {

val scenario = Scenario("scenario", null, null, null, null, null)
val ramp = RampInjection(144000000 / 30, 18000 seconds)
val initialBatchWindow = 2 seconds
val nodeCount = 30
val injectionProfile = InjectionProfile(List(ramp))

val startTime = nowMillis

for {
nodeId <- 0 until nodeCount
shardCount = Shard.shard(_: Int, nodeId, nodeCount).length
shardedRamp = ramp.modify(_.users).using(shardCount)
injectionProfile = InjectionProfile(List(shardedRamp))
timeSinceStart <- -1001 until 2000
} {
for (timeSinceStart <- -1001 until 2000) {
val userStream = UserStream(scenario, new PushbackIterator(injectionProfile.allUsers))

val injection = userStream.withStream(initialBatchWindow, timeSinceStart + startTime, startTime) {
case (scn, duration) => // nothing
val injection = userStream.withStream(InitialBatchWindow, timeSinceStart + startTime, startTime) {
case _ => // nothing
}

injection.continue shouldBe true
}
}

// [fl]
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
//
// [fl]
}

0 comments on commit 8173e90

Please sign in to comment.