Skip to content
Permalink
Browse files
GIRAPH-1174
closes #62
  • Loading branch information
Maja Kabiljo committed Mar 13, 2018
1 parent d86d0d5 commit 8e6ec266190d59558f369f808c53cf9186887ce5
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 0 deletions.
@@ -51,6 +51,7 @@
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.ReusesObjectsPartition;
import org.apache.giraph.utils.GcObserver;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
@@ -323,6 +324,16 @@ public final void addMapperObserverClass(
MAPPER_OBSERVER_CLASSES.add(this, mapperObserverClass);
}

/**
* Add a GcObserver class (optional)
*
* @param gcObserverClass GcObserver class to add.
*/
public final void addGcObserverClass(
Class<? extends GcObserver> gcObserverClass) {
GC_OBSERVER_CLASSES.add(this, gcObserverClass);
}

/**
* Get job observer class
*
@@ -706,6 +717,15 @@ public Class<? extends MapperObserver>[] getMapperObserverClasses() {
return MAPPER_OBSERVER_CLASSES.getArray(this);
}

/**
* Get array of GcObserver classes set in configuration.
*
* @return array of GcObserver classes.
*/
public Class<? extends GcObserver>[] getGcObserverClasses() {
return GC_OBSERVER_CLASSES.getArray(this);
}

/**
* Whether to track, print, and aggregate metrics.
*
@@ -84,6 +84,7 @@
import org.apache.giraph.partition.HashPartitionerFactory;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.SimplePartition;
import org.apache.giraph.utils.GcObserver;
import org.apache.giraph.worker.DefaultWorkerContext;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
@@ -229,6 +230,10 @@ public interface GiraphConstants {
ClassConfOption<MapperObserver> MAPPER_OBSERVER_CLASSES =
ClassConfOption.create("giraph.mapper.observers", null,
MapperObserver.class, "Classes for Mapper Observer - optional");
/** Classes for GC Observer - optional */
ClassConfOption<GcObserver> GC_OBSERVER_CLASSES =
ClassConfOption.create("giraph.gc.observers", null,
GcObserver.class, "Classes for GC oObserver - optional");
/** Message combiner class - optional */
ClassConfOption<MessageCombiner> MESSAGE_COMBINER_CLASS =
ClassConfOption.create("giraph.messageCombinerClass", null,
@@ -76,6 +76,7 @@
import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
import org.apache.giraph.utils.ExtendedDataInput;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.GcObserver;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
@@ -786,6 +787,22 @@ public MapperObserver[] createMapperObservers(
return objects;
}

/**
* Create array of GcObservers.
*
* @param context Mapper context
* @return Instantiated array of GcObservers.
*/
public GcObserver[] createGcObservers(
Mapper<?, ?, ?, ?>.Context context) {
Class<? extends GcObserver>[] klasses = getGcObserverClasses();
GcObserver[] objects = new GcObserver[klasses.length];
for (int i = 0; i < klasses.length; ++i) {
objects[i] = ReflectionUtils.newInstance(klasses[i], this, context);
}
return objects;
}

/**
* Create job observer
*
@@ -58,6 +58,7 @@
import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.scripting.ScriptLoader;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.GcObserver;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.worker.BspServiceWorker;
@@ -650,6 +651,7 @@ private void instantiateBspService()
* notifies an out-of-core engine (if any is used) about the GC.
*/
private void installGCMonitoring() {
final GcObserver[] gcObservers = conf.createGcObservers(context);
List<GarbageCollectorMXBean> mxBeans = ManagementFactory
.getGarbageCollectorMXBeans();
final OutOfCoreEngine oocEngine =
@@ -674,6 +676,9 @@ public void handleNotification(Notification notification,
}
gcTimeMetric.inc(info.getGcInfo().getDuration());
GiraphMetrics.get().getGcTracker().gcOccurred(info.getGcInfo());
for (GcObserver gcObserver : gcObservers) {
gcObserver.gcOccurred(info);
}
if (oocEngine != null) {
oocEngine.gcCompleted(info);
}
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.giraph.utils;

import com.sun.management.GarbageCollectionNotificationInfo;

/**
* Observer for when GCs occur
*/
public interface GcObserver {
/**
* Called to notify that GC occurred
*
* @param gcInfo GC info
*/
void gcOccurred(GarbageCollectionNotificationInfo gcInfo);
}

0 comments on commit 8e6ec26

Please sign in to comment.