Skip to content

Commit

Permalink
Use CountDownLatches to avoid calls to sleep in ApplicationMaster
Browse files Browse the repository at this point in the history
  • Loading branch information
aaudiber committed Dec 5, 2015
1 parent 100ec6e commit 20fbe93
Showing 1 changed file with 11 additions and 17 deletions.
28 changes: 11 additions & 17 deletions integration/yarn/src/main/java/tachyon/yarn/ApplicationMaster.java
Expand Up @@ -71,14 +71,14 @@ public final class ApplicationMaster implements AMRMClientAsync.CallbackHandler
private AMRMClientAsync<ContainerRequest> mRMClient;
/** Client to talk to Node Manager */
private NMClient mNMClient;
/** Whether a container for Tachyon master is allocated */
private volatile boolean mMasterContainerAllocated;
/** The count starts at 1, then becomes 0 when we allocate a container for the Tachyon master */
private CountDownLatch mMasterContainerAllocated;
/** Latch counting down the number of workers to allocate before all mNumWorkers are allocated */
private CountDownLatch mAllWorkersAllocatedLatch;
/** Network address of the container allocated for Tachyon master */
private String mMasterContainerNetAddress;

private volatile boolean mApplicationDone;
/** The count starts at 1, then becomes 0 when the application is done */
private CountDownLatch mApplicationDone;

public ApplicationMaster(int numWorkers, String tachyonHome, String masterAddress) {
mMasterCpu = mTachyonConf.getInt(Constants.INTEGRATION_MASTER_RESOURCE_CPU);
Expand All @@ -94,9 +94,9 @@ public ApplicationMaster(int numWorkers, String tachyonHome, String masterAddres
mNumWorkers = numWorkers;
mTachyonHome = tachyonHome;
mMasterAddress = masterAddress;
mMasterContainerAllocated = false;
mMasterContainerAllocated = new CountDownLatch(1);
mAllWorkersAllocatedLatch = new CountDownLatch(mNumWorkers);
mApplicationDone = false;
mApplicationDone = new CountDownLatch(1);
}

/**
Expand All @@ -123,7 +123,7 @@ public static void main(String[] args) {

@Override
public void onContainersAllocated(List<Container> containers) {
if (!mMasterContainerAllocated) {
if (mMasterContainerAllocated.getCount() != 0) {
launchTachyonMasterContainers(containers);
} else {
launchTachyonWorkerContainers(containers);
Expand All @@ -142,7 +142,7 @@ public void onNodesUpdated(List<NodeReport> updated) {}

@Override
public void onShutdownRequest() {
mApplicationDone = true;
mApplicationDone.countDown();
}

@Override
Expand Down Expand Up @@ -193,9 +193,7 @@ public void requestContainers() throws Exception {
mRMClient.addContainerRequest(masterContainerAsk);

// Wait until Tachyon master container has been allocated
while (!mMasterContainerAllocated) {
Thread.sleep(1000);
}
mMasterContainerAllocated.await();

// Resource requirements for master containers
Resource workerResource = Records.newRecord(Resource.class);
Expand All @@ -216,11 +214,7 @@ public void requestContainers() throws Exception {
mAllWorkersAllocatedLatch.await();

LOG.info("Master and workers are launched");
// Wait for 5 more seconds to avoid application unregistered before some container fully
// launched.
while (!mApplicationDone) {
Thread.sleep(5000);
}
mApplicationDone.await();
}

public void stop() {
Expand Down Expand Up @@ -254,7 +248,7 @@ private void launchTachyonMasterContainers(List<Container> containers) {
String containerUri = container.getNodeHttpAddress(); // in the form of 1.2.3.4:8042
mMasterContainerNetAddress = containerUri.split(":")[0];
LOG.info("Master address: " + mMasterContainerNetAddress);
mMasterContainerAllocated = true;
mMasterContainerAllocated.countDown();
return;
} catch (Exception ex) {
LOG.error("Error launching container " + container.getId() + " " + ex);
Expand Down

0 comments on commit 20fbe93

Please sign in to comment.