Skip to content

Commit

Permalink
XmlIO and XmlSource now take an optional validationEventHandler to th…
Browse files Browse the repository at this point in the history
…row a runtime exception when validations failed (by example)
  • Loading branch information
dgouyette committed Apr 27, 2017
1 parent 6ca7c51 commit 98164e8
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 24 deletions.
Expand Up @@ -27,6 +27,8 @@
import javax.annotation.Nullable;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.ValidationEventHandler;

import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CompressedSource;
import org.apache.beam.sdk.io.FileBasedSink;
Expand Down Expand Up @@ -253,6 +255,9 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>

abstract Builder<T> toBuilder();

@Nullable
abstract ValidationEventHandler getValidationEventHandler();

@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setFileOrPatternSpec(String fileOrPatternSpec);
Expand All @@ -269,6 +274,8 @@ abstract static class Builder<T> {

abstract Builder<T> setCharset(String charset);

abstract Builder<T> setValidationEventHandler(ValidationEventHandler validationEventHandler);

abstract Read<T> build();
}

Expand Down Expand Up @@ -365,6 +372,13 @@ public Read<T> withCharset(Charset charset) {
return toBuilder().setCharset(charset.name()).build();
}

/**
* Sets the Validation handler.
*/
public Read<T> withValidationEventHandler(ValidationEventHandler validationEventHandler) {
return toBuilder().setValidationEventHandler(validationEventHandler).build();
}

@Override
public void validate(PBegin input) {
checkNotNull(
Expand Down Expand Up @@ -401,7 +415,7 @@ public void populateDisplayData(DisplayData.Builder builder) {

@VisibleForTesting
BoundedSource<T> createSource() {
XmlSource<T> source = new XmlSource<>(this);
XmlSource<T> source = new XmlSource<>(this, getValidationEventHandler());
switch (getCompressionType()) {
case UNCOMPRESSED:
return source;
Expand Down
Expand Up @@ -33,7 +33,6 @@
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.bind.ValidationEvent;
import javax.xml.bind.ValidationEventHandler;
import javax.xml.stream.FactoryConfigurationError;
import javax.xml.stream.XMLInputFactory;
Expand All @@ -55,24 +54,30 @@ public class XmlSource<T> extends FileBasedSource<T> {

private final XmlIO.Read<T> spec;

XmlSource(XmlIO.Read<T> spec) {
private ValidationEventHandler validationEventHandler;

XmlSource(XmlIO.Read<T> spec, ValidationEventHandler validationEventHandler) {
super(StaticValueProvider.of(spec.getFileOrPatternSpec()), spec.getMinBundleSize());
this.spec = spec;
this.validationEventHandler = validationEventHandler;
}

private XmlSource(XmlIO.Read<T> spec, Metadata metadata, long startOffset, long endOffset) {
private XmlSource(XmlIO.Read<T> spec, Metadata metadata, long startOffset, long endOffset,
ValidationEventHandler validationEventHandler) {
super(metadata, spec.getMinBundleSize(), startOffset, endOffset);
this.spec = spec;
this.validationEventHandler = validationEventHandler;
}

@Override
protected FileBasedSource<T> createForSubrangeOfFile(Metadata metadata, long start, long end) {
return new XmlSource<T>(spec.from(metadata.toString()), metadata, start, end);
return new XmlSource<T>(spec.from(metadata.toString()), metadata, start, end,
validationEventHandler);
}

@Override
protected FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
return new XMLReader<T>(this);
return new XMLReader<T>(this, validationEventHandler);
}

@Override
Expand All @@ -91,6 +96,11 @@ public Coder<T> getDefaultOutputCoder() {
return JAXBCoder.of(spec.getRecordClass());
}

public ValidationEventHandler getValidationEventHandler() {
return validationEventHandler;
}


/**
* A {@link Source.Reader} for reading JAXB annotated Java objects from an XML file. The XML
* file should be of the form defined at {@link XmlSource}.
Expand Down Expand Up @@ -134,21 +144,16 @@ private static class XMLReader<T> extends FileBasedReader<T> {
// Byte offset of the current record in the XML file provided when creating the source.
private long currentByteOffset = 0;

public XMLReader(XmlSource<T> source) {
public XMLReader(XmlSource<T> source, ValidationEventHandler validationEventHandler) {
super(source);

// Set up a JAXB Unmarshaller that can be used to unmarshall record objects.
try {
JAXBContext jaxbContext = JAXBContext.newInstance(getCurrentSource().spec.getRecordClass());
jaxbUnmarshaller = jaxbContext.createUnmarshaller();

// Throw errors if validation fails. JAXB by default ignores validation errors.
jaxbUnmarshaller.setEventHandler(new ValidationEventHandler() {
@Override
public boolean handleEvent(ValidationEvent event) {
throw new RuntimeException(event.getMessage(), event.getLinkedException());
}
});
if (source.getValidationEventHandler() != null){
jaxbUnmarshaller.setEventHandler(source.getValidationEventHandler());
}
} catch (JAXBException e) {
throw new RuntimeException(e);
}
Expand Down
Expand Up @@ -38,6 +38,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import javax.xml.bind.ValidationEvent;
import javax.xml.bind.ValidationEventHandler;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.beam.sdk.io.BoundedSource;
Expand Down Expand Up @@ -163,6 +165,30 @@ public class XmlSourceTest {
+ "<train size=\"small\"><name>Cédric</name><number>7</number><color>blue</color></train>"
+ "</trains>";

@XmlRootElement
static class TinyTrain {
public TinyTrain(String name) {
this.name = name;
}

public TinyTrain() {
}

public String name = null;

@Override
public String toString() {
String str = "Train[";
boolean first = true;
if (name != null) {
str = str + "name=" + name;
}
str = str + "]";
return str;
}
}


@XmlRootElement
static class Train {
public static final int TRAIN_NUMBER_UNDEFINED = -1;
Expand Down Expand Up @@ -277,10 +303,10 @@ private File createRandomTrainXML(String fileName, List<Train> trains) throws IO
return file;
}

private List<Train> readEverythingFromReader(Reader<Train> reader) throws IOException {
List<Train> results = new ArrayList<>();
private <T> List<T> readEverythingFromReader(Reader<T> reader) throws IOException {
List<T> results = new ArrayList<>();
for (boolean available = reader.start(); available; available = reader.advance()) {
Train train = reader.getCurrent();
T train = reader.getCurrent();
results.add(train);
}
return results;
Expand Down Expand Up @@ -403,6 +429,14 @@ List<String> trainsToStrings(List<Train> input) {
return strings;
}

List<String> tinyTrainsToStrings(List<TinyTrain> input) {
List<String> strings = new ArrayList<>();
for (Object data : input) {
strings.add(data.toString());
}
return strings;
}

@Test
public void testReadXMLSmall() throws IOException {
File file = tempFolder.newFile("trainXMLSmall");
Expand Down Expand Up @@ -526,13 +560,21 @@ public void testReadXMLInvalidRecordClass() throws IOException {
File file = tempFolder.newFile("trainXMLSmall");
Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));

ValidationEventHandler validationEventHandler = new ValidationEventHandler() {
@Override
public boolean handleEvent(ValidationEvent event) {
throw new RuntimeException(event.getMessage(), event.getLinkedException());
}
};

BoundedSource<WrongTrainType> source =
XmlIO.<WrongTrainType>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(WrongTrainType.class)
.createSource();
XmlIO.<WrongTrainType>read()
.from(file.toPath().toString())
.withRootElement("trains")
.withRecordElement("train")
.withRecordClass(WrongTrainType.class)
.withValidationEventHandler(validationEventHandler)
.createSource();

exception.expect(RuntimeException.class);

Expand Down

0 comments on commit 98164e8

Please sign in to comment.