Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Allow enabling / disabling of some strategies globally or at the Came…

…lContext or Route level

git-svn-id: https://svn.apache.org/repos/asf/servicemix/smx5/trunk@1151448 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
commit 501eadb7b17460b36d755be7e3eed1f8dbe89c36 1 parent 4075df1
@gnodet gnodet authored
View
71 core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
@@ -16,11 +16,10 @@
*/
package org.apache.servicemix.core
-import org.apache.camel.{AsyncCallback, Exchange, Processor, CamelContext}
import org.apache.camel.processor.{DelegateProcessor, DelegateAsyncProcessor}
import org.apache.camel.processor.aggregate.{AggregationStrategy, AggregateProcessor}
-import collection.mutable.HashSet
import collection.Iterable
+import org.apache.camel._
/**
* The ServiceMix bread crumb strategy adds a header to the message to ensure we can follow the message throughout
@@ -28,11 +27,11 @@ import collection.Iterable
*/
class Breadcrumbs extends DelegateProcessorFactory {
- import Breadcrumbs.{hasBreadCrumb, addBreadCrumb, getBreadCrumb}
+ import Breadcrumbs._
def create(delegate: Processor) = new DelegateAsyncProcessor(process(delegate)) {
override def process(exchange: Exchange, callback: AsyncCallback) = {
- if (!hasBreadCrumb(exchange)) {
+ if (isEnabled(exchange) && !hasBreadCrumb(exchange)) {
addBreadCrumb(exchange)
}
processNext(exchange, callback)
@@ -50,10 +49,11 @@ class Breadcrumbs extends DelegateProcessorFactory {
val strategy = new AggregationStrategy {
def aggregate(oldExchange: Exchange, newExchange: Exchange) : Exchange = {
val ex = oldstrat.aggregate(oldExchange, newExchange)
- if (oldExchange == null)
- addBreadCrumb(ex, List(getBreadCrumb(newExchange)))
- else
- addBreadCrumb(ex, List(getBreadCrumb(oldExchange), getBreadCrumb(newExchange)))
+ if (isEnabled(ex)) {
+ val bcs = if (oldExchange == null) getBreadCrumbs(ex) ++ getBreadCrumbs(newExchange)
+ else getBreadCrumbs(ex) ++ getBreadCrumbs(oldExchange) ++ getBreadCrumbs(newExchange)
+ setBreadCrumbs(ex, bcs)
+ }
ex
}
}
@@ -63,7 +63,7 @@ class Breadcrumbs extends DelegateProcessorFactory {
}
}
-object Breadcrumbs {
+object Breadcrumbs extends Switchable {
/**
* ServiceMix bread crumb header name
@@ -90,61 +90,36 @@ object Breadcrumbs {
/**
* Add a ServiceMix bread crumb to an Exchange
*/
- def addBreadCrumb(exchange: Exchange) : Unit = setBreadCrumb(exchange, exchange.getContext.getUuidGenerator.generateUuid())
-
- /**
- * Add a number of ServiceMix bread crumbs to an Exchange
- */
- def addBreadCrumb(exchange: Exchange, breadcrumbs: Iterable[String]) : Unit = {
- var bcs = new HashSet[String]()
- bcs = bcs ++ getBreadCrumbs(exchange)
- for (bc <- breadcrumbs) {
- bcs = bcs ++ getBreadCrumbs(bc)
- }
- setBreadCrumb(exchange, bcs)
+ def addBreadCrumb(exchange: Exchange) {
+ setBreadCrumb(exchange, exchange.getContext.getUuidGenerator.generateUuid())
}
/**
* Set the ServiceMix bread crumb to an Exchange
*/
- def setBreadCrumb(exchange: Exchange, breadcrumb: String) : Unit = exchange.getIn.setHeader(SERVICEMIX_BREAD_CRUMB, breadcrumb)
+ def setBreadCrumb(exchange: Exchange, breadcrumb: String) {
+ exchange.getIn.setHeader(SERVICEMIX_BREAD_CRUMB, breadcrumb)
+ }
/**
* Set the ServiceMix bread crumbs to an Exchange
*/
- def setBreadCrumb(exchange: Exchange, breadcrumbs: Iterable[String]) : Unit = setBreadCrumb(exchange, breadcrumbs.mkString(","))
+ def setBreadCrumbs(exchange: Exchange, breadcrumbs: Iterable[String]) {
+ setBreadCrumb(exchange, breadcrumbs.mkString(","))
+ }
/**
- * Enable bread crumbs on the target CamelContext
+ * Enable bread crumbs on the ServiceMix Container
*/
- def enable(context: CamelContext) = {
- context.getProcessorFactory match {
- case global: GlobalProcessorFactory => global.addFactory(new Breadcrumbs)
- case _ => //unable to enable bread crumbs
- }
+ def register(container: ServiceMixContainer = ServiceMixContainer.instance) {
+ container.register(classOf[Breadcrumbs])
}
/**
- * Disable bread crumbs on the target CamelContext
+ * Disable bread crumbs on the ServiceMix Container
*/
- def disable(context: CamelContext) = {
- context.getProcessorFactory match {
- case global: GlobalProcessorFactory => for (breadcrumb <- global.factories.filter(_.isInstanceOf[Breadcrumbs])) {
- global.removeFactory(breadcrumb)
- }
- case _ => //unable to enable bread crumbs
- }
- }
-
- private def nullOrElse[S,T](value: S)(function: S => T) : T = if (value == null) {
- null.asInstanceOf[T]
- } else {
- function(value)
- }
- private def nullOrElse[S,T](value: S, default: T)(function: S => T) : T = if (value == null) {
- default
- } else {
- function(value)
+ def unregister(container: ServiceMixContainer = ServiceMixContainer.instance) {
+ container.unregister(classOf[Breadcrumbs])
}
}
View
46 core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
@@ -18,10 +18,11 @@ package org.apache.servicemix.core
import org.apache.camel.spi.{RouteContext, ProcessorFactory}
import org.apache.camel.model.ProcessorDefinition
-import collection.mutable.ListBuffer
-import org.apache.camel.processor.DelegateAsyncProcessor
-import org.apache.camel.{AsyncProcessor, Processor, AsyncCallback, Exchange}
import java.util.concurrent.atomic.AtomicInteger
+import collection.mutable.ListBuffer
+import org.apache.camel._
+import processor.DelegateAsyncProcessor
+import GlobalProcessorFactory._
/**
* Global ServiceMix ProcessorFactory implementation, which will take care of wrapping processors with the additional
@@ -43,37 +44,44 @@ class GlobalProcessorFactory extends ProcessorFactory {
nullOrElse(definition.createProcessor(context))(new GlobalDelegateProcessor(context, definition, _))
}
- def nullOrElse[S,T](value: S)(function: S => T) = if (value == null) {
- null.asInstanceOf[T]
- } else {
- function(value)
- }
-
def triggerUpdate(block: => Unit) = {
block
version.incrementAndGet()
}
- def configure(original: AsyncProcessor) : AsyncProcessor = {
- factories.foldLeft(original){ (delegate: AsyncProcessor, factory: DelegateProcessorFactory) =>
- factory.create(delegate)
- }
- }
+ class GlobalDelegateProcessor(routeContext: RouteContext, definition: ProcessorDefinition[_], target: Processor) extends DelegateAsyncProcessor(target) {
- class GlobalDelegateProcessor(context: RouteContext, definition: ProcessorDefinition[_], target: Processor) extends DelegateAsyncProcessor(target) {
-
- var currentProcessor = GlobalProcessorFactory.this.configure(getProcessor())
+ var currentProcessor = configure(getProcessor)
var version = GlobalProcessorFactory.this.version.get()
override def process(exchange: Exchange, callback: AsyncCallback) = {
// let's check if processor factories have changed and reconfigure things if necessary
if (version < GlobalProcessorFactory.this.version.get) {
- currentProcessor = GlobalProcessorFactory.this.configure(getProcessor())
+ currentProcessor = configure(getProcessor)
}
currentProcessor.process(exchange, callback)
}
override def toString = "ServiceMix Wrapper[" + processor + "]"
+
+ def configure(original: AsyncProcessor) : AsyncProcessor = {
+ factories.foldLeft(original) { (delegate: AsyncProcessor, factory: DelegateProcessorFactory) => {
+ factory.create(delegate)
+ }
+ }
+ }
+
}
-}
+}
+
+object GlobalProcessorFactory {
+
+ private def nullOrElse[S,T](value: S)(function: S => T) = if (value == null) {
+ null.asInstanceOf[T]
+ } else {
+ function(value)
+ }
+
+}
+
View
52 core/src/main/scala/org/apache/servicemix/core/ServiceMixContainer.scala
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.core
+
+import org.apache.camel.spi.Container
+import org.apache.camel._
+
+class ServiceMixContainer extends Container {
+
+ val processorFactory = new GlobalProcessorFactory
+
+ def manage(camelContext: CamelContext) {
+ if (camelContext.getProcessorFactory == null) {
+ camelContext.setProcessorFactory(processorFactory)
+ }
+ }
+ def register(factory: Class[_ <: DelegateProcessorFactory]) {
+ if (processorFactory.factories.filter(_.getClass == factory).isEmpty) {
+ processorFactory.addFactory(factory.newInstance())
+ }
+ }
+ def unregister(factory: Class[_ <: DelegateProcessorFactory]) {
+ for (f <- processorFactory.factories.filter(_.getClass == factory)) {
+ processorFactory.removeFactory(f)
+ }
+ }
+
+}
+
+object ServiceMixContainer {
+
+ val instance = new ServiceMixContainer
+
+ def init() {
+ Container.Instance.set(instance)
+ }
+
+}
View
70 core/src/main/scala/org/apache/servicemix/core/Switchable.scala
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.core
+
+import collection.mutable.HashMap
+import org.apache.camel.{Route, Exchange, CamelContext}
+
+trait Switchable {
+
+ def enable() {
+ global = true
+ }
+
+ def disable() {
+ global = false
+ }
+
+ def enable(camelContext: CamelContext) {
+ perContext += camelContext.getName -> true
+ }
+
+ def disable(camelContext: CamelContext) {
+ perContext += camelContext.getName -> false
+ }
+
+ def clear(camelContext: CamelContext) {
+ perContext -= camelContext.getName
+ }
+
+ def enable(route: Route) {
+ perRoute += route.getId -> true
+ }
+
+ def disable(route: Route) {
+ perRoute += route.getId -> false
+ }
+
+ def clear(route: Route) {
+ perRoute -= route.getId
+ }
+
+ def reset() {
+ global = true
+ perContext.clear()
+ perRoute.clear()
+ }
+
+ def isEnabled(exchange: Exchange) : Boolean = isRouteEnabled(exchange).getOrElse(isContextEnabled(exchange).getOrElse(global))
+ def isContextEnabled(exchange: Exchange): Option[Boolean] = perContext.get(exchange.getContext.getName)
+ def isRouteEnabled(exchange: Exchange): Option[Boolean] = if (exchange.getFromRouteId != null) perRoute.get(exchange.getFromRouteId) else Some(true)
+
+ private var global: Boolean = true
+ private val perContext = new HashMap[String, Boolean]
+ private val perRoute = new HashMap[String, Boolean]
+
+}
View
28 core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
@@ -1,5 +1,3 @@
-package org.apache.servicemix.core
-
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -16,6 +14,8 @@ package org.apache.servicemix.core
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.servicemix.core
+
import _root_.scala.Predef._
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@@ -37,7 +37,6 @@ class BreadcrumbsTest extends FunSuite with RouteBuilderSupport with BeforeAndAf
lazy val context = {
val result = new DefaultCamelContext()
- result.setProcessorFactory(new GlobalProcessorFactory)
result.addRoutes(createRouteBuilder())
result.start()
result
@@ -49,14 +48,23 @@ class BreadcrumbsTest extends FunSuite with RouteBuilderSupport with BeforeAndAf
result
}
- override protected def afterAll() = {
+
+ override protected def beforeEach() {
+ Breadcrumbs.reset()
+ }
+
+ override protected def beforeAll() {
+ ServiceMixContainer.init()
+ Breadcrumbs.register()
+ }
+
+ override protected def afterAll() {
template.stop()
context.stop()
+ Breadcrumbs.unregister()
}
test("add breadcrumbs to message headers") {
- Breadcrumbs.enable(context)
-
for (body <- messages) {
template.sendBody("direct:test", body)
}
@@ -77,8 +85,6 @@ class BreadcrumbsTest extends FunSuite with RouteBuilderSupport with BeforeAndAf
}
test("bread crumb strategy can be disabled if necessary") {
- Breadcrumbs.enable(context)
-
for (body <- messages) {
template.sendBody("direct:test", body)
}
@@ -119,8 +125,6 @@ class BreadcrumbsTest extends FunSuite with RouteBuilderSupport with BeforeAndAf
}
test("bread crumb strategy with aggregator") {
- Breadcrumbs.enable(context)
-
for (body <- messages) {
template.sendBody("direct:aggregate", body)
}
@@ -131,11 +135,11 @@ class BreadcrumbsTest extends FunSuite with RouteBuilderSupport with BeforeAndAf
val exchange = aggres.getExchanges.get(0)
val bcs = getBreadCrumbs(exchange)
- assert(bcs.size == messages.size, "There should be no more bread crumbs here")
+ expect(messages.size, "The number of breadcrumbs from the aggregator is wrong")(bcs.size)
}
- override protected def afterEach() = {
+ override protected def afterEach() {
MockEndpoint.resetMocks(context)
context.getProcessorFactory.asInstanceOf[GlobalProcessorFactory].factories.clear
}
View
6 core/src/test/scala/org/apache/servicemix/core/ProfilerStrategyTest.scala
@@ -31,14 +31,14 @@ import org.apache.camel.{Exchange, ProducerTemplate}
@RunWith(classOf[JUnitRunner])
class ProfilerStrategyTest extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
- override protected def beforeAll() = {
+ override protected def beforeAll() {
}
- override protected def afterAll() = {
+ override protected def afterAll() {
}
- def sleep() = {
+ def sleep() {
Thread.sleep(2)
}
View
88 core/src/test/scala/org/apache/servicemix/core/SwitchableTest.scala
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.core
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite}
+import org.apache.camel.scala.dsl.builder.{RouteBuilder, RouteBuilderSupport}
+import org.apache.camel.impl._
+
+@RunWith(classOf[JUnitRunner])
+class SwitchableTest extends FunSuite with RouteBuilderSupport with BeforeAndAfterAll with BeforeAndAfterEach {
+
+ lazy val context = {
+ val ctx = new DefaultCamelContext()
+ ctx.setName("contextId")
+ ctx.addRoutes(new RouteBuilder() { "direct:a" --> "direct:b" routeId("routeId") })
+ ctx.start()
+ ctx
+ }
+ lazy val route = context.getRoutes.get(0)
+ lazy val exchange = {
+ val ex = new DefaultExchange(context)
+ ex.setFromRouteId(route.getId)
+ ex
+ }
+ lazy val switchable = new Switchable() {}
+
+ override def beforeEach() {
+ switchable.reset()
+ }
+
+ test("global level") {
+ switchable.reset()
+ assert(switchable.isEnabled(exchange))
+
+ switchable.disable()
+ assert(!switchable.isEnabled(exchange))
+ }
+
+ test("context level") {
+ switchable.disable()
+ switchable.enable(context)
+ assert(switchable.isEnabled(exchange))
+
+ switchable.clear(context)
+ assert(!switchable.isEnabled(exchange))
+
+ switchable.enable()
+ assert(switchable.isEnabled(exchange))
+
+ switchable.disable(context)
+ assert(!switchable.isEnabled(exchange))
+ }
+
+ test("route level") {
+ switchable.disable()
+ switchable.enable(route)
+ assert(switchable.isEnabled(exchange))
+
+ switchable.clear(route)
+ assert(!switchable.isEnabled(exchange))
+
+ switchable.enable()
+ assert(switchable.isEnabled(exchange))
+
+ switchable.disable(route)
+ assert(!switchable.isEnabled(exchange))
+
+ switchable.enable(context)
+ assert(!switchable.isEnabled(exchange))
+ }
+
+}
Please sign in to comment.
Something went wrong with that request. Please try again.