Permalink
Browse files

HADOOP-6685. Add new generic serialization interface.

  • Loading branch information...
1 parent ef7b9a9 commit ca7c66bffdf111b24c9127fbd837a2735c4a38b0 @omalley omalley committed Dec 4, 2010
Showing with 6,329 additions and 1,106 deletions.
  1. +1 −1 build.xml
  2. +16 −0 ivy.xml
  3. +15 −0 ivy/hadoop-common-template.xml
  4. +4 −0 ivy/ivysettings.xml
  5. +5 −0 ivy/libraries.properties
  6. +2 −2 src/java/core-default.xml
  7. +4 −2 src/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
  8. +19 −8 src/java/org/apache/hadoop/io/ArrayFile.java
  9. +37 −10 src/java/org/apache/hadoop/io/BloomMapFile.java
  10. +16 −0 src/java/org/apache/hadoop/io/DataInputBuffer.java
  11. +15 −25 src/java/org/apache/hadoop/io/DefaultStringifier.java
  12. +285 −161 src/java/org/apache/hadoop/io/MapFile.java
  13. +5 −3 src/java/org/apache/hadoop/io/RawComparator.java
  14. +693 −698 src/java/org/apache/hadoop/io/SequenceFile.java
  15. +8 −4 src/java/org/apache/hadoop/io/SetFile.java
  16. +34 −6 src/java/org/apache/hadoop/io/file/tfile/BCFile.java
  17. +6 −17 src/java/org/apache/hadoop/io/file/tfile/CompareUtils.java
  18. +4 −4 src/java/org/apache/hadoop/io/file/tfile/Compression.java
  19. +1 −1 src/java/org/apache/hadoop/io/file/tfile/RawComparable.java
  20. +343 −81 src/java/org/apache/hadoop/io/file/tfile/TFile.java
  21. +1 −1 src/java/org/apache/hadoop/io/file/tfile/TFileDumper.java
  22. +38 −6 src/java/org/apache/hadoop/io/file/tfile/Utils.java
  23. +45 −0 src/java/org/apache/hadoop/io/serial/RawComparator.java
  24. +145 −0 src/java/org/apache/hadoop/io/serial/Serialization.java
  25. +139 −0 src/java/org/apache/hadoop/io/serial/SerializationFactory.java
  26. +140 −0 src/java/org/apache/hadoop/io/serial/TypedSerialization.java
  27. +146 −0 src/java/org/apache/hadoop/io/serial/lib/CompatibilitySerialization.java
  28. +81 −0 src/java/org/apache/hadoop/io/serial/lib/DeserializationRawComparator.java
  29. +110 −0 src/java/org/apache/hadoop/io/serial/lib/JavaSerialization.java
  30. +19 −0 src/java/org/apache/hadoop/io/serial/lib/MemcmpRawComparator.java
  31. +763 −0 src/java/org/apache/hadoop/io/serial/lib/SerializationMetadata.java
  32. +93 −0 src/java/org/apache/hadoop/io/serial/lib/WritableSerialization.java
  33. +47 −0 src/java/org/apache/hadoop/io/serial/lib/avro/AvroComparator.java
  34. +33 −0 src/java/org/apache/hadoop/io/serial/lib/avro/AvroReflectSerializable.java
  35. +333 −0 src/java/org/apache/hadoop/io/serial/lib/avro/AvroSerialization.java
  36. +14 −8 src/java/org/apache/hadoop/io/{serializer → serial/lib/avro}/package.html
  37. +490 −0 src/java/org/apache/hadoop/io/serial/lib/protobuf/ProtoBufComparator.java
  38. +114 −0 src/java/org/apache/hadoop/io/serial/lib/protobuf/ProtoBufSerialization.java
  39. +88 −0 src/java/org/apache/hadoop/io/serial/lib/thrift/StreamTransport.java
  40. +110 −0 src/java/org/apache/hadoop/io/serial/lib/thrift/ThriftSerialization.java
  41. +51 −0 src/java/org/apache/hadoop/io/serial/package-info.java
  42. +3 −0 src/java/org/apache/hadoop/io/serializer/Deserializer.java
  43. +4 −0 src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java
  44. +3 −2 src/java/org/apache/hadoop/io/serializer/JavaSerialization.java
  45. +4 −0 src/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java
  46. +4 −0 src/java/org/apache/hadoop/io/serializer/Serialization.java
  47. +4 −0 src/java/org/apache/hadoop/io/serializer/SerializationFactory.java
  48. +4 −0 src/java/org/apache/hadoop/io/serializer/Serializer.java
  49. +4 −1 src/java/org/apache/hadoop/io/serializer/WritableSerialization.java
  50. +5 −1 src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerializable.java
  51. +3 −1 src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java
  52. +3 −0 src/java/org/apache/hadoop/io/serializer/avro/AvroSerialization.java
  53. +3 −0 src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java
  54. +41 −0 src/java/org/apache/hadoop/io/serializer/package-info.java
  55. +1 −1 src/java/org/apache/hadoop/security/SaslRpcServer.java
  56. +23 −2 src/java/org/apache/hadoop/util/Options.java
  57. +41 −26 src/java/org/apache/hadoop/util/ReflectionUtils.java
  58. +15 −0 src/protobuf/SerializationMetadata.proto
  59. +21 −0 src/test/core/org/apache/hadoop/io/AvroKey.java
  60. +21 −0 src/test/core/org/apache/hadoop/io/AvroValue.java
  61. +641 −0 src/test/core/org/apache/hadoop/io/ProtoTest.java
  62. +11 −0 src/test/core/org/apache/hadoop/io/ProtoTest.proto
  63. +1 −1 src/test/core/org/apache/hadoop/io/RandomDatum.java
  64. +15 −6 src/test/core/org/apache/hadoop/io/TestDefaultStringifier.java
  65. +18 −12 src/test/core/org/apache/hadoop/io/TestMapFile.java
  66. +278 −14 src/test/core/org/apache/hadoop/io/TestSequenceFileSerialization.java
  67. +307 −0 src/test/core/org/apache/hadoop/io/ThriftKey.java
  68. +309 −0 src/test/core/org/apache/hadoop/io/ThriftValue.java
  69. +2 −1 src/test/core/org/apache/hadoop/io/file/tfile/TestTFileJClassComparatorByteArrays.java
  70. +10 −0 src/test/core/org/apache/hadoop/io/test.genavro
  71. +7 −0 src/test/core/org/apache/hadoop/io/test.thrift
  72. +10 −0 src/test/findbugsExcludeFile.xml
View
@@ -96,7 +96,7 @@
<property name="test.all.tests.file" value="${test.src.dir}/all-tests"/>
<property name="javadoc.link.java"
- value="http://java.sun.com/javase/6/docs/api/"/>
+ value="http://download.oracle.com/javase/6/docs/api"/>
<property name="javadoc.packages" value="org.apache.hadoop.*"/>
<property name="javadoc.maxmemory" value="512m" />
View
16 ivy.xml
@@ -269,6 +269,22 @@
<exclude module="jetty"/>
<exclude module="slf4j-simple"/>
</dependency>
+ <dependency org="com.google.protobuf"
+ name="protobuf-java"
+ rev="${protobuf.version}"
+ conf="common->default"/>
+ <dependency org="org.apache.hadoop"
+ name="libthrift"
+ rev="${thrift.version}"
+ conf="common->default">
+ <exclude module="servlet-api"/>
+ <exclude module="slf4j-api"/>
+ <exclude module="slf4j-log4j12"/>
+ </dependency>
+ <dependency org="org.yaml"
+ name="snakeyaml"
+ rev="${snakeyaml.version}"
+ conf="common->default"/>
<dependency org="org.codehaus.jackson"
name="jackson-mapper-asl"
rev="${jackson.version}"
@@ -119,6 +119,21 @@
<version>2.0.8</version>
</dependency>
<dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>2.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.5.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>1.7</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>avro</artifactId>
<version>1.3.2</version>
View
@@ -30,13 +30,17 @@
<resolvers>
<!--ibiblio resolvers-->
<ibiblio name="maven2" root="${repo.maven.org}" m2compatible="true"/>
+ <ibiblio name="apache"
+ root="https://repository.apache.org/content/repositories/releases"
+ m2compatible="true"/>
<filesystem name="fs" m2compatible="true" force="true">
<artifact pattern="${repo.dir}/[organisation]/[module]/[revision]/[module]-[revision].[ext]"/>
<ivy pattern="${repo.dir}/[organisation]/[module]/[revision]/[module]-[revision].pom"/>
</filesystem>
<chain name="default" dual="true">
+ <resolver ref="apache"/>
<resolver ref="maven2"/>
</chain>
View
@@ -62,13 +62,18 @@ mina-core.version=2.0.0-M5
oro.version=2.0.8
+protobuf.version=2.3.0
+
rats-lib.version=0.6
servlet.version=4.0.6
servlet-api-2.5.version=6.1.14
servlet-api.version=2.5
slf4j-api.version=1.5.11
slf4j-log4j12.version=1.5.11
+snakeyaml.version=1.7
+
+thrift.version=0.5.0.0
wagon-http.version=1.0-beta-2
@@ -155,8 +155,8 @@
</property>
<property>
- <name>io.serializations</name>
- <value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>
+ <name>hadoop.serializations</name>
+ <value>org.apache.hadoop.io.serial.lib.WritableSerialization,org.apache.hadoop.io.serial.lib.protobuf.ProtoBufSerialization,org.apache.hadoop.io.serial.lib.thrift.ThriftSerialization,org.apache.hadoop.io.serial.lib.avro.AvroSerialization,org.apache.hadoop.io.serial.lib.CompatibilitySerialization</value>
<description>A list of serialization classes that can be used for
obtaining serializers and deserializers.</description>
</property>
@@ -138,9 +138,11 @@
public static final String IO_SORT_FACTOR_KEY = "io.sort.factor";
/** Default value for IO_SORT_FACTOR_DEFAULT */
public static final int IO_SORT_FACTOR_DEFAULT = 100;
- /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
+ /** Defines the list of the deprecated serializations. */
public static final String IO_SERIALIZATIONS_KEY = "io.serializations";
-
+ /** Defines the list of serializations */
+ public static final String HADOOP_SERIALIZATIONS_KEY = "hadoop.serializations";
+
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String TFILE_IO_CHUNK_SIZE_KEY = "tfile.io.chunk.size";
/** Default value for TFILE_IO_CHUNK_SIZE_DEFAULT */
@@ -40,15 +40,15 @@ protected ArrayFile() {} // no public ctor
/** Create the named file for values of the named class. */
public Writer(Configuration conf, FileSystem fs,
- String file, Class<? extends Writable> valClass)
+ String file, Class<?> valClass)
throws IOException {
super(conf, new Path(file), keyClass(LongWritable.class),
valueClass(valClass));
}
/** Create the named file for values of the named class. */
public Writer(Configuration conf, FileSystem fs,
- String file, Class<? extends Writable> valClass,
+ String file, Class<?> valClass,
CompressionType compress, Progressable progress)
throws IOException {
super(conf, new Path(file),
@@ -59,7 +59,7 @@ public Writer(Configuration conf, FileSystem fs,
}
/** Append a value to the file. */
- public synchronized void append(Writable value) throws IOException {
+ public synchronized void append(Object value) throws IOException {
super.append(count, value); // add to map
count.set(count.get()+1); // increment count
}
@@ -81,20 +81,31 @@ public synchronized void seek(long n) throws IOException {
seek(key);
}
- /** Read and return the next value in the file. */
+ @Deprecated
public synchronized Writable next(Writable value) throws IOException {
- return next(key, value) ? value : null;
+ return (Writable) next((Object) value);
+ }
+
+ /** Read and return the next value in the file. */
+ public synchronized Object next(Object value) throws IOException {
+ key = (LongWritable) nextKey(key);
+ return key == null? null : getCurrentValue(value);
}
/** Returns the key associated with the most recent call to {@link
- * #seek(long)}, {@link #next(Writable)}, or {@link
- * #get(long,Writable)}. */
+ * #seek(long)}, {@link #next(Object)}, or {@link
+ * #get(long,Object)}. */
public synchronized long key() throws IOException {
return key.get();
}
+ @Deprecated
+ public synchronized Writable get(long n, Writable value) throws IOException{
+ return (Writable) get(n, (Object) value);
+ }
+
/** Return the <code>n</code>th value in the file. */
- public synchronized Writable get(long n, Writable value)
+ public synchronized Object get(long n, Object value)
throws IOException {
key.set(n);
return get(key, value);
@@ -31,7 +31,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.util.Options;
+import org.apache.hadoop.io.serial.Serialization;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.bloom.DynamicBloomFilter;
import org.apache.hadoop.util.bloom.Filter;
@@ -42,7 +42,7 @@
* This class extends {@link MapFile} and provides very much the same
* functionality. However, it uses dynamic Bloom filters to provide
* quick membership test for keys, and it offers a fast version of
- * {@link Reader#get(WritableComparable, Writable)} operation, especially in
+ * {@link Reader#get(Object, Object)} operation, especially in
* case of sparsely populated MapFile-s.
*/
@InterfaceAudience.Public
@@ -82,7 +82,9 @@ public static void delete(FileSystem fs, String name) throws IOException {
private DataOutputBuffer buf = new DataOutputBuffer();
private FileSystem fs;
private Path dir;
+ private final Serialization<Object> keySerialization;
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass,
@@ -92,6 +94,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName,
compression(compress, codec), progressable(progress));
}
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass,
@@ -101,6 +104,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName,
compression(compress), progressable(progress));
}
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass,
@@ -110,6 +114,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName,
compression(compress));
}
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
@@ -120,6 +125,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName,
progressable(progress));
}
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
@@ -129,6 +135,7 @@ public Writer(Configuration conf, FileSystem fs, String dirName,
progressable(progress));
}
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass, CompressionType compress)
@@ -137,26 +144,30 @@ public Writer(Configuration conf, FileSystem fs, String dirName,
valueClass(valClass), compression(compress));
}
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass) throws IOException {
this(conf, new Path(dirName), comparator(comparator),
valueClass(valClass));
}
+ @SuppressWarnings("unchecked")
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass,
Class valClass) throws IOException {
this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass));
}
+ @SuppressWarnings("unchecked")
public Writer(Configuration conf, Path dir,
SequenceFile.Writer.Option... options) throws IOException {
super(conf, dir, options);
this.fs = dir.getFileSystem(conf);
this.dir = dir;
initBloomFilter(conf);
+ keySerialization = (Serialization<Object>) getKeySerialization();
}
private synchronized void initBloomFilter(Configuration conf) {
@@ -174,11 +185,10 @@ private synchronized void initBloomFilter(Configuration conf) {
}
@Override
- public synchronized void append(WritableComparable key, Writable val)
- throws IOException {
+ public synchronized void append(Object key, Object val) throws IOException {
super.append(key, val);
buf.reset();
- key.write(buf);
+ keySerialization.serialize(buf, key);
bloomKey.set(byteArrayForBloomKey(buf), 1.0);
bloomFilter.add(bloomKey);
}
@@ -198,11 +208,14 @@ public synchronized void close() throws IOException {
private DynamicBloomFilter bloomFilter;
private DataOutputBuffer buf = new DataOutputBuffer();
private Key bloomKey = new Key();
+ private final Serialization<Object> keySerialization;
+ @SuppressWarnings("unchecked")
public Reader(Path dir, Configuration conf,
SequenceFile.Reader.Option... options) throws IOException {
super(dir, conf, options);
initBloomFilter(dir, conf);
+ keySerialization = (Serialization<Object>) getKeySerialization();
}
@Deprecated
@@ -245,26 +258,40 @@ private void initBloomFilter(Path dirName,
* @return false iff key doesn't exist, true if key probably exists.
* @throws IOException
*/
- public boolean probablyHasKey(WritableComparable key) throws IOException {
+ public boolean probablyHasKey(Object key) throws IOException {
if (bloomFilter == null) {
return true;
}
buf.reset();
- key.write(buf);
+ keySerialization.serialize(buf, key);
bloomKey.set(byteArrayForBloomKey(buf), 1.0);
return bloomFilter.membershipTest(bloomKey);
}
/**
* Fast version of the
- * {@link MapFile.Reader#get(WritableComparable, Writable)} method. First
+ * {@link MapFile.Reader#get(Object, Object)} method. First
* it checks the Bloom filter for the existence of the key, and only if
* present it performs the real get operation. This yields significant
* performance improvements for get operations on sparsely populated files.
*/
+ @SuppressWarnings("unchecked")
+ @Deprecated
@Override
- public synchronized Writable get(WritableComparable key, Writable val)
- throws IOException {
+ public synchronized Writable get(WritableComparable key,
+ Writable value) throws IOException {
+ return (Writable) get((Object) key, (Object) value);
+ }
+
+ /**
+ * Fast version of the
+ * {@link MapFile.Reader#get(Object, Object)} method. First
+ * it checks the Bloom filter for the existence of the key, and only if
+ * present it performs the real get operation. This yields significant
+ * performance improvements for get operations on sparsely populated files.
+ */
+ @Override
+ public synchronized Object get(Object key, Object val) throws IOException {
if (!probablyHasKey(key)) {
return null;
}
@@ -93,4 +93,20 @@ public void reset(byte[] input, int start, int length) {
/** Returns the length of the input. */
public int getLength() { return buffer.getLength(); }
+ public String toString() {
+ StringBuilder sb = new StringBuilder(3 * buffer.getLength() + 10);
+ byte[] bytes = getData();
+ for(int i=0; i < buffer.getLength(); i++) {
+ sb.append(' ');
+ String num = Integer.toHexString(0xff & bytes[i]);
+ // if it is only one digit, add a leading 0.
+ if (num.length() < 2) {
+ sb.append('0');
+ }
+ sb.append(num);
+ }
+ sb.append("; pos=");
+ sb.append(buffer.getPosition());
+ return sb.toString();
+ }
}
Oops, something went wrong.

0 comments on commit ca7c66b

Please sign in to comment.