Skip to content

Commit

Permalink
[FLINK-14405][runtime] Update ResourceProfile and ResourceSpec to ali…
Browse files Browse the repository at this point in the history
…gn with FLIP-49 resource dimensions.
  • Loading branch information
xintongsong committed Oct 18, 2019
1 parent 11b699b commit 02a54db
Show file tree
Hide file tree
Showing 12 changed files with 472 additions and 353 deletions.

Large diffs are not rendered by default.

Expand Up @@ -36,42 +36,42 @@ public class ResourceSpecTest extends TestLogger {

@Test
public void testIsValid() throws Exception {
ResourceSpec rs = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
assertTrue(rs.isValid());

rs = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1).
build();
assertTrue(rs.isValid());

rs = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(-1).
build();
assertFalse(rs.isValid());
}

@Test
public void testLessThanOrEqual() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
assertTrue(rs1.lessThanOrEqual(rs2));
assertTrue(rs2.lessThanOrEqual(rs1));

ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1.1).
build();
assertTrue(rs1.lessThanOrEqual(rs3));
assertFalse(rs3.lessThanOrEqual(rs1));

ResourceSpec rs4 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
assertFalse(rs4.lessThanOrEqual(rs3));
Expand All @@ -80,52 +80,52 @@ public void testLessThanOrEqual() throws Exception {

@Test
public void testEquals() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
assertEquals(rs1, rs2);
assertEquals(rs2, rs1);

ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
ResourceSpec rs4 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1).
build();
assertNotEquals(rs3, rs4);

ResourceSpec rs5 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
assertEquals(rs3, rs5);
}

@Test
public void testHashCode() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs1 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
assertEquals(rs1.hashCode(), rs2.hashCode());

ResourceSpec rs3 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
ResourceSpec rs4 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1).
build();
assertNotEquals(rs3.hashCode(), rs4.hashCode());

ResourceSpec rs5 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(2.2).
build();
assertEquals(rs3.hashCode(), rs5.hashCode());
Expand All @@ -135,10 +135,10 @@ public void testHashCode() throws Exception {
public void testMerge() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1.1).
build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec rs2 = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();

ResourceSpec rs3 = rs1.merge(rs2);
assertEquals(1.1, rs3.getGPUResource(), 0.000001);
Expand All @@ -151,7 +151,7 @@ public void testMerge() throws Exception {
public void testSerializable() throws Exception {
ResourceSpec rs1 = ResourceSpec.newBuilder().
setCpuCores(1.0).
setHeapMemoryInMB(100).
setTaskHeapMemoryMB(100).
setGPUResource(1.1).
build();

Expand All @@ -164,7 +164,7 @@ public void testMergeThisUnknown() throws Exception {
final ResourceSpec spec1 = ResourceSpec.UNKNOWN;
final ResourceSpec spec2 = ResourceSpec.newBuilder()
.setCpuCores(1.0)
.setHeapMemoryInMB(100)
.setTaskHeapMemoryMB(100)
.setGPUResource(1.1)
.build();

Expand All @@ -177,7 +177,7 @@ public void testMergeThisUnknown() throws Exception {
public void testMergeOtherUnknown() throws Exception {
final ResourceSpec spec1 = ResourceSpec.newBuilder()
.setCpuCores(1.0)
.setHeapMemoryInMB(100)
.setTaskHeapMemoryMB(100)
.setGPUResource(1.1)
.build();
final ResourceSpec spec2 = ResourceSpec.UNKNOWN;
Expand Down
Expand Up @@ -60,8 +60,8 @@ public void testConfigurationOfResource() throws Exception{
opMethod.setAccessible(true);

// verify explicit change in resources
ResourceSpec minResources = ResourceSpec.newBuilder().setCpuCores(1.0).setHeapMemoryInMB(100).build();
ResourceSpec preferredResources = ResourceSpec.newBuilder().setCpuCores(2.0).setHeapMemoryInMB(200).build();
ResourceSpec minResources = ResourceSpec.newBuilder().setCpuCores(1.0).setTaskHeapMemoryMB(100).build();
ResourceSpec preferredResources = ResourceSpec.newBuilder().setCpuCores(2.0).setTaskHeapMemoryMB(200).build();
opMethod.invoke(operator, minResources, preferredResources);

assertEquals(minResources, operator.getMinResources());
Expand Down
Expand Up @@ -73,13 +73,13 @@ public class JobGraphGeneratorTest {
*/
@Test
public void testResourcesForChainedOperators() throws Exception {
ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setHeapMemoryInMB(100).build();
ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setHeapMemoryInMB(200).build();
ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setHeapMemoryInMB(300).build();
ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setHeapMemoryInMB(400).build();
ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setHeapMemoryInMB(500).build();
ResourceSpec resource6 = ResourceSpec.newBuilder().setCpuCores(0.6).setHeapMemoryInMB(600).build();
ResourceSpec resource7 = ResourceSpec.newBuilder().setCpuCores(0.7).setHeapMemoryInMB(700).build();
ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setTaskHeapMemoryMB(100).build();
ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setTaskHeapMemoryMB(200).build();
ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setTaskHeapMemoryMB(300).build();
ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setTaskHeapMemoryMB(400).build();
ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setTaskHeapMemoryMB(500).build();
ResourceSpec resource6 = ResourceSpec.newBuilder().setCpuCores(0.6).setTaskHeapMemoryMB(600).build();
ResourceSpec resource7 = ResourceSpec.newBuilder().setCpuCores(0.7).setTaskHeapMemoryMB(700).build();

Method opMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class);
opMethod.setAccessible(true);
Expand Down Expand Up @@ -147,12 +147,12 @@ public boolean filter(Long value) throws Exception {
*/
@Test
public void testResourcesForDeltaIteration() throws Exception{
ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setHeapMemoryInMB(100).build();
ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setHeapMemoryInMB(200).build();
ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setHeapMemoryInMB(300).build();
ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setHeapMemoryInMB(400).build();
ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setHeapMemoryInMB(500).build();
ResourceSpec resource6 = ResourceSpec.newBuilder().setCpuCores(0.6).setHeapMemoryInMB(600).build();
ResourceSpec resource1 = ResourceSpec.newBuilder().setCpuCores(0.1).setTaskHeapMemoryMB(100).build();
ResourceSpec resource2 = ResourceSpec.newBuilder().setCpuCores(0.2).setTaskHeapMemoryMB(200).build();
ResourceSpec resource3 = ResourceSpec.newBuilder().setCpuCores(0.3).setTaskHeapMemoryMB(300).build();
ResourceSpec resource4 = ResourceSpec.newBuilder().setCpuCores(0.4).setTaskHeapMemoryMB(400).build();
ResourceSpec resource5 = ResourceSpec.newBuilder().setCpuCores(0.5).setTaskHeapMemoryMB(500).build();
ResourceSpec resource6 = ResourceSpec.newBuilder().setCpuCores(0.6).setTaskHeapMemoryMB(600).build();

Method opMethod = Operator.class.getDeclaredMethod("setResources", ResourceSpec.class);
opMethod.setAccessible(true);
Expand Down

0 comments on commit 02a54db

Please sign in to comment.