Skip to content

Commit

Permalink
NIFI-7394
Browse files Browse the repository at this point in the history
Add support for sending Multipart/FORM data to InvokeHTTP.

By using dynamic properties with a prefix naming scheme, allow
definition of the parts, including the name to give the Flowfile content
part, and optionally it's file name.
  • Loading branch information
ottobackwards committed Apr 25, 2020
1 parent cd10435 commit 04ff9ae
Show file tree
Hide file tree
Showing 2 changed files with 314 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@

import static org.apache.commons.lang3.StringUtils.trimToEmpty;

import com.burgstaller.okhttp.AuthenticationCacheInterceptor;
import com.burgstaller.okhttp.CachingAuthenticatorDecorator;
import com.burgstaller.okhttp.digest.CachingAuthenticator;
import com.burgstaller.okhttp.digest.DigestAuthenticator;
import com.google.common.io.Files;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -66,9 +61,17 @@
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;

import com.burgstaller.okhttp.AuthenticationCacheInterceptor;
import com.burgstaller.okhttp.CachingAuthenticatorDecorator;
import com.burgstaller.okhttp.digest.CachingAuthenticator;
import com.burgstaller.okhttp.digest.DigestAuthenticator;
import com.google.common.io.Files;
import okhttp3.Cache;
import okhttp3.Credentials;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.MultipartBody.Builder;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
Expand All @@ -77,6 +80,7 @@
import okio.BufferedSink;
import org.apache.commons.io.input.TeeInputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
Expand Down Expand Up @@ -131,9 +135,19 @@
@WritesAttribute(attribute = "invokehttp.java.exception.message", description = "The Java exception message raised when the processor fails"),
@WritesAttribute(attribute = "user-defined", description = "If the 'Put Response Body In Attribute' property is set then whatever it is set to "
+ "will become the attribute key and the value would be the body of the HTTP response.")})
@DynamicProperty(name = "Header Name", value = "Attribute Expression Language", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Send request header with a key matching the Dynamic Property Key and a value created by evaluating "
+ "the Attribute Expression Language set in the value of the Dynamic Property.")
@DynamicProperties ({
@DynamicProperty(name = "Header Name", value = "Attribute Expression Language", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description =
"Send request header with a key matching the Dynamic Property Key and a value created by evaluating "
+ "the Attribute Expression Language set in the value of the Dynamic Property."),
@DynamicProperty(name = "post.form.<NAME>", value = "Attribute Expression Language", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description =
"When the HTTP Method is POST, dynamic properties with the property name in the form of post.form.<NAME>,"
+ " where the <NAME> will be the form data name, will be used to fill out the multipart form parts."
+ " If the value is the literal 'FLOWFILE_CONTENT', that form part will have the flowfile's data."
+ " If send message body is set, the properties will be ignored."
+ " The property name post.form.filename will set the filename to set for the flowfile content.")
})
public final class InvokeHTTP extends AbstractProcessor {
// flowfile attribute keys returned after reading the response
public final static String STATUS_CODE = "invokehttp.status.code";
Expand All @@ -148,6 +162,9 @@ public final class InvokeHTTP extends AbstractProcessor {

public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";

public static final String FLOWFILE_CONTENT_FORM_MARKER = "FLOWFILE_CONTENT";
public static final String FLOWFILE_CONTENT_FORM_FILE_NAME = "post.form.filename";

// Set of flowfile attributes which we generally always ignore during
// processing, including when converting http headers, copying attributes, etc.
// This set includes our strings defined above as well as some standard flowfile
Expand All @@ -163,6 +180,8 @@ public final class InvokeHTTP extends AbstractProcessor {
public static final String HTTP = "http";
public static final String HTTPS = "https";

private static final Pattern DYNAMIC_FORM_PARAMETER_NAME = Pattern.compile("post\\.form\\.(?<formDataName>.*)$");

// properties
public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
.name("HTTP Method")
Expand Down Expand Up @@ -512,6 +531,22 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
if (propertyDescriptorName.startsWith("post.form.")) {

Matcher matcher = DYNAMIC_FORM_PARAMETER_NAME.matcher(propertyDescriptorName);
if (matcher.matches()) {
return new PropertyDescriptor.Builder()
.required(false)
.name(propertyDescriptorName)
.description("Form Data " + matcher.group("formDataName"))
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
}
return null;
}

return new PropertyDescriptor.Builder()
.required(false)
.name(propertyDescriptorName)
Expand Down Expand Up @@ -1025,24 +1060,65 @@ private Request configureRequest(final ProcessContext context, final ProcessSess

private RequestBody getRequestBodyToSend(final ProcessSession session, final ProcessContext context, final FlowFile requestFlowFile) {
if(context.getProperty(PROP_SEND_BODY).asBoolean()) {
return new RequestBody() {
@Override
public MediaType contentType() {
String contentType = context.getProperty(PROP_CONTENT_TYPE).evaluateAttributeExpressions(requestFlowFile).getValue();
contentType = StringUtils.isBlank(contentType) ? DEFAULT_CONTENT_TYPE : contentType;
return MediaType.parse(contentType);
String evalContentType = context.getProperty(PROP_CONTENT_TYPE)
.evaluateAttributeExpressions(requestFlowFile).getValue();
final String contentType = StringUtils.isBlank(evalContentType) ? DEFAULT_CONTENT_TYPE : evalContentType;
HashMap<String,PropertyDescriptor> propertyDescriptors = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
Matcher matcher = DYNAMIC_FORM_PARAMETER_NAME.matcher(entry.getKey().getName());
if (matcher.matches()) {
propertyDescriptors.put(matcher.group("formDataName"),entry.getKey());
}

@Override
public void writeTo(BufferedSink sink) throws IOException {
session.exportTo(requestFlowFile, sink.outputStream());
}
if (propertyDescriptors.size() > 0) {
// we have form data
MultipartBody.Builder builder = new Builder().setType(MultipartBody.FORM);
String contentKey = null;
String contentFileName = null;

// loop through the dynamic form parameters
// we can't add the content before we know if there is a file name or not, so we
// get the keys and do them after this
for (final Map.Entry<String, PropertyDescriptor> entry : propertyDescriptors.entrySet()) {
final String propValue = context.getProperty(entry.getValue().getName()).evaluateAttributeExpressions(requestFlowFile).getValue();
if (propValue.equals(FLOWFILE_CONTENT_FORM_MARKER)) {
contentKey = entry.getKey();
} else if (entry.getValue().getName().equals(FLOWFILE_CONTENT_FORM_FILE_NAME)) {
contentFileName = propValue;
} else {
builder.addFormDataPart(entry.getKey(), propValue);
}
}

@Override
public long contentLength(){
return useChunked ? -1 : requestFlowFile.getSize();
if (contentKey != null) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
session.exportTo(requestFlowFile, byteArrayOutputStream);
if (contentFileName != null) {
builder.addFormDataPart(contentKey, contentFileName,
RequestBody.create(MediaType.parse(contentType), byteArrayOutputStream.toByteArray()));
} else {
builder.addFormDataPart(contentKey, null,
RequestBody.create(MediaType.parse(contentType), byteArrayOutputStream.toByteArray()));
}
}
};
return builder.build();
} else {
return new RequestBody() {
@Override
public MediaType contentType() {
return MediaType.parse(contentType);
}

@Override
public void writeTo(BufferedSink sink) throws IOException {
session.exportTo(requestFlowFile, sink.outputStream());
}

@Override
public long contentLength() {
return useChunked ? -1 : requestFlowFile.getSize();
}
};
}
} else {
return RequestBody.create(null, new byte[0]);
}
Expand All @@ -1063,6 +1139,12 @@ private Request.Builder setHeaderProperties(final ProcessContext context, Reques
logger.warn(excludedHeaders.get(headerKey), new Object[]{headerKey});
continue;
}

// don't include dynamic form data properties
if ( DYNAMIC_FORM_PARAMETER_NAME.matcher(headerKey).matches()) {
continue;
}

requestBuilder = requestBuilder.addHeader(headerKey, headerValue);
}

Expand Down

0 comments on commit 04ff9ae

Please sign in to comment.