Skip to content

Commit

Permalink
DRILL-5498: Better handling of CSV column headers
Browse files Browse the repository at this point in the history
See DRILL-5498 for details.

Replaced the repeated varchar reader for reading columns with a purpose
built column parser. Implemented rules to recover from invalid column
headers.
  • Loading branch information
Paul Rogers committed May 19, 2017
1 parent 41ffed5 commit 68ce6dc
Show file tree
Hide file tree
Showing 5 changed files with 660 additions and 9 deletions.
Expand Up @@ -179,11 +179,7 @@ public void setup(OperatorContext context, OutputMutator outputMutator) throws E
// don't skip header in case skipFirstLine is set true
settings.setSkipFirstLine(false);

// setup Output using OutputMutator
// we should use a separate output mutator to avoid reshaping query output with header data
HeaderOutputMutator hOutputMutator = new HeaderOutputMutator();
TextOutput hOutput = new RepeatedVarCharOutput(hOutputMutator, getColumns(), true);
this.allocate(hOutputMutator.fieldVectorMap);
HeaderBuilder hOutput = new HeaderBuilder();

// setup Input using InputStream
// we should read file header irrespective of split given given to this reader
Expand All @@ -198,11 +194,10 @@ public void setup(OperatorContext context, OutputMutator outputMutator) throws E
reader.parseNext();

// grab the field names from output
String [] fieldNames = ((RepeatedVarCharOutput)hOutput).getTextOutput();
String [] fieldNames = hOutput.getHeaders();

// cleanup and set to skip the first line next time we read input
reader.close();
hOutputMutator.close();
settings.setSkipFirstLine(true);

return fieldNames;
Expand Down
@@ -0,0 +1,274 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.easy.text.compliant;

import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import com.google.common.base.Charsets;

/**
* Text output that implements a header reader/parser.
* The caller parses out the characters of each header;
* this class assembles UTF-8 bytes into Unicode characters,
* fixes invalid characters (those not legal for SQL symbols),
* and maps duplicate names to unique names.
* <p>
* That is, this class is as permissive as possible with file
* headers to avoid spurious query failures for trivial reasons.
*/

// Note: this class uses Java heap strings and the usual Java
// convenience classes. Since we do heavy Unicode string operations,
// and read a single row, there is no good reason to try to use
// value vectors and direct memory for this task.

public class HeaderBuilder extends TextOutput {

/**
* Maximum Drill symbol length, as enforced for headers.
* @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier">
* identifier documentation</a>
*/
// TODO: Replace with the proper constant, if available
public static final int MAX_HEADER_LEN = 1024;

/**
* Prefix used to replace non-alphabetic characters at the start of
* a column name. For example, $foo becomes col_foo. Used
* because SQL does not allow _foo.
*/

public static final String COLUMN_PREFIX = "col_";

/**
* Prefix used to create numbered columns for missing
* headers. Typical names: column_1, column_2, ...
*/

public static final String ANONYMOUS_COLUMN_PREFIX = "column_";

/**
* Exception that reports header errors. Is an unchecked exception
* to avoid cluttering the normal field reader interface.
*/
public static class HeaderError extends RuntimeException {

private static final long serialVersionUID = 1L;

public HeaderError(String msg) {
super(msg);
}

public HeaderError(int colIndex, String msg) {
super("Column " + (colIndex + 1) + ": " + msg);
}
}

public final List<String> headers = new ArrayList<>();
public final ByteBuffer currentField = ByteBuffer.allocate(MAX_HEADER_LEN);

@Override
public void startField(int index) {
currentField.clear();
}

@Override
public boolean endField() {
String header = new String(currentField.array(), 0, currentField.position(), Charsets.UTF_8);
header = validateSymbol(header);
headers.add(header);
return true;
}

@Override
public boolean endEmptyField() {

// Empty header will be rewritten to "column_<n>".

return endField();
}

/**
* Validate the header name according to the SQL lexical rules.
* @see <a href="https://drill.apache.org/docs/lexical-structure/#identifier">
* identifier documentation</a>
* @param header the header name to validate
*/

// TODO: Replace with existing code, if any.
private String validateSymbol(String header) {
header = header.trim();

// To avoid unnecessary query failures, just make up a column name
// if the name is missing or all blanks.

if (header.isEmpty()) {
return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1);
}
if (! Character.isAlphabetic(header.charAt(0))) {
return rewriteHeader(header);
}
for (int i = 1; i < header.length(); i++) {
char ch = header.charAt(i);
if (! Character.isAlphabetic(ch) &&
! Character.isDigit(ch) && ch != '_') {
return rewriteHeader(header);
}
}
return header;
}

/**
* Given an invalid header, rewrite it to replace illegal characters
* with valid ones. The header won't be what the user specified,
* but it will be a valid SQL identifier. This solution avoids failing
* queries due to corrupted or invalid header data.
* <p>
* Names with invalid first characters are mapped to "col_". Example:
* $foo maps to col_foo. If the only character is non-alphabetic, treat
* the column as anonymous and create a generic name: column_4, etc.
* <p>
* This mapping could create a column that exceeds the maximum length
* of 1024. Since that is not really a hard limit, we just live with the
* extra few characters.
*
* @param header the original header
* @return the rewritten header, valid for SQL
*/

private String rewriteHeader(String header) {
final StringBuilder buf = new StringBuilder();

// If starts with non-alphabetic, can't map the character to
// underscore, so just tack on a prefix.

char ch = header.charAt(0);
if (Character.isAlphabetic(ch)) {
buf.append(ch);
} else if (Character.isDigit(ch)) {
buf.append(COLUMN_PREFIX);
buf.append(ch);

// For the strange case of only one character, format
// the same as an empty header.

} else if (header.length() == 1) {
return ANONYMOUS_COLUMN_PREFIX + (headers.size() + 1);
} else {
buf.append(COLUMN_PREFIX);
}

// Convert all remaining invalid characters to underscores

for (int i = 1; i < header.length(); i++) {
ch = header.charAt(i);
if (Character.isAlphabetic(ch) ||
Character.isDigit(ch) || ch == '_') {
buf.append(ch);
} else {
buf.append("_");
}
}
return buf.toString();
}

@Override
public void append(byte data) {

// Ensure the data fits. Note that, if the name is Unicode, the actual
// number of characters might be less than the limit even though the
// byte count exceeds the limit. Fixing this, in general, would require
// a buffer four times larger, so we leave that as a later improvement
// if ever needed.

try {
currentField.put(data);
} catch (BufferOverflowException e) {
throw new HeaderError(headers.size(), "Column exceeds maximum length of " + MAX_HEADER_LEN);
}
}

@Override
public void finishRecord() {
if (headers.isEmpty()) {
throw new HeaderError("The file must define at least one header.");
}

// Force headers to be unique.

final Set<String> idents = new HashSet<String>();
for (int i = 0; i < headers.size(); i++) {
String header = headers.get(i);
String key = header.toLowerCase();

// Is the header a duplicate?

if (idents.contains(key)) {

// Make header unique by appending a suffix.
// This loop must end because we have a finite
// number of headers.
// The original column is assumed to be "1", so
// the first duplicate is "2", and so on.
// Note that this will map columns of the form:
// "col,col,col_2,col_2_2" to
// "col", "col_2", "col_2_2", "col_2_2_2".
// No mapping scheme is perfect...

for (int l = 2; ; l++) {
final String rewritten = header + "_" + l;
key = rewritten.toLowerCase();
if (! idents.contains(key)) {
headers.set(i, rewritten);
break;
}
}
}
idents.add(key);
}
}

@Override
public long getRecordCount() { return 1; }

@Override
public void startBatch() { }

@Override
public void finishBatch() { }

@Override
public boolean rowHasData() {
return ! headers.isEmpty();
}

public String[] getHeaders() {

// Just return the headers: any needed checks were done in
// finishRecord()

final String array[] = new String[headers.size()];
return headers.toArray(array);
}

}

0 comments on commit 68ce6dc

Please sign in to comment.