Skip to content
Permalink
Browse files
JIRA-1146
closes #36
  • Loading branch information
Maja Kabiljo committed May 10, 2017
1 parent 4f9c6c2 commit d4596a029984865ace68fab981047bed1f1d1494
Show file tree
Hide file tree
Showing 19 changed files with 212 additions and 2 deletions.
@@ -23,6 +23,7 @@
import java.util.Random;

import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.block.PieceCount;
import org.apache.giraph.block_app.framework.piece.AbstractPiece;
import org.apache.giraph.block_app.library.striping.StripingUtils;
import org.apache.giraph.function.Consumer;
@@ -145,6 +146,11 @@ public Iterator<AbstractPiece> iterator() {

@Override
public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) { }

@Override
public PieceCount getPieceCount() {
return PieceCount.createUnknownCount();
}
})));
}

@@ -26,6 +26,7 @@
import org.apache.giraph.block_app.framework.api.giraph.BlockMasterCompute;
import org.apache.giraph.block_app.framework.api.giraph.BlockWorkerContext;
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.block.PieceCount;
import org.apache.giraph.block_app.framework.piece.AbstractPiece;
import org.apache.giraph.block_app.framework.piece.Piece;
import org.apache.giraph.conf.BooleanConfOption;
@@ -147,6 +148,11 @@ public static void initAndCheckConfig(GiraphConfiguration conf) {
checkBlockTypes(
executionBlock, blockFactory.createExecutionStage(immConf), immConf);

PieceCount pieceCount = executionBlock.getPieceCount();
if (pieceCount.isKnown()) {
GiraphConstants.SUPERSTEP_COUNT.set(conf, pieceCount.getCount() + 1);
}

// check for non 'static final' fields in BlockFactories
Class<?> bfClass = blockFactory.getClass();
while (!bfClass.equals(Object.class)) {
@@ -56,4 +56,12 @@ public interface Block extends Iterable<AbstractPiece> {
* without actually executing them.
*/
void forAllPossiblePieces(Consumer<AbstractPiece> consumer);

/**
* How many pieces are in this block.
* Sometimes we don't know (eg RepeatBlock).
*
* @return How many pieces are in this block.
*/
PieceCount getPieceCount();
}
@@ -36,4 +36,9 @@ public Iterator<AbstractPiece> iterator() {
@Override
public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
}

@Override
public PieceCount getPieceCount() {
return new PieceCount(0);
}
}
@@ -110,4 +110,9 @@ public AbstractPiece apply(AbstractPiece input) {
public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
block.forAllPossiblePieces(consumer);
}

@Override
public PieceCount getPieceCount() {
return block.getPieceCount();
}
}
@@ -60,6 +60,14 @@ public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
elseBlock.forAllPossiblePieces(consumer);
}

@Override
public PieceCount getPieceCount() {
PieceCount thenCount = thenBlock.getPieceCount();
PieceCount elseCount = elseBlock.getPieceCount();
return thenCount.equals(elseCount) ?
thenCount : PieceCount.createUnknownCount();
}

@Override
public String toString() {
if (elseBlock instanceof EmptyBlock) {
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.giraph.block_app.framework.block;

import com.google.common.base.Objects;

/**
* Number of pieces
*/
public class PieceCount {
private boolean known;
private int count;

public PieceCount(int count) {
known = true;
this.count = count;
}

private PieceCount() {
known = false;
}

public static PieceCount createUnknownCount() {
return new PieceCount();
}


public PieceCount add(PieceCount other) {
if (!this.known || !other.known) {
known = false;
} else {
count += other.count;
}
return this;
}

public PieceCount multiply(int value) {
count *= value;
return this;
}

public int getCount() {
if (known) {
return count;
} else {
throw new IllegalStateException(
"Can't get superstep count when it's unknown");
}
}

public boolean isKnown() {
return known;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof PieceCount) {
PieceCount other = (PieceCount) obj;
if (known) {
return other.known && other.count == count;
} else {
return !other.known;
}
}
return false;
}

@Override
public int hashCode() {
return Objects.hashCode(known, count);
}
}
@@ -32,10 +32,12 @@
@SuppressWarnings("rawtypes")
public final class RepeatBlock implements Block {
private final Block block;
private final boolean constantRepeatTimes;
private final IntSupplier repeatTimes;

public RepeatBlock(final int repeatTimes, Block block) {
this.block = block;
this.constantRepeatTimes = true;
this.repeatTimes = new IntSupplier() {
@Override
public int get() {
@@ -56,6 +58,7 @@ public int get() {
*/
public RepeatBlock(IntSupplier repeatTimes, Block block) {
this.block = block;
this.constantRepeatTimes = false;
this.repeatTimes = repeatTimes;
}

@@ -80,6 +83,13 @@ public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
block.forAllPossiblePieces(consumer);
}

@Override
public PieceCount getPieceCount() {
return constantRepeatTimes ?
block.getPieceCount().multiply(repeatTimes.get()) :
PieceCount.createUnknownCount();
}

@Override
public String toString() {
return "RepeatBlock(" + repeatTimes + " * " + block + ")";
@@ -74,6 +74,11 @@ public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
block.forAllPossiblePieces(consumer);
}

@Override
public PieceCount getPieceCount() {
return PieceCount.createUnknownCount();
}

@Override
public String toString() {
return "RepeatUntilBlock(" + repeatTimes + " * " + block + ")";
@@ -53,6 +53,15 @@ public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
}
}

@Override
public PieceCount getPieceCount() {
PieceCount ret = new PieceCount(0);
for (Block block : blocks) {
ret.add(block.getPieceCount());
}
return ret;
}

@Override
public String toString() {
return "SequenceBlock" + Arrays.toString(blocks);
@@ -26,6 +26,7 @@
import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.block.PieceCount;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
@@ -262,6 +263,11 @@ public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
consumer.apply(this);
}

@Override
public PieceCount getPieceCount() {
return new PieceCount(1);
}

@Override
public String toString() {
String name = getClass().getSimpleName();
@@ -26,6 +26,7 @@
import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
import org.apache.giraph.block_app.framework.block.PieceCount;
import org.apache.giraph.block_app.framework.piece.AbstractPiece;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
@@ -249,6 +250,11 @@ public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
}
}

@Override
public PieceCount getPieceCount() {
return new PieceCount(1);
}

@SuppressWarnings("deprecation")
@Override
public void registerAggregators(BlockMasterApi master)
@@ -21,6 +21,7 @@

import org.apache.giraph.block_app.framework.AbstractBlockFactory;
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.block.PieceCount;
import org.apache.giraph.block_app.framework.block.SequenceBlock;
import org.apache.giraph.block_app.framework.piece.AbstractPiece;
import org.apache.giraph.block_app.framework.piece.Piece;
@@ -101,6 +102,11 @@ protected AbstractPiece computeNext() {
public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
consumer.apply(curPiece);
}

@Override
public PieceCount getPieceCount() {
return curPiece.getPieceCount();
}
}
);
}
@@ -26,6 +26,7 @@
import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner;
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.block.BlockWithApiHandle;
import org.apache.giraph.block_app.framework.block.PieceCount;
import org.apache.giraph.block_app.framework.piece.AbstractPiece;
import org.apache.giraph.block_app.framework.piece.DefaultParentPiece;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
@@ -198,6 +199,11 @@ public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) {
piece.forAllPossiblePieces(consumer);
}

@Override
public PieceCount getPieceCount() {
return piece.getPieceCount();
}

@Override
public BlockApiHandle getBlockApiHandle() {
return handle;
@@ -22,6 +22,7 @@

import org.apache.giraph.block_app.framework.piece.Piece;
import org.apache.giraph.function.Supplier;
import org.junit.Assert;
import org.junit.Test;

public class TestIfBlock {
@@ -53,6 +54,7 @@ public void testIfBlockThen() throws Exception {
BlockTestingUtils.testIndependence(
Arrays.asList(piece1, piece2),
ifBlock);
Assert.assertFalse(ifBlock.getPieceCount().isKnown());
}

@Test
@@ -69,6 +71,7 @@ public void testIfBlockElse() throws Exception {
BlockTestingUtils.testIndependence(
Arrays.asList(piece1, piece2),
ifBlock);
Assert.assertFalse(ifBlock.getPieceCount().isKnown());
}

@Test
@@ -83,6 +86,19 @@ public void testIfNestedInRepeat() throws Exception {
BlockTestingUtils.testNestedRepeatBlock(
Arrays.asList(piece1, piece2),
ifBlock);
Assert.assertFalse(ifBlock.getPieceCount().isKnown());
}

@Test
public void testIfThenElsePieceCount() {
Piece piece1 = new Piece();
Piece piece2 = new Piece();
Block ifBlock = new IfBlock(
TRUE_SUPPLIER,
piece1,
piece2
);
Assert.assertTrue(ifBlock.getPieceCount().isKnown());
Assert.assertEquals(1, ifBlock.getPieceCount().getCount());
}
}
@@ -23,6 +23,7 @@

import org.apache.giraph.block_app.framework.piece.AbstractPiece;
import org.apache.giraph.block_app.framework.piece.Piece;
import org.junit.Assert;
import org.junit.Test;

import com.google.common.collect.Iterables;
@@ -46,6 +47,7 @@ public void testRepeatBlockBasic() throws Exception {
BlockTestingUtils.testIndependence(
Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))),
repeatBlock);
Assert.assertEquals(REPEAT_TIMES * 2, repeatBlock.getPieceCount().getCount());
}

@Test
@@ -27,6 +27,7 @@
import org.apache.giraph.block_app.framework.piece.Piece;
import org.apache.giraph.function.Supplier;
import org.apache.giraph.function.primitive.PrimitiveRefs.IntRef;
import org.junit.Assert;
import org.junit.Test;

import com.google.common.collect.Iterables;
@@ -58,6 +59,8 @@ public void testRepeatUntilBlockBasic() throws Exception {
BlockTestingUtils.testIndependence(
Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))),
repeatBlock);
Assert.assertEquals(2, innerBlock.getPieceCount().getCount());
Assert.assertFalse(repeatBlock.getPieceCount().isKnown());
}

@Test

0 comments on commit d4596a0

Please sign in to comment.