Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

AVRO-1200. DatumWriter can write malformed data if collection is modi…

…fied during write.

git-svn-id: https://svn.apache.org/repos/asf/avro/trunk@1407419 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
commit 121b98ddc5a9db44040260bf5a6ec7bb106ad3de 1 parent f261733
Thomas White authored
View
5 CHANGES.txt
@@ -21,7 +21,7 @@ Trunk (not yet released)
(Hernan Otero via cutting)
AVRO-983. maven-avro-plugin: Allow maven schema compiler to support
- external reference in an another avro schema file (tomwhite)
+ external reference in an another avro schema file. (tomwhite)
BUG FIXES
@@ -34,6 +34,9 @@ Trunk (not yet released)
AVRO-1197. Java: Expose mapreduce tests so that 'maven install'
works correctly. (Mike Percy via cutting)
+ AVRO-1200. DatumWriter can write malformed data if collection is
+ modified during write. (tomwhite)
+
Avro 1.7.2 (20 October 2012)
View
13 lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.Map;
import java.util.Collection;
@@ -122,13 +123,19 @@ protected void writeArray(Schema schema, Object datum, Encoder out)
throws IOException {
Schema element = schema.getElementType();
long size = getArraySize(datum);
+ long actualSize = 0;
out.writeArrayStart();
out.setItemCount(size);
for (Iterator<? extends Object> it = getArrayElements(datum); it.hasNext();) {
out.startItem();
write(element, it.next(), out);
+ actualSize++;
}
out.writeArrayEnd();
+ if (actualSize != size) {
+ throw new ConcurrentModificationException("Size of array written was " +
+ size + ", but number of elements written was " + actualSize + ". ");
+ }
}
/** Called to find the index for a datum within a union. By default calls
@@ -157,14 +164,20 @@ protected void writeMap(Schema schema, Object datum, Encoder out)
throws IOException {
Schema value = schema.getValueType();
int size = getMapSize(datum);
+ int actualSize = 0;
out.writeMapStart();
out.setItemCount(size);
for (Map.Entry<Object,Object> entry : getMapEntries(datum)) {
out.startItem();
writeString(entry.getKey().toString(), out);
write(value, entry.getValue(), out);
+ actualSize++;
}
out.writeMapEnd();
+ if (actualSize != size) {
+ throw new ConcurrentModificationException("Size of map written was " +
+ size + ", but number of entries written was " + actualSize + ". ");
+ }
}
/** Called by the default implementation of {@link #writeMap} to get the size
View
161 lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericDatumWriter.java
@@ -18,17 +18,32 @@
package org.apache.avro.generic;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.avro.Schema;
import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.DirectBinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.junit.Test;
+import org.apache.avro.util.Utf8;
public class TestGenericDatumWriter {
@Test
@@ -50,4 +65,150 @@ public void testWrite() throws IOException {
DecoderFactory.get().jsonDecoder(s, new ByteArrayInputStream(bao.toByteArray())));
assertEquals(r, o);
}
+
+ @Test
+ public void testArrayConcurrentModification() throws Exception {
+ String json = "{\"type\": \"array\", \"items\": \"int\" }";
+ Schema s = Schema.parse(json);
+ final GenericArray<Integer> a = new GenericData.Array<Integer>(1, s);
+ ByteArrayOutputStream bao = new ByteArrayOutputStream();
+ final GenericDatumWriter<GenericArray<Integer>> w =
+ new GenericDatumWriter<GenericArray<Integer>>(s);
+
+ CountDownLatch sizeWrittenSignal = new CountDownLatch(1);
+ CountDownLatch eltAddedSignal = new CountDownLatch(1);
+
+ final TestEncoder e = new TestEncoder(EncoderFactory.get()
+ .directBinaryEncoder(bao, null), sizeWrittenSignal, eltAddedSignal);
+
+ // call write in another thread
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future<Void> result = executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ w.write(a, e);
+ return null;
+ }
+ });
+ sizeWrittenSignal.await();
+ // size has been written so now add an element to the array
+ a.add(7);
+ // and signal for the element to be written
+ eltAddedSignal.countDown();
+ try {
+ result.get();
+ fail("Expected ConcurrentModificationException");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof ConcurrentModificationException);
+ }
+ }
+
+
+ @Test
+ public void testMapConcurrentModification() throws Exception {
+ String json = "{\"type\": \"map\", \"values\": \"int\" }";
+ Schema s = Schema.parse(json);
+ final Map<String, Integer> m = new HashMap<String, Integer>();
+ ByteArrayOutputStream bao = new ByteArrayOutputStream();
+ final GenericDatumWriter<Map<String, Integer>> w =
+ new GenericDatumWriter<Map<String, Integer>>(s);
+
+ CountDownLatch sizeWrittenSignal = new CountDownLatch(1);
+ CountDownLatch eltAddedSignal = new CountDownLatch(1);
+
+ final TestEncoder e = new TestEncoder(EncoderFactory.get()
+ .directBinaryEncoder(bao, null), sizeWrittenSignal, eltAddedSignal);
+
+ // call write in another thread
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future<Void> result = executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ w.write(m, e);
+ return null;
+ }
+ });
+ sizeWrittenSignal.await();
+ // size has been written so now add an entry to the map
+ m.put("a", 7);
+ // and signal for the entry to be written
+ eltAddedSignal.countDown();
+ try {
+ result.get();
+ fail("Expected ConcurrentModificationException");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof ConcurrentModificationException);
+ }
+ }
+
+ static class TestEncoder extends Encoder {
+
+ Encoder e;
+ CountDownLatch sizeWrittenSignal;
+ CountDownLatch eltAddedSignal;
+
+ TestEncoder(Encoder encoder, CountDownLatch sizeWrittenSignal,
+ CountDownLatch eltAddedSignal) {
+ this.e = encoder;
+ this.sizeWrittenSignal = sizeWrittenSignal;
+ this.eltAddedSignal = eltAddedSignal;
+ }
+
+ @Override
+ public void writeArrayStart() throws IOException {
+ e.writeArrayStart();
+ sizeWrittenSignal.countDown();
+ try {
+ eltAddedSignal.await();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
+ @Override
+ public void writeMapStart() throws IOException {
+ e.writeMapStart();
+ sizeWrittenSignal.countDown();
+ try {
+ eltAddedSignal.await();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
+ @Override
+ public void flush() throws IOException { e.flush(); }
+ @Override
+ public void writeNull() throws IOException { e.writeNull(); }
+ @Override
+ public void writeBoolean(boolean b) throws IOException { e.writeBoolean(b); }
+ @Override
+ public void writeInt(int n) throws IOException { e.writeInt(n); }
+ @Override
+ public void writeLong(long n) throws IOException { e.writeLong(n); }
+ @Override
+ public void writeFloat(float f) throws IOException { e.writeFloat(f); }
+ @Override
+ public void writeDouble(double d) throws IOException { e.writeDouble(d); }
+ @Override
+ public void writeString(Utf8 utf8) throws IOException { e.writeString(utf8); }
+ @Override
+ public void writeBytes(ByteBuffer bytes) throws IOException { e.writeBytes(bytes); }
+ @Override
+ public void writeBytes(byte[] bytes, int start, int len) throws IOException { e.writeBytes(bytes, start, len); }
+ @Override
+ public void writeFixed(byte[] bytes, int start, int len) throws IOException { e.writeFixed(bytes, start, len); }
+ @Override
+ public void writeEnum(int en) throws IOException { e.writeEnum(en); }
+ @Override
+ public void setItemCount(long itemCount) throws IOException { e.setItemCount(itemCount); }
+ @Override
+ public void startItem() throws IOException { e.startItem(); }
+ @Override
+ public void writeArrayEnd() throws IOException { e.writeArrayEnd(); }
+ @Override
+ public void writeMapEnd() throws IOException { e.writeMapEnd(); }
+ @Override
+ public void writeIndex(int unionIndex) throws IOException { e.writeIndex(unionIndex); }
+ };
}
Please sign in to comment.
Something went wrong with that request. Please try again.