Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@
<exclude>src/test/resources/avro/multiple-types.avsc</exclude>
<exclude>src/test/resources/avro/simple.avsc</exclude>
<exclude>src/test/resources/avro/recursive.avsc</exclude>
<exclude>src/test/resources/avro/schemaless_simple_record.avsc</exclude>
<exclude>src/test/resources/avro/schemaless_simple_record_extra_field.avsc</exclude>

<exclude>src/test/resources/cef/empty-row.txt</exclude>
<exclude>src/test/resources/cef/misformatted-row.txt</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ protected GenericRecord nextAvroRecord() throws IOException {
try {
genericRecord = datumReader.read(null, decoder);
} catch (final EOFException eof) {
return null;
throw new IOException("Was expecting more data, but reached EOF.", eof);
}

return genericRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@
*/
package org.apache.nifi.avro;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.io.File;
import java.io.FileInputStream;
Expand All @@ -32,11 +28,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.Test;

public class TestAvroReaderWithExplicitSchema {

Expand Down Expand Up @@ -105,6 +105,34 @@ public void testAvroExplicitReaderWithEmbeddedSchemaFileDifferentFromExplicitSch
assertThrows(IOException.class, () -> new AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema));
}

@Test
public void testAvroExplicitReaderWithSchemalessFileAndExplicitSchema() throws Exception {
AvroReaderWithExplicitSchema avroReader = createAvroReaderWithExplicitSchema(
"src/test/resources/avro/schemaless_simple_record.avro",
"src/test/resources/avro/schemaless_simple_record.avsc"
);

GenericData.Record expected = new GenericData.Record(new Schema.Parser().parse(new File("src/test/resources/avro/schemaless_simple_record.avsc")));
expected.put("field_1", 123);
expected.put("field_2", "44");
expected.put("field_3", 5);

GenericRecord actual1 = avroReader.nextAvroRecord();
assertEquals(expected, actual1);

GenericRecord actual2 = avroReader.nextAvroRecord();
assertNull(actual2);
}

@Test
public void testAvroExplicitReaderWithSchemalessFileAndWrongExplicitSchema() throws Exception {
AvroReaderWithExplicitSchema avroReader = createAvroReaderWithExplicitSchema(
"src/test/resources/avro/schemaless_simple_record.avro",
"src/test/resources/avro/schemaless_simple_record_extra_field.avsc"
);
assertThrows(IOException.class, avroReader::nextAvroRecord);
}

@Test
public void testAvroExplicitReaderWithSchemalessFileDecimalValuesWithDifferentBufferSize() throws Exception {
// GIVEN
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"type": "record",
"name": "nifiRecord",
"namespace": "org.apache.nifi",
"fields": [
{
"name": "field_1",
"type": [
"int",
"null"
]
},
{
"name": "field_2",
"type": [
"string",
"null"
]
},
{
"name": "field_3",
"type": [
"int",
"null"
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"type":"record",
"name":"message_name",
"namespace":"message_namespace",
"fields":[
{
"name":"field_1",
"type":["long"]
},
{
"name":"field_2",
"type":["string"]
},
{
"name":"field_3",
"type":["int"]
},
{
"name":"extra",
"type":["long"]
}
]
}