Skip to content

Commit

Permalink
[core] move AbstractID from runtime to core
Browse files Browse the repository at this point in the history
  • Loading branch information
Maximilian Michels authored and mxm committed Feb 27, 2015
1 parent 557edff commit facf54e
Show file tree
Hide file tree
Showing 19 changed files with 97 additions and 63 deletions.
Expand Up @@ -16,17 +16,14 @@
* limitations under the License.
*/

package org.apache.flink.runtime;
package org.apache.flink.util;

import java.io.IOException;
import java.util.Random;

import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.StringUtils;

import io.netty.buffer.ByteBuf;

/**
* A statistically unique identification number.
Expand All @@ -46,10 +43,10 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j


/** The upper part of the actual ID */
private long upperPart;
protected long upperPart;

/** The lower part of the actual ID */
private long lowerPart;
protected long lowerPart;

// --------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -148,11 +145,6 @@ public void write(DataOutputView out) throws IOException {
out.writeLong(this.upperPart);
}

public void writeTo(ByteBuf buf) {
buf.writeLong(this.lowerPart);
buf.writeLong(this.upperPart);
}

// --------------------------------------------------------------------------------------------
// Standard Utilities
// --------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -16,21 +16,19 @@
* limitations under the License.
*/

package org.apache.flink.runtime;
package org.apache.flink.util;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;

import org.apache.flink.runtime.testutils.CommonTestUtils;

import java.nio.ByteBuffer;

/**
* This class contains tests for the {@link org.apache.flink.runtime.AbstractID} class.
* This class contains tests for the {@link org.apache.flink.util.AbstractID} class.
*/
public class AbstractIDTest {
/**
Expand All @@ -40,7 +38,7 @@ public class AbstractIDTest {
public void testSerialization() {
final AbstractID origID = new AbstractID();
try {
final AbstractID copyID = (AbstractID) CommonTestUtils.createCopyWritable(origID);
final AbstractID copyID = CommonTestUtils.createCopy(origID);

assertEquals(origID.hashCode(), copyID.hashCode());
assertEquals(origID, copyID);
Expand Down Expand Up @@ -71,26 +69,6 @@ public void testConvertToBytes() {
}
}

@Test
public void testConvertToByteBuffer() {
try {
JobID origID = new JobID();

byte[] bytes = origID.getBytes();
ByteBuffer buffer = ByteBuffer.wrap(bytes);

JobID copy1 = JobID.fromByteBuffer(buffer);
JobID copy2 = JobID.fromByteArray(bytes);

assertEquals(origID, copy1);
assertEquals(origID, copy2);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void testCompare() {
try {
Expand All @@ -107,16 +85,16 @@ public void testCompare() {
AbstractID id10 = new AbstractID(Long.MIN_VALUE, Long.MAX_VALUE);

// test self equality
assertEquals(0, id1.compareTo(CommonTestUtils.createCopyWritable(id1)));
assertEquals(0, id2.compareTo(CommonTestUtils.createCopyWritable(id2)));
assertEquals(0, id3.compareTo(CommonTestUtils.createCopyWritable(id3)));
assertEquals(0, id4.compareTo(CommonTestUtils.createCopyWritable(id4)));
assertEquals(0, id5.compareTo(CommonTestUtils.createCopyWritable(id5)));
assertEquals(0, id6.compareTo(CommonTestUtils.createCopyWritable(id6)));
assertEquals(0, id7.compareTo(CommonTestUtils.createCopyWritable(id7)));
assertEquals(0, id8.compareTo(CommonTestUtils.createCopyWritable(id8)));
assertEquals(0, id9.compareTo(CommonTestUtils.createCopyWritable(id9)));
assertEquals(0, id10.compareTo(CommonTestUtils.createCopyWritable(id10)));
assertEquals(0, id1.compareTo(CommonTestUtils.createCopy(id1)));
assertEquals(0, id2.compareTo(CommonTestUtils.createCopy(id2)));
assertEquals(0, id3.compareTo(CommonTestUtils.createCopy(id3)));
assertEquals(0, id4.compareTo(CommonTestUtils.createCopy(id4)));
assertEquals(0, id5.compareTo(CommonTestUtils.createCopy(id5)));
assertEquals(0, id6.compareTo(CommonTestUtils.createCopy(id6)));
assertEquals(0, id7.compareTo(CommonTestUtils.createCopy(id7)));
assertEquals(0, id8.compareTo(CommonTestUtils.createCopy(id8)));
assertEquals(0, id9.compareTo(CommonTestUtils.createCopy(id9)));
assertEquals(0, id10.compareTo(CommonTestUtils.createCopy(id10)));

// test order
assertCompare(id1, id2, -1);
Expand Down
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.executiongraph;

import io.netty.buffer.ByteBuf;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;

/**
* Unique identifier for the attempt to execute a tasks. Multiple attempts happen
Expand All @@ -36,6 +36,11 @@ public ExecutionAttemptID(long lowerPart, long upperPart) {
super(lowerPart, upperPart);
}

public void writeTo(ByteBuf buf) {
buf.writeLong(this.lowerPart);
buf.writeLong(this.upperPart);
}

public static ExecutionAttemptID fromByteBuf(ByteBuf buf) {
long lower = buf.readLong();
long upper = buf.readLong();
Expand Down
Expand Up @@ -26,7 +26,7 @@
import java.util.Set;

import akka.actor.ActorRef;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
Expand Down
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.runtime.instance;

import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;

/**
* Class for statistically unique instance IDs.
Expand Down
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.runtime.instance;

import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;

Expand Down
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.runtime.instance;

import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
Expand Down
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.runtime.instance;

import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;
import org.apache.flink.runtime.jobgraph.JobID;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;

import io.netty.buffer.ByteBuf;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;

public class InputChannelID extends AbstractID {

Expand All @@ -37,6 +37,11 @@ public InputChannelID(AbstractID id) {
super(id);
}

public void writeTo(ByteBuf buf) {
buf.writeLong(this.lowerPart);
buf.writeLong(this.upperPart);
}

public static InputChannelID fromByteBuf(ByteBuf buf) {
long lower = buf.readLong();
long upper = buf.readLong();
Expand Down
Expand Up @@ -20,7 +20,7 @@

import java.util.UUID;

import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;

public class IntermediateDataSetID extends AbstractID {

Expand Down
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.jobgraph;

import io.netty.buffer.ByteBuf;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;

public class IntermediateResultPartitionID extends AbstractID {

Expand All @@ -36,6 +36,11 @@ public IntermediateResultPartitionID(long lowerPart, long upperPart) {
super(lowerPart, upperPart);
}

public void writeTo(ByteBuf buf) {
buf.writeLong(this.lowerPart);
buf.writeLong(this.upperPart);
}

public static IntermediateResultPartitionID fromByteBuf(ByteBuf buf) {
long lower = buf.readLong();
long upper = buf.readLong();
Expand Down
Expand Up @@ -21,7 +21,7 @@

import javax.xml.bind.DatatypeConverter;

import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;

import java.nio.ByteBuffer;

Expand Down
Expand Up @@ -20,7 +20,7 @@

import javax.xml.bind.DatatypeConverter;

import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;

/**
* A class for statistically unique job vertex IDs.
Expand Down
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.runtime.jobmanager.scheduler;

import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;
import org.apache.flink.runtime.instance.Instance;

import com.google.common.base.Preconditions;
Expand Down
Expand Up @@ -21,7 +21,7 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;

import com.google.common.base.Preconditions;
Expand Down
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.runtime.jobmanager.scheduler;

import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;

public class ResourceId extends AbstractID {
private static final long serialVersionUID = 1L;
Expand Down
Expand Up @@ -37,7 +37,7 @@
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;
import org.apache.flink.runtime.instance.SharedSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.slf4j.Logger;
Expand Down
Expand Up @@ -30,7 +30,7 @@

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.runtime.AbstractID;
import org.apache.flink.util.AbstractID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SharedSlot;
Expand Down
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobgraph;

import org.junit.Test;

import java.nio.ByteBuffer;

import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;

public class JobIdTest {

@Test
public void testConvertToByteBuffer() {
try {
JobID origID = new JobID();

byte[] bytes = origID.getBytes();
ByteBuffer buffer = ByteBuffer.wrap(bytes);

JobID copy1 = JobID.fromByteBuffer(buffer);
JobID copy2 = JobID.fromByteArray(bytes);

assertEquals(origID, copy1);
assertEquals(origID, copy2);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

}

0 comments on commit facf54e

Please sign in to comment.