Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Reset StreamCache in getBodyAs to be able to re-read it, see #2724 #894

Merged
merged 1 commit into from

5 participants

@patriknw
Owner

Should be backported to release-2.1

@RayRoestenburg

Nice find, LGTM

@akka-ci
Owner

Started jenkins job akka-pr-validator at https://jenkins.akka.io/job/akka-pr-validator/131/

@akka-ci
Owner

jenkins job akka-pr-validator: Success - https://jenkins.akka.io/job/akka-pr-validator/131/

@viktorklang viktorklang commented on the diff
akka-camel/src/main/scala/akka/camel/CamelMessage.scala
@@ -108,7 +108,21 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
* Java API
*
*/
- def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
+ def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = {
+ val result = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
+ // to be able to re-read a StreamCache we must "undo" the side effect by resetting the StreamCache
+ resetStreamCache()
@viktorklang Owner

What happens if resetStreamCache is called by one Thread as another thread is reading the body? (i.e. the same CamelMessage is sent to 2 actors)

@patriknw Owner

yes, that will not end well, body is Any and can be whatever mutable non-thread safe instance, so it's not much we can do about it.
I would say that the recommendation should be that the user should convert the body to a real immutable instance in the first endpoint (actor) before passing it on to other actors.

This fix is still relevant, because you expect to be able to call getBodyAs several times (in same actor) with the same result (e.g. adding a debug log stmt should not break things).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@bantonsson
Owner

LGTM

@patriknw patriknw merged commit e5fce9e into master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
18 akka-camel/src/main/scala/akka/camel/CamelMessage.scala
@@ -5,7 +5,7 @@
package akka.camel
import java.util.{ Map JMap, Set JSet }
-import org.apache.camel.{ CamelContext, Message JCamelMessage }
+import org.apache.camel.{ CamelContext, Message JCamelMessage, StreamCache }
import akka.AkkaException
import scala.reflect.ClassTag
import scala.util.Try
@@ -108,7 +108,21 @@ case class CamelMessage(body: Any, headers: Map[String, Any]) {
* Java API
*
*/
- def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
+ def getBodyAs[T](clazz: Class[T], camelContext: CamelContext): T = {
+ val result = camelContext.getTypeConverter.mandatoryConvertTo[T](clazz, body)
+ // to be able to re-read a StreamCache we must "undo" the side effect by resetting the StreamCache
+ resetStreamCache()
@viktorklang Owner

What happens if resetStreamCache is called by one Thread as another thread is reading the body? (i.e. the same CamelMessage is sent to 2 actors)

@patriknw Owner

yes, that will not end well, body is Any and can be whatever mutable non-thread safe instance, so it's not much we can do about it.
I would say that the recommendation should be that the user should convert the body to a real immutable instance in the first endpoint (actor) before passing it on to other actors.

This fix is still relevant, because you expect to be able to call getBodyAs several times (in same actor) with the same result (e.g. adding a debug log stmt should not break things).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ result
+ }
+
+ /**
+ * Reset StreamCache body. Nothing is done if the body is not a StreamCache.
+ * See http://camel.apache.org/stream-caching.html
+ */
+ def resetStreamCache(): Unit = body match {
+ case stream: StreamCache stream.reset
+ case _
+ }
/**
* Returns a new CamelMessage with a new body, while keeping the same headers.
View
9 akka-camel/src/test/java/akka/camel/MessageJavaTestBase.java
@@ -8,6 +8,7 @@
import akka.dispatch.Mapper;
import akka.japi.Function;
import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.converter.stream.InputStreamCache;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -100,6 +101,14 @@ public void shouldReturnAllHeadersUnmodifiable() {
message("test1" , createMap("A", "1")).withHeaders(createMap("C", "3")));
}
+ @Test
+ public void shouldBeAbleToReReadStreamCacheBody() throws Exception {
+ CamelMessage msg = new CamelMessage(new InputStreamCache("test1".getBytes("utf-8")), empty);
+ assertEquals("test1", msg.getBodyAs(String.class, camel.context()));
+ // re-read
+ assertEquals("test1", msg.getBodyAs(String.class, camel.context()));
+ }
+
private static Set<String> createSet(String... entries) {
HashSet<String> set = new HashSet<String>();
set.addAll(Arrays.asList(entries));
View
9 akka-camel/src/test/scala/akka/camel/MessageScalaTest.scala
@@ -5,11 +5,11 @@
package akka.camel
import java.io.InputStream
-
import org.apache.camel.NoTypeConversionAvailableException
import akka.camel.TestSupport.{ SharedCamelSystem }
import org.scalatest.FunSuite
import org.scalatest.matchers.MustMatchers
+import org.apache.camel.converter.stream.InputStreamCache
class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem {
implicit def camelContext = camel.context
@@ -44,12 +44,17 @@ class MessageScalaTest extends FunSuite with MustMatchers with SharedCamelSystem
test("mustSetBodyAndPreserveHeaders") {
CamelMessage("test1", Map("A" -> "1")).copy(body = "test2") must be(
CamelMessage("test2", Map("A" -> "1")))
-
}
test("mustSetHeadersAndPreserveBody") {
CamelMessage("test1", Map("A" -> "1")).copy(headers = Map("C" -> "3")) must be(
CamelMessage("test1", Map("C" -> "3")))
+ }
+ test("mustBeAbleToReReadStreamCacheBody") {
+ val msg = CamelMessage(new InputStreamCache("test1".getBytes("utf-8")), Map.empty)
+ msg.bodyAs[String] must be("test1")
+ // re-read
+ msg.bodyAs[String] must be("test1")
}
}
Something went wrong with that request. Please try again.