Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Reset StreamCache in getBodyAs to be able to re-read it, see #2724

  • Loading branch information...
commit 99b02c08268fe363ec6ee5687dc5365a3ec56c78 1 parent 04c30fb
Patrik Nordwall patriknw authored
18 akka-camel/src/main/scala/akka/camel/CamelMessage.scala
View
@@ -5,7 +5,7 @@
package akka.camel
import java.util.{ Map JMap, Set JSet }
-import org.apache.camel.{ CamelContext, Message JCamelMessage }
+import org.apache.camel.{ CamelContext, Message JCamelMessage, StreamCache }
import akka.AkkaException
import scala.reflect.ClassTag
import scala.util.Try
@@ -108,7 +108,21 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
* Java API
*
*/
- def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
+ def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = {
+ val result = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
+ // to be able to re-read a StreamCache we must "undo" the side effect by resetting the StreamCache
+ resetStreamCache()
+ result
+ }
+
+ /**
+ * Reset StreamCache body. Nothing is done if the body is not a StreamCache.
+ * See http://camel.apache.org/stream-caching.html
+ */
+ def resetStreamCache(): Unit = body match {
+ case stream: StreamCache stream.reset
+ case _
+ }
/**
* Returns a new CamelMessage with a new body, while keeping the same headers.
9 akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java
View
@@ -8,6 +8,7 @@
import akka.dispatch.Mapper;
import akka.japi.Function;
import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.converter.stream.InputStreamCache;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -100,6 +101,14 @@ public void shouldReturnAllHeadersUnmodifiable() {
message("test1" , createMap("A", "1")).withHeaders(createMap("C", "3")));
}
+ @Test
+ public void shouldBeAbleToReReadStreamCacheBody() throws Exception {
+ CamelMessage msg = new CamelMessage(new InputStreamCache("test1".getBytes("utf-8")), empty);
+ assertEquals("test1", msg.getBodyAs(String.class, camel.context()));
+ // re-read
+ assertEquals("test1", msg.getBodyAs(String.class, camel.context()));
+ }
+
private static Set<String> createSet(String... entries) {
HashSet<String> set = new HashSet<String>();
set.addAll(Arrays.asList(entries));
9 akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala
View
@@ -5,11 +5,11 @@
package akka.camel
import java.io.InputStream
-
import org.apache.camel.NoTypeConversionAvailableException
import akka.camel.TestSupport.{ SharedCamelSystem }
import org.scalatest.FunSuite
import org.scalatest.matchers.MustMatchers
+import org.apache.camel.converter.stream.InputStreamCache
class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem {
implicit def camelContext = camel.context
@@ -44,12 +44,17 @@ class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem
test("mustSetBodyAndPreserveHeaders") {
CamelMessage("test1", Map("A" -> "1")).copy(body = "test2") must be(
CamelMessage("test2", Map("A" -> "1")))
-
}
test("mustSetHeadersAndPreserveBody") {
CamelMessage("test1", Map("A" -> "1")).copy(headers = Map("C" -> "3")) must be(
CamelMessage("test1", Map("C" -> "3")))
+ }
+ test("mustBeAbleToReReadStreamCacheBody") {
+ val msg = CamelMessage(new InputStreamCache("test1".getBytes("utf-8")), Map.empty)
+ msg.bodyAs[String] must be("test1")
+ // re-read
+ msg.bodyAs[String] must be("test1")
}
}

1 comment on commit 99b02c0

Raymond Roestenburg

Nice find, LGTM

Please sign in to comment.
Something went wrong with that request. Please try again.