Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEMO-429] SWPP TEAM19 Code Smell Fix #269

Merged
merged 13 commits into from Dec 17, 2019
1 change: 1 addition & 0 deletions common/src/main/java/org/apache/nemo/common/Pair.java
Expand Up @@ -28,6 +28,7 @@
* @param <B> type of the right element.
*/
public final class Pair<A, B> implements Serializable {
// TODO #430: Pair elements should be serializable
private final A left;
private final B right;

Expand Down
Expand Up @@ -60,7 +60,7 @@ public String toString() {
*/
private final class BytesDecoder implements Decoder<byte[]> {

private final InputStream inputStream;
private final transient InputStream inputStream;
private boolean returnedArray;

/**
Expand Down
Expand Up @@ -88,7 +88,7 @@ private LoopVertex(final LoopVertex that) {
that.iterativeIncomingEdges.forEach((v, es) -> es.forEach(this::addIterativeIncomingEdge));
that.nonIterativeIncomingEdges.forEach((v, es) -> es.forEach(this::addNonIterativeIncomingEdge));
that.dagOutgoingEdges.forEach(((v, es) -> es.forEach(this::addDagOutgoingEdge)));
that.edgeWithLoopToEdgeWithInternalVertex.forEach((eLoop, eInternal) -> this.mapEdgeWithLoop(eLoop, eInternal));
that.edgeWithLoopToEdgeWithInternalVertex.forEach(this::mapEdgeWithLoop);
this.maxNumberOfIterations = that.maxNumberOfIterations;
this.terminationCondition = that.terminationCondition;
}
Expand All @@ -106,7 +106,7 @@ public DAGBuilder<IRVertex, IREdge> getBuilder() {
}

/**
* @return the DAG of rthe LoopVertex
* @return the DAG of the LoopVertex
*/
public DAG<IRVertex, IREdge> getDAG() {
return builder.buildWithoutSourceSinkCheck();
Expand Down
2 changes: 1 addition & 1 deletion common/src/test/java/org/apache/nemo/common/DAGTest.java
Expand Up @@ -138,7 +138,7 @@ public void testNormalDAG() {
assertEquals(descendants.size(), 0);

descendants = dag.getDescendants("2");
assertEquals(descendants.size(), 1);
assertEquals(1, descendants.size());
assertTrue(descendants.contains(new IntegerVertex(3)));
}

Expand Down
Expand Up @@ -109,8 +109,8 @@ public void setCurrentWatermarkOfAllMainAndSideInputs(final long newWatermark) {

this.curWatermark = newWatermark;
// TODO #282: Handle late data
inMemorySideInputs.entrySet().removeIf(entry -> {
return entry.getKey().right().maxTimestamp().getMillis() <= this.curWatermark; // Discard old sideinputs.
});
inMemorySideInputs.entrySet().removeIf(entry ->
entry.getKey().right().maxTimestamp().getMillis() <= this.curWatermark // Discard old sideinputs.
);
}
}
Expand Up @@ -78,7 +78,7 @@ public SerializedMemoryBlock(final String blockId,
*/
@Override
public void write(final K key,
final Object element) throws BlockWriteException {
final Object element) {
if (committed) {
throw new BlockWriteException(new Throwable("The partition is already committed!"));
} else {
Expand All @@ -104,7 +104,7 @@ public void write(final K key,
* @throws BlockWriteException for any error occurred while trying to write a block.
*/
@Override
public void writePartitions(final Iterable<NonSerializedPartition<K>> partitions) throws BlockWriteException {
public void writePartitions(final Iterable<NonSerializedPartition<K>> partitions) {
if (!committed) {
try {
final Iterable<SerializedPartition<K>> convertedPartitions = DataUtil.convertToSerPartitions(
Expand All @@ -127,7 +127,7 @@ public void writePartitions(final Iterable<NonSerializedPartition<K>> partitions
* @throws BlockWriteException for any error occurred while trying to write a block.
*/
@Override
public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> partitions) throws BlockWriteException {
public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> partitions) {
if (!committed) {
partitions.forEach(serializedPartitions::add);
} else {
Expand All @@ -145,7 +145,7 @@ public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> par
* @throws BlockFetchException for any error occurred while trying to fetch a block.
*/
@Override
public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange keyRange) throws BlockFetchException {
public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange keyRange) {
try {
return DataUtil.convertToNonSerPartitions(serializer, readSerializedPartitions(keyRange));
} catch (final IOException e) {
Expand All @@ -162,7 +162,7 @@ public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange keyRang
* @throws BlockFetchException for any error occurred while trying to fetch a block.
*/
@Override
public Iterable<SerializedPartition<K>> readSerializedPartitions(final KeyRange keyRange) throws BlockFetchException {
public Iterable<SerializedPartition<K>> readSerializedPartitions(final KeyRange keyRange) {
if (committed) {
final List<SerializedPartition<K>> partitionsInRange = new ArrayList<>();
serializedPartitions.forEach(serializedPartition -> {
Expand All @@ -186,7 +186,7 @@ public Iterable<SerializedPartition<K>> readSerializedPartitions(final KeyRange
* @throws BlockWriteException for any error occurred while trying to write a block.
*/
@Override
public synchronized Optional<Map<K, Long>> commit() throws BlockWriteException {
public synchronized Optional<Map<K, Long>> commit() {
try {
if (!committed) {
commitPartitions();
Expand All @@ -213,7 +213,7 @@ public synchronized Optional<Map<K, Long>> commit() throws BlockWriteException {
* Commits all un-committed partitions.
*/
@Override
public synchronized void commitPartitions() throws BlockWriteException {
public synchronized void commitPartitions() {
try {
for (final SerializedPartition<K> partition : nonCommittedPartitionsMap.values()) {
partition.commit();
Expand Down