From 584bbedaecfad8fecdcc60b65bf2f1677ca8df64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Wed, 1 Apr 2026 10:22:40 +0000 Subject: [PATCH 1/4] AVRO-4068: Java Code Cleanup (#3192) Cherry-picked from branch-1.12 (bb2e45262). Conflicts resolved: - PrimitivesArrays.java: kept deleted (removed in branch-1.11) - BinaryData.java: used 3-param hashBytes consistently - BufferedBinaryEncoder.java: took AVRO-4068 javadoc improvement - JsonEncoder.java: kept branch-1.11 refactored version (no JsonOptions enum) --- .../java/org/apache/avro/Conversions.java | 2 +- .../java/org/apache/avro/JsonProperties.java | 8 +-- .../main/java/org/apache/avro/Protocol.java | 12 ++-- .../main/java/org/apache/avro/Resolver.java | 2 +- .../java/org/apache/avro/file/BZip2Codec.java | 2 +- .../apache/avro/file/DataFileConstants.java | 1 - .../org/apache/avro/file/DataFileStream.java | 24 ++----- .../org/apache/avro/file/DataFileWriter.java | 15 ++--- .../org/apache/avro/file/DeflateCodec.java | 6 +- .../java/org/apache/avro/file/FileReader.java | 4 +- .../org/apache/avro/file/SnappyCodec.java | 4 +- .../java/org/apache/avro/file/XZCodec.java | 7 +- .../org/apache/avro/file/ZstandardCodec.java | 3 +- .../org/apache/avro/generic/GenericData.java | 12 ++-- .../avro/generic/GenericDatumReader.java | 2 +- .../avro/generic/GenericDatumWriter.java | 6 +- .../java/org/apache/avro/io/BinaryData.java | 10 +-- .../org/apache/avro/io/BinaryDecoder.java | 12 ++-- .../apache/avro/io/BlockingBinaryEncoder.java | 9 ++- .../avro/io/BlockingDirectBinaryEncoder.java | 2 +- .../apache/avro/io/BufferedBinaryEncoder.java | 4 +- .../main/java/org/apache/avro/io/Decoder.java | 66 +++++++++---------- .../apache/avro/io/DirectBinaryDecoder.java | 2 +- .../main/java/org/apache/avro/io/Encoder.java | 58 ++++++++-------- .../org/apache/avro/io/EncoderFactory.java | 4 +- .../org/apache/avro/io/FastReaderBuilder.java | 25 ++++--- .../java/org/apache/avro/io/JsonDecoder.java | 7 +- .../java/org/apache/avro/io/JsonEncoder.java | 6 +- .../org/apache/avro/io/ResolvingDecoder.java | 6 +- .../org/apache/avro/io/ValidatingEncoder.java | 8 +-- .../org/apache/avro/io/parsing/Parser.java | 2 +- .../io/parsing/ResolvingGrammarGenerator.java | 4 +- .../org/apache/avro/io/parsing/Symbol.java | 18 ++--- .../parsing/ValidatingGrammarGenerator.java | 2 +- .../avro/message/BinaryMessageDecoder.java | 2 +- .../avro/message/RawMessageDecoder.java | 4 +- .../avro/message/RawMessageEncoder.java | 3 +- .../avro/path/TracingAvroTypeException.java | 4 +- .../avro/reflect/FieldAccessReflect.java | 6 +- .../org/apache/avro/reflect/ReflectData.java | 3 +- .../avro/specific/ExternalizableInput.java | 5 -- .../apache/avro/specific/SpecificData.java | 2 +- .../avro/util/ByteBufferInputStream.java | 4 +- 43 files changed, 171 insertions(+), 217 deletions(-) diff --git a/lang/java/avro/src/main/java/org/apache/avro/Conversions.java b/lang/java/avro/src/main/java/org/apache/avro/Conversions.java index 9d7f7f91091..531d06728ce 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/Conversions.java +++ b/lang/java/avro/src/main/java/org/apache/avro/Conversions.java @@ -174,7 +174,7 @@ public BigDecimal fromBytes(final ByteBuffer value, final Schema schema, final L BigInteger bg = null; ByteBuffer buffer = decoder.readBytes(null); byte[] array = buffer.array(); - if (array != null && array.length > 0) { + if (array.length > 0) { bg = new BigInteger(array); } diff --git a/lang/java/avro/src/main/java/org/apache/avro/JsonProperties.java b/lang/java/avro/src/main/java/org/apache/avro/JsonProperties.java index b53bc6cb2ba..fb5fa0f018d 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/JsonProperties.java +++ b/lang/java/avro/src/main/java/org/apache/avro/JsonProperties.java @@ -148,9 +148,9 @@ private Null() { // Also, we only ever ADD to the collection, never changing a value, so // putWithAbsent is the // only modifier - private ConcurrentMap props = new ConcurrentHashMap() { + private final ConcurrentMap props = new ConcurrentHashMap() { private static final long serialVersionUID = 1L; - private Queue> propOrder = new ConcurrentLinkedQueue<>(); + private final Queue> propOrder = new ConcurrentLinkedQueue<>(); @Override public JsonNode putIfAbsent(String key, JsonNode value) { @@ -194,7 +194,7 @@ public int size() { } }; - private Set reserved; + private final Set reserved; JsonProperties(Set reserved) { this.reserved = reserved; @@ -204,7 +204,7 @@ public int size() { this.reserved = reserved; for (Entry a : propMap.entrySet()) { Object v = a.getValue(); - JsonNode json = null; + JsonNode json; if (v instanceof String) { json = TextNode.valueOf((String) v); } else if (v instanceof JsonNode) { diff --git a/lang/java/avro/src/main/java/org/apache/avro/Protocol.java b/lang/java/avro/src/main/java/org/apache/avro/Protocol.java index e01a3c73ea9..13a765b58bf 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/Protocol.java +++ b/lang/java/avro/src/main/java/org/apache/avro/Protocol.java @@ -31,10 +31,8 @@ import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -70,11 +68,9 @@ public class Protocol extends JsonProperties { public static final long VERSION = 1; // Support properties for both Protocol and Message objects - private static final Set MESSAGE_RESERVED = Collections - .unmodifiableSet(new HashSet<>(Arrays.asList("doc", "response", "request", "errors", "one-way"))); + private static final Set MESSAGE_RESERVED = Set.of("doc", "response", "request", "errors", "one-way"); - private static final Set FIELD_RESERVED = Collections - .unmodifiableSet(new HashSet<>(Arrays.asList("name", "type", "doc", "default", "aliases"))); + private static final Set FIELD_RESERVED = Set.of("name", "type", "doc", "default", "aliases"); /** A protocol message. */ public class Message extends JsonProperties { @@ -255,8 +251,8 @@ void toJson1(JsonGenerator gen) throws IOException { /** Union type for generating system errors. */ public static final Schema SYSTEM_ERRORS = Schema.createUnion(Collections.singletonList(SYSTEM_ERROR)); - private static final Set PROTOCOL_RESERVED = Collections - .unmodifiableSet(new HashSet<>(Arrays.asList("namespace", "protocol", "doc", "messages", "types", "errors"))); + private static final Set PROTOCOL_RESERVED = Set.of("namespace", "protocol", "doc", "messages", "types", + "errors"); private Protocol() { super(PROTOCOL_RESERVED); diff --git a/lang/java/avro/src/main/java/org/apache/avro/Resolver.java b/lang/java/avro/src/main/java/org/apache/avro/Resolver.java index 117c9e3911f..8b62b24d757 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/Resolver.java +++ b/lang/java/avro/src/main/java/org/apache/avro/Resolver.java @@ -435,7 +435,7 @@ public static class RecordAdjust extends Action { * fields that will be read from the writer: these n are in the order * dictated by writer's schema. The remaining m fields will be read from * default values (actions for these default values are found in - * {@link RecordAdjust#defaults}. + * {@link RecordAdjust#defaults}). */ public final Field[] readerOrder; diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java b/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java index fe90557fa2e..8fd6b6a09bd 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java @@ -64,7 +64,7 @@ public ByteBuffer decompress(ByteBuffer compressedData) throws IOException { try (BZip2CompressorInputStream inputStream = new BZip2CompressorInputStream(bais)) { - int readCount = -1; + int readCount; while ((readCount = inputStream.read(buffer, compressedData.position(), buffer.length)) > 0) { baos.write(buffer, 0, readCount); } diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java index fe269ca06b2..4664f5410df 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileConstants.java @@ -27,7 +27,6 @@ private DataFileConstants() { public static final byte VERSION = 1; public static final byte[] MAGIC = new byte[] { (byte) 'O', (byte) 'b', (byte) 'j', VERSION }; - public static final long FOOTER_BLOCK = -1; public static final int SYNC_SIZE = 16; public static final int DEFAULT_SYNC_INTERVAL = 4000 * SYNC_SIZE; diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java index a96b694abd4..809c5e4ab14 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java @@ -62,7 +62,7 @@ private Header() { } } - private DatumReader reader; + private final DatumReader reader; private long blockSize; private boolean availableBlock = false; private Header header; @@ -93,7 +93,7 @@ public DataFileStream(InputStream in, DatumReader reader) throws IOException /** * create an uninitialized DataFileStream */ - protected DataFileStream(DatumReader reader) throws IOException { + protected DataFileStream(DatumReader reader) { this.reader = reader; } @@ -146,7 +146,7 @@ void initialize(InputStream in, byte[] magic) throws IOException { } /** Initialize the stream without reading from it. */ - void initialize(Header header) throws IOException { + void initialize(Header header) { this.header = header; this.codec = resolveCodec(); reader.setSchema(header.schema); @@ -302,7 +302,7 @@ boolean hasNextBlock() { blockRemaining = vin.readLong(); // read block count blockSize = vin.readLong(); // read block size if (blockSize > Integer.MAX_VALUE || blockSize < 0) { - throw new IOException("Block size invalid or too large for this " + "implementation: " + blockSize); + throw new IOException("Block size invalid or too large for this implementation: " + blockSize); } blockCount = blockRemaining; availableBlock = true; @@ -365,22 +365,6 @@ private DataBlock(long numEntries, int blockSize) { this.numEntries = numEntries; } - byte[] getData() { - return data; - } - - long getNumEntries() { - return numEntries; - } - - int getBlockSize() { - return blockSize; - } - - boolean isFlushOnWrite() { - return flushOnWrite; - } - void setFlushOnWrite(boolean flushOnWrite) { this.flushOnWrite = flushOnWrite; } diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java index 37d67322e9c..2275ea9afe6 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java @@ -25,7 +25,7 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.io.EncoderFactory; import org.apache.avro.util.NonCopyingByteArrayOutputStream; -import org.apache.commons.compress.utils.IOUtils; +import org.apache.commons.io.IOUtils; import java.io.BufferedOutputStream; import java.io.Closeable; @@ -55,7 +55,7 @@ */ public class DataFileWriter implements Closeable, Flushable { private Schema schema; - private DatumWriter dout; + private final DatumWriter dout; private OutputStream underlyingStream; @@ -116,11 +116,10 @@ public DataFileWriter setCodec(CodecFactory c) { * is written. In this case, the {@linkplain #flush()} must be called to flush * the stream. * - * Invalid values throw IllegalArgumentException - * * @param syncInterval the approximate number of uncompressed bytes to write in * each block * @return this DataFileWriter + * @throws IllegalArgumentException if syncInterval is invalid */ public DataFileWriter setSyncInterval(int syncInterval) { if (syncInterval < 32 || syncInterval > (1 << 30)) { @@ -192,7 +191,7 @@ public DataFileWriter create(Schema schema, OutputStream outs, byte[] sync) t * Set whether this writer should flush the block to the stream every time a * sync marker is written. By default, the writer will flush the buffer each * time a sync marker is written (if the block size limit is reached or the - * {@linkplain #sync()} is called. + * {@linkplain #sync()} is called). * * @param flushOnEveryBlock - If set to false, this writer will not flush the * block to the stream until {@linkplain #flush()} is @@ -472,11 +471,11 @@ public void close() throws IOException { } } - private class BufferedFileOutputStream extends BufferedOutputStream { + private static class BufferedFileOutputStream extends BufferedOutputStream { private long position; // start of buffer private class PositionFilter extends FilterOutputStream { - public PositionFilter(OutputStream out) throws IOException { + public PositionFilter(OutputStream out) { super(out); } @@ -487,7 +486,7 @@ public void write(byte[] b, int off, int len) throws IOException { } } - public BufferedFileOutputStream(OutputStream out) throws IOException { + public BufferedFileOutputStream(OutputStream out) { super(null); this.out = new PositionFilter(out); } diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java index 87498d3ee82..e6d58e46a13 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java @@ -40,7 +40,7 @@ public class DeflateCodec extends Codec { private static final int DEFAULT_BUFFER_SIZE = 8192; static class Option extends CodecFactory { - private int compressionLevel; + private final int compressionLevel; Option(int compressionLevel) { this.compressionLevel = compressionLevel; @@ -55,8 +55,8 @@ protected Codec createInstance() { private Deflater deflater; private Inflater inflater; // currently only do 'nowrap' -- RFC 1951, not zlib - private boolean nowrap = true; - private int compressionLevel; + private final boolean nowrap = true; + private final int compressionLevel; public DeflateCodec(int compressionLevel) { this.compressionLevel = compressionLevel; diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/FileReader.java b/lang/java/avro/src/main/java/org/apache/avro/file/FileReader.java index 07229d59ee8..9a54cf055ef 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/FileReader.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/FileReader.java @@ -31,7 +31,7 @@ public interface FileReader extends Iterator, Iterable, Closeable { /** * Read the next datum from the file. - * + * * @param reuse an instance to reuse. * @throws NoSuchElementException if no more remain in the file. */ @@ -39,7 +39,7 @@ public interface FileReader extends Iterator, Iterable, Closeable { /** * Move to the next synchronization point after a position. To process a range - * of file entires, call this with the starting position, then check + * of file entries, call this with the starting position, then check * {@link #pastSync(long)} with the end point before each call to * {@link #next()}. */ diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java index 72bf0b74822..454d2925deb 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java @@ -26,7 +26,7 @@ /** * Implements Snappy compression and decompression. */ public class SnappyCodec extends Codec { - private CRC32 crc32 = new CRC32(); + private final CRC32 crc32 = new CRC32(); static class Option extends CodecFactory { static { @@ -72,7 +72,7 @@ public ByteBuffer decompress(ByteBuffer in) throws IOException { crc32.reset(); crc32.update(out.array(), 0, size); - if (in.getInt(((Buffer) in).limit() - 4) != (int) crc32.getValue()) + if (in.getInt(in.limit() - 4) != (int) crc32.getValue()) throw new IOException("Checksum failure"); return out; diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java index 3052f2a4160..bc674b73466 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java @@ -26,7 +26,6 @@ import org.apache.avro.util.NonCopyingByteArrayOutputStream; import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream; -import org.apache.commons.compress.utils.IOUtils; /** * Implements xz compression and decompression. */ public class XZCodec extends Codec { @@ -34,7 +33,7 @@ public class XZCodec extends Codec { private static final int DEFAULT_BUFFER_SIZE = 8192; static class Option extends CodecFactory { - private int compressionLevel; + private final int compressionLevel; Option(int compressionLevel) { this.compressionLevel = compressionLevel; @@ -46,7 +45,7 @@ protected Codec createInstance() { } } - private int compressionLevel; + private final int compressionLevel; public XZCodec(int compressionLevel) { this.compressionLevel = compressionLevel; @@ -72,7 +71,7 @@ public ByteBuffer decompress(ByteBuffer data) throws IOException { InputStream bytesIn = new ByteArrayInputStream(data.array(), computeOffset(data), data.remaining()); try (InputStream ios = new XZCompressorInputStream(bytesIn)) { - IOUtils.copy(ios, baos); + ios.transferTo(baos); } return baos.asByteBuffer(); } diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java index f778b2fe356..0d4e31958d9 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; import org.apache.avro.util.NonCopyingByteArrayOutputStream; -import org.apache.commons.compress.utils.IOUtils; public class ZstandardCodec extends Codec { public final static int DEFAULT_COMPRESSION = 3; @@ -82,7 +81,7 @@ public ByteBuffer decompress(ByteBuffer compressedData) throws IOException { InputStream bytesIn = new ByteArrayInputStream(compressedData.array(), computeOffset(compressedData), compressedData.remaining()); try (InputStream ios = ZstandardLoader.input(bytesIn, useBufferPool)) { - IOUtils.copy(ios, baos); + ios.transferTo(baos); } return baos.asByteBuffer(); } diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java index 875abc7d70d..a4292c0e44d 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java +++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java @@ -137,9 +137,9 @@ private void loadConversions() { } } - private Map> conversions = new HashMap<>(); + private final Map> conversions = new HashMap<>(); - private Map, Map>> conversionsByClass = new IdentityHashMap<>(); + private final Map, Map>> conversionsByClass = new IdentityHashMap<>(); public Collection> getConversions() { return conversions.values(); @@ -523,8 +523,8 @@ public int compareTo(Fixed that) { /** Default implementation of {@link GenericEnumSymbol}. */ public static class EnumSymbol implements GenericEnumSymbol { - private Schema schema; - private String symbol; + private final Schema schema; + private final String symbol; public EnumSymbol(Schema schema, String symbol) { this.schema = schema; @@ -1208,9 +1208,7 @@ protected int compareMaps(final Map m1, final Map m2) { } } } - } catch (ClassCastException unused) { - return 1; - } catch (NullPointerException unused) { + } catch (ClassCastException | NullPointerException unused) { return 1; } diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java index 3c5d1316cb3..b818a070c18 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java +++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java @@ -519,7 +519,7 @@ public int hashCode() { @Override public boolean equals(Object obj) { - if (obj == null || !(obj instanceof GenericDatumReader.IdentitySchemaKey)) { + if (!(obj instanceof GenericDatumReader.IdentitySchemaKey)) { return false; } IdentitySchemaKey key = (IdentitySchemaKey) obj; diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java index deeac0b1f2b..20a856c4dc3 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java +++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java @@ -219,9 +219,7 @@ protected ClassCastException addClassCastMsg(ClassCastException e, String s) { /** Helper method for adding a message to an Avro Type Exception . */ protected AvroTypeException addAvroTypeMsg(AvroTypeException e, String s) { - AvroTypeException result = new AvroTypeException(e.getMessage() + s); - result.initCause(e.getCause() == null ? e : e.getCause()); - return result; + return new AvroTypeException(e.getMessage() + s, e.getCause() == null ? e : e.getCause()); } /** @@ -282,7 +280,7 @@ protected void writeArray(Schema schema, Object datum, Encoder out) throws IOExc long actualSize = 0; out.writeArrayStart(); out.setItemCount(size); - for (Iterator it = getArrayElements(datum); it.hasNext();) { + for (Iterator it = getArrayElements(datum); it.hasNext();) { out.startItem(); try { write(element, it.next(), out); diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryData.java b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryData.java index f925bcd9698..cc1eb20f0a3 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryData.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryData.java @@ -261,11 +261,11 @@ private static int hashCode(HashData data, Schema schema) throws IOException { case UNION: return hashCode(data, schema.getTypes().get(decoder.readInt())); case FIXED: - return hashBytes(1, data, schema.getFixedSize(), false); + return hashBytes(data, schema.getFixedSize(), false); case STRING: - return hashBytes(0, data, decoder.readInt(), false); + return hashBytes(data, decoder.readInt(), false); case BYTES: - return hashBytes(1, data, decoder.readInt(), true); + return hashBytes(data, decoder.readInt(), true); case NULL: return 0; default: @@ -273,8 +273,8 @@ private static int hashCode(HashData data, Schema schema) throws IOException { } } - private static int hashBytes(int init, HashData data, int len, boolean rev) throws IOException { - int hashCode = init; + private static int hashBytes(HashData data, int len, boolean rev) throws IOException { + int hashCode = 1; byte[] bytes = data.decoder.getBuf(); int start = data.decoder.getPos(); int end = start + len; diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java index 3fa675d793a..4e45781b030 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java @@ -200,7 +200,7 @@ public long readLong() throws IOException { if (b > 0x7f) { // only the low 28 bits can be set, so this won't carry // the sign bit to the long - l = innerLongDecode((long) n); + l = innerLongDecode(n); } else { l = n; } @@ -777,7 +777,7 @@ public int available() throws IOException { } private static class InputStreamByteSource extends ByteSource { - private InputStream in; + private final InputStream in; protected boolean isEof = false; private InputStreamByteSource(InputStream in) { @@ -907,7 +907,7 @@ public void close() throws IOException { */ private static class ByteArrayByteSource extends ByteSource { private static final int MIN_SIZE = 16; - private byte[] data; + private final byte[] data; private int position; private int max; private boolean compacted = false; @@ -947,7 +947,7 @@ protected void skipSourceBytes(long length) throws IOException { } @Override - protected long trySkipBytes(long length) throws IOException { + protected long trySkipBytes(long length) { // the buffer is shared, so this should return 0 max = ba.getLim(); position = ba.getPos(); @@ -972,13 +972,13 @@ protected void readRaw(byte[] data, int off, int len) throws IOException { } @Override - protected int tryReadRaw(byte[] data, int off, int len) throws IOException { + protected int tryReadRaw(byte[] data, int off, int len) { // the buffer is shared, nothing to read return 0; } @Override - protected void compactAndFill(byte[] buf, int pos, int minPos, int remaining) throws IOException { + protected void compactAndFill(byte[] buf, int pos, int minPos, int remaining) { // this implementation does not want to mutate the array passed in, // so it makes a new tiny buffer unless it has been compacted once before if (!compacted) { diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BlockingBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BlockingBinaryEncoder.java index d0bfc8f075e..9a0d9e414b0 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BlockingBinaryEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BlockingBinaryEncoder.java @@ -89,7 +89,7 @@ public enum State { * this case, {@link BlockedValue#start} is zero. The header for such a block * has _already been written_ (we've written out a header indicating that the * block has a single item, and we put a "zero" down for the byte-count to - * indicate that we don't know the physical length of the buffer. Any blocks + * indicate that we don't know the physical length of the buffer). Any blocks * _containing_ this block must be in the {@link #OVERFLOW} state. */ OVERFLOW @@ -130,7 +130,7 @@ public BlockedValue() { * Check invariants of this and also the BlockedValue * containing this. */ - public boolean check(BlockedValue prev, int pos) { + public void check(BlockedValue prev, int pos) { assert state != State.ROOT || type == null; assert (state == State.ROOT || type == Schema.Type.ARRAY || type == Schema.Type.MAP); @@ -156,7 +156,6 @@ public boolean check(BlockedValue prev, int pos) { assert prev.state == State.ROOT || prev.state == State.OVERFLOW; break; } - return false; } } @@ -179,7 +178,7 @@ public boolean check(BlockedValue prev, int pos) { // buffer large enough for up to two ints for a block header // rounded up to a multiple of 4 bytes. - private byte[] headerBuffer = new byte[12]; + private final byte[] headerBuffer = new byte[12]; private boolean check() { assert buf != null; @@ -438,7 +437,7 @@ private void endBlockedValue() throws IOException { * Called when we've finished writing the last item in an overflow buffer. When * this is finished, the top of the stack will be an empty block in the * "regular" state. - * + * * @throws IOException */ private void finishOverflow() throws IOException { diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java index 2ef2375e640..9f391a31921 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java @@ -60,7 +60,7 @@ public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder { * Create a writer that sends its output to the underlying stream * out. * - * @param out The Outputstream to write to + * @param out The OutputStream to write to */ public BlockingDirectBinaryEncoder(OutputStream out) { super(out); diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java index 376289ec882..62e7a01afe7 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java @@ -105,8 +105,8 @@ private void flushBuffer() throws IOException { * current position and the end. This will not expand the buffer larger than its * current size, for writes larger than or near to the size of the buffer, we * flush the buffer and write directly to the output, bypassing the buffer. - * - * @param num + * + * @param num the number of bytes to ensure are available * @throws IOException */ private void ensureBounds(int num) throws IOException { diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java index a0f4049f023..11fc28d762e 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java @@ -30,11 +30,11 @@ *

* The other type of methods support the reading of maps and arrays. These * methods are {@link #readArrayStart}, {@link #arrayNext}, and similar methods - * for maps). See {@link #readArrayStart} for details on these methods.) + * for maps. See {@link #readArrayStart} for details on these methods. *

* {@link DecoderFactory} contains Decoder construction and configuration * facilities. - * + * * @see DecoderFactory * @see Encoder */ @@ -44,7 +44,7 @@ public abstract class Decoder { /** * "Reads" a null value. (Doesn't actually read anything, but advances the state * of the parser if the implementation is stateful.) - * + * * @throws AvroTypeException If this is a stateful reader and null is not the * type of the next value to be read */ @@ -52,7 +52,7 @@ public abstract class Decoder { /** * Reads a boolean value written by {@link Encoder#writeBoolean}. - * + * * @throws AvroTypeException If this is a stateful reader and boolean is not the * type of the next value to be read */ @@ -61,7 +61,7 @@ public abstract class Decoder { /** * Reads an integer written by {@link Encoder#writeInt}. - * + * * @throws AvroTypeException If encoded value is larger than 32-bits * @throws AvroTypeException If this is a stateful reader and int is not the * type of the next value to be read @@ -70,7 +70,7 @@ public abstract class Decoder { /** * Reads a long written by {@link Encoder#writeLong}. - * + * * @throws AvroTypeException If this is a stateful reader and long is not the * type of the next value to be read */ @@ -78,7 +78,7 @@ public abstract class Decoder { /** * Reads a float written by {@link Encoder#writeFloat}. - * + * * @throws AvroTypeException If this is a stateful reader and is not the type of * the next value to be read */ @@ -86,7 +86,7 @@ public abstract class Decoder { /** * Reads a double written by {@link Encoder#writeDouble}. - * + * * @throws AvroTypeException If this is a stateful reader and is not the type of * the next value to be read */ @@ -94,7 +94,7 @@ public abstract class Decoder { /** * Reads a char-string written by {@link Encoder#writeString}. - * + * * @throws AvroTypeException If this is a stateful reader and char-string is not * the type of the next value to be read */ @@ -102,7 +102,7 @@ public abstract class Decoder { /** * Reads a char-string written by {@link Encoder#writeString}. - * + * * @throws AvroTypeException If this is a stateful reader and char-string is not * the type of the next value to be read */ @@ -110,7 +110,7 @@ public abstract class Decoder { /** * Discards a char-string written by {@link Encoder#writeString}. - * + * * @throws AvroTypeException If this is a stateful reader and char-string is not * the type of the next value to be read */ @@ -120,7 +120,7 @@ public abstract class Decoder { * Reads a byte-string written by {@link Encoder#writeBytes}. if old is * not null and has sufficient capacity to take in the bytes being read, the * bytes are returned in old. - * + * * @throws AvroTypeException If this is a stateful reader and byte-string is not * the type of the next value to be read */ @@ -128,7 +128,7 @@ public abstract class Decoder { /** * Discards a byte-string written by {@link Encoder#writeBytes}. - * + * * @throws AvroTypeException If this is a stateful reader and byte-string is not * the type of the next value to be read */ @@ -136,7 +136,7 @@ public abstract class Decoder { /** * Reads fixed sized binary object. - * + * * @param bytes The buffer to store the contents being read. * @param start The position where the data needs to be written. * @param length The size of the binary object. @@ -149,7 +149,7 @@ public abstract class Decoder { /** * A shorthand for readFixed(bytes, 0, bytes.length). - * + * * @throws AvroTypeException If this is a stateful reader and fixed sized binary * object is not the type of the next value to be read * or the length is incorrect. @@ -161,7 +161,7 @@ public void readFixed(byte[] bytes) throws IOException { /** * Discards fixed sized binary object. - * + * * @param length The size of the binary object to be skipped. * @throws AvroTypeException If this is a stateful reader and fixed sized binary * object is not the type of the next value to be read @@ -172,7 +172,7 @@ public void readFixed(byte[] bytes) throws IOException { /** * Reads an enumeration. - * + * * @return The enumeration's value. * @throws AvroTypeException If this is a stateful reader and enumeration is not * the type of the next value to be read. @@ -185,7 +185,7 @@ public void readFixed(byte[] bytes) throws IOException { * returns non-zero, then the caller should read the indicated number of items, * and then call {@link #arrayNext} to find out the number of items in the next * block. The typical pattern for consuming an array looks like: - * + * *

    *   for(long i = in.readArrayStart(); i != 0; i = in.arrayNext()) {
    *     for (long j = 0; j < i; j++) {
@@ -193,7 +193,7 @@ public void readFixed(byte[] bytes) throws IOException {
    *     }
    *   }
    * 
- * + * * @throws AvroTypeException If this is a stateful reader and array is not the * type of the next value to be read */ @@ -201,9 +201,9 @@ public void readFixed(byte[] bytes) throws IOException { /** * Processes the next block of an array and returns the number of items in the - * block and let's the caller read those items. - * - * @throws AvroTypeException When called outside of an array context + * block and lets the caller read those items. + * + * @throws AvroTypeException When called outside an array context */ public abstract long arrayNext() throws IOException; @@ -216,7 +216,7 @@ public void readFixed(byte[] bytes) throws IOException { * possible. It will return zero if there are no more items to skip through, or * an item count if it needs the client's help in skipping. The typical usage * pattern is: - * + * *
    *   for(long i = in.skipArray(); i != 0; i = i.skipArray()) {
    *     for (long j = 0; j < i; j++) {
@@ -224,7 +224,7 @@ public void readFixed(byte[] bytes) throws IOException {
    *     }
    *   }
    * 
- * + * * Note that this method can automatically skip through items if a byte-count is * found in the underlying data, or if a schema has been provided to the * implementation, but otherwise the client will have to skip through items @@ -240,9 +240,9 @@ public void readFixed(byte[] bytes) throws IOException { * {@link #readArrayStart}. * * As an example, let's say you want to read a map of records, the record - * consisting of an Long field and a Boolean field. Your code would look + * consisting of a Long field and a Boolean field. Your code would look * something like this: - * + * *
    * Map m = new HashMap();
    * Record reuse = new Record();
@@ -255,7 +255,7 @@ public void readFixed(byte[] bytes) throws IOException {
    *   }
    * }
    * 
- * + * * @throws AvroTypeException If this is a stateful reader and map is not the * type of the next value to be read */ @@ -264,8 +264,8 @@ public void readFixed(byte[] bytes) throws IOException { /** * Processes the next block of map entries and returns the count of them. * Similar to {@link #arrayNext}. See {@link #readMapStart} for details. - * - * @throws AvroTypeException When called outside of a map context + * + * @throws AvroTypeException When called outside a map context */ public abstract long mapNext() throws IOException; @@ -273,9 +273,9 @@ public void readFixed(byte[] bytes) throws IOException { * Support for quickly skipping through a map similar to {@link #skipArray}. * * As an example, let's say you want to skip a map of records, the record - * consisting of an Long field and a Boolean field. Your code would look + * consisting of a Long field and a Boolean field. Your code would look * something like this: - * + * *
    * for (long i = in.skipMap(); i != 0; i = in.skipMap()) {
    *   for (long j = 0; j < i; j++) {
@@ -285,7 +285,7 @@ public void readFixed(byte[] bytes) throws IOException {
    *   }
    * }
    * 
- * + * * @throws AvroTypeException If this is a stateful reader and array is not the * type of the next value to be read */ @@ -294,7 +294,7 @@ public void readFixed(byte[] bytes) throws IOException { /** * Reads the tag of a union written by {@link Encoder#writeIndex}. - * + * * @throws AvroTypeException If this is a stateful reader and union is not the * type of the next value to be read */ diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryDecoder.java index 6f07b13eee2..2cf64ed75af 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryDecoder.java @@ -190,7 +190,7 @@ public InputStream inputStream() { } @Override - public boolean isEnd() throws IOException { + public boolean isEnd() { throw new UnsupportedOperationException(); } } diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java index db3e88b6c85..85d5c421fb6 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java @@ -39,7 +39,7 @@ *

* {@link EncoderFactory} contains Encoder construction and configuration * facilities. - * + * * @see EncoderFactory * @see Decoder */ @@ -48,7 +48,7 @@ public abstract class Encoder implements Flushable { /** * "Writes" a null value. (Doesn't actually write anything, but advances the * state of the parser if this class is stateful.) - * + * * @throws AvroTypeException If this is a stateful writer and a null is not * expected */ @@ -56,7 +56,7 @@ public abstract class Encoder implements Flushable { /** * Write a boolean value. - * + * * @throws AvroTypeException If this is a stateful writer and a boolean is not * expected */ @@ -64,7 +64,7 @@ public abstract class Encoder implements Flushable { /** * Writes a 32-bit integer. - * + * * @throws AvroTypeException If this is a stateful writer and an integer is not * expected */ @@ -72,7 +72,7 @@ public abstract class Encoder implements Flushable { /** * Write a 64-bit integer. - * + * * @throws AvroTypeException If this is a stateful writer and a long is not * expected */ @@ -80,7 +80,7 @@ public abstract class Encoder implements Flushable { /** * Write a float. - * + * * @throws IOException * @throws AvroTypeException If this is a stateful writer and a float is not * expected @@ -89,7 +89,7 @@ public abstract class Encoder implements Flushable { /** * Write a double. - * + * * @throws AvroTypeException If this is a stateful writer and a double is not * expected */ @@ -97,7 +97,7 @@ public abstract class Encoder implements Flushable { /** * Write a Unicode character string. - * + * * @throws AvroTypeException If this is a stateful writer and a char-string is * not expected */ @@ -107,7 +107,7 @@ public abstract class Encoder implements Flushable { * Write a Unicode character string. The default implementation converts the * String to a {@link org.apache.avro.util.Utf8}. Some Encoder implementations * may want to do something different as a performance optimization. - * + * * @throws AvroTypeException If this is a stateful writer and a char-string is * not expected */ @@ -119,7 +119,7 @@ public void writeString(String str) throws IOException { * Write a Unicode character string. If the CharSequence is an * {@link org.apache.avro.util.Utf8} it writes this directly, otherwise the * CharSequence is converted to a String via toString() and written. - * + * * @throws AvroTypeException If this is a stateful writer and a char-string is * not expected */ @@ -132,7 +132,7 @@ public void writeString(CharSequence charSequence) throws IOException { /** * Write a byte string. - * + * * @throws AvroTypeException If this is a stateful writer and a byte-string is * not expected */ @@ -140,7 +140,7 @@ public void writeString(CharSequence charSequence) throws IOException { /** * Write a byte string. - * + * * @throws AvroTypeException If this is a stateful writer and a byte-string is * not expected */ @@ -149,7 +149,7 @@ public void writeString(CharSequence charSequence) throws IOException { /** * Writes a byte string. Equivalent to * writeBytes(bytes, 0, bytes.length) - * + * * @throws IOException * @throws AvroTypeException If this is a stateful writer and a byte-string is * not expected @@ -160,7 +160,7 @@ public void writeBytes(byte[] bytes) throws IOException { /** * Writes a fixed size binary object. - * + * * @param bytes The contents to write * @param start The position within bytes where the contents start. * @param len The number of bytes to write. @@ -172,8 +172,8 @@ public void writeBytes(byte[] bytes) throws IOException { /** * A shorthand for writeFixed(bytes, 0, bytes.length) - * - * @param bytes + * + * @param bytes the data */ public void writeFixed(byte[] bytes) throws IOException { writeFixed(bytes, 0, bytes.length); @@ -194,8 +194,8 @@ public void writeFixed(ByteBuffer bytes) throws IOException { /** * Writes an enumeration. - * - * @param e + * + * @param e the enumeration to write * @throws AvroTypeException If this is a stateful writer and an enumeration is * not expected or the e is out of range. * @throws IOException @@ -214,9 +214,9 @@ public void writeFixed(ByteBuffer bytes) throws IOException { * the array have been written, call {@link #writeArrayEnd}. * * As an example, let's say you want to write an array of records, the record - * consisting of an Long field and a Boolean field. Your code would look + * consisting of a Long field and a Boolean field. Your code would look * something like this: - * + * *

    * out.writeArrayStart();
    * out.setItemCount(list.size());
@@ -227,7 +227,7 @@ public void writeFixed(ByteBuffer bytes) throws IOException {
    * }
    * out.writeArrayEnd();
    * 
- * + * * @throws AvroTypeException If this is a stateful writer and an array is not * expected */ @@ -248,8 +248,8 @@ public void writeFixed(ByteBuffer bytes) throws IOException { /** * Start a new item of an array or map. See {@link #writeArrayStart} for usage * information. - * - * @throws AvroTypeException If called outside of an array or map context + * + * @throws AvroTypeException If called outside an array or map context */ public abstract void startItem() throws IOException; @@ -268,9 +268,9 @@ public void writeFixed(ByteBuffer bytes) throws IOException { * usage. * * As an example of usage, let's say you want to write a map of records, the - * record consisting of an Long field and a Boolean field. Your code would look + * record consisting of a Long field and a Boolean field. Your code would look * something like this: - * + * *
    * out.writeMapStart();
    * out.setItemCount(list.size());
@@ -282,7 +282,7 @@ public void writeFixed(ByteBuffer bytes) throws IOException {
    * }
    * out.writeMapEnd();
    * 
- * + * * @throws AvroTypeException If this is a stateful writer and a map is not * expected */ @@ -302,15 +302,15 @@ public void writeFixed(ByteBuffer bytes) throws IOException { * Call this method to write the tag of a union. * * As an example of usage, let's say you want to write a union, whose second - * branch is a record consisting of an Long field and a Boolean field. Your code + * branch is a record consisting of a Long field and a Boolean field. Your code * would look something like this: - * + * *
    * out.writeIndex(1);
    * out.writeLong(record.longField);
    * out.writeBoolean(record.boolField);
    * 
- * + * * @throws AvroTypeException If this is a stateful writer and a map is not * expected */ diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java b/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java index bc412d58da4..b91794be3ee 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java @@ -165,7 +165,6 @@ public int getBlockSize() { * reuse is null, this will be a new instance. If reuse is * not null, then the returned instance may be a new instance or * reuse reconfigured to use out. - * @throws IOException * @see BufferedBinaryEncoder * @see Encoder */ @@ -286,7 +285,6 @@ public BinaryEncoder blockingDirectBinaryEncoder(OutputStream out, BinaryEncoder * reuse is null, this will be a new instance. If reuse is * not null, then the returned instance may be a new instance or * reuse reconfigured to use out. - * @throws IOException * @see BlockingBinaryEncoder * @see Encoder */ @@ -370,7 +368,7 @@ JsonEncoder jsonEncoder(Schema schema, JsonGenerator gen) throws IOException { * {@link ValidatingEncoder} is not thread-safe. * * @param schema The Schema to validate operations against. Cannot be null. - * @param encoder The Encoder to wrap. Cannot be be null. + * @param encoder The Encoder to wrap. Cannot be null. * @return A ValidatingEncoder configured to wrap encoder and validate * against schema * @throws IOException diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java b/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java index f6e1ed5aae2..dbd06f305e8 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java @@ -140,7 +140,7 @@ private RecordReader createRecordReader(RecordAdjust action) throws IOException return recordReader; } - private RecordReader initializeRecordReader(RecordReader recordReader, RecordAdjust action) throws IOException { + private void initializeRecordReader(RecordReader recordReader, RecordAdjust action) throws IOException { recordReader.startInitialization(); // generate supplier for the new object instances @@ -171,7 +171,6 @@ private RecordReader initializeRecordReader(RecordReader recordReader, RecordAdj } recordReader.finishInitialization(readSteps, action.reader, action.instanceSupplier); - return recordReader; } private ExecutionStep createFieldSetter(Field field, FieldReader reader) { @@ -277,7 +276,7 @@ private FieldReader getNonConvertedReader(Action action) throws IOException { throw new IllegalStateException("Error getting reader for action type " + action.getClass()); } case DO_NOTHING: - return getReaderForBaseType(action.reader, action.writer); + return getReaderForBaseType(action.reader); case RECORD: return createRecordReader((RecordAdjust) action); case ENUM: @@ -297,7 +296,7 @@ private FieldReader getNonConvertedReader(Action action) throws IOException { } } - private FieldReader getReaderForBaseType(Schema readerSchema, Schema writerSchema) throws IOException { + private FieldReader getReaderForBaseType(Schema readerSchema) { switch (readerSchema.getType()) { case NULL: return (old, decoder) -> { @@ -307,7 +306,7 @@ private FieldReader getReaderForBaseType(Schema readerSchema, Schema writerSchem case BOOLEAN: return (old, decoder) -> decoder.readBoolean(); case STRING: - return createStringReader(readerSchema, writerSchema); + return createStringReader(readerSchema); case INT: return (old, decoder) -> decoder.readInt(); case LONG: @@ -319,7 +318,7 @@ private FieldReader getReaderForBaseType(Schema readerSchema, Schema writerSchem case BYTES: return createBytesReader(); case FIXED: - return createFixedReader(readerSchema, writerSchema); + return createFixedReader(readerSchema); case RECORD: // covered by action type case UNION: // covered by action type case ENUM: // covered by action type @@ -330,7 +329,7 @@ private FieldReader getReaderForBaseType(Schema readerSchema, Schema writerSchem } } - private FieldReader createPromotingReader(Promote promote) throws IOException { + private FieldReader createPromotingReader(Promote promote) { switch (promote.reader.getType()) { case BYTES: return (reuse, decoder) -> ByteBuffer.wrap(decoder.readString(null).getBytes()); @@ -364,7 +363,7 @@ private FieldReader createPromotingReader(Promote promote) throws IOException { "No promotion possible for type " + promote.writer.getType() + " to " + promote.reader.getType()); } - private FieldReader createStringReader(Schema readerSchema, Schema writerSchema) { + private FieldReader createStringReader(Schema readerSchema) { FieldReader stringReader = createSimpleStringReader(readerSchema); if (isClassPropEnabled()) { return getTransformingStringReader(readerSchema.getProp(SpecificData.CLASS_PROP), stringReader); @@ -497,7 +496,7 @@ private FieldReader createEnumReader(EnumAdjust action) { }); } - private FieldReader createFixedReader(Schema readerSchema, Schema writerSchema) { + private FieldReader createFixedReader(Schema readerSchema) { return reusingReader((reuse, decoder) -> { GenericFixed fixed = (GenericFixed) data.createFixed(reuse, readerSchema); decoder.readFixed(fixed.bytes(), 0, readerSchema.getFixedSize()); @@ -516,9 +515,9 @@ public static FieldReader reusingReader(ReusingFieldReader reader) { public interface FieldReader extends DatumReader { @Override - public Object read(Object reuse, Decoder decoder) throws IOException; + Object read(Object reuse, Decoder decoder) throws IOException; - public default boolean canReuse() { + default boolean canReuse() { return false; } @@ -530,7 +529,7 @@ default void setSchema(Schema schema) { public interface ReusingFieldReader extends FieldReader { @Override - public default boolean canReuse() { + default boolean canReuse() { return true; } } @@ -608,7 +607,7 @@ public Object read(Object reuse, Decoder decoder) throws IOException { } public interface ExecutionStep { - public void execute(Object record, Decoder decoder) throws IOException; + void execute(Object record, Decoder decoder) throws IOException; } } diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/JsonDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/JsonDecoder.java index e666647f18b..ac365ee0f96 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/JsonDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/JsonDecoder.java @@ -47,7 +47,7 @@ */ public class JsonDecoder extends ParsingDecoder implements Parser.ActionHandler { private JsonParser in; - private static JsonFactory jsonFactory = new JsonFactory(); + private static final JsonFactory JSON_FACTORY = new JsonFactory(); Stack reorderBuffers = new Stack<>(); ReorderBuffer currentReorderBuffer; @@ -97,7 +97,7 @@ public JsonDecoder configure(InputStream in) throws IOException { parser.reset(); reorderBuffers.clear(); currentReorderBuffer = null; - this.in = jsonFactory.createParser(in); + this.in = JSON_FACTORY.createParser(in); this.in.nextToken(); return this; } @@ -298,8 +298,7 @@ public ByteBuffer readBytes(ByteBuffer old) throws IOException { } private byte[] readByteArray() throws IOException { - byte[] result = in.getText().getBytes(StandardCharsets.ISO_8859_1); - return result; + return in.getText().getBytes(StandardCharsets.ISO_8859_1); } @Override diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java index 71cc690b8a4..8c7746d99c7 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java @@ -47,7 +47,7 @@ * JsonEncoder is not thread-safe. */ public class JsonEncoder extends ParsingEncoder implements Parser.ActionHandler { - private static final String LINE_SEPARATOR = System.getProperty("line.separator"); + private static final String LINE_SEPARATOR = System.lineSeparator(); final Parser parser; private JsonGenerator out; private boolean includeNamespace = true; @@ -137,15 +137,13 @@ public JsonEncoder configure(OutputStream out) throws IOException { * @param generator The JsonGenerator to direct output to. Cannot be null. * @throws IOException * @throws NullPointerException if {@code generator} is {@code null} - * @return this JsonEncoder */ - private JsonEncoder configure(JsonGenerator generator) throws IOException { + private void configure(JsonGenerator generator) throws IOException { Objects.requireNonNull(generator, "JsonGenerator cannot be null"); if (null != parser) { flush(); } this.out = generator; - return this; } @Override diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/ResolvingDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/ResolvingDecoder.java index 6f119a39b65..6bdb16a332c 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/ResolvingDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/ResolvingDecoder.java @@ -140,7 +140,7 @@ public final Schema.Field[] readFieldOrderIfDiff() throws IOException { /** * Consume any more data that has been written by the writer but not needed by - * the reader so that the the underlying decoder is in proper shape for the next + * the reader so that the underlying decoder is in proper shape for the next * record. This situation happens when, for example, the writer writes a record * with two fields and the reader needs only the first field. * @@ -187,11 +187,11 @@ public float readFloat() throws IOException { public double readDouble() throws IOException { Symbol actual = parser.advance(Symbol.DOUBLE); if (actual == Symbol.INT) { - return (double) in.readInt(); + return in.readInt(); } else if (actual == Symbol.LONG) { return (double) in.readLong(); } else if (actual == Symbol.FLOAT) { - return (double) in.readFloat(); + return in.readFloat(); } else { assert actual == Symbol.DOUBLE; return in.readDouble(); diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingEncoder.java index d7440c7406e..d61967751a0 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingEncoder.java @@ -36,7 +36,7 @@ * and configure. *

* ValidatingEncoder is not thread-safe. - * + * * @see Encoder * @see EncoderFactory */ @@ -44,12 +44,12 @@ public class ValidatingEncoder extends ParsingEncoder implements Parser.ActionHa protected Encoder out; protected final Parser parser; - ValidatingEncoder(Symbol root, Encoder out) throws IOException { + ValidatingEncoder(Symbol root, Encoder out) { this.out = out; this.parser = new Parser(root, this); } - ValidatingEncoder(Schema schema, Encoder in) throws IOException { + ValidatingEncoder(Schema schema, Encoder in) { this(new ValidatingGrammarGenerator().generate(schema), in); } @@ -60,7 +60,7 @@ public void flush() throws IOException { /** * Reconfigures this ValidatingEncoder to wrap the encoder provided. - * + * * @param encoder The Encoder to wrap for validation. * @return This ValidatingEncoder. */ diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/Parser.java b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/Parser.java index 12fc4044a9c..89269578d2c 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/Parser.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/Parser.java @@ -139,7 +139,7 @@ public final void processTrailingImplicitActions() throws IOException { * repeater and input is either {@link Symbol#ARRAY_END} or * {@link Symbol#MAP_END} pushes nothing. * - * @param sym + * @param sym the symbol */ public final void pushProduction(Symbol sym) { Symbol[] p = sym.production; diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java index 77fbe1c7ad0..275ee35bc70 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java @@ -223,7 +223,7 @@ private Symbol simpleGen(Schema s, Map seen) { } } - private static EncoderFactory factory = new EncoderFactory().configureBufferSize(32); + private final static EncoderFactory ENCODER_FACTORY = new EncoderFactory().configureBufferSize(32); /** * Returns the Avro binary encoded version of n according to the schema @@ -236,7 +236,7 @@ private Symbol simpleGen(Schema s, Map seen) { */ private static byte[] getBinary(Schema s, JsonNode n) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); - Encoder e = factory.binaryEncoder(out, null); + Encoder e = ENCODER_FACTORY.binaryEncoder(out, null); encode(e, s, n); e.flush(); return out.toByteArray(); diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/Symbol.java b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/Symbol.java index a18f3fdbcd5..e905c968c83 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/Symbol.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/Symbol.java @@ -51,15 +51,15 @@ public enum Kind { IMPLICIT_ACTION, /** non-terminal action symbol which is explicitly consumed */ EXPLICIT_ACTION - }; + } /// The kind of this symbol. public final Kind kind; /** * The production for this symbol. If this symbol is a terminal this is - * null. Otherwise this holds the the sequence of the symbols that - * forms the production for this symbol. The sequence is in the reverse order of + * null. Otherwise this holds the sequence of the symbols that forms + * the production for this symbol. The sequence is in the reverse order of * production. This is useful for easy copying onto parsing stack. * * Please note that this is a final. So the production for a symbol should be @@ -94,7 +94,7 @@ static Symbol root(Symbol... symbols) { /** * A convenience method to construct a sequence. - * + * * @param production The constituent symbols of the sequence. */ static Symbol seq(Symbol... production) { @@ -103,7 +103,7 @@ static Symbol seq(Symbol... production) { /** * A convenience method to construct a repeater. - * + * * @param symsToRepeat The symbols to repeat in the repeater. */ static Symbol repeat(Symbol endSymbol, Symbol... symsToRepeat) { @@ -119,7 +119,7 @@ static Symbol alt(Symbol[] symbols, String[] labels) { /** * A convenience method to construct an ErrorAction. - * + * * @param e */ static Symbol error(String e) { @@ -128,7 +128,7 @@ static Symbol error(String e) { /** * A convenience method to construct a ResolvingAction. - * + * * @param w The writer symbol * @param r The reader symbol */ @@ -201,7 +201,7 @@ public int flattenedSize() { * @param skip The position where the output input sub-array starts. * @param map A map of symbols which have already been expanded. Useful for * handling recursive definitions and for caching. - * @param map2 A map to to store the list of fix-ups. + * @param map2 A map to store the list of fix-ups. */ static void flatten(Symbol[] in, int start, Symbol[] out, int skip, Map map, Map> map2) { @@ -238,7 +238,7 @@ private static void copyFixups(List fixups, Symbol[] out, int outPos, Sym /** * Returns the amount of space required to flatten the given sub-array of * symbols. - * + * * @param symbols The array of input symbols. * @param start The index where the subarray starts. * @return The number of symbols that will be produced if one expands the given diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ValidatingGrammarGenerator.java b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ValidatingGrammarGenerator.java index 7798f520ae6..2f2e9cdc1c0 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ValidatingGrammarGenerator.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ValidatingGrammarGenerator.java @@ -41,7 +41,7 @@ public Symbol generate(Schema schema) { * given schema sc. If there is already an entry for the given schema * in the given map seen then that entry is returned. Otherwise a new * symbol is generated and an entry is inserted into the map. - * + * * @param sc The schema for which the start symbol is required * @param seen A map of schema to symbol mapping done so far. * @return The start symbol for the schema diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java index d835bd3fc8e..46d1d04b8bd 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java @@ -122,7 +122,7 @@ public BinaryMessageDecoder(GenericData model, Schema readSchema, SchemaStore re public void addSchema(Schema writeSchema) { long fp = SchemaNormalization.parsingFingerprint64(writeSchema); final Schema actualReadSchema = this.readSchema != null ? this.readSchema : writeSchema; - codecByFingerprint.put(fp, new RawMessageDecoder(model, writeSchema, actualReadSchema)); + codecByFingerprint.put(fp, new RawMessageDecoder<>(model, writeSchema, actualReadSchema)); } private RawMessageDecoder getDecoder(long fp) { diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java index ad2b1d31e49..917e5be88e3 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java @@ -78,9 +78,7 @@ public RawMessageDecoder(GenericData model, Schema schema) { * @param writeSchema the {@link Schema} used to decode buffers */ public RawMessageDecoder(GenericData model, Schema writeSchema, Schema readSchema) { - Schema writeSchema1 = writeSchema; - Schema readSchema1 = readSchema; - this.reader = model.createDatumReader(writeSchema1, readSchema1); + this.reader = model.createDatumReader(writeSchema, readSchema); } @Override diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java index 4df0d4c3683..230c6c1feab 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java @@ -81,9 +81,8 @@ public RawMessageEncoder(GenericData model, Schema schema) { * @param shouldCopy whether to copy buffers before returning encoded results */ public RawMessageEncoder(GenericData model, Schema schema, boolean shouldCopy) { - Schema writeSchema = schema; this.copyOutputBytes = shouldCopy; - this.writer = model.createDatumWriter(writeSchema); + this.writer = model.createDatumWriter(schema); } @Override diff --git a/lang/java/avro/src/main/java/org/apache/avro/path/TracingAvroTypeException.java b/lang/java/avro/src/main/java/org/apache/avro/path/TracingAvroTypeException.java index 4aed18b91de..f7dae885d5d 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/path/TracingAvroTypeException.java +++ b/lang/java/avro/src/main/java/org/apache/avro/path/TracingAvroTypeException.java @@ -59,8 +59,6 @@ public AvroTypeException summarize(Schema root) { sb.append(step.toString()); } } - AvroTypeException summary = new AvroTypeException(sb.toString()); - summary.initCause(cause); - return summary; + return new AvroTypeException(sb.toString(), cause); } } diff --git a/lang/java/avro/src/main/java/org/apache/avro/reflect/FieldAccessReflect.java b/lang/java/avro/src/main/java/org/apache/avro/reflect/FieldAccessReflect.java index c790dbfb886..d47d82c3838 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/reflect/FieldAccessReflect.java +++ b/lang/java/avro/src/main/java/org/apache/avro/reflect/FieldAccessReflect.java @@ -40,8 +40,8 @@ protected FieldAccessor getAccessor(Field field) { private static class ReflectionBasedAccessor extends FieldAccessor { protected final Field field; - private boolean isStringable; - private boolean isCustomEncoded; + private final boolean isStringable; + private final boolean isCustomEncoded; public ReflectionBasedAccessor(Field field) { this.field = field; @@ -83,7 +83,7 @@ protected boolean isCustomEncoded() { private static final class ReflectionBasesAccessorCustomEncoded extends ReflectionBasedAccessor { - private CustomEncoding encoding; + private final CustomEncoding encoding; public ReflectionBasesAccessorCustomEncoded(Field f, CustomEncoding encoding) { super(f); diff --git a/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java b/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java index 8cfbdb0529c..bd8f84f91a6 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java +++ b/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java @@ -545,8 +545,7 @@ Schema createNonStringMapSchema(Type keyType, Type valueType, Map meth = CTOR_CACHE.apply(c); - Object[] params = useSchema ? new Object[] { schema } : (Object[]) null; + Object[] params = useSchema ? new Object[] { schema } : null; return (old, sch) -> { try { diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/ByteBufferInputStream.java b/lang/java/avro/src/main/java/org/apache/avro/util/ByteBufferInputStream.java index f0ae5cc8a5e..6abb62015dc 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/util/ByteBufferInputStream.java +++ b/lang/java/avro/src/main/java/org/apache/avro/util/ByteBufferInputStream.java @@ -25,7 +25,7 @@ /** Utility to present {@link ByteBuffer} data as an {@link InputStream}. */ public class ByteBufferInputStream extends InputStream { - private List buffers; + private final List buffers; private int current; public ByteBufferInputStream(List buffers) { @@ -90,7 +90,7 @@ public ByteBuffer readBuffer(int length) throws IOException { /** * Returns the next non-empty buffer. */ - private ByteBuffer getBuffer() throws IOException { + private ByteBuffer getBuffer() { while (current < buffers.size()) { ByteBuffer buffer = buffers.get(current); if (buffer.hasRemaining()) From 25a9335d6e09e04a72f8fd432fe97267954bcc94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Tue, 31 Mar 2026 09:49:46 +0200 Subject: [PATCH 2/4] AVRO-4189: [Java] Use ClassUtils.forName() in FastReaderBuilder for consistency (#3693) FastReaderBuilder.findClass() was using data.getClassLoader().loadClass() directly, bypassing ClassUtils.forName() which is used everywhere else in the codebase. This change aligns FastReaderBuilder with the standard class loading path and adds tests for class loading behavior. --- .../org/apache/avro/io/FastReaderBuilder.java | 5 +- .../io/TestFastReaderBuilderClassLoading.java | 120 ++++++++++++++++++ .../avro/util/TestClassSecurityValidator.java | 23 ++++ 3 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 lang/java/avro/src/test/java/org/apache/avro/io/TestFastReaderBuilderClassLoading.java diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java b/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java index dbd06f305e8..7ff80e2a4f5 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/FastReaderBuilder.java @@ -52,6 +52,7 @@ import org.apache.avro.reflect.ReflectionUtil; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.ClassUtils; import org.apache.avro.util.Utf8; import org.apache.avro.util.WeakIdentityHashMap; import org.apache.avro.util.internal.Accessor; @@ -446,8 +447,8 @@ private FieldReader getTransformingStringReader(String valueClass, FieldReader s private Optional> findClass(String clazz) { try { - return Optional.of(data.getClassLoader().loadClass(clazz)); - } catch (ReflectiveOperationException e) { + return Optional.of(ClassUtils.forName(data.getClassLoader(), clazz)); + } catch (ClassNotFoundException e) { return Optional.empty(); } } diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestFastReaderBuilderClassLoading.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestFastReaderBuilderClassLoading.java new file mode 100644 index 00000000000..2feedaedaaf --- /dev/null +++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestFastReaderBuilderClassLoading.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.Collections; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.ClassSecurityValidator; +import org.apache.avro.util.ClassSecurityValidator.ClassSecurityPredicate; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Tests that FastReaderBuilder.findClass() routes class loading through + * ClassUtils.forName(), so that ClassSecurityValidator is applied consistently. + */ +public class TestFastReaderBuilderClassLoading { + + private static final String TEST_VALUE = "https://example.com"; + + private ClassSecurityPredicate originalValidator; + + @BeforeEach + public void saveValidator() { + originalValidator = ClassSecurityValidator.getGlobal(); + } + + @AfterEach + public void restoreValidator() { + ClassSecurityValidator.setGlobal(originalValidator); + } + + /** + * When the validator blocks a class referenced by java-class, FastReaderBuilder + * must propagate the SecurityException so the caller knows why the class was + * rejected. + */ + @Test + void blockedClassThrowsSecurityException() { + // Block java.net.URI + ClassSecurityValidator.setGlobal(ClassSecurityValidator.composite(ClassSecurityValidator.DEFAULT_TRUSTED_CLASSES, + ClassSecurityValidator.builder().add("org.apache.avro.util.Utf8").build())); + + assertThrows(SecurityException.class, () -> readWithJavaClass("java.net.URI"), + "Blocked class should cause a SecurityException to propagate"); + } + + /** + * When the validator trusts a class referenced by java-class, FastReaderBuilder + * should instantiate it normally. + */ + @Test + void trustedClassIsInstantiated() { + ClassSecurityValidator.setGlobal(ClassSecurityValidator.composite(ClassSecurityValidator.DEFAULT_TRUSTED_CLASSES, + ClassSecurityValidator.builder().add("java.net.URI").add("org.apache.avro.util.Utf8").build())); + + GenericRecord result = readWithJavaClass("java.net.URI"); + + assertInstanceOf(URI.class, result.get("value")); + assertEquals(URI.create(TEST_VALUE), result.get("value")); + } + + /** + * Encode a string, then read it back through FastReaderBuilder with the given + * java-class. + */ + private GenericRecord readWithJavaClass(String javaClass) { + try { + Schema stringSchema = Schema.create(Schema.Type.STRING); + stringSchema.addProp(SpecificData.CLASS_PROP, javaClass); + stringSchema.addProp(GenericData.STRING_PROP, GenericData.StringType.String.name()); + + Schema recordSchema = Schema.createRecord("TestRecord", null, "test", false); + recordSchema.setFields(Collections.singletonList(new Schema.Field("value", stringSchema, null, null))); + + // Encode + GenericRecord record = new GenericRecordBuilder(recordSchema).set("value", TEST_VALUE).build(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); + new GenericDatumWriter(recordSchema).write(record, encoder); + encoder.flush(); + + // Decode with fast reader enabled + GenericData data = new GenericData(); + data.setFastReaderEnabled(true); + GenericDatumReader reader = new GenericDatumReader<>(recordSchema, recordSchema, data); + return reader.read(null, DecoderFactory.get().binaryDecoder(new ByteArrayInputStream(out.toByteArray()), null)); + } catch (IOException e) { + return fail("Unexpected IOException during encode/decode", e); + } + } +} diff --git a/lang/java/avro/src/test/java/org/apache/avro/util/TestClassSecurityValidator.java b/lang/java/avro/src/test/java/org/apache/avro/util/TestClassSecurityValidator.java index 7e58a66a073..33d7c1cdba9 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/util/TestClassSecurityValidator.java +++ b/lang/java/avro/src/test/java/org/apache/avro/util/TestClassSecurityValidator.java @@ -97,6 +97,29 @@ public void forbiddenClass(String className) { assertEquals("Not inner", e.getMessage()); } + @Test + void testClassUtilsEnforcesValidator() { + ClassSecurityValidator.setGlobal(ClassSecurityValidator.builder().add("java.lang.String").build()); + + assertThrows(SecurityException.class, () -> ClassUtils.forName("java.net.URI"), + "ClassUtils.forName should reject classes not in the trusted set"); + + assertDoesNotThrow(() -> ClassUtils.forName("java.lang.String"), + "ClassUtils.forName should allow classes in the trusted set"); + } + + @Test + void testDirectLoadClassDoesNotUseValidator() throws ClassNotFoundException { + ClassSecurityValidator.setGlobal(ClassSecurityValidator.builder().add("java.lang.String").build()); + + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + Class loaded = cl.loadClass("java.net.URI"); + assertNotNull(loaded, "Direct ClassLoader.loadClass() loads any class regardless of the validator"); + + assertThrows(SecurityException.class, () -> ClassUtils.forName("java.net.URI"), + "ClassUtils.forName correctly applies the validator"); + } + @Test void testBuildComplexPredicate() { ClassSecurityValidator.setGlobal(ClassSecurityValidator.composite( From 21fe903b05fdcd0eff1fec344981c9fd8a1bcecb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Fri, 1 May 2026 12:49:38 +0200 Subject: [PATCH 3/4] AVRO-4241: [Java] Validate remaining bytes before allocating collections to prevent OOM from malicious data --- .../avro/generic/GenericDatumReader.java | 118 ++++++- .../org/apache/avro/io/BinaryDecoder.java | 65 ++++ .../main/java/org/apache/avro/io/Decoder.java | 10 + .../org/apache/avro/io/ValidatingDecoder.java | 5 + .../avro/util/ByteBufferInputStream.java | 12 + .../avro/generic/TestGenericDatumReader.java | 186 ++++++++++++ .../org/apache/avro/io/TestBinaryDecoder.java | 45 +++ .../test/generic/MinBytesPerElementTest.java | 287 ++++++++++++++++++ 8 files changed, 726 insertions(+), 2 deletions(-) create mode 100644 lang/java/perf/src/main/java/org/apache/avro/perf/test/generic/MinBytesPerElementTest.java diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java index b818a070c18..f8f092b3994 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java +++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java @@ -17,12 +17,16 @@ */ package org.apache.avro.generic; +import java.io.EOFException; import java.io.IOException; import java.lang.reflect.Constructor; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -285,6 +289,7 @@ protected Object readArray(Object old, Schema expected, ResolvingDecoder in) thr long l = in.readArrayStart(); long base = 0; if (l > 0) { + ensureAvailableCollectionBytes(in, l, expectedType); LogicalType logicalType = expectedType.getLogicalType(); Conversion conversion = getData().getConversionFor(logicalType); Object array = newArray(old, (int) l, expected); @@ -300,13 +305,23 @@ protected Object readArray(Object old, Schema expected, ResolvingDecoder in) thr } } base += l; - } while ((l = in.arrayNext()) > 0); + } while ((l = arrayNext(in, expectedType)) > 0); return pruneArray(array); } else { return pruneArray(newArray(old, 0, expected)); } } + /** + * Reads the next array block count and validates remaining bytes before the + * caller allocates storage. + */ + private long arrayNext(ResolvingDecoder in, Schema elementType) throws IOException { + long l = in.arrayNext(); + ensureAvailableCollectionBytes(in, l, elementType); + return l; + } + private Object pruneArray(Object object) { if (object instanceof GenericArray) { ((GenericArray) object).prune(); @@ -342,6 +357,7 @@ protected Object readMap(Object old, Schema expected, ResolvingDecoder in) throw long l = in.readMapStart(); LogicalType logicalType = eValue.getLogicalType(); Conversion conversion = getData().getConversionFor(logicalType); + ensureAvailableMapBytes(in, l, eValue); Object map = newMap(old, (int) l); if (l > 0) { do { @@ -355,11 +371,39 @@ protected Object readMap(Object old, Schema expected, ResolvingDecoder in) throw addToMap(map, readMapKey(null, expected, in), readWithoutConversion(null, eValue, in)); } } - } while ((l = in.mapNext()) > 0); + } while ((l = mapNext(in, eValue)) > 0); } return map; } + /** + * Reads the next map block count and validates remaining bytes before the + * caller allocates storage. + */ + private long mapNext(ResolvingDecoder in, Schema valueType) throws IOException { + long l = in.mapNext(); + ensureAvailableMapBytes(in, l, valueType); + return l; + } + + /** + * Validates remaining bytes for a map block. Each map entry has a string key + * (at least 1 byte for the length varint) plus a value, so the minimum bytes + * per entry is {@code 1 + minBytesPerElement(valueSchema)}. + */ + private static void ensureAvailableMapBytes(Decoder decoder, long count, Schema valueSchema) throws EOFException { + if (count <= 0) { + return; + } + // Map keys are always strings: at least 1 byte for the length varint + long minBytesPerEntry = 1L + minBytesPerElement(valueSchema); + int remaining = decoder.remainingBytes(); + if (remaining >= 0 && count * minBytesPerEntry > remaining) { + throw new EOFException("Map claims " + count + " entries with at least " + minBytesPerEntry + + " bytes each, but only " + remaining + " bytes are available"); + } + } + /** * Called by the default implementation of {@link #readMap} to read a key value. * The default implementation returns delegates to @@ -378,6 +422,76 @@ protected void addToMap(Object map, Object key, Object value) { ((Map) map).put(key, value); } + /** + * Returns the minimum number of bytes required to encode a single value of the + * given schema in Avro binary format. Used to validate that the decoder has + * enough data remaining before allocating collection backing arrays. + *

+ * Returns 0 for types whose binary encoding is empty ({@code null}, zero-length + * {@code fixed}, records with only zero-byte fields). Returns a positive value + * for all other types. + */ + static int minBytesPerElement(Schema schema) { + return minBytesPerElement(schema, Collections.newSetFromMap(new IdentityHashMap<>())); + } + + private static int minBytesPerElement(Schema schema, Set visited) { + switch (schema.getType()) { + case NULL: + return 0; + case FIXED: + return schema.getFixedSize(); + case FLOAT: + return 4; + case DOUBLE: + return 8; + case RECORD: + if (!visited.add(schema)) { + return 0; // break recursion for self-referencing schemas + } + long sum = 0; + for (Schema.Field f : schema.getFields()) { + sum += minBytesPerElement(f.schema(), visited); + if (sum >= Integer.MAX_VALUE) { + sum = Integer.MAX_VALUE; + break; + } + } + visited.remove(schema); + return (int) sum; + case UNION: + // The branch index varint is always at least 1 byte + return 1; + default: + // BOOLEAN, INT, LONG, ENUM, STRING, BYTES, ARRAY, MAP are all >= 1 byte + return 1; + } + } + + /** + * Validates that the decoder has enough remaining bytes to hold {@code count} + * elements of the given schema, assuming each element requires at least + * {@link #minBytesPerElement} bytes. Throws {@link EOFException} if the decoder + * reports fewer remaining bytes than required. + *

+ * This check prevents out-of-memory errors from pre-allocating huge backing + * arrays when the source data is truncated or malicious. + */ + private static void ensureAvailableCollectionBytes(Decoder decoder, long count, Schema elementSchema) + throws EOFException { + if (count <= 0) { + return; + } + int minBytes = minBytesPerElement(elementSchema); + if (minBytes > 0) { + int remaining = decoder.remainingBytes(); + if (remaining >= 0 && count * (long) minBytes > remaining) { + throw new EOFException("Collection claims " + count + " elements with at least " + minBytes + + " bytes each, but only " + remaining + " bytes are available"); + } + } + } + /** * Called to read a fixed value. May be overridden for alternate fixed * representations. By default, returns {@link GenericFixed}. diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java index 4e45781b030..67ffa34f9c1 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java @@ -17,6 +17,7 @@ */ package org.apache.avro.io; +import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -27,6 +28,7 @@ import org.apache.avro.AvroRuntimeException; import org.apache.avro.InvalidNumberEncodingException; import org.apache.avro.SystemLimitException; +import org.apache.avro.util.ByteBufferInputStream; import org.apache.avro.util.Utf8; /** @@ -283,6 +285,7 @@ public double readDouble() throws IOException { @Override public Utf8 readString(Utf8 old) throws IOException { int length = SystemLimitException.checkMaxStringLength(readLong()); + ensureAvailableBytes(length); Utf8 result = (old != null ? old : new Utf8()); result.setByteLength(length); if (0 != length) { @@ -306,6 +309,7 @@ public void skipString() throws IOException { @Override public ByteBuffer readBytes(ByteBuffer old) throws IOException { int length = SystemLimitException.checkMaxBytesLength(readLong()); + ensureAvailableBytes(length); final ByteBuffer result; if (old != null && length <= old.capacity()) { result = old; @@ -493,6 +497,21 @@ public boolean isEnd() throws IOException { return (0 == read); } + /** + * Returns the total number of bytes remaining that can be read from this + * decoder (including any buffered bytes), or {@code -1} if the total is + * unknown. + *

+ * Byte-array-backed decoders return an exact count. InputStream-backed decoders + * return an exact count only when the wrapped stream can report one. + *

+ * {@link DirectBinaryDecoder} always returns {@code -1}. + */ + @Override + public int remainingBytes() { + return source != null ? source.remainingBytes() : -1; + } + /** * Ensures that buf[pos + num - 1] is not out of the buffer array bounds. * However, buf[pos + num -1] may be >= limit if there is not enough data left @@ -515,6 +534,27 @@ private void ensureBounds(int num) throws IOException { } } + /** + * Validates that the source has at least {@code length} bytes remaining before + * proceeding. Throws early if the declared length is inconsistent with the + * available data. + *

+ * This check is only applied when the decoder knows the exact remaining byte + * count. + * + * @param length the number of bytes expected to be available + * @throws EOFException if the source is known to have fewer bytes remaining + */ + private void ensureAvailableBytes(int length) throws EOFException { + if (source != null && length > 0) { + int remaining = source.remainingBytes(); + if (remaining >= 0 && length > remaining) { + throw new EOFException( + "Attempted to read " + length + " bytes, but only " + remaining + " bytes are available"); + } + } + } + /** * Returns an {@link java.io.InputStream} that is aware of any buffering that * may occur in this BinaryDecoder. Readers that need to interleave decoding @@ -649,6 +689,12 @@ protected ByteSource() { abstract boolean isEof(); + /** + * Returns the total number of bytes remaining that can be read from this source + * (including any buffered bytes), or {@code -1} if the total is unknown. + */ + protected abstract int remainingBytes(); + protected void attach(int bufferSize, BinaryDecoder decoder) { decoder.buf = new byte[bufferSize]; decoder.pos = 0; @@ -895,6 +941,20 @@ public boolean isEof() { return isEof; } + @Override + protected int remainingBytes() { + int buffered = ba.getLim() - ba.getPos(); + try { + if (in.getClass() == ByteArrayInputStream.class || in.getClass() == ByteBufferInputStream.class) { + long total = (long) buffered + in.available(); + return (int) Math.min(total, Integer.MAX_VALUE); + } + } catch (IOException e) { + return -1; + } + return -1; + } + @Override public void close() throws IOException { in.close(); @@ -1013,5 +1073,10 @@ public boolean isEof() { int remaining = ba.getLim() - ba.getPos(); return (remaining == 0); } + + @Override + protected int remainingBytes() { + return ba.getLim() - ba.getPos(); + } } } diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java index 11fc28d762e..80640a61aa0 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java @@ -299,4 +299,14 @@ public void readFixed(byte[] bytes) throws IOException { * type of the next value to be read */ public abstract int readIndex() throws IOException; + + /** + * Returns the total number of bytes remaining that can be read from this + * decoder, or {@code -1} if the total is unknown. Implementations that can + * determine remaining capacity (for example, byte-array-backed decoders) should + * override this method. The default returns {@code -1}. + */ + public int remainingBytes() { + return -1; + } } diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingDecoder.java index dbee4458575..26f79a16ff2 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingDecoder.java @@ -246,4 +246,9 @@ public int readIndex() throws IOException { public Symbol doAction(Symbol input, Symbol top) throws IOException { return null; } + + @Override + public int remainingBytes() { + return in != null ? in.remainingBytes() : -1; + } } diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/ByteBufferInputStream.java b/lang/java/avro/src/main/java/org/apache/avro/util/ByteBufferInputStream.java index 6abb62015dc..375abc23fbf 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/util/ByteBufferInputStream.java +++ b/lang/java/avro/src/main/java/org/apache/avro/util/ByteBufferInputStream.java @@ -65,6 +65,18 @@ public int read(byte[] b, int off, int len) throws IOException { } } + @Override + public int available() throws IOException { + long remaining = 0; + for (int i = current; i < buffers.size(); i++) { + remaining += buffers.get(i).remaining(); + if (remaining >= Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + } + return (int) remaining; + } + /** * Read a buffer from the input without copying, if possible. */ diff --git a/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericDatumReader.java b/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericDatumReader.java index 3ec67cd1139..714d2e60814 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericDatumReader.java +++ b/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericDatumReader.java @@ -17,17 +17,26 @@ */ package org.apache.avro.generic; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.avro.Schema; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; public class TestGenericDatumReader { @@ -117,4 +126,181 @@ private void sleep() { } } } + + // --- minBytesPerElement tests --- + + @Test + public void testMinBytesPerElementPrimitives() { + assertEquals(0, GenericDatumReader.minBytesPerElement(Schema.create(Schema.Type.NULL))); + assertEquals(1, GenericDatumReader.minBytesPerElement(Schema.create(Schema.Type.BOOLEAN))); + assertEquals(1, GenericDatumReader.minBytesPerElement(Schema.create(Schema.Type.INT))); + assertEquals(1, GenericDatumReader.minBytesPerElement(Schema.create(Schema.Type.LONG))); + assertEquals(4, GenericDatumReader.minBytesPerElement(Schema.create(Schema.Type.FLOAT))); + assertEquals(8, GenericDatumReader.minBytesPerElement(Schema.create(Schema.Type.DOUBLE))); + assertEquals(1, GenericDatumReader.minBytesPerElement(Schema.create(Schema.Type.STRING))); + assertEquals(1, GenericDatumReader.minBytesPerElement(Schema.create(Schema.Type.BYTES))); + } + + @Test + public void testMinBytesPerElementFixed() { + assertEquals(0, GenericDatumReader.minBytesPerElement(Schema.createFixed("ZeroFixed", null, "test", 0))); + assertEquals(5, GenericDatumReader.minBytesPerElement(Schema.createFixed("FiveFixed", null, "test", 5))); + assertEquals(16, GenericDatumReader.minBytesPerElement(Schema.createFixed("SixteenFixed", null, "test", 16))); + } + + @Test + public void testMinBytesPerElementUnion() { + // Union always >= 1 byte (branch index varint) + Schema nullableInt = Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.INT)); + assertEquals(1, GenericDatumReader.minBytesPerElement(nullableInt)); + } + + @Test + public void testMinBytesPerElementRecord() { + // Empty record = 0 bytes + Schema emptyRecord = Schema.createRecord("Empty", null, "test", false); + emptyRecord.setFields(Collections.emptyList()); + assertEquals(0, GenericDatumReader.minBytesPerElement(emptyRecord)); + + // Record with a single non-null field >= 1 byte + Schema recWithInt = Schema.createRecord("WithInt", null, "test", false); + recWithInt.setFields(Collections.singletonList(new Schema.Field("x", Schema.create(Schema.Type.INT)))); + assertEquals(1, GenericDatumReader.minBytesPerElement(recWithInt)); + + // Record with only null fields = 0 bytes + Schema recWithNull = Schema.createRecord("WithNull", null, "test", false); + recWithNull.setFields(Collections.singletonList(new Schema.Field("n", Schema.create(Schema.Type.NULL)))); + assertEquals(0, GenericDatumReader.minBytesPerElement(recWithNull)); + + Schema recWithMultipleFields = Schema.createRecord("WithMultipleFields", null, "test", false); + recWithMultipleFields.setFields(Arrays.asList(new Schema.Field("f", Schema.create(Schema.Type.FLOAT)), + new Schema.Field("d", Schema.create(Schema.Type.DOUBLE)))); + assertEquals(12, GenericDatumReader.minBytesPerElement(recWithMultipleFields)); + } + + @Test + public void testMinBytesPerElementNestedCollections() { + // Array and map types are >= 1 byte (count varint) + assertEquals(1, GenericDatumReader.minBytesPerElement(Schema.createArray(Schema.create(Schema.Type.INT)))); + assertEquals(1, GenericDatumReader.minBytesPerElement(Schema.createMap(Schema.create(Schema.Type.INT)))); + } + + // --- Collection byte validation end-to-end tests --- + + /** + * Encodes the given longs as Avro varints into a byte array. + */ + private static byte[] encodeVarints(long... values) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + BinaryEncoder enc = EncoderFactory.get().directBinaryEncoder(baos, null); + for (long v : values) { + enc.writeLong(v); + } + enc.flush(); + return baos.toByteArray(); + } + + /** + * Verify that reading an array of ints with a huge count but no element data + * throws EOFException from the schema-aware byte check. + */ + @Test + public void arrayOfIntsRejectsHugeCount() throws Exception { + Schema schema = Schema.createArray(Schema.create(Schema.Type.INT)); + GenericDatumReader reader = new GenericDatumReader<>(schema); + + // Binary: varint(10_000_000) for block count, varint(0) for terminator. + // No actual element data -- the reader should reject before allocating. + byte[] data = encodeVarints(10_000_000L, 0L); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null); + assertThrows(EOFException.class, () -> reader.read(null, decoder)); + } + + /** + * Verify that reading an array of nulls with a large count SUCCEEDS because + * null elements are 0 bytes each, so the byte check is correctly skipped. + */ + @Test + public void arrayOfNullsAcceptsLargeCount() throws Exception { + Schema schema = Schema.createArray(Schema.create(Schema.Type.NULL)); + GenericDatumReader reader = new GenericDatumReader<>(schema); + + // Binary: varint(1000) for block count, varint(0) for terminator. + // 1000 null elements = 0 bytes of element data. + byte[] data = encodeVarints(1000L, 0L); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null); + GenericData.Array result = (GenericData.Array) reader.read(null, decoder); + assertEquals(1000, result.size()); + } + + /** + * Verify that reading a map of string->int with a huge count throws + * EOFException. Each map entry needs at least 2 bytes (1 for key length varint + * + 1 for int value). + */ + @Test + public void mapOfStringToIntRejectsHugeCount() throws Exception { + Schema schema = Schema.createMap(Schema.create(Schema.Type.INT)); + GenericDatumReader reader = new GenericDatumReader<>(schema); + + byte[] data = encodeVarints(10_000_000L, 0L); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null); + assertThrows(EOFException.class, () -> reader.read(null, decoder)); + } + + /** + * Verify that reading a map of string->null with a huge count also throws + * EOFException because map keys are always strings (at least 1 byte each). + */ + @Test + public void mapOfStringToNullRejectsHugeCount() throws Exception { + Schema schema = Schema.createMap(Schema.create(Schema.Type.NULL)); + GenericDatumReader reader = new GenericDatumReader<>(schema); + + byte[] data = encodeVarints(10_000_000L, 0L); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null); + assertThrows(EOFException.class, () -> reader.read(null, decoder)); + } + + /** + * Verify that reading an array of zero-length fixed elements with a large count + * SUCCEEDS because zero-length fixed elements are 0 bytes each. + */ + @Test + public void arrayOfZeroLengthFixedAcceptsLargeCount() throws Exception { + Schema fixedSchema = Schema.createFixed("Empty", null, "test", 0); + Schema schema = Schema.createArray(fixedSchema); + GenericDatumReader reader = new GenericDatumReader<>(schema); + + byte[] data = encodeVarints(500L, 0L); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null); + GenericData.Array result = (GenericData.Array) reader.read(null, decoder); + assertEquals(500, result.size()); + } + + @Test + public void arrayOfRecordsRejectsHugeCountUsingFullRecordSize() throws Exception { + Schema recordSchema = Schema.createRecord("Element", null, "test", false); + recordSchema.setFields(Arrays.asList(new Schema.Field("f", Schema.create(Schema.Type.FLOAT)), + new Schema.Field("d", Schema.create(Schema.Type.DOUBLE)))); + Schema schema = Schema.createArray(recordSchema); + GenericDatumReader reader = new GenericDatumReader<>(schema); + + byte[] data = encodeVarints(2L, 0L); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null); + assertThrows(EOFException.class, () -> reader.read(null, decoder)); + } + + @Test + public void mapOfRecordsRejectsHugeCountUsingFullRecordSize() throws Exception { + Schema recordSchema = Schema.createRecord("MapValue", null, "test", false); + recordSchema.setFields(Arrays.asList(new Schema.Field("f", Schema.create(Schema.Type.FLOAT)), + new Schema.Field("d", Schema.create(Schema.Type.DOUBLE)))); + Schema schema = Schema.createMap(recordSchema); + GenericDatumReader reader = new GenericDatumReader<>(schema); + + byte[] data = encodeVarints(1L, 0L); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null); + assertThrows(EOFException.class, () -> reader.read(null, decoder)); + } } diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java index 6010fc9c69f..f1713666563 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java +++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java @@ -682,4 +682,49 @@ void eof(boolean useDirect) throws IOException { Assertions.assertThrows(EOFException.class, () -> d.readInt()); } + /** + * Verify that a byte-array-backed decoder rejects a string whose varint length + * exceeds the remaining bytes, throwing {@link EOFException} before + * allocating the buffer. + */ + @Test + public void testStringLengthExceedsAvailableBytes() throws IOException { + // Encode a varint claiming 10_000_000 bytes of string data, but supply none. + // The byte-array-backed decoder knows it has only a few bytes left after + // the varint, so ensureAvailableBytes must throw EOFException. + BinaryDecoder bd = newDecoder(false, 10_000_000L); + Assertions.assertThrows(EOFException.class, () -> bd.readString(null)); + } + + /** + * Same as {@link #testStringLengthExceedsAvailableBytes()} but for + * {@link BinaryDecoder#readBytes(ByteBuffer)}. + */ + @Test + public void testBytesLengthExceedsAvailableBytes() throws IOException { + BinaryDecoder bd = newDecoder(false, 10_000_000L); + Assertions.assertThrows(EOFException.class, () -> bd.readBytes(null)); + } + + @Test + public void testStringLengthDoesNotTrustUnknownAvailable() throws IOException { + byte[] encoded; + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null); + encoder.writeString("hello"); + encoder.flush(); + encoded = baos.toByteArray(); + } + + InputStream in = new ByteArrayInputStream(encoded) { + @Override + public synchronized int available() { + return 0; + } + }; + + BinaryDecoder decoder = factory.binaryDecoder(in, null); + Assertions.assertEquals("hello", decoder.readString(null).toString()); + } + } diff --git a/lang/java/perf/src/main/java/org/apache/avro/perf/test/generic/MinBytesPerElementTest.java b/lang/java/perf/src/main/java/org/apache/avro/perf/test/generic/MinBytesPerElementTest.java new file mode 100644 index 00000000000..dc831c73d57 --- /dev/null +++ b/lang/java/perf/src/main/java/org/apache/avro/perf/test/generic/MinBytesPerElementTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.perf.test.generic; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Random; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.concurrent.TimeUnit; + +/** + * Benchmarks to measure the overhead of {@code minBytesPerElement} computation + * during array/map decoding via {@link GenericDatumReader}. Tests complex, + * wide, and recursive schema structures to verify that the per-block schema + * traversal cost is acceptable. + */ +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 3, time = 2) +@Measurement(iterations = 5, time = 3) +@Fork(1) +@State(Scope.Thread) +public class MinBytesPerElementTest { + + /** + * Schema complexity levels to benchmark: - WIDE: Record with 50 fields of + * various types - DEEP: Deeply nested records (10 levels) - RECURSIVE: + * Self-referencing record (linked-list style) + */ + @Param({ "WIDE", "DEEP", "RECURSIVE" }) + public String schemaType; + + private byte[] encodedArrayData; + private byte[] encodedMapData; + private DatumReader arrayReader; + private DatumReader mapReader; + private Schema arrayWrapperSchema; + private Schema mapWrapperSchema; + + @Setup(Level.Trial) + public void setup() throws IOException { + Schema elementSchema = buildElementSchema(schemaType); + + // Wrap in array and map schemas for testing collection decoding paths + arrayWrapperSchema = Schema.createRecord("ArrayWrapper", null, "test", false); + arrayWrapperSchema + .setFields(Arrays.asList(new Schema.Field("items", Schema.createArray(elementSchema), null, null))); + + mapWrapperSchema = Schema.createRecord("MapWrapper", null, "test", false); + mapWrapperSchema.setFields(Arrays.asList(new Schema.Field("entries", Schema.createMap(elementSchema), null, null))); + + arrayReader = new GenericDatumReader<>(arrayWrapperSchema); + mapReader = new GenericDatumReader<>(mapWrapperSchema); + + // Encode test data: array with 1000 elements + encodedArrayData = encodeArrayData(arrayWrapperSchema, elementSchema, 1000); + // Encode test data: map with 1000 entries + encodedMapData = encodeMapData(mapWrapperSchema, elementSchema, 1000); + } + + @Benchmark + public void decodeArrayOfComplexRecords(Blackhole bh) throws IOException { + Decoder decoder = DecoderFactory.get().binaryDecoder(encodedArrayData, null); + GenericRecord result = arrayReader.read(null, decoder); + bh.consume(result); + } + + @Benchmark + public void decodeMapOfComplexRecords(Blackhole bh) throws IOException { + Decoder decoder = DecoderFactory.get().binaryDecoder(encodedMapData, null); + GenericRecord result = mapReader.read(null, decoder); + bh.consume(result); + } + + private static Schema buildElementSchema(String type) { + switch (type) { + case "WIDE": + return buildWideSchema(); + case "DEEP": + return buildDeepSchema(); + case "RECURSIVE": + return buildRecursiveSchema(); + default: + throw new IllegalArgumentException("Unknown schema type: " + type); + } + } + + /** + * Wide record: 50 fields of mixed types (int, long, double, float, string, + * boolean, bytes, nested record). + */ + private static Schema buildWideSchema() { + Schema innerRecord = Schema.createRecord("Inner", null, "test", false); + innerRecord.setFields(Arrays.asList(new Schema.Field("x", Schema.create(Schema.Type.INT), null, null), + new Schema.Field("y", Schema.create(Schema.Type.DOUBLE), null, null))); + + List fields = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + fields.add(new Schema.Field("int_" + i, Schema.create(Schema.Type.INT), null, null)); + } + for (int i = 0; i < 10; i++) { + fields.add(new Schema.Field("long_" + i, Schema.create(Schema.Type.LONG), null, null)); + } + for (int i = 0; i < 10; i++) { + fields.add(new Schema.Field("double_" + i, Schema.create(Schema.Type.DOUBLE), null, null)); + } + for (int i = 0; i < 10; i++) { + fields.add(new Schema.Field("str_" + i, Schema.create(Schema.Type.STRING), null, null)); + } + for (int i = 0; i < 5; i++) { + fields.add(new Schema.Field("bool_" + i, Schema.create(Schema.Type.BOOLEAN), null, null)); + } + for (int i = 0; i < 5; i++) { + fields.add(new Schema.Field("rec_" + i, innerRecord, null, null)); + } + + Schema wide = Schema.createRecord("WideRecord", null, "test", false); + wide.setFields(fields); + return wide; + } + + /** + * Deeply nested: 10 levels of records, each containing an int and the next + * level. + */ + private static Schema buildDeepSchema() { + Schema current = Schema.createRecord("Level10", null, "test", false); + current.setFields(Arrays.asList(new Schema.Field("value", Schema.create(Schema.Type.INT), null, null))); + + for (int i = 9; i >= 1; i--) { + Schema next = Schema.createRecord("Level" + i, null, "test", false); + next.setFields(Arrays.asList(new Schema.Field("value", Schema.create(Schema.Type.INT), null, null), + new Schema.Field("child", current, null, null))); + current = next; + } + return current; + } + + /** + * Recursive: self-referencing record (linked list). The "next" field is a union + * of null and the record itself. + */ + private static Schema buildRecursiveSchema() { + Schema recursive = Schema.createRecord("LinkedNode", null, "test", false); + Schema nullSchema = Schema.create(Schema.Type.NULL); + Schema union = Schema.createUnion(Arrays.asList(nullSchema, recursive)); + recursive.setFields(Arrays.asList(new Schema.Field("value", Schema.create(Schema.Type.INT), null, null), + new Schema.Field("next", union, null, null))); + return recursive; + } + + private static byte[] encodeArrayData(Schema wrapperSchema, Schema elementSchema, int count) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DatumWriter writer = new GenericDatumWriter<>(wrapperSchema); + Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null); + + GenericRecord wrapper = new GenericData.Record(wrapperSchema); + List items = new ArrayList<>(count); + Random r = new Random(42); + for (int i = 0; i < count; i++) { + items.add(buildRecord(elementSchema, r, 0)); + } + wrapper.put("items", items); + writer.write(wrapper, encoder); + encoder.flush(); + return baos.toByteArray(); + } + + private static byte[] encodeMapData(Schema wrapperSchema, Schema elementSchema, int count) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DatumWriter writer = new GenericDatumWriter<>(wrapperSchema); + Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null); + + GenericRecord wrapper = new GenericData.Record(wrapperSchema); + HashMap map = new HashMap<>(); + Random r = new Random(42); + for (int i = 0; i < count; i++) { + map.put("key_" + i, buildRecord(elementSchema, r, 0)); + } + wrapper.put("entries", map); + writer.write(wrapper, encoder); + encoder.flush(); + return baos.toByteArray(); + } + + private static GenericRecord buildRecord(Schema schema, Random r, int depth) { + GenericRecord rec = new GenericData.Record(schema); + for (Schema.Field field : schema.getFields()) { + rec.put(field.pos(), buildValue(field.schema(), r, depth)); + } + return rec; + } + + private static Object buildValue(Schema schema, Random r, int depth) { + switch (schema.getType()) { + case INT: + return r.nextInt(); + case LONG: + return r.nextLong(); + case FLOAT: + return r.nextFloat(); + case DOUBLE: + return r.nextDouble(); + case BOOLEAN: + return r.nextBoolean(); + case STRING: + return "s" + r.nextInt(1000); + case BYTES: + byte[] b = new byte[4]; + r.nextBytes(b); + return java.nio.ByteBuffer.wrap(b); + case RECORD: + return buildRecord(schema, r, depth + 1); + case UNION: + // For recursive schemas, limit depth + List types = schema.getTypes(); + if (depth > 3) { + // Pick the null branch to terminate recursion + for (int i = 0; i < types.size(); i++) { + if (types.get(i).getType() == Schema.Type.NULL) { + return null; + } + } + } + // Pick non-null branch + for (int i = 0; i < types.size(); i++) { + if (types.get(i).getType() != Schema.Type.NULL) { + return buildValue(types.get(i), r, depth); + } + } + return null; + case ARRAY: + return new ArrayList<>(); + case MAP: + return new HashMap<>(); + case NULL: + return null; + default: + return null; + } + } +} From a1c2374471b99da361992ad216a8bdf42d75b1f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Fri, 1 May 2026 13:40:57 +0200 Subject: [PATCH 4/4] AVRO-4250 [build] Run github actions test workflows for the maintenance branches --- .github/workflows/test-lang-c++.yml | 4 ++-- .github/workflows/test-lang-c.yml | 4 ++-- .github/workflows/test-lang-csharp.yml | 4 ++-- .github/workflows/test-lang-java.yml | 4 ++-- .github/workflows/test-lang-js.yml | 4 ++-- .github/workflows/test-lang-perl.yml | 4 ++-- .github/workflows/test-lang-php.yml | 4 ++-- .github/workflows/test-lang-py.yml | 4 ++-- .github/workflows/test-lang-ruby.yml | 4 ++-- 9 files changed, 18 insertions(+), 18 deletions(-) diff --git a/.github/workflows/test-lang-c++.yml b/.github/workflows/test-lang-c++.yml index bc96d2df131..e3ca1884a73 100644 --- a/.github/workflows/test-lang-c++.yml +++ b/.github/workflows/test-lang-c++.yml @@ -17,9 +17,9 @@ name: Test C++ on: workflow_dispatch: push: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] pull_request: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] paths: - '.github/workflows/test-lang-c\+\+.yml' - 'lang/c\+\+/**' diff --git a/.github/workflows/test-lang-c.yml b/.github/workflows/test-lang-c.yml index f59d244d17a..679a5abf4eb 100644 --- a/.github/workflows/test-lang-c.yml +++ b/.github/workflows/test-lang-c.yml @@ -17,9 +17,9 @@ name: Test C on: workflow_dispatch: push: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] pull_request: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] paths: - .github/workflows/test-lang-c.yml - lang/c/** diff --git a/.github/workflows/test-lang-csharp.yml b/.github/workflows/test-lang-csharp.yml index 1dd00fc4ce4..2b69c3a19f9 100644 --- a/.github/workflows/test-lang-csharp.yml +++ b/.github/workflows/test-lang-csharp.yml @@ -17,9 +17,9 @@ name: 'Test C#' on: workflow_dispatch: push: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] pull_request: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] paths: - .github/workflows/test-lang-csharp.yml - lang/csharp/** diff --git a/.github/workflows/test-lang-java.yml b/.github/workflows/test-lang-java.yml index 96f0fd7b312..e9ca5c6bf1b 100644 --- a/.github/workflows/test-lang-java.yml +++ b/.github/workflows/test-lang-java.yml @@ -17,9 +17,9 @@ name: 'Test Java' on: workflow_dispatch: push: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] pull_request: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] paths: - .github/workflows/test-lang-java.yml - lang/java/** diff --git a/.github/workflows/test-lang-js.yml b/.github/workflows/test-lang-js.yml index a3000e05d6f..744777c8e7f 100644 --- a/.github/workflows/test-lang-js.yml +++ b/.github/workflows/test-lang-js.yml @@ -17,9 +17,9 @@ name: 'Test JavaScript' on: workflow_dispatch: push: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] pull_request: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] paths: - .github/workflows/test-lang-js.yml - lang/js/** diff --git a/.github/workflows/test-lang-perl.yml b/.github/workflows/test-lang-perl.yml index a9c923fc3a6..34726338e0b 100644 --- a/.github/workflows/test-lang-perl.yml +++ b/.github/workflows/test-lang-perl.yml @@ -17,9 +17,9 @@ name: 'Test Perl' on: workflow_dispatch: push: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] pull_request: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] paths: - .github/workflows/test-lang-perl.yml - lang/perl/** diff --git a/.github/workflows/test-lang-php.yml b/.github/workflows/test-lang-php.yml index 8c91f9de3d9..9638f9ee8a4 100644 --- a/.github/workflows/test-lang-php.yml +++ b/.github/workflows/test-lang-php.yml @@ -17,9 +17,9 @@ name: 'Test PHP' on: workflow_dispatch: push: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] pull_request: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] paths: - .github/workflows/test-lang-php.yml - lang/php/** diff --git a/.github/workflows/test-lang-py.yml b/.github/workflows/test-lang-py.yml index aba38f40595..aa31acc3339 100644 --- a/.github/workflows/test-lang-py.yml +++ b/.github/workflows/test-lang-py.yml @@ -17,9 +17,9 @@ name: 'Test Python' on: workflow_dispatch: push: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] pull_request: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] paths: - .github/workflows/test-lang-py.yml - lang/py/** diff --git a/.github/workflows/test-lang-ruby.yml b/.github/workflows/test-lang-ruby.yml index 642d08cc11a..aed789b2624 100644 --- a/.github/workflows/test-lang-ruby.yml +++ b/.github/workflows/test-lang-ruby.yml @@ -17,9 +17,9 @@ name: 'Test Ruby' on: workflow_dispatch: push: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] pull_request: - branches: [ master ] + branches: [master, branch-1.11, branch-1.12] paths: - .github/workflows/test-lang-ruby.yml - lang/ruby/**