Skip to content
Permalink
Browse files
GIRAPH-1185
closes #69
  • Loading branch information
Maja Kabiljo committed Apr 12, 2018
1 parent 3273f25 commit 60752aab7f8c3ce2b1fd57e232c7305adfb6200a
Show file tree
Hide file tree
Showing 6 changed files with 717 additions and 22 deletions.
@@ -92,12 +92,12 @@ public BlockOutputHandle getOutputHandle() {

@Override
public void write(DataOutput out) throws IOException {
HadoopKryo.writeClassAndObject(out, workerLogic);
HadoopKryo.writeClassAndObj(out, workerLogic);
}

@Override
public void readFields(DataInput in) throws IOException {
workerLogic = HadoopKryo.readClassAndObject(in);
workerLogic = HadoopKryo.readClassAndObj(in);
workerLogic.getOutputHandle().initialize(getConf(), getContext());
}
}
@@ -26,11 +26,15 @@
import java.util.Map.Entry;
import java.util.Random;

import com.esotericsoftware.kryo.util.DefaultClassResolver;
import org.apache.giraph.conf.GiraphConfigurationSettable;
import com.esotericsoftware.kryo.ClassResolver;
import com.esotericsoftware.kryo.ReferenceResolver;
import com.esotericsoftware.kryo.util.MapReferenceResolver;
import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
import org.apache.giraph.types.ops.collections.BasicSet;
import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
import org.apache.giraph.writable.kryo.serializers.ArraysAsListSerializer;
import org.apache.giraph.writable.kryo.serializers.CollectionsNCopiesSerializer;
import org.apache.giraph.writable.kryo.serializers.DirectWritableSerializer;
@@ -69,16 +73,26 @@
* It extends Kryo to reuse KryoPool functionality, but have additional needed
* objects cached as well. If we move to ThreadLocal or other caching
* technique, we can use composition, instead of inheritance here.
*
* TODO: Refactor this class into two separate classes depending on
* whether the reference tracking is enabled or disabled.
*/
public class HadoopKryo extends Kryo {
/** Pool of reusable Kryo objects, since they are expensive to create */
private static final KryoPool KRYO_POOL = new KryoPool.Builder(
new KryoFactory() {
@Override
public Kryo create() {
return createKryo();
return createKryo(true, true);
}
}).build();
/** Thread local HadoopKryo object */
private static final ThreadLocal<HadoopKryo> KRYO =
new ThreadLocal<HadoopKryo>() {
@Override protected HadoopKryo initialValue() {
return createKryo(false, false);
}
};

/**
* List of interfaces/parent classes that will not be allowed to be
@@ -111,19 +125,15 @@ public Kryo create() {
"Logger must be a static field");
}

// Use chunked streams, so within same stream we can use both kryo and
// non-kryo serialization.
/** Reusable Input object */
private final InputChunked input = new InputChunked(4096);
private InputChunked input;
/** Reusable Output object */
private final OutputChunked output = new OutputChunked(4096);
private OutputChunked output;

/** Reusable DataInput wrapper stream */
private final DataInputWrapperStream dataInputWrapperStream =
new DataInputWrapperStream();
private DataInputWrapperStream dataInputWrapperStream;
/** Reusable DataOutput wrapper stream */
private final DataOutputWrapperStream dataOutputWrapperStream =
new DataOutputWrapperStream();
private DataOutputWrapperStream dataOutputWrapperStream;

/**
* Map of already initialized serializers used
@@ -136,17 +146,27 @@ public Kryo create() {
private HadoopKryo() {
}

/**
* Constructor that takes custom class resolver and reference resolver.
* @param classResolver Class resolver
* @param referenceResolver Reference resolver
*/
private HadoopKryo(ClassResolver classResolver,
ReferenceResolver referenceResolver) {
super(classResolver, referenceResolver);
}

// Public API:

/**
* Write type of given object and the object itself to the output stream.
* Inverse of readClassAndObject.
* Inverse of readClassAndObj.
*
* @param out Output stream
* @param object Object to write
*/
public static void writeClassAndObject(
final DataOutput out, final Object object) {
public static void writeClassAndObj(
final DataOutput out, final Object object) {
writeInternal(out, object, false);
}

@@ -159,7 +179,7 @@ public static void writeClassAndObject(
* @return Deserialized object
* @param <T> Type of the object being read
*/
public static <T> T readClassAndObject(DataInput in) {
public static <T> T readClassAndObj(DataInput in) {
return readInternal(in, null, false);
}

@@ -186,6 +206,62 @@ public static void readIntoObject(DataInput in, Object object) {
readInternal(in, object, true);
}

/**
* Writes class and object to specified output stream with specified
* Kryo object. It does not use interim buffers.
* @param kryo Kryo object
* @param out Output stream
* @param object Object
*/
public static void writeWithKryo(
final HadoopKryo kryo, final Output out,
final Object object) {
kryo.writeClassAndObject(out, object);
out.close();
}

/**
* Write out of object with given kryo
* @param kryo Kryo object
* @param out Output
* @param object Object to write
*/
public static void writeWithKryoOutOfObject(
final HadoopKryo kryo, final Output out,
final Object object) {
kryo.writeOutOfObject(out, object);
out.close();
}

/**
* Reads class and object from specified input stream with
* specified kryo object.
* it does not use interim buffers.
* @param kryo Kryo object
* @param in Input buffer
* @param <T> Object type parameter
* @return Object
*/
public static <T> T readWithKryo(
final HadoopKryo kryo, final Input in) {
T object;
object = (T) kryo.readClassAndObject(in);
in.close();
return object;
}

/**
* Read into object with given kryo.
* @param kryo Kryo object
* @param in Input
* @param object Object to read into
*/
public static void readWithKryoIntoObject(
final HadoopKryo kryo, final Input in, Object object) {
kryo.readIntoObject(in, object);
in.close();
}

/**
* Create copy of the object, by magically recursively copying
* all of it's fields, keeping reference structures (like cycles)
@@ -203,15 +279,39 @@ public T execute(Kryo kryo) {
});
}

/**
* Returns a kryo which doesn't track objects, hence
* serialization of recursive/nested objects is not
* supported.
*
* Reference tracking significantly degrades the performance
* since kryo has to store all serialized objects and search
* the history to check if an object has been already serialized.
*
* @return Hadoop kryo which doesn't track objects.
*/
public static HadoopKryo getNontrackingKryo() {
return KRYO.get();
}

// Private implementation:

/**
* Create new instance of HadoopKryo, properly initialized.
*
* @return New HadoopKryo instnace
* @param trackReferences if true, object references are tracked.
* @param hasBuffer if true, an interim buffer is used.
* @return new HadoopKryo instance
*/
private static HadoopKryo createKryo() {
HadoopKryo kryo = new HadoopKryo();
private static HadoopKryo createKryo(boolean trackReferences,
boolean hasBuffer) {
HadoopKryo kryo;
if (trackReferences) {
kryo = new HadoopKryo();
} else {
// TODO: if trackReferences is false use custom class resolver.
kryo = new HadoopKryo(new DefaultClassResolver(),
new MapReferenceResolver());
}

String version = System.getProperty("java.version");
char minor = version.charAt(2);
@@ -296,6 +396,19 @@ public void write(Kryo kryo, Output output, Object object) {
kryo.addDefaultSerializer(Writable.class, customSerializerFactory);
kryo.setDefaultSerializer(customSerializerFactory);

if (hasBuffer) {
kryo.input = new InputChunked(4096);
kryo.output = new OutputChunked(4096);
kryo.dataInputWrapperStream = new DataInputWrapperStream();
kryo.dataOutputWrapperStream = new DataOutputWrapperStream();
}

if (!trackReferences) {
kryo.setReferences(false);

// TODO: Enable the following when a custom class resolver is created.
// kryo.setAutoReset(false);
}
return kryo;
}

@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.writable.kryo;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.hadoop.io.Writable;

/**
* Generic wrapper object, making any object writable.
*
* Usage of this class is similar to KryoWritableWrapper but
* unlike KryoWritableWrapper, this class does not
* support recursive/nested objects to provide better
* performance.
*
* If the underlying stream is a kryo output stream than the read/write
* happens with a kryo object that doesn't track references, providing
* significantly better performance.
*
* @param <T> Object type
*/
public class KryoSimpleWrapper<T> implements Writable {

/** Wrapped object */
private T object;

/**
* Create wrapper given an object.
* @param object Object instance
*/
public KryoSimpleWrapper(T object) {
this.object = object;
}

/**
* Creates wrapper initialized with null.
*/
public KryoSimpleWrapper() {
}

/**
* Unwrap the object value
* @return Object value
*/
public T get() {
return object;
}

/**
* Set wrapped object value
* @param object New object value
*/
public void set(T object) {
this.object = object;
}

@Override
public void readFields(DataInput in) throws java.io.IOException {
if (in instanceof Input) {
Input inp = (Input) in;
object = HadoopKryo.readWithKryo(HadoopKryo.getNontrackingKryo(), inp);
} else {
object = HadoopKryo.readClassAndObj(in);
}
}

@Override
public void write(DataOutput out) throws IOException {
if (out instanceof Output) {
Output outp = (Output) out;
HadoopKryo.writeWithKryo(HadoopKryo.getNontrackingKryo(), outp, object);
} else {
HadoopKryo.writeClassAndObj(out, object);
}
}
}

0 comments on commit 60752aa

Please sign in to comment.