Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,6 @@
*/
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import jakarta.activation.DataHandler;
import jakarta.mail.Authenticator;
import jakarta.mail.Message;
Expand All @@ -50,7 +32,6 @@
import jakarta.mail.internet.MimeUtility;
import jakarta.mail.internet.PreencodedMimeBodyPart;
import jakarta.mail.util.ByteArrayDataSource;

import org.apache.commons.codec.binary.Base64;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
Expand Down Expand Up @@ -79,6 +60,24 @@
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@SupportsBatching
@Tags({"email", "put", "notify", "smtp"})
@InputRequirement(Requirement.INPUT_REQUIRED)
Expand Down Expand Up @@ -245,6 +244,17 @@ public class PutEmail extends AbstractProcessor {
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final PropertyDescriptor INPUT_CHARACTER_SET = new PropertyDescriptor.Builder()
.name("input-character-set")
.displayName("Input Character Set")
.description("Specifies the character set of the FlowFile contents "
+ "for reading input FlowFile contents to generate the message body "
+ "or as an attachment to the message. "
+ "If not set, UTF-8 will be the default value.")
.required(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue(StandardCharsets.UTF_8.name())
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
Expand All @@ -255,7 +265,6 @@ public class PutEmail extends AbstractProcessor {
.description("FlowFiles that fail to send will be routed to this relationship")
.build();

private static final Charset CONTENT_CHARSET = StandardCharsets.UTF_8;

private List<PropertyDescriptor> properties;

Expand Down Expand Up @@ -297,8 +306,10 @@ protected void init(final ProcessorInitializationContext context) {
properties.add(SUBJECT);
properties.add(MESSAGE);
properties.add(CONTENT_AS_MESSAGE);
properties.add(INPUT_CHARACTER_SET);
properties.add(ATTACH_FILE);
properties.add(INCLUDE_ALL_ATTRIBUTES);

this.properties = Collections.unmodifiableList(properties);

final Set<Relationship> relationships = new HashSet<>();
Expand Down Expand Up @@ -390,13 +401,25 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final String messageText = getMessage(flowFile, context, session);

final String contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions(flowFile).getValue();
message.setContent(messageText, contentType);
final Charset charset = getCharset(context);

message.setContent(messageText, contentType + String.format("; charset=\"%s\"", MimeUtility.mimeCharset(charset.name())));

message.setSentDate(new Date());

if (context.getProperty(ATTACH_FILE).asBoolean()) {
final MimeBodyPart mimeText = new PreencodedMimeBodyPart("base64");
mimeText.setDataHandler(new DataHandler(new ByteArrayDataSource(
Base64.encodeBase64(messageText.getBytes(CONTENT_CHARSET)), contentType + "; charset=\"utf-8\"")));
final String encoding = getEncoding(context);
final MimeBodyPart mimeText = new PreencodedMimeBodyPart(encoding);
final byte[] messageBytes = messageText.getBytes(charset);
final byte[] encodedMessageBytes = "base64".equals(encoding) ? Base64.encodeBase64(messageBytes) : messageBytes;
final DataHandler messageDataHandler = new DataHandler(
new ByteArrayDataSource(
encodedMessageBytes,
contentType + String.format("; charset=\"%s\"", MimeUtility.mimeCharset(charset.name()))
)
);
mimeText.setDataHandler(messageDataHandler);
mimeText.setHeader("Content-Transfer-Encoding", MimeUtility.getEncoding(mimeText.getDataHandler()));
final MimeBodyPart mimeFile = new MimeBodyPart();
session.read(flowFile, stream -> {
try {
Expand All @@ -406,13 +429,21 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
});

mimeFile.setFileName(MimeUtility.encodeText(flowFile.getAttribute(CoreAttributes.FILENAME.key()), CONTENT_CHARSET.name(), null));
mimeFile.setFileName(MimeUtility.encodeText(flowFile.getAttribute(CoreAttributes.FILENAME.key()), charset.name(), null));
mimeFile.setHeader("Content-Transfer-Encoding", MimeUtility.getEncoding(mimeFile.getDataHandler()));
final MimeMultipart multipart = new MimeMultipart();
multipart.addBodyPart(mimeText);
multipart.addBodyPart(mimeFile);

message.setContent(multipart);
} else {
// message is not a Multipart, need to set Content-Transfer-Encoding header at the message level
message.setHeader("Content-Transfer-Encoding", MimeUtility.getEncoding(message.getDataHandler()));
}


message.saveChanges();

send(message);

session.getProvenanceReporter().send(flowFile, "mailto:" + message.getAllRecipients()[0].toString());
Expand All @@ -433,7 +464,8 @@ private String getMessage(final FlowFile flowFile, final ProcessContext context,
final byte[] byteBuffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, byteBuffer, false));

messageText = new String(byteBuffer, 0, byteBuffer.length, CONTENT_CHARSET);
final Charset charset = getCharset(context);
messageText = new String(byteBuffer, 0, byteBuffer.length, charset);
} else if (context.getProperty(MESSAGE).isSet()) {
messageText = context.getProperty(MESSAGE).evaluateAttributeExpressions(flowFile).getValue();
}
Expand Down Expand Up @@ -588,4 +620,27 @@ public ValidationResult validate(String subject, String input, ValidationContext
.build();
}
}

/**
* Utility function to get a charset from the {@code INPUT_CHARACTER_SET} property
* @param context the ProcessContext
* @return the Charset
*/
private Charset getCharset(final ProcessContext context) {
return Charset.forName(context.getProperty(INPUT_CHARACTER_SET).getValue());
}

/**
* Utility function to get the correct encoding from the {@code INPUT_CHARACTER_SET} property
* @param context the ProcessContext
* @return the encoding
*/
private String getEncoding(final ProcessContext context) {
final Charset charset = Charset.forName(context.getProperty(INPUT_CHARACTER_SET).getValue());
if (Charset.forName("US-ASCII").equals(charset)) {
return "7bit";
}
// Every other charset in StandardCharsets use 8 bits or more. Using base64 encoding by default
return "base64";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.junit.jupiter.api.Test;

import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -41,6 +42,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

public class TestPutEmail {
Expand Down Expand Up @@ -115,6 +117,7 @@ public void testOutgoingMessage() throws Exception {
runner.setProperty(PutEmail.FROM, "test@apache.org");
runner.setProperty(PutEmail.MESSAGE, "Message Body");
runner.setProperty(PutEmail.TO, "recipient@apache.org");
runner.setProperty(PutEmail.INPUT_CHARACTER_SET, StandardCharsets.UTF_8.name());

runner.enqueue("Some Text".getBytes());

Expand All @@ -128,7 +131,7 @@ public void testOutgoingMessage() throws Exception {
Message message = processor.getMessages().get(0);
assertEquals("test@apache.org", message.getFrom()[0].toString());
assertEquals("TestingNiFi", message.getHeader("X-Mailer")[0], "X-Mailer Header");
assertEquals("Message Body", message.getContent());
assertEquals("Message Body", getMessageText(message, StandardCharsets.UTF_8));
assertEquals("recipient@apache.org", message.getRecipients(RecipientType.TO)[0].toString());
assertNull(message.getRecipients(RecipientType.BCC));
assertNull(message.getRecipients(RecipientType.CC));
Expand All @@ -145,6 +148,7 @@ public void testOutgoingMessageWithOptionalProperties() throws Exception {
runner.setProperty(PutEmail.BCC, "${bcc}");
runner.setProperty(PutEmail.CC, "${cc}");
runner.setProperty(PutEmail.ATTRIBUTE_NAME_REGEX, "Precedence.*");
runner.setProperty(PutEmail.INPUT_CHARACTER_SET, StandardCharsets.UTF_8.name());

Map<String, String> attributes = new HashMap<>();
attributes.put("from", "test@apache.org <NiFi>");
Expand All @@ -166,7 +170,7 @@ public void testOutgoingMessageWithOptionalProperties() throws Exception {
Message message = processor.getMessages().get(0);
assertEquals("\"test@apache.org\" <NiFi>", message.getFrom()[0].toString());
assertEquals("TestingNíFiNonASCII", MimeUtility.decodeText(message.getHeader("X-Mailer")[0]), "X-Mailer Header");
assertEquals("the message body", message.getContent());
assertEquals("the message body", getMessageText(message, StandardCharsets.UTF_8));
assertEquals(1, message.getRecipients(RecipientType.TO).length);
assertEquals("to@apache.org", message.getRecipients(RecipientType.TO)[0].toString());
assertEquals(1, message.getRecipients(RecipientType.BCC).length);
Expand Down Expand Up @@ -220,6 +224,8 @@ public void testOutgoingMessageAttachment() throws Exception {
runner.setProperty(PutEmail.MESSAGE, "Message Body");
runner.setProperty(PutEmail.ATTACH_FILE, "true");
runner.setProperty(PutEmail.CONTENT_TYPE, "text/html");
runner.setProperty(PutEmail.TO, "recipient@apache.org");
runner.setProperty(PutEmail.INPUT_CHARACTER_SET, StandardCharsets.UTF_8.name());

Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(), "test한的ほу́.pdf");
Expand All @@ -240,10 +246,8 @@ public void testOutgoingMessageAttachment() throws Exception {
assertInstanceOf(MimeMultipart.class, message.getContent());

final MimeMultipart multipart = (MimeMultipart) message.getContent();
final BodyPart part = multipart.getBodyPart(0);
final InputStream is = part.getDataHandler().getInputStream();
final String decodedText = StringUtils.newStringUtf8(Base64.decodeBase64(IOUtils.toString(is, StandardCharsets.UTF_8)));
assertEquals("Message Body", decodedText);

assertEquals("Message Body", getMessageText(message, StandardCharsets.UTF_8));

final BodyPart attachPart = multipart.getBodyPart(1);
final InputStream attachIs = attachPart.getDataHandler().getInputStream();
Expand All @@ -263,6 +267,7 @@ public void testOutgoingMessageWithFlowfileContent() throws Exception {
runner.setProperty(PutEmail.CC, "recipientcc@apache.org,anothercc@apache.org");
runner.setProperty(PutEmail.BCC, "recipientbcc@apache.org,anotherbcc@apache.org");
runner.setProperty(PutEmail.CONTENT_AS_MESSAGE, "${sendContent}");
runner.setProperty(PutEmail.INPUT_CHARACTER_SET, StandardCharsets.UTF_8.name());

Map<String, String> attributes = new HashMap<>();
attributes.put("sendContent", "true");
Expand All @@ -280,7 +285,7 @@ public void testOutgoingMessageWithFlowfileContent() throws Exception {
assertEquals("test@apache.org", message.getFrom()[0].toString());
assertEquals("from@apache.org", message.getFrom()[1].toString());
assertEquals("TestingNiFi", message.getHeader("X-Mailer")[0], "X-Mailer Header");
assertEquals("Some Text", message.getContent());
assertEquals("Some Text", getMessageText(message, StandardCharsets.UTF_8));
assertEquals("recipient@apache.org", message.getRecipients(RecipientType.TO)[0].toString());
assertEquals("another@apache.org", message.getRecipients(RecipientType.TO)[1].toString());
assertEquals("recipientcc@apache.org", message.getRecipients(RecipientType.CC)[0].toString());
Expand All @@ -307,7 +312,6 @@ public void testInvalidDynamicMailPropertyName() {
runner.setProperty(PutEmail.CONTENT_AS_MESSAGE, "${sendContent}");

runner.setProperty("mail.", "sample_value");
runner.assertNotValid();
}

@Test
Expand All @@ -320,11 +324,70 @@ public void testOverwritingDynamicMailProperty() {
runner.assertNotValid();
}

@Test
public void testUnrecognizedCharset() {
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host");
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
runner.setProperty(PutEmail.FROM, "test@apache.org");
runner.setProperty(PutEmail.MESSAGE, "test message");
runner.setProperty(PutEmail.TO, "recipient@apache.org");

// not one of the recognized charsets
runner.setProperty(PutEmail.INPUT_CHARACTER_SET, "NOT A CHARACTER SET");

runner.assertNotValid();
}

@Test
public void testPutEmailWithMismatchedCharset() throws Exception {
// String specifically chosen to have characters encoded differently in US_ASCII and UTF_8
final String rawString = "SoftwÄrë Ënginëër Ön NiFi";
final byte[] rawBytes = rawString.getBytes(StandardCharsets.US_ASCII);
final byte[] rawBytesUTF8 = rawString.getBytes(StandardCharsets.UTF_8);

// verify that the message bytes are different (some messages are not)
assertNotEquals(rawBytes, rawBytesUTF8);

runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host");
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
runner.setProperty(PutEmail.FROM, "test@apache.org");
runner.setProperty(PutEmail.MESSAGE, new String(rawBytesUTF8, StandardCharsets.US_ASCII));
runner.setProperty(PutEmail.TO, "recipient@apache.org");
runner.setProperty(PutEmail.INPUT_CHARACTER_SET, StandardCharsets.UTF_8.name());

runner.enqueue("Some Text".getBytes());

runner.run();

runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(PutEmail.REL_SUCCESS);

// Verify that the Message was populated correctly
assertEquals(1, processor.getMessages().size(), "Expected a single message to be sent");
Message message = processor.getMessages().get(0);
final String retrievedMessageText = getMessageText(message, StandardCharsets.UTF_8);
assertNotEquals(rawString, retrievedMessageText);
}

private void setRequiredProperties(final TestRunner runner) {
// values here may be overridden in some tests
runner.setProperty(PutEmail.SMTP_HOSTNAME, "smtp-host");
runner.setProperty(PutEmail.HEADER_XMAILER, "TestingNiFi");
runner.setProperty(PutEmail.FROM, "test@apache.org,from@apache.org");
runner.setProperty(PutEmail.TO, "recipient@apache.org,another@apache.org");
}

private String getMessageText(final Message message, final Charset charset) throws Exception {
if (message.getContent() instanceof MimeMultipart) {
final MimeMultipart multipart = (MimeMultipart) message.getContent();
final BodyPart part = multipart.getBodyPart(0);
final InputStream is = part.getDataHandler().getInputStream();
final String encoding = Charset.forName("US-ASCII").equals(charset) ? "7bit" : "base64";
final byte[] decodedTextBytes = "base64".equals(encoding) ? Base64.decodeBase64(IOUtils.toByteArray(is)) : IOUtils.toByteArray(is);
final String decodedText = StringUtils.newString(decodedTextBytes, charset.name());
return decodedText;
} else {
return (String) message.getContent();
}
}
}