diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java index a1340556de57c..31785a0161252 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcCamelCodec.java @@ -19,8 +19,10 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.OutputStream; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; @@ -35,6 +37,12 @@ public class JdbcCamelCodec { public byte[] marshallExchange(CamelContext camelContext, Exchange exchange, boolean allowSerializedHeaders) throws IOException { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + marshallExchange(camelContext, exchange, allowSerializedHeaders, bytesOut); + return bytesOut.toByteArray(); + } + + public void marshallExchange(CamelContext camelContext, Exchange exchange, boolean allowSerializedHeaders, OutputStream outputStream) throws IOException { // use DefaultExchangeHolder to marshal to a serialized object DefaultExchangeHolder pe = DefaultExchangeHolder.marshal(exchange, false, allowSerializedHeaders); // add the aggregated size and timeout property as the only properties we want to retain @@ -51,11 +59,15 @@ public byte[] marshallExchange(CamelContext camelContext, Exchange exchange, boo if (exchange.getFromEndpoint() != null) { DefaultExchangeHolder.addProperty(pe, "CamelAggregatedFromEndpoint", exchange.getFromEndpoint().getEndpointUri()); } - return encode(pe); + encode(pe, outputStream); } public Exchange unmarshallExchange(CamelContext camelContext, byte[] buffer) throws IOException, ClassNotFoundException { - DefaultExchangeHolder pe = decode(camelContext, buffer); + return unmarshallExchange(camelContext, new ByteArrayInputStream(buffer)); + } + + public Exchange unmarshallExchange(CamelContext camelContext, InputStream inputStream) throws IOException, ClassNotFoundException { + DefaultExchangeHolder pe = decode(camelContext, inputStream); Exchange answer = new DefaultExchange(camelContext); DefaultExchangeHolder.unmarshal(answer, pe); // restore the from endpoint @@ -69,18 +81,13 @@ public Exchange unmarshallExchange(CamelContext camelContext, byte[] buffer) thr return answer; } - private byte[] encode(Object object) throws IOException { - ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); - ObjectOutputStream objectOut = new ObjectOutputStream(bytesOut); - objectOut.writeObject(object); - objectOut.close(); - byte[] data = bytesOut.toByteArray(); - return data; + private void encode(Object object, OutputStream bytesOut) throws IOException { + try (ObjectOutputStream objectOut = new ObjectOutputStream(bytesOut)) { + objectOut.writeObject(object); + } } - private DefaultExchangeHolder decode(CamelContext camelContext, byte[] dataIn) throws IOException, ClassNotFoundException { - ByteArrayInputStream bytesIn = new ByteArrayInputStream(dataIn); - + private DefaultExchangeHolder decode(CamelContext camelContext, InputStream bytesIn) throws IOException, ClassNotFoundException { ObjectInputStream objectIn = null; Object obj = null; try {