Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

GIRAPH-1188 #70

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.block_app.framework.no_vtx;

import java.util.Iterator;
Expand Down
10 changes: 8 additions & 2 deletions giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
Expand Up @@ -25,6 +25,7 @@
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.utils.CheckpointingUtils;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.giraph.writable.kryo.GiraphClassResolver;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
import org.apache.giraph.zk.ZooKeeperExt;
Expand Down Expand Up @@ -81,7 +82,9 @@ public abstract class BspService<I extends WritableComparable,
/** Input splits all done node*/
public static final String INPUT_SPLITS_ALL_DONE_NODE =
"/_inputSplitsAllDone";

/** Directory to store kryo className-ID assignment */
public static final String KRYO_REGISTERED_CLASS_DIR =
"/_kryo";
/** Directory of attempts of this application */
public static final String APPLICATION_ATTEMPTS_DIR =
"/_applicationAttemptsDir";
Expand Down Expand Up @@ -155,6 +158,8 @@ public abstract class BspService<I extends WritableComparable,
protected final String haltComputationPath;
/** Path where memory observer stores data */
protected final String memoryObserverPath;
/** Kryo className-ID mapping directory */
protected final String kryoRegisteredClassPath;
/** Private ZooKeeper instance that implements the service */
private final ZooKeeperExt zk;
/** Has the Connection occurred? */
Expand Down Expand Up @@ -250,7 +255,7 @@ public BspService(
inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
cleanedUpPath = basePath + CLEANED_UP_DIR;

kryoRegisteredClassPath = basePath + KRYO_REGISTERED_CLASS_DIR;


String restartJobId = RESTART_JOB_ID.get(conf);
Expand Down Expand Up @@ -289,6 +294,7 @@ public BspService(
throw new RuntimeException(e);
}

GiraphClassResolver.setZookeeperInfo(zk, kryoRegisteredClassPath);
this.taskId = (int) getApplicationAttempt() * conf.getMaxWorkers() +
conf.getTaskPartition();
this.hostnameTaskId = hostname + "_" + getTaskId();
Expand Down
@@ -0,0 +1,333 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.writable.kryo;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Registration;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.util.DefaultClassResolver;
import com.esotericsoftware.kryo.util.ObjectMap;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import java.util.HashMap;
import java.util.Map;
import java.util.List;

import static com.esotericsoftware.kryo.util.Util.getWrapperClass;

/**
* This class resolver assigns unique classIds for every class that was not
* explicitly registered. It uses zookeeper for consistent mapping across all
* nodes.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some more documentation the describes how this mechanism works?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more documentation.

public class GiraphClassResolver extends DefaultClassResolver {
/** Base ID to start for class name assignments.
* This number has to be high enough to not conflict with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a conflict something we can detect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could if this information was not kept private in the base class. The application will certainly fail if there is a conflict.

* explicity registered class IDs.
* */
private static final int BASE_CLASS_ID = 1000;

/** Class name to ID cache */
private static Map<String, Integer> CLASS_NAME_TO_ID = new HashMap();
/** ID to class name cache */
private static Map<Integer, String> ID_TO_CLASS_NAME = new HashMap();
/** Zookeeper */
private static ZooKeeperExt ZK;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's generally not a nice design to have these static, but not sure if because of some details in kryo there is no other way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without static, we have to pass this information as a parameter. But there are so many places where the kryo objects are created, so it would not be pretty to delegate this information from BspService to all these places.

/** Zookeeper path for automatic class registrations */
private static String KRYO_REGISTERED_CLASS_PATH;
/** Minimum class ID assigned by zookeeper sequencing */
private static int MIN_CLASS_ID = -1;

/** Memoized class id*/
private int memoizedClassId = -1;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to speed up the case when single class is used very frequently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

/** Memoized class registration */
private Registration memoizedClassIdValue;

/**
* Sets zookeeper informaton.
* @param zookeeperExt ZookeeperExt
* @param kryoClassPath Zookeeper directory path where class Name-ID
* mapping is stored.
*/
public static void setZookeeperInfo(ZooKeeperExt zookeeperExt,
String kryoClassPath) {
ZK = zookeeperExt;
KRYO_REGISTERED_CLASS_PATH = kryoClassPath;
}

/**
* Creates a new node for the given class name.
* Creation mode is persistent sequential, i.e.
* ZK will always create a new node . There could be
* multiple entries for the same class name but since
* the lowest index is used, this is not a problem.
* @param className Class name
*/
public static void createClassName(String className) {
try {
String path = KRYO_REGISTERED_CLASS_PATH + "/" + className;
ZK.createExt(path,
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL,
true);
} catch (KeeperException e) {
throw new IllegalStateException(
"Failed to create class " + className, e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"Interrupted while creating " + className, e);
}
}

/**
* Refreshes class-ID mapping from zookeeper.
* Not thread safe.
*/
public static void refreshCache() {
try {
ZK.createOnceExt(KRYO_REGISTERED_CLASS_PATH,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can avoid calling this every time you refresh the cache and, instead, call it once during initialization (e.g. in setZookeeperInfo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a boolean flag to avoid this repeated call.

null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
true);
} catch (KeeperException e) {
throw new IllegalStateException(
"Failed to refresh kryo cache " +
KRYO_REGISTERED_CLASS_PATH, e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"Interrupted while refreshing kryo cache " +
KRYO_REGISTERED_CLASS_PATH, e);
}

List<String> registeredList;
try {
registeredList =
ZK.getChildrenExt(KRYO_REGISTERED_CLASS_PATH,
false,
true,
false);
} catch (KeeperException e) {
throw new IllegalStateException(
"Failed to retrieve child nodes for " + KRYO_REGISTERED_CLASS_PATH, e);
} catch (InterruptedException e) {
throw new IllegalStateException(
"Interrupted while retrieving child nodes for " +
KRYO_REGISTERED_CLASS_PATH, e);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some debug logging here that prints the returned list? With the usual pattern:

if (LOG.isDebugEnabled()) {
LOG.debug("...");
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

for (String name : registeredList) {
// Since these files are created with PERSISTENT_SEQUENTIAL mode,
// Kryo appends a sequential number to their file name.
String className = name.substring(0,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when we create PERSISTENT_SEQUENTIAL node, zookeeper adds the sequential number to its file name? Maybe just add a comment stating that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH);
int classId = Integer.parseInt(
name.substring(name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH));

if (MIN_CLASS_ID == -1) {
MIN_CLASS_ID = classId;
}

int adjustedId = classId - MIN_CLASS_ID + BASE_CLASS_ID;
if (CLASS_NAME_TO_ID.putIfAbsent(className, adjustedId) == null) {
ID_TO_CLASS_NAME.put(adjustedId, className);
}
}
}

/**
* Gets ID for the given class name.
* @param className Class name
* @return class id Class ID
*/
public static int getClassId(String className) {
if (CLASS_NAME_TO_ID.containsKey(className)) {
return CLASS_NAME_TO_ID.get(className);
}
synchronized (GiraphClassResolver.class) {
if (CLASS_NAME_TO_ID.containsKey(className)) {
return CLASS_NAME_TO_ID.get(className);
}
refreshCache();

if (!CLASS_NAME_TO_ID.containsKey(className)) {
createClassName(className);
refreshCache();
}
}

if (!CLASS_NAME_TO_ID.containsKey(className)) {
throw new IllegalStateException("Failed to assigned id to " + className);
}

return CLASS_NAME_TO_ID.get(className);
}

/**
* Get class name for given ID.
* @param id class ID
* @return class name
*/
public static String getClassName(int id) {
if (ID_TO_CLASS_NAME.containsKey(id)) {
return ID_TO_CLASS_NAME.get(id);
}
synchronized (GiraphClassResolver.class) {
if (ID_TO_CLASS_NAME.containsKey(id)) {
return ID_TO_CLASS_NAME.get(id);
}
refreshCache();
}

if (!ID_TO_CLASS_NAME.containsKey(id)) {
throw new IllegalStateException("ID " + id + " doesn't exist");
}
return ID_TO_CLASS_NAME.get(id);
}

@Override
public Registration register(Registration registration) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this method differ from implementation in DefaultClassResolver?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default implementation also handles the case when the class is non-explicitly registered, i.e. it is not assigned to an an integer. Since in this implementation all classes are assigned to integers agreed by all kryo instances, they all treated like explicitly assigned classes. This implementation throws an exception if it encounters a class that has not been assigned to an integer yet.

if (registration == null) {
throw new IllegalArgumentException("registration cannot be null");
}
if (registration.getId() == NAME) {
throw new IllegalArgumentException("Invalid registration ID");
}

idToRegistration.put(registration.getId(), registration);
classToRegistration.put(registration.getType(), registration);
if (registration.getType().isPrimitive()) {
classToRegistration.put(getWrapperClass(registration.getType()),
registration);
}
return registration;
}

@Override
public Registration registerImplicit(Class type) {
return register(new Registration(type, kryo.getDefaultSerializer(type),
getClassId(type.getName())));
}

@Override
public Registration writeClass(Output output, Class type) {
if (type == null) {
output.writeVarInt(Kryo.NULL, true);
return null;
}

Registration registration = kryo.getRegistration(type);
if (registration.getId() == NAME) {
throw new IllegalStateException("Invalid registration ID");
} else {
output.writeVarInt(registration.getId() + 2, true);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the offset 2 for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class ID's are incremented by 2 when writing, because 0 is used for null, and 1 is used for non-explicitly registered classes, though non-explicit class registration is an error for this implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this as a comment here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

}
return registration;
}

@Override
public Registration readClass(Input input) {
int classID = input.readVarInt(true);
if (classID == Kryo.NULL) {
return null;
} else if (classID == NAME + 2) {
throw new IllegalStateException("Invalid class ID");
}
if (classID == memoizedClassId) {
return memoizedClassIdValue;
}
Registration registration = idToRegistration.get(classID - 2);
if (registration == null) {
String className = getClassName(classID - 2);
Class type = getTypeByName(className);
if (type == null) {
try {
type = Class.forName(className, false, kryo.getClassLoader());
} catch (ClassNotFoundException ex) {
throw new KryoException("Unable to find class: " + className, ex);
}
if (nameToClass == null) {
nameToClass = new ObjectMap();
}
nameToClass.put(className, type);
}
registration = new Registration(type, kryo.getDefaultSerializer(type),
classID - 2);
register(registration);
}
memoizedClassId = classID;
memoizedClassIdValue = registration;
return registration;
}

/**
* Reset the internal state
* Reset clears two hash tables:
* 1 - Class name to ID: Every non-explicitly registered class takes the
* ID agreed by all kryo instances, and it doesn't change across
* serializations, so this reset is not required.
* 2- Reference tracking: Not required because it is disabled.
*
* Therefore, this method should not be invoked.
*
*/
public void reset() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a comment for what this and the two methods below are for and why we don't have to implement them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the comments.

throw new IllegalStateException("Not implemented");
}

/**
* This method writes the class name for the first encountered
* non-explicitly registered class. Since all non-explicitly registered
* classes take the ID agreed by all kryo instances, there is no need
* to write the class name, so this method should not be invoked.
* @param output Output stream
* @param type CLass type
* @param registration Registration
*/
@Override
protected void writeName(Output output, Class type,
Registration registration) {
throw new IllegalStateException("Not implemented");
}

/**
* This method reads the class name for the first encountered
* non-explicitly registered class. Since all non-explicitly registered
* classes take the ID agreed by all kryo instances, class name is
* never written, so this method should not be invoked.
* @param input Input stream
* @return Registration
*/
@Override
protected Registration readName(Input input) {
throw new IllegalStateException("Not implemented");
}

/**
* Get type by class name.
* @param className Class name
* @return class type
*/
protected Class<?> getTypeByName(final String className) {
return nameToClass != null ? nameToClass.get(className) : null;
}
}