Skip to content
Permalink
Browse files
BATCHEE-70 added header support for jsefa
  • Loading branch information
rsandtner committed Dec 2, 2015
1 parent 6cb7b37 commit 711054266b945b38477495dc7361199a87f98323
Showing 21 changed files with 1,215 additions and 14 deletions.
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.jsefa;

import org.apache.batchee.doc.api.Documentation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotation to add a custom Header for a {@link net.sf.jsefa.csv.annotation.CsvField}.
*/
@Target(value = ElementType.FIELD)
@Retention(value = RetentionPolicy.RUNTIME)
@Documentation("This annotation is used to specify a custom header for a annotated @CsvField.")
public @interface Header {


/**
* @return header for the annotated field
*/
String value();
}
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.batchee.jsefa;

import net.sf.jsefa.csv.annotation.CsvDataType;
import net.sf.jsefa.csv.annotation.CsvField;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;

public class JSefaCsvMapping {

/**
* storage for CsvHeaders
*/
private final List<String> headers = new ArrayList<String>();


private JSefaCsvMapping(Class<?> type) {
calculateHeaders(type);
}


public Iterable<String> getHeader() {
return headers;
}



private void calculateHeaders(Class<?> type) {

SortedMap<Integer, SortedMap<Integer, String>> allHeaders = new TreeMap<Integer, SortedMap<Integer, String>>();

calculateHeaders(type, allHeaders, 0);

for (SortedMap<Integer, String> headerMap : allHeaders.values()) {

for (String header : headerMap.values()) {
headers.add(header);
}
}
}

private void calculateHeaders(Class<?> type, SortedMap<Integer, SortedMap<Integer, String>> allHeaders, int index) {

if (type.getSuperclass() != Object.class) {
// decrement index for inheritance, because index will be the same
// but if we have another object within OR the same positions specified in the @CsvField annotation
// we override the values
calculateHeaders(type.getSuperclass(), allHeaders, index - 1);
}

SortedMap<Integer, String> typeHeaders = new TreeMap<Integer, String>();

for (Field field : type.getDeclaredFields()) {

CsvField fieldAnnotation = field.getAnnotation(CsvField.class);
if (fieldAnnotation == null) {
continue;
}

if (field.getType().getAnnotation(CsvDataType.class) != null) {
calculateHeaders(field.getType(), allHeaders, fieldAnnotation.pos());
} else {

String header = null;

Header headerAnnotation = field.getAnnotation(Header.class);
if (headerAnnotation != null) {
header = headerAnnotation.value();
}

// use field.getName() as default
if (header == null || header.isEmpty()) {
header = field.getName();
}

String previousField = typeHeaders.put(fieldAnnotation.pos(), header);
if (previousField != null) {
throw new IllegalArgumentException(String.format("multiple fields for position %d defined! Fields: %s! Type: %s", fieldAnnotation.pos(),
previousField + ", " + field.getName(),
type.getName()));
}
}
}

allHeaders.put(index, typeHeaders);
}


public static List<JSefaCsvMapping> forTypes(Class<?>... types) {
List<JSefaCsvMapping> mappings = new ArrayList<JSefaCsvMapping>(types.length);

for (Class<?> type : types) {
mappings.add(new JSefaCsvMapping(type));
}

return mappings;
}

}
@@ -17,7 +17,9 @@
package org.apache.batchee.jsefa;

import net.sf.jsefa.Deserializer;
import net.sf.jsefa.common.lowlevel.filter.HeaderAndFooterFilter;
import net.sf.jsefa.csv.CsvIOFactory;
import net.sf.jsefa.csv.config.CsvConfiguration;
import org.apache.batchee.doc.api.Documentation;

import javax.batch.api.BatchProperty;
@@ -80,14 +82,33 @@ public class JSefaCsvReader extends JSefaReader {
@Documentation("should deliimter be used after last field")
private String useDelimiterAfterLastField;

@Inject
@BatchProperty
@Documentation("should the header be ignored")
private Boolean ignoreHeader;

@Inject
@BatchProperty()
@Documentation("number of header lines")
private Integer headerSize;


@Override
protected Deserializer initDeserializer() throws Exception {
return CsvIOFactory.createFactory(
JsefaConfigurations.newCsvConfiguration(
CsvConfiguration config = JsefaConfigurations.newCsvConfiguration(
defaultNoValueString, defaultQuoteMode, fieldDelimiter, lineBreak, quoteCharacter,
quoteCharacterEscapeMode, useDelimiterAfterLastField, validationMode, validationProvider,
lineFilter, lowLevelConfiguration, lineFilterLimit, objectAccessorProvider,
specialRecordDelimiter, simpleTypeProvider, typeMappingRegistry),
JsefaConfigurations.createObjectTypes(objectTypes)).createDeserializer();
specialRecordDelimiter, simpleTypeProvider, typeMappingRegistry);

if (config.getLineFilter() == null &&
Boolean.TRUE.equals(ignoreHeader) &&
headerSize != null && headerSize > 0) {

config.setLineFilter(new HeaderAndFooterFilter(headerSize, false, false));
}

return CsvIOFactory.createFactory(config, JsefaConfigurations.createObjectTypes(objectTypes))
.createDeserializer();
}
}
@@ -18,10 +18,13 @@

import net.sf.jsefa.Serializer;
import net.sf.jsefa.csv.CsvIOFactory;
import net.sf.jsefa.csv.CsvSerializer;
import net.sf.jsefa.csv.lowlevel.config.CsvLowLevelConfiguration;
import org.apache.batchee.doc.api.Documentation;

import javax.batch.api.BatchProperty;
import javax.inject.Inject;
import java.io.Serializable;

@Documentation("Writes a CSV file using JSefa.")
public class JSefaCsvWriter extends JSefaWriter {
@@ -80,6 +83,17 @@ public class JSefaCsvWriter extends JSefaWriter {
@Documentation("should deliimter be used after last field")
private String useDelimiterAfterLastField;

@Inject
@BatchProperty
@Documentation("the header for the file")
private String header;

@Inject
@BatchProperty
@Documentation("Should the header be calculated from @Header or fieldnames if @Header is not present on fields annotated with @CsvField. " +
"This property will be ignored if the header property is set.")
private Boolean writeHeader;

@Override
protected Serializer createSerializer() throws Exception {
return CsvIOFactory.createFactory(
@@ -91,4 +105,47 @@ protected Serializer createSerializer() throws Exception {
JsefaConfigurations.createObjectTypes(objectTypes))
.createSerializer();
}

@Override
public void open(Serializable checkpoint) throws Exception {
super.open(checkpoint);

// write header only on first run
if (checkpoint != null) {
return;
}

char delimiter;
if (fieldDelimiter != null && !fieldDelimiter.isEmpty()) {
delimiter = fieldDelimiter.charAt(0);
} else {
delimiter = CsvLowLevelConfiguration.Defaults.DEFAULT_FIELD_DELIMITER;
}

//X TODO align behavoir of DefaultBAtchArtifactFactory and CDIBatchArtifactFactory?
//X DefaultBatchArtifactFactory only resolves properties which are set, while
//X CDIBatchArtifactFactory resolves every field...
if (header == null && writeHeader != null && writeHeader) {
Class<?>[] classes = JsefaConfigurations.createObjectTypes(objectTypes);

StringBuilder headerBuilder = new StringBuilder(50);
for (JSefaCsvMapping mapping : JSefaCsvMapping.forTypes(classes)) {

for (String headerPart : mapping.getHeader()) {

if (headerBuilder.length() > 0) {
headerBuilder.append(delimiter);
}

headerBuilder.append(headerPart);
}
}

header = headerBuilder.toString();
}

if (header != null && !header.trim().isEmpty()) {
((CsvSerializer) serializer).getLowLevelSerializer().writeLine(header.trim());
}
}
}
@@ -16,9 +16,14 @@
*/
package org.apache.batchee.jsefa;

import org.apache.batchee.extras.typed.TypedItemProcessor;
import org.apache.batchee.jsefa.bean.Address;
import org.apache.batchee.jsefa.bean.PersonWithAddress;
import org.apache.batchee.jsefa.bean.Record;
import org.apache.batchee.jsefa.util.CsvUtil;
import org.apache.batchee.jsefa.util.IOs;
import org.apache.batchee.util.Batches;
import org.testng.Assert;
import org.testng.annotations.Test;

import javax.batch.api.chunk.ItemProcessor;
@@ -51,6 +56,33 @@ public void read() throws Exception {
}
}


@Test
public void testReadWithHeader() {
String path = "target/work/JsefaCsvReaderWithHeader.csv";

Properties properties = new Properties();
properties.setProperty("input", path);

StringBuilder csvBuilder = new StringBuilder(200);
for (int i = 0; i < 10; i++) {
csvBuilder.append(IOs.LINE_SEPARATOR)
.append(CsvUtil.toCsv(new PersonWithAddress("firstName_" + i,
"lastName_" + i,
new Address("street_" + i,
"zip_" + i,
"city_" + i))));
}

IOs.write(path, csvBuilder.toString());

JobOperator operator = BatchRuntime.getJobOperator();
Batches.waitForEnd(operator, operator.start("jsefa-csv-reader-header", properties));

Assert.assertEquals(Storage.ITEMS.size(), 10);

}

public static class StoreItems implements ItemProcessor {
public static final List<Record> ITEMS = new ArrayList<Record>(3);

@@ -60,4 +92,15 @@ public Object processItem(final Object item) throws Exception {
return item;
}
}

public static class Storage extends TypedItemProcessor<PersonWithAddress, PersonWithAddress> {

static final List<PersonWithAddress> ITEMS = new ArrayList<PersonWithAddress>(10);

@Override
protected PersonWithAddress doProcessItem(PersonWithAddress item) {
ITEMS.add(item);
return item;
}
}
}

0 comments on commit 7110542

Please sign in to comment.