Skip to content
Permalink
Browse files
BATCHEE-72 commons-csv integration
  • Loading branch information
Romain Manni-Bucau committed Nov 20, 2015
1 parent a44fad9 commit 20fad4b9d3a00a710a5b0b5736b8fdc0eced5ec9
Show file tree
Hide file tree
Showing 30 changed files with 1,653 additions and 17 deletions.
@@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>batchee-extensions</artifactId>
<groupId>org.apache.batchee</groupId>
<version>0.3-incubating-SNAPSHOT</version>
</parent>

<artifactId>batchee-commons-csv</artifactId>
<name>BatchEE :: Extensions :: Commons CSV</name>

<dependencies>
<dependency>
<groupId>org.apache.batchee</groupId>
<artifactId>batchee-extras</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.2</version>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.csv;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.QuoteMode;

class CSVFormatFactory {
private CSVFormatFactory() {
// no-op
}
//CHECKSTYLE:OFF
static CSVFormat newFormat(final String format,
final String delimiter,
final String quoteCharacter,
final String quoteMode,
final String commentMarker,
final String escapeCharacter,
final String ignoreSurroundingSpaces,
final String ignoreEmptyLines,
final String recordSeparator,
final String nullString,
final String headerComments,
final String header,
final String skipHeaderRecord,
final String allowMissingColumnNames,
final String readHeaders) {
//CHECKSTYLE:ON
CSVFormat out = format == null ? CSVFormat.DEFAULT : CSVFormat.valueOf(format);
if (delimiter != null) {
out = out.withDelimiter(delimiter.charAt(0));
}
if (quoteCharacter != null) {
out = out.withQuote(quoteCharacter.charAt(0));
}
if (quoteMode != null) {
out = out.withQuoteMode(QuoteMode.valueOf(quoteMode));
}
if (commentMarker != null) {
out = out.withCommentMarker(commentMarker.charAt(0));
}
if (escapeCharacter != null) {
out = out.withEscape(escapeCharacter.charAt(0));
}
if (ignoreSurroundingSpaces != null) {
out = out.withIgnoreSurroundingSpaces(Boolean.parseBoolean(ignoreSurroundingSpaces));
}
if (ignoreEmptyLines != null) {
out = out.withIgnoreEmptyLines(Boolean.parseBoolean(ignoreEmptyLines));
}
if (recordSeparator != null) {
out = out.withRecordSeparator(recordSeparator.charAt(0));
}
if (nullString != null) {
out = out.withNullString(nullString);
}
if (headerComments != null && !headerComments.trim().isEmpty()) {
out = out.withHeaderComments(headerComments.split(" *, *"));
}
if (Boolean.parseBoolean(readHeaders)) {
out = out.withHeader();
}
if (header != null && !header.trim().isEmpty()) {
out = out.withHeader(header.split(" *, *"));
}
if (skipHeaderRecord != null) {
out = out.withSkipHeaderRecord(Boolean.parseBoolean(skipHeaderRecord));
}
if (allowMissingColumnNames != null) {
out = out.withAllowMissingColumnNames(Boolean.parseBoolean(allowMissingColumnNames));
}
return out;
}
}
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.csv;

import org.apache.batchee.csv.mapper.DefaultMapper;
import org.apache.batchee.extras.buffered.IteratorReader;
import org.apache.batchee.extras.locator.BeanLocator;
import org.apache.batchee.extras.transaction.CountedReader;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;

import javax.batch.api.BatchProperty;
import javax.inject.Inject;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;

public class CommonsCsvReader extends CountedReader {
private static final CsvReaderMapper<CSVRecord> NOOP_MAPPER = new CsvReaderMapper<CSVRecord>() {
@Override
public CSVRecord fromRecord(final CSVRecord record) {
return record;
}
};

@Inject
@BatchProperty
private String format;

@Inject
@BatchProperty
private String input;

@Inject
@BatchProperty
private String encoding;

@Inject
@BatchProperty
private String mapper;

@Inject
@BatchProperty
private String mapping;

@Inject
@BatchProperty
private String locator;

@Inject
@BatchProperty
private String allowMissingColumnNames;

@Inject
@BatchProperty
private String delimiter;

@Inject
@BatchProperty
private String quoteCharacter;

@Inject
@BatchProperty
private String quoteMode;

@Inject
@BatchProperty
private String commentMarker;

@Inject
@BatchProperty
private String escapeCharacter;

@Inject
@BatchProperty
private String ignoreSurroundingSpaces;

@Inject
@BatchProperty
private String ignoreEmptyLines;

@Inject
@BatchProperty
private String recordSeparator;

@Inject
@BatchProperty
private String nullString;

@Inject
@BatchProperty
private String headerComments;

@Inject
@BatchProperty
private String header;

@Inject
@BatchProperty
private String skipHeaderRecord;

@Inject
@BatchProperty
private String readHeaders;

private IteratorReader<CSVRecord> iterator;
private CSVParser parser;
private BeanLocator.LocatorInstance<CsvReaderMapper> mapperInstance;

@Override
public void open(final Serializable checkpoint) throws Exception {
final CSVFormat csvFormat = newFormat();
parser = csvFormat.parse(newReader());
iterator = new IteratorReader<CSVRecord>(parser.iterator());

mapperInstance = mapper == null ?
new BeanLocator.LocatorInstance<CsvReaderMapper>(
mapping != null ? new DefaultMapper(Thread.currentThread().getContextClassLoader().loadClass(mapping)) : NOOP_MAPPER, null) :
BeanLocator.Finder.get(locator).newInstance(CsvReaderMapper.class, mapper);


super.open(checkpoint);
}

@Override
protected Object doRead() throws Exception {
final CSVRecord read = iterator.read();
return read != null ? mapperInstance.getValue().fromRecord(read) : null;
}

@Override
public void close() throws Exception {
mapperInstance.release();
if (parser != null) {
parser.close();
}
}

protected Reader newReader() {
try { // no need of BufferedReader since [csv] does it
return encoding != null ? new InputStreamReader(new FileInputStream(input), encoding) : new FileReader(input);
} catch (final FileNotFoundException e) {
throw new IllegalArgumentException(e);
} catch (final UnsupportedEncodingException e) {
throw new IllegalArgumentException(e);
}
}

protected CSVFormat newFormat() {
return CSVFormatFactory.newFormat(
format, delimiter, quoteCharacter, quoteMode, commentMarker, escapeCharacter, ignoreSurroundingSpaces,
ignoreEmptyLines, recordSeparator, nullString, headerComments, header, skipHeaderRecord, allowMissingColumnNames,
readHeaders);
}
}

0 comments on commit 20fad4b

Please sign in to comment.