Skip to content

Commit

Permalink
Minor clean up of the JMS module
Browse files Browse the repository at this point in the history
  • Loading branch information
slandelle committed May 5, 2017
1 parent 636445c commit ac28feb
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 31 deletions.
8 changes: 4 additions & 4 deletions gatling-core/src/main/scala/io/gatling/core/check/Check.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import io.gatling.core.session.{ Expression, Session }

object Check {

def check[R](response: R, session: Session, checks: List[Check[R]])(implicit cache: mutable.Map[Any, Any] = mutable.Map.empty[Any, Any]): (Session => Session, Option[Failure]) = {
def check[R](response: R, session: Session, checks: List[Check[R]])(implicit preparedCache: mutable.Map[Any, Any] = mutable.Map.empty[Any, Any]): (Session => Session, Option[Failure]) = {

@tailrec
def checkRec(session: Session, checks: List[Check[R]], update: Session => Session, failure: Option[Failure]): (Session => Session, Option[Failure]) =
Expand Down Expand Up @@ -61,7 +61,7 @@ object Check {

trait Check[R] {

def check(response: R, session: Session)(implicit cache: mutable.Map[Any, Any]): Validation[CheckResult]
def check(response: R, session: Session)(implicit preparedCache: mutable.Map[Any, Any]): Validation[CheckResult]
}

case class CheckBase[R, P, X](
Expand All @@ -71,9 +71,9 @@ case class CheckBase[R, P, X](
saveAs: Option[String]
) extends Check[R] {

def check(response: R, session: Session)(implicit cache: mutable.Map[Any, Any]): Validation[CheckResult] = {
def check(response: R, session: Session)(implicit preparedCache: mutable.Map[Any, Any]): Validation[CheckResult] = {

def memoizedPrepared: Validation[P] = cache
def memoizedPrepared: Validation[P] = preparedCache
.getOrElseUpdate(preparer, preparer(response))
.asInstanceOf[Validation[P]]

Expand Down
2 changes: 1 addition & 1 deletion gatling-jms/src/main/scala/io/gatling/jms/JmsDsl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ trait JmsDsl extends JmsCheckSupport {
* DSL text to start the jms builder
*
* @param requestName human readable name of request
* @return a PingBuilder instance which can be used to build up a ping
* @return a JmsDslBuilderBase instance which can be used to build up a JMS action
*/
def jms(requestName: Expression[String]) = JmsDslBuilderBase(requestName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,11 @@ class JmsXPathProvider(xmlParsers: XmlParsers) extends CheckProtocolProvider[XPa

private val ErrorMapper = "Could not parse response into a DOM Document: " + _

private def xpathPreparer[T](f: InputSource => T)(message: Message): Validation[Option[T]] =
safely(ErrorMapper) {
override val preparer: Preparer[Message, Option[Dom]] =
message => safely(ErrorMapper) {
message match {
case tm: TextMessage => Some(f(new InputSource(new StringReader(tm.getText)))).success
case tm: TextMessage => Some(xmlParsers.parse(new InputSource(new StringReader(tm.getText)))).success
case _ => "Unsupported message type".failure
}
}

override val preparer: Preparer[Message, Option[Dom]] =
xpathPreparer(xmlParsers.parse)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package io.gatling.jms.client
import io.gatling.core.action.Action
import io.gatling.core.session.Session
import io.gatling.jms._
import io.gatling.jms.action.MessageSent

import akka.actor.ActorRef

Expand Down Expand Up @@ -47,7 +46,7 @@ class JmsTracker(listenerThread: ListenerThread, actor: ActorRef) {
}

class ListenerThread(runnable: Runnable) extends Thread(runnable) {
def close() = {
def close(): Unit = {
interrupt()
join(1000)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import io.gatling.commons.util.ClockSingleton._
import io.gatling.core.config.GatlingConfiguration
import io.gatling.core.stats.StatsEngine
import io.gatling.core.util.NameGen
import io.gatling.jms.action.{ JmsLogging, MessageReceived, Tracker }
import io.gatling.jms.action.JmsLogging
import io.gatling.jms.protocol.JmsMessageMatcher

import akka.actor.ActorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.gatling.jms.action
package io.gatling.jms.client

import javax.jms.Message

Expand Down Expand Up @@ -59,18 +59,18 @@ case class MessageReceived(
case object TimeoutScan

object Tracker {
def props(statsEngine: StatsEngine, configuration: GatlingConfiguration) = Props(new Tracker(statsEngine, configuration))
def props(statsEngine: StatsEngine, configuration: GatlingConfiguration) =
Props(new Tracker(statsEngine, configuration.jms.replyTimeoutScanPeriod milliseconds))
}

/**
* Bookkeeping actor to correlate request and response JMS messages
* Once a message is correlated, it publishes to the Gatling core DataWriter
*/
class Tracker(statsEngine: StatsEngine, configuration: GatlingConfiguration) extends BaseActor {
class Tracker(statsEngine: StatsEngine, replyTimeoutScanPeriod: FiniteDuration) extends BaseActor {

private val sentMessages = mutable.HashMap.empty[String, MessageSent]
private val timedOutMessages = mutable.ArrayBuffer.empty[MessageSent]
private val replyTimeoutScanPeriod = configuration.jms.replyTimeoutScanPeriod milliseconds
private var periodicTimeoutScanTriggered = false

def triggerPeriodicTimeoutScan(): Unit =
Expand All @@ -81,7 +81,7 @@ class Tracker(statsEngine: StatsEngine, configuration: GatlingConfiguration) ext
}
}

def receive = {
override def receive: Receive = {
// message was sent; add the timestamps to the map
case messageSent: MessageSent =>
sentMessages += messageSent.matchId -> messageSent
Expand All @@ -106,7 +106,7 @@ class Tracker(statsEngine: StatsEngine, configuration: GatlingConfiguration) ext
}
}

for (MessageSent(matchId, sent, receivedTimeout, checks, session, next, requestName) <- timedOutMessages) {
for (MessageSent(matchId, sent, receivedTimeout, _, session, next, requestName) <- timedOutMessages) {
sentMessages.remove(matchId)
executeNext(session.markAsFailed, sent, now, KO, next, requestName, Some(s"Reply timeout after $receivedTimeout ms"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ import io.gatling.jms.client.JmsConnectionPool

case class JmsComponents(jmsProtocol: JmsProtocol, jmsConnectionPool: JmsConnectionPool) extends ProtocolComponents {

def onStart: Option[Session => Session] = None
def onExit: Option[Session => Unit] = None
override def onStart: Option[Session => Session] = None
override def onExit: Option[Session => Unit] = None
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ case class JmsProtocolBuilder(
def receiveTimeout(timeout: Long): JmsProtocolBuilder = this
def replyTimeout(timeout: Long): JmsProtocolBuilder = copy(replyTimeout = Some(timeout))

def build = new JmsProtocol(
def build = JmsProtocol(
credentials = creds,
deliveryMode = deliveryMode,
messageMatcher = messageMatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import io.gatling.core.session.{ Expression, ExpressionSuccessWrapper }
import io.gatling.jms.JmsCheck
import io.gatling.jms.action.{ RequestReplyBuilder, SendBuilder }

import com.softwaremill.quicklens.ModifyPimp
import com.softwaremill.quicklens._

case class JmsDslBuilderBase(requestName: Expression[String]) {

Expand Down Expand Up @@ -119,7 +119,7 @@ case class RequestReplyDslBuilder(attributes: JmsAttributes, factory: JmsAttribu
def jmsType(jmsType: Expression[String]) = this.modify(_.attributes.jmsType).setTo(Some(jmsType))

/**
* Add a check that will be perfomed on each received JMS response message before giving Gatling on OK/KO response
* Add a check that will be performed on each received JMS response message before giving Gatling on OK/KO response
*/
def check(checks: JmsCheck*) = this.modify(_.attributes.checks).using(_ ::: checks.toList)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package io.gatling.jms

import javax.jms._

import scala.concurrent.duration.DurationInt
import scala.concurrent.duration._

import io.gatling.core.Predef._
import io.gatling.jms.Predef._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import io.gatling.core.stats.message.ResponseTimings
import io.gatling.core.stats.writer.ResponseMessage
import io.gatling.jms._
import io.gatling.jms.check.JmsSimpleCheck
import io.gatling.jms.client.{ MessageReceived, MessageSent, Tracker }

import akka.testkit.TestActorRef

class TrackerSpec extends AkkaSpec with CoreDsl with JmsDsl with MockMessage {

val configuration = GatlingConfiguration.loadForTest()

def ignoreDrift(actual: Session) = {
private def ignoreDrift(actual: Session) = {
actual.drift shouldBe >(0L)
actual.setDrift(0)
}
Expand All @@ -41,7 +42,7 @@ class TrackerSpec extends AkkaSpec with CoreDsl with JmsDsl with MockMessage {

"JmsRequestTrackerActor" should "pass to next to next actor when matching message is received" in {
val statsEngine = new MockStatsEngine
val tracker = TestActorRef(new Tracker(statsEngine, configuration))
val tracker = TestActorRef(Tracker.props(statsEngine, configuration))

tracker ! MessageSent("1", 15, 0, Nil, session, new ActorDelegatingAction("next", testActor), "success")
tracker ! MessageReceived("1", 30, textMessage("test"))
Expand All @@ -56,7 +57,7 @@ class TrackerSpec extends AkkaSpec with CoreDsl with JmsDsl with MockMessage {
it should "pass KO to next actor when check fails" in {
val failedCheck = JmsSimpleCheck(_ => false)
val statsEngine = new MockStatsEngine
val tracker = TestActorRef(new Tracker(statsEngine, configuration))
val tracker = TestActorRef(Tracker.props(statsEngine, configuration))

tracker ! MessageSent("1", 15, 0, List(failedCheck), session, new ActorDelegatingAction("next", testActor), "failure")
tracker ! MessageReceived("1", 30, textMessage("test"))
Expand All @@ -71,7 +72,7 @@ class TrackerSpec extends AkkaSpec with CoreDsl with JmsDsl with MockMessage {
it should "pass updated session to next actor if modified by checks" in {
val check: JmsCheck = xpath("/id").saveAs("id")
val statsEngine = new MockStatsEngine
val tracker = TestActorRef(new Tracker(statsEngine, configuration))
val tracker = TestActorRef(Tracker.props(statsEngine, configuration))

tracker ! MessageSent("1", 15, 0, List(check), session, new ActorDelegatingAction("next", testActor), "updated")
tracker ! MessageReceived("1", 30, textMessage("<id>5</id>"))
Expand All @@ -85,7 +86,7 @@ class TrackerSpec extends AkkaSpec with CoreDsl with JmsDsl with MockMessage {

it should "pass information to session about response time in case group are used" in {
val statsEngine = new MockStatsEngine
val tracker = TestActorRef(new Tracker(statsEngine, configuration))
val tracker = TestActorRef(Tracker.props(statsEngine, configuration))

val groupSession = session.enterGroup("group")
tracker ! MessageSent("1", 15, 0, Nil, groupSession, new ActorDelegatingAction("next", testActor), "logGroupResponse")
Expand Down

0 comments on commit ac28feb

Please sign in to comment.