Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-429] SWPP TEAM3 Code Smell Fix #265

Merged
merged 19 commits into from
Dec 6, 2019
Merged
4 changes: 2 additions & 2 deletions common/src/test/java/org/apache/nemo/common/DAGTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public void testSimpleDAG() {
assertEquals(dag.getIncomingEdgesOf(new IntegerVertex(1)).size(), 0);
assertEquals(dag.getOutgoingEdgesOf(new IntegerVertex(5)).size(), 0);
assertEquals(dag.getIncomingEdgesOf(new IntegerVertex(3)).size(), 1);
assertEquals(dag.getOutgoingEdgesOf(new IntegerVertex(4)).size(), 1);
assertEquals(dag.getTopologicalSort().size(), 5);
assertEquals(1, dag.getOutgoingEdgesOf(new IntegerVertex(4)).size());
assertEquals(5, dag.getTopologicalSort().size());
davin111 marked this conversation as resolved.
Show resolved Hide resolved

final List<IntegerVertex> topologicalOrder = dag.getTopologicalSort();
assertEquals(topologicalOrder.get(0).getValue(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public final class ReduceByKeyTransform<K, V> extends NoWatermarkEmitTransform<Tuple2<K, V>, Tuple2<K, V>> {
private static final Logger LOG = LoggerFactory.getLogger(ReduceByKeyTransform.class.getName());

// TODO #431: Handle states in Transforms better
private final Map<K, List<V>> keyToValues;
private final Function2<V, V, V> func;
private OutputCollector<Tuple2<K, V>> outputCollector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,23 @@
public final class ReduceTransform<T> implements Transform<T, T> {
private final Function2<T, T, T> func;
private OutputCollector<T> outputCollector;
// TODO #431: Handle states in Transforms better
private T result;
davin111 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Constructor.
*
* @param func function to run for the reduce transform.
*/
// TODO #432: ReduceTransform Unit Test
public ReduceTransform(final Function2<T, T, T> func) {
davin111 marked this conversation as resolved.
Show resolved Hide resolved
this.func = func;
this.result = null;
davin111 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void prepare(final Context context, final OutputCollector<T> oc) {
this.outputCollector = oc;
this.result = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public final class MetricUtils {
public static final String POSTGRESQL_METADATA_DB_NAME =
"jdbc:postgresql://nemo-optimization.cabbufr3evny.us-west-2.rds.amazonaws.com:5432/nemo_optimization";
private static final String METADATA_TABLE_NAME = "nemo_optimization_meta";
private static final String SAVING_METADATA_FAIL_MSG = "Saving of Metadata to DB failed: ";

/**
* Private constructor.
Expand Down Expand Up @@ -166,7 +167,7 @@ private static void updateMetaData() {
try {
insertOrUpdateMetadata(c, "EP_KEY_METADATA", l, r);
} catch (SQLException e) {
LOG.warn("Saving of Metadata to DB failed: ", e);
LOG.warn(SAVING_METADATA_FAIL_MSG, e);
}
});
LOG.info("EP Key Metadata saved to DB.");
Expand All @@ -177,14 +178,14 @@ private static void updateMetaData() {
try {
insertOrUpdateMetadata(c, "EP_METADATA", l.left() * 10000 + l.right(), r);
} catch (SQLException e) {
LOG.warn("Saving of Metadata to DB failed: ", e);
LOG.warn(SAVING_METADATA_FAIL_MSG, e);
}
});
LOG.info("EP Metadata saved to DB.");
}
}
} catch (SQLException e) {
LOG.warn("Saving of Metadata to DB failed: ", e);
LOG.warn(SAVING_METADATA_FAIL_MSG, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public NonSerializedMemoryBlock createBlock(final String blockId) {
* @throws BlockWriteException if fail to write.
*/
@Override
public void writeBlock(final Block block) throws BlockWriteException {
public void writeBlock(final Block block) {
if (!(block instanceof NonSerializedMemoryBlock)) {
throw new BlockWriteException(new Throwable(
this.toString() + "only accept " + NonSerializedPartition.class.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public Block createBlock(final String blockId) {
* @throws BlockWriteException if fail to write.
*/
@Override
public void writeBlock(final Block block) throws BlockWriteException {
public void writeBlock(final Block block) {
if (!(block instanceof SerializedMemoryBlock)) {
throw new BlockWriteException(new Throwable(
this.toString() + "only accept " + SerializedMemoryBlock.class.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private int getMessageId(final Set<StageEdge> stageEdges) {
.map(edge -> edge.getExecutionProperties()
.get(MessageIdEdgeProperty.class)
.<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException(edge.getId())))
.findFirst().<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException());
.findFirst().<IllegalArgumentException>orElseThrow(IllegalArgumentException::new);
// Type casting is needed. See: https://stackoverflow.com/a/40865318

return messageIds.iterator().next();
Expand Down Expand Up @@ -286,7 +286,7 @@ public void onTaskStateReportFromExecutor(final String executorId,
public void onSpeculativeExecutionCheck() {
MutableBoolean isNewCloneCreated = new MutableBoolean(false);

selectEarliestSchedulableGroup().ifPresent(scheduleGroup -> {
selectEarliestSchedulableGroup().ifPresent(scheduleGroup ->
scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
final Stage stage = planStateManager.getPhysicalPlan().getStageDAG().getVertexById(stageId);

Expand All @@ -296,8 +296,8 @@ public void onSpeculativeExecutionCheck() {
isNewCloneCreated.setValue(doSpeculativeExecution(stage, cloneConf));
}
});
});
});
})
);

if (isNewCloneCreated.booleanValue()) {
doSchedule(); // Do schedule the new clone.
Expand Down Expand Up @@ -513,7 +513,7 @@ private Set<StageEdge> getEdgesToOptimize(final String taskId) {
final Stage stagePutOnHold = stageDag.getVertices().stream()
.filter(stage -> stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
.findFirst()
.orElseThrow(() -> new RuntimeException());
.orElseThrow(RuntimeException::new);

// Stage put on hold, i.e. stage with vertex containing MessageAggregatorTransform
// should have a parent stage whose outgoing edges contain the target edge of dynamic optimization.
Expand Down