Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAMEL-17027 stream into buffer of configured size #11153

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view

Large diffs are not rendered by default.

Expand Up @@ -48,6 +48,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "batchSize": getOrCreateConfiguration(target).setBatchSize(property(camelContext, int.class, value)); return true;
case "bridgeerrorhandler":
case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
case "buffersize":
case "bufferSize": getOrCreateConfiguration(target).setBufferSize(property(camelContext, int.class, value)); return true;
case "configuration": target.setConfiguration(property(camelContext, org.apache.camel.component.aws2.s3.AWS2S3Configuration.class, value)); return true;
case "customeralgorithm":
case "customerAlgorithm": getOrCreateConfiguration(target).setCustomerAlgorithm(property(camelContext, java.lang.String.class, value)); return true;
Expand Down Expand Up @@ -166,6 +168,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "batchSize": return int.class;
case "bridgeerrorhandler":
case "bridgeErrorHandler": return boolean.class;
case "buffersize":
case "bufferSize": return int.class;
case "configuration": return org.apache.camel.component.aws2.s3.AWS2S3Configuration.class;
case "customeralgorithm":
case "customerAlgorithm": return java.lang.String.class;
Expand Down Expand Up @@ -280,6 +284,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "batchSize": return getOrCreateConfiguration(target).getBatchSize();
case "bridgeerrorhandler":
case "bridgeErrorHandler": return target.isBridgeErrorHandler();
case "buffersize":
case "bufferSize": return getOrCreateConfiguration(target).getBufferSize();
case "configuration": return target.getConfiguration();
case "customeralgorithm":
case "customerAlgorithm": return getOrCreateConfiguration(target).getCustomerAlgorithm();
Expand Down
Expand Up @@ -45,6 +45,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "batchSize": target.getConfiguration().setBatchSize(property(camelContext, int.class, value)); return true;
case "bridgeerrorhandler":
case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
case "buffersize":
case "bufferSize": target.getConfiguration().setBufferSize(property(camelContext, int.class, value)); return true;
case "customeralgorithm":
case "customerAlgorithm": target.getConfiguration().setCustomerAlgorithm(property(camelContext, java.lang.String.class, value)); return true;
case "customerkeyid":
Expand Down Expand Up @@ -193,6 +195,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "batchSize": return int.class;
case "bridgeerrorhandler":
case "bridgeErrorHandler": return boolean.class;
case "buffersize":
case "bufferSize": return int.class;
case "customeralgorithm":
case "customerAlgorithm": return java.lang.String.class;
case "customerkeyid":
Expand Down Expand Up @@ -337,6 +341,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "batchSize": return target.getConfiguration().getBatchSize();
case "bridgeerrorhandler":
case "bridgeErrorHandler": return target.isBridgeErrorHandler();
case "buffersize":
case "bufferSize": return target.getConfiguration().getBufferSize();
case "customeralgorithm":
case "customerAlgorithm": return target.getConfiguration().getCustomerAlgorithm();
case "customerkeyid":
Expand Down
Expand Up @@ -21,7 +21,7 @@ public class AWS2S3EndpointUriFactory extends org.apache.camel.support.component
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
Set<String> props = new HashSet<>(73);
Set<String> props = new HashSet<>(74);
props.add("accessKey");
props.add("amazonS3Client");
props.add("amazonS3Presigner");
Expand All @@ -35,6 +35,7 @@ public class AWS2S3EndpointUriFactory extends org.apache.camel.support.component
props.add("batchSize");
props.add("bridgeErrorHandler");
props.add("bucketNameOrArn");
props.add("bufferSize");
props.add("customerAlgorithm");
props.add("customerKeyId");
props.add("customerKeyMD5");
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -128,6 +128,8 @@ public class AWS2S3Configuration implements Cloneable {
private int batchMessageNumber = 10;
@UriParam(defaultValue = "1000000", label = "producer")
private int batchSize = 1000000;
@UriParam(defaultValue = "1000000", label = "producer")
private int bufferSize = 1000000;
@UriParam(defaultValue = "progressive", label = "producer")
private AWSS3NamingStrategyEnum namingStrategy = AWSS3NamingStrategyEnum.progressive;
@UriParam(label = "producer")
Expand Down Expand Up @@ -665,6 +667,17 @@ public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

public int getBufferSize() {
return bufferSize;
}

/**
* The buffer size (in bytes) in streaming upload mode
*/
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}

public AWSS3NamingStrategyEnum getNamingStrategy() {
return namingStrategy;
}
Expand Down
Expand Up @@ -52,7 +52,6 @@
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.utils.IoUtils;

/**
* A Producer which sends messages to the Amazon Web Service Simple Storage Service
Expand Down Expand Up @@ -133,7 +132,7 @@ public void run() {
public void process(final Exchange exchange) throws Exception {
InputStream is = exchange.getIn().getMandatoryBody(InputStream.class);

buffer.write(IoUtils.toByteArray(is));
buffer.write(AWS2S3Utils.toByteArray(is, getConfiguration().getBufferSize()));

final String keyName = getConfiguration().getKeyName();
final String fileName = AWS2S3Utils.determineFileName(keyName);
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.component.aws2.s3.utils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;

Expand Down Expand Up @@ -112,6 +113,19 @@ public static long determineLengthInputStream(InputStream is) throws IOException
return size;
}

public static byte[] toByteArray(InputStream is, final int size) throws IOException {
try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
byte[] data = new byte[4096];
int total = 0;
int n = 0;
while (total < size && (n = is.read(data)) != -1) {
output.write(data, 0, n);
total += n;
}
return output.toByteArray();
}
}

public static String determineKey(final Exchange exchange, AWS2S3Configuration configuration) {
String key = exchange.getIn().getHeader(AWS2S3Constants.KEY, String.class);
if (ObjectHelper.isEmpty(key)) {
Expand Down