Skip to content
Permalink
Browse files
JIRA-1171
closes #60
  • Loading branch information
Maja Kabiljo committed Feb 21, 2018
1 parent 94a3ac5 commit 39eb2533b8f7c6c8cb17ee30d5c2005e74873ff4
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 16 deletions.
@@ -206,6 +206,14 @@ public Collection<PartitionStats> call() {
partitionStatsList.size() + " partitions, " +
partitionStore.getNumPartitions() + " remaining " +
MemoryUtils.getRuntimeMemoryStats());
long timeDoingGCWhileProcessing =
taskManager.getSuperstepGCTime() - startGCTime;
timeDoingGC += timeDoingGCWhileProcessing;
long timeProcessingPartition =
System.currentTimeMillis() - startProcessingTime -
timeDoingGCWhileProcessing;
timeProcessing += timeProcessingPartition;
partitionStats.setComputeMs(timeProcessingPartition);
} catch (IOException e) {
throw new IllegalStateException("call: Caught unexpected IOException," +
" failing.", e);
@@ -215,11 +223,6 @@ public Collection<PartitionStats> call() {
} finally {
partitionStore.putPartition(partition);
}
long timeDoingGCWhileProcessing =
taskManager.getSuperstepGCTime() - startGCTime;
timeDoingGC += timeDoingGCWhileProcessing;
timeProcessing += System.currentTimeMillis() - startProcessingTime -
timeDoingGCWhileProcessing;
histogramComputePerPartition.update(
System.currentTimeMillis() - startTime);
}
@@ -279,7 +282,8 @@ private PartitionStats computePartition(
boolean ignoreExistingVertices)
throws IOException, InterruptedException {
PartitionStats partitionStats =
new PartitionStats(partition.getId(), 0, 0, 0, 0, 0);
new PartitionStats(partition.getId(), 0, 0, 0, 0, 0,
serviceWorker.getWorkerInfo().getHostnameId());
final LongRef verticesComputedProgress = new LongRef(0);

Progressable verticesProgressable = new Progressable() {
@@ -985,7 +985,8 @@ private GlobalStats aggregateWorkerStats(long superstep) {
printAggregatedMetricsToHDFS(superstep, aggregatedMetrics);
}
for (MasterObserver observer : observers) {
observer.superstepMetricsUpdate(superstep, aggregatedMetrics);
observer.superstepMetricsUpdate(
superstep, aggregatedMetrics, allPartitionStatsList);
}
}

@@ -20,6 +20,9 @@

import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.metrics.AggregatedMetrics;
import org.apache.giraph.partition.PartitionStats;

import java.util.List;

/**
* A no-op implementation of MasterObserver to make it easier for users.
@@ -55,5 +58,6 @@ public void postSuperstep(long superstep) { }

@Override
public void superstepMetricsUpdate(long superstep,
AggregatedMetrics aggregatedMetrics) { }
AggregatedMetrics aggregatedMetrics,
List<PartitionStats> partitionStatsList) { }
}
@@ -20,6 +20,9 @@

import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
import org.apache.giraph.metrics.AggregatedMetrics;
import org.apache.giraph.partition.PartitionStats;

import java.util.List;

/**
* Observer for Master.
@@ -62,7 +65,9 @@ public interface MasterObserver extends ImmutableClassesGiraphConfigurable {
*
* @param superstep Supsertep number
* @param aggregatedMetrics Metrics
* @param partitionStatsList List of partition stats
*/
void superstepMetricsUpdate(
long superstep, AggregatedMetrics aggregatedMetrics);
long superstep, AggregatedMetrics aggregatedMetrics,
List<PartitionStats> partitionStatsList);
}
@@ -368,7 +368,8 @@ protected long offloadInMemoryPartitionData(
partitionStore.getPartitionEdgeCount(partitionId));
Partition<I, V, E> partition =
partitionStore.removePartition(partitionId);
LOG.debug("Offloading partition " + partition + " DataIndex[" + index + "]");
LOG.debug(
"Offloading partition " + partition + " DataIndex[" + index + "]");
index.addIndex(DataIndex.TypeIndexEntry.PARTITION_VERTICES);
OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
dataAccessor.prepareOutput(ioThreadId, index.copy(), false);
@@ -40,6 +40,13 @@ public class PartitionStats implements Writable {
private long messagesSentCount = 0;
/** Message byetes sent from this partition */
private long messageBytesSentCount = 0;
/**
* How long did compute take on this partition
* (excluding time spent in GC) (TODO and waiting on open requests)
*/
private long computeMs;
/** Hostname and id of worker owning this partition */
private String workerHostnameId;

/**
* Default constructor for reflection.
@@ -55,19 +62,22 @@ public PartitionStats() { }
* @param edgeCount Edge count.
* @param messagesSentCount Number of messages sent
* @param messageBytesSentCount Number of message bytes sent
* @param workerHostnameId Hostname and id of worker owning this partition
*/
public PartitionStats(int partitionId,
long vertexCount,
long finishedVertexCount,
long edgeCount,
long messagesSentCount,
long messageBytesSentCount) {
long messageBytesSentCount,
String workerHostnameId) {
this.partitionId = partitionId;
this.vertexCount = vertexCount;
this.finishedVertexCount = finishedVertexCount;
this.edgeCount = edgeCount;
this.messagesSentCount = messagesSentCount;
this.messageBytesSentCount = messageBytesSentCount;
this.workerHostnameId = workerHostnameId;
}

/**
@@ -174,6 +184,18 @@ public long getMessageBytesSentCount() {
return messageBytesSentCount;
}

public long getComputeMs() {
return computeMs;
}

public void setComputeMs(long computeMs) {
this.computeMs = computeMs;
}

public String getWorkerHostnameId() {
return workerHostnameId;
}

@Override
public void readFields(DataInput input) throws IOException {
partitionId = input.readInt();
@@ -182,6 +204,8 @@ public void readFields(DataInput input) throws IOException {
edgeCount = input.readLong();
messagesSentCount = input.readLong();
messageBytesSentCount = input.readLong();
computeMs = input.readLong();
workerHostnameId = input.readUTF();
}

@Override
@@ -192,13 +216,16 @@ public void write(DataOutput output) throws IOException {
output.writeLong(edgeCount);
output.writeLong(messagesSentCount);
output.writeLong(messageBytesSentCount);
output.writeLong(computeMs);
output.writeUTF(workerHostnameId);
}

@Override
public String toString() {
return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" +
finishedVertexCount + ",edges=" + edgeCount + ",msgsSent=" +
messagesSentCount + ",msgBytesSent=" +
messageBytesSentCount + ")";
messageBytesSentCount + ",computeMs=" + computeMs +
")";
}
}
@@ -22,9 +22,12 @@
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.metrics.AggregatedMetrics;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.log4j.Logger;

import java.util.List;

/**
* An observer for both worker and master that periodically dumps the memory
* usage using jmap tool.
@@ -102,7 +105,8 @@ public void postSuperstep(long superstep) { }

@Override
public void superstepMetricsUpdate(long superstep,
AggregatedMetrics aggregatedMetrics) { }
AggregatedMetrics aggregatedMetrics,
List<PartitionStats> partitionStatsList) { }

@Override
public void applicationFailed(Exception e) { }
@@ -20,8 +20,11 @@
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.metrics.AggregatedMetrics;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.worker.WorkerObserver;

import java.util.List;

/**
* Logs versions of Giraph dependencies on job start.
*/
@@ -59,5 +62,6 @@ public void postSuperstep(long superstep) { }

@Override
public void superstepMetricsUpdate(long superstep,
AggregatedMetrics aggregatedMetrics) { }
AggregatedMetrics aggregatedMetrics,
List<PartitionStats> partitionStatsList) { }
}
@@ -23,9 +23,12 @@
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.metrics.AggregatedMetrics;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.log4j.Logger;

import java.util.List;

/**
* An observer for both worker and master that periodically checks if available
* memory on heap is below certain threshold, and if found to be the case
@@ -113,7 +116,8 @@ public void postSuperstep(long superstep) { }

@Override
public void superstepMetricsUpdate(long superstep,
AggregatedMetrics aggregatedMetrics) { }
AggregatedMetrics aggregatedMetrics,
List<PartitionStats> partitionStatsList) { }

@Override
public void applicationFailed(Exception e) { }
@@ -608,7 +608,9 @@ public FinishedSuperstepStats setup() {
partitionStore.getPartitionVertexCount(partitionId),
0,
partitionStore.getPartitionEdgeCount(partitionId),
0, 0);
0,
0,
workerInfo.getHostnameId());
partitionStatsList.add(partitionStats);
}
workerGraphPartitioner.finalizePartitionStats(

0 comments on commit 39eb253

Please sign in to comment.