Skip to content

Commit

Permalink
Finished and added test coverage for #34
Browse files Browse the repository at this point in the history
  • Loading branch information
pereferrera committed Dec 17, 2013
1 parent de53d92 commit d5abb34
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 29 deletions.
Expand Up @@ -18,7 +18,9 @@
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Serializable;
import java.io.Writer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
Expand All @@ -37,7 +39,8 @@
* (http://opencsv.sourceforge.net/).
*/
@SuppressWarnings("serial")
public class TupleTextOutputFormat extends FileOutputFormat<ITuple, NullWritable> implements Serializable {
public class TupleTextOutputFormat extends FileOutputFormat<ITuple, NullWritable> implements
Serializable {

public static final char NO_QUOTE_CHARACTER = CSVWriter.NO_QUOTE_CHARACTER;
public static final char NO_ESCAPE_CHARACTER = CSVWriter.NO_ESCAPE_CHARACTER;
Expand All @@ -48,22 +51,22 @@ public class TupleTextOutputFormat extends FileOutputFormat<ITuple, NullWritable
private final char escapeCharacter;
private final boolean addHeader;
private final String nullString;
public TupleTextOutputFormat(Schema schema, boolean addHeader, char separatorCharacter, char quoteCharacter,
char escapeCharacter) {

public TupleTextOutputFormat(Schema schema, boolean addHeader, char separatorCharacter,
char quoteCharacter, char escapeCharacter) {
this(schema, addHeader, separatorCharacter, quoteCharacter, escapeCharacter, null);
}

/**
* You must specify the Schema that will be used for Tuples being written and the CSV semantics (if any). Use
* {@link #NO_ESCAPE_CHARACTER} and {@link #NO_QUOTE_CHARACTER} if you don't want to add CSV semantics to the output.
* If addHeader is true, the name of the Fields in the Schema will be used to add a header to the file.
* <p>
* Use "nullString" to replace nulls with some string.
*/
public TupleTextOutputFormat(Schema schema, boolean addHeader, char separatorCharacter, char quoteCharacter,
char escapeCharacter, String nullString) {
public TupleTextOutputFormat(Schema schema, boolean addHeader, char separatorCharacter,
char quoteCharacter, char escapeCharacter, String nullString) {

this.schema = schema;
this.addHeader = addHeader;
this.separatorCharacter = separatorCharacter;
Expand All @@ -73,36 +76,119 @@ public TupleTextOutputFormat(Schema schema, boolean addHeader, char separatorCha
}

@Override
public RecordWriter<ITuple, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException,
InterruptedException {
public RecordWriter<ITuple, NullWritable> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {

Path file = getDefaultWorkFile(context, "");
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(file.getFileSystem(context.getConfiguration())
.create(file)));
CSVWriter csvWriter = new CSVWriter(writer, separatorCharacter, quoteCharacter, escapeCharacter);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(file.getFileSystem(
context.getConfiguration()).create(file)));
CustomCSVWriter csvWriter = new CustomCSVWriter(writer, separatorCharacter, quoteCharacter,
escapeCharacter, nullString);
if(addHeader) {
String[] header = new String[schema.getFields().size()];
for(int i = 0; i < schema.getFields().size(); i++) {
header[i] = schema.getFields().get(i).getName();
}
csvWriter.writeNext(header);
}
return new TupleTextRecordWriter(schema, csvWriter, nullString);
return new TupleTextRecordWriter(schema, csvWriter);
}

/**
* We had to almost re-implement CSVWriter for properly supporting null strings. We can't reuse a lot of code due to
* inheritance / visibility problems.
*/
public static class CustomCSVWriter {

String nullString;
private Writer rawWriter;
private PrintWriter pw;
private char separator;
private char quotechar;
private char escapechar;

public CustomCSVWriter(Writer writer, char separator, char quotechar, char escapechar,
String nullString) {
this.rawWriter = writer;
this.pw = new PrintWriter(writer);
this.nullString = nullString;
this.separator = separator;
this.quotechar = quotechar;
this.escapechar = escapechar;
}

public void writeNext(String[] toWrite) throws IOException {
if(toWrite == null)
return;

StringBuilder sb = new StringBuilder(CSVWriter.INITIAL_STRING_SIZE);
for(int i = 0; i < toWrite.length; i++) {

if(i != 0) {
sb.append(separator);
}

String nextElement = toWrite[i];
if(nextElement == null) {
if(nullString == null) {
throw new IOException("Null field and no null string specified by constructor.");
}
sb.append(nullString);
} else {
if(quotechar != NO_QUOTE_CHARACTER)
sb.append(quotechar);

sb.append(stringContainsSpecialCharacters(nextElement) ? processLine(nextElement)
: nextElement);

if(quotechar != NO_QUOTE_CHARACTER)
sb.append(quotechar);
}
}

sb.append(CSVWriter.DEFAULT_LINE_END);
pw.write(sb.toString());
}

protected StringBuilder processLine(String nextElement) {
StringBuilder sb = new StringBuilder(CSVWriter.INITIAL_STRING_SIZE);
for(int j = 0; j < nextElement.length(); j++) {
char nextChar = nextElement.charAt(j);
if(escapechar != NO_ESCAPE_CHARACTER && nextChar == quotechar) {
sb.append(escapechar).append(nextChar);
} else if(escapechar != NO_ESCAPE_CHARACTER && nextChar == escapechar) {
sb.append(escapechar).append(nextChar);
} else {
sb.append(nextChar);
}
}

return sb;
}

private boolean stringContainsSpecialCharacters(String line) {
return line.indexOf(quotechar) != -1 || line.indexOf(escapechar) != -1;
}

public void close() throws IOException {
pw.flush();
pw.close();
rawWriter.close();
}

}

public static class TupleTextRecordWriter extends RecordWriter<ITuple, NullWritable> {

private final CSVWriter writer;
private final CustomCSVWriter writer;
private final Schema schema;
private final String[] lineToWrite;
private final String nullString;

public TupleTextRecordWriter(Schema schema, CSVWriter writer, String nullString) {
public TupleTextRecordWriter(Schema schema, CustomCSVWriter writer) {
this.writer = writer;
this.schema = schema;
int nFields = schema.getFields().size();
lineToWrite = new String[nFields];
this.nullString = nullString;
}

@Override
Expand All @@ -118,18 +204,17 @@ public void write(ITuple tuple, NullWritable toIgnore) throws IOException, Inter
+ "] does not match output format Schema [" + schema.getName() + "]");
}
if(tuple.getSchema().getFields().size() != schema.getFields().size()) {
throw new IOException("Input schema has different number of fields [" + tuple.getSchema().getFields().size()
+ "] not matching output format Schema fields [" + schema.getFields().size() + "]");
throw new IOException("Input schema has different number of fields ["
+ tuple.getSchema().getFields().size() + "] not matching output format Schema fields ["
+ schema.getFields().size() + "]");
}
// Convert the tuple to an array of Strings
for(int i = 0; i < tuple.getSchema().getFields().size(); i++) {
Object obj = tuple.get(i);
if(obj != null) {
lineToWrite[i] = obj.toString();
} else if(nullString != null) {
lineToWrite[i] = nullString;
} else {
throw new IOException("Null object in field (" + tuple.getSchema().getField(i).getName() + ") and no null string specified by constructor.");
lineToWrite[i] = null;
}
}
// Write it to the CSV writer
Expand Down
Expand Up @@ -2,17 +2,13 @@

import static org.junit.Assert.assertEquals;

import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.nio.charset.Charset;
import java.util.Arrays;

import org.junit.Test;

import com.datasalt.pangool.tuplemr.mapred.lib.input.NullableCSVTokenizer;
import com.datasalt.pangool.tuplemr.mapred.lib.input.TupleTextInputFormat;
import com.google.common.io.Files;
import com.googlecode.jcsv.CSVStrategy;
import com.googlecode.jcsv.reader.CSVReader;
import com.googlecode.jcsv.reader.internal.CSVReaderBuilder;
Expand Down
Expand Up @@ -333,6 +333,7 @@ public void testNulls() throws IOException, InterruptedException, ClassNotFoundE
TupleMRException, URISyntaxException {

String line1 = "\"Joe\",\\N,,\"\\\"Joan\\\"\",\"\"";
System.out.println(line1);

CommonUtils.writeTXT(line1, new File(IN));
Configuration conf = getConf();
Expand All @@ -347,7 +348,7 @@ public void testNulls() throws IOException, InterruptedException, ClassNotFoundE
MapOnlyJobBuilder mO = new MapOnlyJobBuilder(conf);
mO.addInput(inPath, new TupleTextInputFormat(schema, false, true, ',', '"', '\\',
FieldSelector.NONE, TupleTextInputFormat.NO_NULL_STRING),
new MapOnlyMapper<ITuple, NullWritable, NullWritable, NullWritable>() {
new MapOnlyMapper<ITuple, NullWritable, ITuple, NullWritable>() {

protected void map(ITuple key, NullWritable value, Context context,
MultipleOutputsCollector collector) throws IOException, InterruptedException {
Expand All @@ -358,18 +359,21 @@ protected void map(ITuple key, NullWritable value, Context context,
Assert.assertEquals("Joe", key.get("name"));
Assert.assertEquals("\"Joan\"", key.get("name3"));
Assert.assertEquals("", key.get("emptystring"));
context.write(key, value);
} catch(Throwable t) {
t.printStackTrace();
throw new RuntimeException(t);
}
}
});

mO.setOutput(outPath, new HadoopOutputFormat(NullOutputFormat.class), NullWritable.class,
mO.setOutput(outPath, new TupleTextOutputFormat(schema, false, ',', '"', '\\', "\\N"), NullWritable.class,
NullWritable.class);
Job job = mO.createJob();
try {
assertTrue(job.waitForCompletion(true));
String str = Files.toString(new File(outPath.toString() + "/part-m-00000"), Charset.defaultCharset());
assertEquals("\"Joe\",\\N,\\N,\"\\\"Joan\\\"\",\"\"", str.trim());
} finally {
mO.cleanUpInstanceFiles();
}
Expand Down

0 comments on commit d5abb34

Please sign in to comment.