Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-4416 CSVRecordReader does not accept escaped character as delimiter #2172

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -99,6 +99,7 @@
<exclude>src/test/resources/csv/extra-white-space.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
<exclude>src/test/resources/csv/single-bank-account.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account_escapedchar.csv</exclude>
<exclude>src/test/resources/grok/error-with-stack-trace.log</exclude>
<exclude>src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log</exclude>
<exclude>src/test/resources/grok/nifi-log-sample.log</exclude>
Expand Down
Expand Up @@ -19,6 +19,7 @@

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.QuoteMode;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
Expand Down Expand Up @@ -48,15 +49,15 @@ public class CSVUtils {
static final PropertyDescriptor VALUE_SEPARATOR = new PropertyDescriptor.Builder()
.name("Value Separator")
.description("The character that is used to separate values/fields in a CSV Record")
.addValidator(new SingleCharacterValidator())
.addValidator(CSVValidators.UNESCAPED_SINGLE_CHAR_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue(",")
.required(true)
.build();
static final PropertyDescriptor QUOTE_CHAR = new PropertyDescriptor.Builder()
.name("Quote Character")
.description("The character that is used to quote values so that escape characters do not have to be used")
.addValidator(new SingleCharacterValidator())
.addValidator(new CSVValidators.SingleCharacterValidator())
.expressionLanguageSupported(false)
.defaultValue("\"")
.required(true)
Expand Down Expand Up @@ -89,14 +90,14 @@ public class CSVUtils {
static final PropertyDescriptor COMMENT_MARKER = new PropertyDescriptor.Builder()
.name("Comment Marker")
.description("The character that is used to denote the start of a comment. Any line that begins with this comment will be ignored.")
.addValidator(new SingleCharacterValidator())
.addValidator(new CSVValidators.SingleCharacterValidator())
.expressionLanguageSupported(false)
.required(false)
.build();
static final PropertyDescriptor ESCAPE_CHAR = new PropertyDescriptor.Builder()
.name("Escape Character")
.description("The character that is used to escape characters that would otherwise have a specific meaning to the CSV Parser.")
.addValidator(new SingleCharacterValidator())
.addValidator(new CSVValidators.SingleCharacterValidator())
.expressionLanguageSupported(false)
.defaultValue("\\")
.required(true)
Expand Down Expand Up @@ -179,12 +180,16 @@ static CSVFormat createCSVFormat(final ConfigurationContext context) {
}
}

private static char getUnescapedChar(final ConfigurationContext context, final PropertyDescriptor property) {
return StringEscapeUtils.unescapeJava(context.getProperty(property).getValue()).charAt(0);
}

private static char getChar(final ConfigurationContext context, final PropertyDescriptor property) {
return CSVUtils.unescape(context.getProperty(property).getValue()).charAt(0);
}

private static CSVFormat buildCustomFormat(final ConfigurationContext context) {
final char valueSeparator = getChar(context, VALUE_SEPARATOR);
final char valueSeparator = getUnescapedChar(context, VALUE_SEPARATOR);
CSVFormat format = CSVFormat.newFormat(valueSeparator)
.withAllowMissingColumnNames()
.withIgnoreEmptyLines();
Expand Down
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.csv;

import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;

import java.util.HashSet;
import java.util.Set;

public class CSVValidators {

public static class SingleCharacterValidator implements Validator {
private static final Set<String> illegalChars = new HashSet<>();

static {
illegalChars.add("\r");
illegalChars.add("\n");
}

@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {

if (input == null) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation("Input is null for this property")
.build();
}

final String unescaped = CSVUtils.unescape(input);
if (unescaped.length() != 1) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation("Value must be exactly 1 character but was " + input.length() + " in length")
.build();
}

if (illegalChars.contains(unescaped)) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation(input + " is not a valid character for this property")
.build();
}

return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(true)
.build();
}

}

public static final Validator UNESCAPED_SINGLE_CHAR_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {

if (input == null) {
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(false)
.explanation("Input is null for this property")
.build();
}

String unescapeString = unescapeString(input);

return new ValidationResult.Builder()
.subject(subject)
.input(unescapeString)
.explanation("Only non-null single characters are supported")
.valid((unescapeString.length() == 1 && unescapeString.charAt(0) != 0) || context.isExpressionLanguagePresent(input))
.build();
}

private String unescapeString(String input) {
if (input != null && input.length() > 1) {
input = StringEscapeUtils.unescapeJava(input);
}
return input;
}
};

}

This file was deleted.

Expand Up @@ -33,6 +33,7 @@
import java.util.TimeZone;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.SimpleRecordSchema;
Expand All @@ -57,7 +58,7 @@ private List<RecordField> getDefaultFields() {
return fields;
}

private CSVRecordReader createReader(final InputStream in, final RecordSchema schema) throws IOException {
private CSVRecordReader createReader(final InputStream in, final RecordSchema schema, CSVFormat format) throws IOException {
return new CSVRecordReader(in, Mockito.mock(ComponentLog.class), schema, format, true, false,
RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat());
}
Expand Down Expand Up @@ -93,7 +94,7 @@ public void testSimpleParse() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(fields);

try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/single-bank-account.csv"));
final CSVRecordReader reader = createReader(fis, schema)) {
final CSVRecordReader reader = createReader(fis, schema, format)) {

final Object[] record = reader.nextRecord().getValues();
final Object[] expectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
Expand All @@ -111,7 +112,7 @@ public void testMultipleRecords() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(fields);

try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account.csv"));
final CSVRecordReader reader = createReader(fis, schema)) {
final CSVRecordReader reader = createReader(fis, schema, format)) {

final Object[] firstRecord = reader.nextRecord().getValues();
final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
Expand All @@ -133,7 +134,7 @@ public void testExtraWhiteSpace() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(fields);

try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/extra-white-space.csv"));
final CSVRecordReader reader = createReader(fis, schema)) {
final CSVRecordReader reader = createReader(fis, schema, format)) {

final Object[] firstRecord = reader.nextRecord().getValues();
final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
Expand All @@ -160,7 +161,7 @@ public void testMissingField() throws IOException, MalformedRecordException {
final byte[] inputData = csvData.getBytes();

try (final InputStream bais = new ByteArrayInputStream(inputData);
final CSVRecordReader reader = createReader(bais, schema)) {
final CSVRecordReader reader = createReader(bais, schema, format)) {

final Record record = reader.nextRecord();
assertNotNull(record);
Expand Down Expand Up @@ -190,7 +191,7 @@ public void testReadRawWithDifferentFieldName() throws IOException, MalformedRec

// test nextRecord does not contain a 'continent' field
try (final InputStream bais = new ByteArrayInputStream(inputData);
final CSVRecordReader reader = createReader(bais, schema)) {
final CSVRecordReader reader = createReader(bais, schema, format)) {

final Record record = reader.nextRecord();
assertNotNull(record);
Expand All @@ -210,7 +211,7 @@ public void testReadRawWithDifferentFieldName() throws IOException, MalformedRec

// test nextRawRecord does contain 'continent' field
try (final InputStream bais = new ByteArrayInputStream(inputData);
final CSVRecordReader reader = createReader(bais, schema)) {
final CSVRecordReader reader = createReader(bais, schema, format)) {

final Record record = reader.nextRecord(false, false);
assertNotNull(record);
Expand Down Expand Up @@ -241,7 +242,7 @@ public void testFieldInSchemaButNotHeader() throws IOException, MalformedRecordE
final byte[] inputData = csvData.getBytes();

try (final InputStream bais = new ByteArrayInputStream(inputData);
final CSVRecordReader reader = createReader(bais, schema)) {
final CSVRecordReader reader = createReader(bais, schema, format)) {

final Record record = reader.nextRecord();
assertNotNull(record);
Expand Down Expand Up @@ -304,7 +305,7 @@ public void testExtraFieldNotInHeader() throws IOException, MalformedRecordExcep

// test nextRecord does not contain a 'continent' field
try (final InputStream bais = new ByteArrayInputStream(inputData);
final CSVRecordReader reader = createReader(bais, schema)) {
final CSVRecordReader reader = createReader(bais, schema, format)) {

final Record record = reader.nextRecord(false, false);
assertNotNull(record);
Expand All @@ -322,4 +323,30 @@ public void testExtraFieldNotInHeader() throws IOException, MalformedRecordExcep
assertNull(reader.nextRecord(false, false));
}
}

@Test
public void testMultipleRecordsEscapedWithSpecialChar() throws IOException, MalformedRecordException {

char delimiter = StringEscapeUtils.unescapeJava("\u0001").charAt(0);

final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"').withDelimiter(delimiter);
final List<RecordField> fields = getDefaultFields();
fields.replaceAll(f -> f.getFieldName().equals("balance") ? new RecordField("balance", doubleDataType) : f);

final RecordSchema schema = new SimpleRecordSchema(fields);

try (final InputStream fis = new FileInputStream(new File("src/test/resources/csv/multi-bank-account_escapedchar.csv"));
final CSVRecordReader reader = createReader(fis, schema, format)) {

final Object[] firstRecord = reader.nextRecord().getValues();
final Object[] firstExpectedValues = new Object[] {"1", "John Doe", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
Assert.assertArrayEquals(firstExpectedValues, firstRecord);

final Object[] secondRecord = reader.nextRecord().getValues();
final Object[] secondExpectedValues = new Object[] {"2", "Jane Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"};
Assert.assertArrayEquals(secondExpectedValues, secondRecord);

assertNull(reader.nextRecord());
}
}
}