Skip to content

Commit

Permalink
NIFI-7394: Add support for sending Multipart/FORM data to InvokeHTTP.
Browse files Browse the repository at this point in the history
By using dynamic properties with a prefix naming scheme, allow
definition of the parts, including the name to give the Flowfile content
part, and optionally it's file name.
After review:
- change so that we can send just the form content or just form data
  without the flowfile
- change the content name and content file name from dynamic properties
  to properties
- change the dynamic name to be an invalid http header "post:form:xxxx"
- add validation and more tests

This closes #4234.

Signed-off-by: Mark Payne <markap14@hotmail.com>
  • Loading branch information
ottobackwards authored and markap14 committed Apr 29, 2020
1 parent 1259bd5 commit 659a383
Show file tree
Hide file tree
Showing 2 changed files with 461 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@

import static org.apache.commons.lang3.StringUtils.trimToEmpty;

import com.burgstaller.okhttp.AuthenticationCacheInterceptor;
import com.burgstaller.okhttp.CachingAuthenticatorDecorator;
import com.burgstaller.okhttp.digest.CachingAuthenticator;
import com.burgstaller.okhttp.digest.DigestAuthenticator;
import com.google.common.io.Files;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -57,6 +51,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
Expand All @@ -66,9 +61,17 @@
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;

import com.burgstaller.okhttp.AuthenticationCacheInterceptor;
import com.burgstaller.okhttp.CachingAuthenticatorDecorator;
import com.burgstaller.okhttp.digest.CachingAuthenticator;
import com.burgstaller.okhttp.digest.DigestAuthenticator;
import com.google.common.io.Files;
import okhttp3.Cache;
import okhttp3.Credentials;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.MultipartBody.Builder;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
Expand All @@ -77,6 +80,7 @@
import okio.BufferedSink;
import org.apache.commons.io.input.TeeInputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
Expand Down Expand Up @@ -131,9 +135,17 @@
@WritesAttribute(attribute = "invokehttp.java.exception.message", description = "The Java exception message raised when the processor fails"),
@WritesAttribute(attribute = "user-defined", description = "If the 'Put Response Body In Attribute' property is set then whatever it is set to "
+ "will become the attribute key and the value would be the body of the HTTP response.")})
@DynamicProperty(name = "Header Name", value = "Attribute Expression Language", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Send request header with a key matching the Dynamic Property Key and a value created by evaluating "
+ "the Attribute Expression Language set in the value of the Dynamic Property.")
@DynamicProperties ({
@DynamicProperty(name = "Header Name", value = "Attribute Expression Language", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description =
"Send request header with a key matching the Dynamic Property Key and a value created by evaluating "
+ "the Attribute Expression Language set in the value of the Dynamic Property."),
@DynamicProperty(name = "post:form:<NAME>", value = "Attribute Expression Language", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description =
"When the HTTP Method is POST, dynamic properties with the property name in the form of post:form:<NAME>,"
+ " where the <NAME> will be the form data name, will be used to fill out the multipart form parts."
+ " If send message body is false, the flowfile will not be sent, but any other form data will be.")
})
public final class InvokeHTTP extends AbstractProcessor {
// flowfile attribute keys returned after reading the response
public final static String STATUS_CODE = "invokehttp.status.code";
Expand All @@ -148,6 +160,8 @@ public final class InvokeHTTP extends AbstractProcessor {

public static final String DEFAULT_CONTENT_TYPE = "application/octet-stream";

public static final String FORM_BASE= "post:form";

// Set of flowfile attributes which we generally always ignore during
// processing, including when converting http headers, copying attributes, etc.
// This set includes our strings defined above as well as some standard flowfile
Expand All @@ -163,6 +177,8 @@ public final class InvokeHTTP extends AbstractProcessor {
public static final String HTTP = "http";
public static final String HTTPS = "https";

private static final Pattern DYNAMIC_FORM_PARAMETER_NAME = Pattern.compile("post:form:(?<formDataName>.*)$");

// properties
public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
.name("HTTP Method")
Expand Down Expand Up @@ -297,6 +313,30 @@ public final class InvokeHTTP extends AbstractProcessor {
.required(false)
.build();

public static final PropertyDescriptor PROP_FORM_BODY_FORM_NAME = new PropertyDescriptor.Builder()
.name("form-body-form-name")
.displayName("Flowfile Form Data Name")
.description("When Send Message Body is true, and Flowfile Form Data Name is set, "
+ " the Flowfile will be sent as the message body in multipart/form format with this value "
+ "as the form data name.")
.required(false)
.addValidator(
StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

public static final PropertyDescriptor PROP_SET_FORM_FILE_NAME = new PropertyDescriptor.Builder()
.name("set-form-filename")
.displayName("Set Flowfile Form Data File Name")
.description(
"When Send Message Body is true, Flowfile Form Data Name is set, "
+ "and Set Flowfile Form Data File Name is true, the Flowfile's fileName value "
+ "will be set as the filename property of the form data.")
.required(false)
.defaultValue("true")
.allowableValues("true","false")
.build();

// Per RFC 7235, 2617, and 2616.
// basic-credentials = base64-user-pass
// base64-user-pass = userid ":" password
Expand Down Expand Up @@ -450,7 +490,9 @@ public final class InvokeHTTP extends AbstractProcessor {
PROP_PENALIZE_NO_RETRY,
PROP_USE_ETAG,
PROP_ETAG_MAX_CACHE_SIZE,
IGNORE_RESPONSE_CONTENT));
IGNORE_RESPONSE_CONTENT,
PROP_FORM_BODY_FORM_NAME,
PROP_SET_FORM_FILE_NAME));

// relationships
public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
Expand Down Expand Up @@ -512,6 +554,22 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
if (propertyDescriptorName.startsWith(FORM_BASE)) {

Matcher matcher = DYNAMIC_FORM_PARAMETER_NAME.matcher(propertyDescriptorName);
if (matcher.matches()) {
return new PropertyDescriptor.Builder()
.required(false)
.name(propertyDescriptorName)
.description("Form Data " + matcher.group("formDataName"))
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
}
return null;
}

return new PropertyDescriptor.Builder()
.required(false)
.name(propertyDescriptorName)
Expand Down Expand Up @@ -594,6 +652,35 @@ protected Collection<ValidationResult> customValidate(final ValidationContext va
}
}

// Check for dynamic properties for form components.
// Even if the flowfile is not sent, we may still send form parameters.
boolean hasFormData = false;
Map<String, PropertyDescriptor> propertyDescriptors = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : validationContext.getProperties().entrySet()) {
Matcher matcher = DYNAMIC_FORM_PARAMETER_NAME.matcher(entry.getKey().getName());
if (matcher.matches()) {
hasFormData = true;
break;
}
}

// if form data exists, and send body is true, Flowfile Form Data Name must be set.
final boolean sendBody = validationContext.getProperty(PROP_SEND_BODY).asBoolean();
final boolean contentNameSet = validationContext.getProperty(PROP_FORM_BODY_FORM_NAME).isSet();
if (hasFormData) {
if (sendBody && !contentNameSet) {
results.add(new ValidationResult.Builder().subject(PROP_FORM_BODY_FORM_NAME.getDisplayName())
.valid(false).explanation(
"If dynamic form data properties are set, and send body is true, Flowfile Form Data Name must be configured.")
.build());
}
}
if (!sendBody && contentNameSet) {
results.add(new ValidationResult.Builder().subject(PROP_FORM_BODY_FORM_NAME.getDisplayName())
.valid(false).explanation("If Flowfile Form Data Name is configured, Send Message Body must be true.")
.build());
}

return results;
}

Expand Down Expand Up @@ -1023,29 +1110,66 @@ private Request configureRequest(final ProcessContext context, final ProcessSess
return requestBuilder.build();
}

private RequestBody getRequestBodyToSend(final ProcessSession session, final ProcessContext context, final FlowFile requestFlowFile) {
if(context.getProperty(PROP_SEND_BODY).asBoolean()) {
return new RequestBody() {
@Override
public MediaType contentType() {
String contentType = context.getProperty(PROP_CONTENT_TYPE).evaluateAttributeExpressions(requestFlowFile).getValue();
contentType = StringUtils.isBlank(contentType) ? DEFAULT_CONTENT_TYPE : contentType;
return MediaType.parse(contentType);
}
private RequestBody getRequestBodyToSend(final ProcessSession session, final ProcessContext context,
final FlowFile requestFlowFile) {

@Override
public void writeTo(BufferedSink sink) throws IOException {
session.exportTo(requestFlowFile, sink.outputStream());
}
boolean sendBody = context.getProperty(PROP_SEND_BODY).asBoolean();

@Override
public long contentLength(){
return useChunked ? -1 : requestFlowFile.getSize();
}
};
} else {
return RequestBody.create(null, new byte[0]);
String evalContentType = context.getProperty(PROP_CONTENT_TYPE)
.evaluateAttributeExpressions(requestFlowFile).getValue();
final String contentType = StringUtils.isBlank(evalContentType) ? DEFAULT_CONTENT_TYPE : evalContentType;
String contentKey = context.getProperty(PROP_FORM_BODY_FORM_NAME).evaluateAttributeExpressions(requestFlowFile).getValue();

// Check for dynamic properties for form components.
// Even if the flowfile is not sent, we may still send form parameters.
Map<String, PropertyDescriptor> propertyDescriptors = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
Matcher matcher = DYNAMIC_FORM_PARAMETER_NAME.matcher(entry.getKey().getName());
if (matcher.matches()) {
propertyDescriptors.put(matcher.group("formDataName"), entry.getKey());
}
}

RequestBody requestBody = new RequestBody() {
@Nullable
@Override
public MediaType contentType() {
return MediaType.parse(contentType);
}

@Override
public void writeTo(BufferedSink sink) throws IOException {
session.exportTo(requestFlowFile, sink.outputStream());
}

@Override
public long contentLength() {
return useChunked ? -1 : requestFlowFile.getSize();
}
};

if (propertyDescriptors.size() > 0 || StringUtils.isNotEmpty(contentKey)) {
// we have form data
MultipartBody.Builder builder = new Builder().setType(MultipartBody.FORM);
boolean useFileName = context.getProperty(PROP_SET_FORM_FILE_NAME).asBoolean();
String contentFileName = null;
if (useFileName) {
contentFileName = requestFlowFile.getAttribute(CoreAttributes.FILENAME.key());
}
// loop through the dynamic form parameters
for (final Map.Entry<String, PropertyDescriptor> entry : propertyDescriptors.entrySet()) {
final String propValue = context.getProperty(entry.getValue().getName())
.evaluateAttributeExpressions(requestFlowFile).getValue();
builder.addFormDataPart(entry.getKey(), propValue);
}
if (sendBody) {
builder.addFormDataPart(contentKey, contentFileName, requestBody);
}
return builder.build();
} else if (sendBody) {
return requestBody;
}
return RequestBody.create(null, new byte[0]);
}

private Request.Builder setHeaderProperties(final ProcessContext context, Request.Builder requestBuilder, final FlowFile requestFlowFile) {
Expand All @@ -1063,6 +1187,12 @@ private Request.Builder setHeaderProperties(final ProcessContext context, Reques
logger.warn(excludedHeaders.get(headerKey), new Object[]{headerKey});
continue;
}

// don't include dynamic form data properties
if ( DYNAMIC_FORM_PARAMETER_NAME.matcher(headerKey).matches()) {
continue;
}

requestBuilder = requestBuilder.addHeader(headerKey, headerValue);
}

Expand Down

0 comments on commit 659a383

Please sign in to comment.