Skip to content

Commit

Permalink
YARN-2958. Made RMStateStore not update the last sequence number when…
Browse files Browse the repository at this point in the history
… updating the delegation token. Contributed by Varun Saxena.
  • Loading branch information
zjshen14 committed Jan 5, 2015
1 parent dfd2589 commit 562a701
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 144 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -311,6 +311,9 @@ Release 2.7.0 - UNRELEASED
YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue. YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue.
(Rohith Sharmaks via ozawa) (Rohith Sharmaks via ozawa)


YARN-2958. Made RMStateStore not update the last sequence number when updating the
delegation token. (Varun Saxena via zjshen)

Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18


INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
Expand Down
Expand Up @@ -60,8 +60,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;

import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;


@Private @Private
Expand Down Expand Up @@ -452,11 +450,10 @@ public synchronized void removeApplicationStateInternal(
} }


@Override @Override
public synchronized void storeRMDelegationTokenAndSequenceNumberState( public synchronized void storeRMDelegationTokenState(
RMDelegationTokenIdentifier identifier, Long renewDate, RMDelegationTokenIdentifier identifier, Long renewDate)
int latestSequenceNumber) throws Exception { throws Exception {
storeOrUpdateRMDelegationTokenAndSequenceNumberState( storeOrUpdateRMDelegationTokenState(identifier, renewDate, false);
identifier, renewDate,latestSequenceNumber, false);
} }


@Override @Override
Expand All @@ -469,16 +466,15 @@ public synchronized void removeRMDelegationTokenState(
} }


@Override @Override
protected void updateRMDelegationTokenAndSequenceNumberInternal( protected void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
int latestSequenceNumber) throws Exception { throws Exception {
storeOrUpdateRMDelegationTokenAndSequenceNumberState( storeOrUpdateRMDelegationTokenState(rmDTIdentifier, renewDate, true);
rmDTIdentifier, renewDate,latestSequenceNumber, true);
} }


private void storeOrUpdateRMDelegationTokenAndSequenceNumberState( private void storeOrUpdateRMDelegationTokenState(
RMDelegationTokenIdentifier identifier, Long renewDate, RMDelegationTokenIdentifier identifier, Long renewDate,
int latestSequenceNumber, boolean isUpdate) throws Exception { boolean isUpdate) throws Exception {
Path nodeCreatePath = Path nodeCreatePath =
getNodePath(rmDTSecretManagerRoot, getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber()); DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
Expand All @@ -490,23 +486,24 @@ private void storeOrUpdateRMDelegationTokenAndSequenceNumberState(
} else { } else {
LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber()); LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
writeFile(nodeCreatePath, identifierData.toByteArray()); writeFile(nodeCreatePath, identifierData.toByteArray());
}


// store sequence number // store sequence number
Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot, Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber); DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + identifier.getSequenceNumber());
+ latestSequenceNumber); LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
if (dtSequenceNumberPath == null) { + identifier.getSequenceNumber());
if (!createFile(latestSequenceNumberPath)) { if (dtSequenceNumberPath == null) {
throw new Exception("Failed to create " + latestSequenceNumberPath); if (!createFile(latestSequenceNumberPath)) {
} throw new Exception("Failed to create " + latestSequenceNumberPath);
} else { }
if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) { } else {
throw new Exception("Failed to rename " + dtSequenceNumberPath); if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) {
throw new Exception("Failed to rename " + dtSequenceNumberPath);
}
} }
dtSequenceNumberPath = latestSequenceNumberPath;
} }
dtSequenceNumberPath = latestSequenceNumberPath;
} }


@Override @Override
Expand Down
Expand Up @@ -544,31 +544,30 @@ protected void removeApplicationStateInternal(ApplicationStateData appState)
throw new IOException(e); throw new IOException(e);
} }
} }


@Override private void storeOrUpdateRMDT(RMDelegationTokenIdentifier tokenId,
protected void storeRMDelegationTokenAndSequenceNumberState( Long renewDate, boolean isUpdate) throws IOException {
RMDelegationTokenIdentifier tokenId, Long renewDate,
int latestSequenceNumber) throws IOException {
String tokenKey = getRMDTTokenNodeKey(tokenId); String tokenKey = getRMDTTokenNodeKey(tokenId);
RMDelegationTokenIdentifierData tokenData = RMDelegationTokenIdentifierData tokenData =
new RMDelegationTokenIdentifierData(tokenId, renewDate); new RMDelegationTokenIdentifierData(tokenId, renewDate);
ByteArrayOutputStream bs = new ByteArrayOutputStream();
DataOutputStream ds = new DataOutputStream(bs);
try {
ds.writeInt(latestSequenceNumber);
} finally {
ds.close();
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Storing token to " + tokenKey); LOG.debug("Storing token to " + tokenKey);
LOG.debug("Storing " + latestSequenceNumber + " to "
+ RM_DT_SEQUENCE_NUMBER_KEY);
} }
try { try {
WriteBatch batch = db.createWriteBatch(); WriteBatch batch = db.createWriteBatch();
try { try {
batch.put(bytes(tokenKey), tokenData.toByteArray()); batch.put(bytes(tokenKey), tokenData.toByteArray());
batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray()); if(!isUpdate) {
ByteArrayOutputStream bs = new ByteArrayOutputStream();
try (DataOutputStream ds = new DataOutputStream(bs)) {
ds.writeInt(tokenId.getSequenceNumber());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Storing " + tokenId.getSequenceNumber() + " to "
+ RM_DT_SEQUENCE_NUMBER_KEY);
}
batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray());
}
db.write(batch); db.write(batch);
} finally { } finally {
batch.close(); batch.close();
Expand All @@ -579,11 +578,17 @@ protected void storeRMDelegationTokenAndSequenceNumberState(
} }


@Override @Override
protected void updateRMDelegationTokenAndSequenceNumberInternal( protected void storeRMDelegationTokenState(
RMDelegationTokenIdentifier tokenId, Long renewDate, RMDelegationTokenIdentifier tokenId, Long renewDate)
int latestSequenceNumber) throws IOException { throws IOException {
storeRMDelegationTokenAndSequenceNumberState(tokenId, renewDate, storeOrUpdateRMDT(tokenId, renewDate, false);
latestSequenceNumber); }

@Override
protected void updateRMDelegationTokenState(
RMDelegationTokenIdentifier tokenId, Long renewDate)
throws IOException {
storeOrUpdateRMDT(tokenId, renewDate, true);
} }


@Override @Override
Expand Down
Expand Up @@ -149,23 +149,30 @@ public synchronized void removeApplicationStateInternal(
} }
} }


@Override private void storeOrUpdateRMDT(RMDelegationTokenIdentifier rmDTIdentifier,
public synchronized void storeRMDelegationTokenAndSequenceNumberState( Long renewDate, boolean isUpdate) throws Exception {
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) throws Exception {
Map<RMDelegationTokenIdentifier, Long> rmDTState = Map<RMDelegationTokenIdentifier, Long> rmDTState =
state.rmSecretManagerState.getTokenState(); state.rmSecretManagerState.getTokenState();
if (rmDTState.containsKey(rmDTIdentifier)) { if (rmDTState.containsKey(rmDTIdentifier)) {
IOException e = new IOException("RMDelegationToken: " + rmDTIdentifier IOException e = new IOException("RMDelegationToken: " + rmDTIdentifier
+ "is already stored."); + "is already stored.");
LOG.info("Error storing info for RMDelegationToken: " + rmDTIdentifier, e); LOG.info("Error storing info for RMDelegationToken: " + rmDTIdentifier, e);
throw e; throw e;
} }
rmDTState.put(rmDTIdentifier, renewDate); rmDTState.put(rmDTIdentifier, renewDate);
state.rmSecretManagerState.dtSequenceNumber = latestSequenceNumber; if(!isUpdate) {
state.rmSecretManagerState.dtSequenceNumber =
rmDTIdentifier.getSequenceNumber();
}
LOG.info("Store RMDT with sequence number " LOG.info("Store RMDT with sequence number "
+ rmDTIdentifier.getSequenceNumber() + rmDTIdentifier.getSequenceNumber());
+ ". And the latest sequence number is " + latestSequenceNumber); }

@Override
public synchronized void storeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
storeOrUpdateRMDT(rmDTIdentifier, renewDate, false);
} }


@Override @Override
Expand All @@ -179,12 +186,11 @@ public synchronized void removeRMDelegationTokenState(
} }


@Override @Override
protected void updateRMDelegationTokenAndSequenceNumberInternal( protected void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
int latestSequenceNumber) throws Exception { throws Exception {
removeRMDelegationTokenState(rmDTIdentifier); removeRMDelegationTokenState(rmDTIdentifier);
storeRMDelegationTokenAndSequenceNumberState( storeOrUpdateRMDT(rmDTIdentifier, renewDate, true);
rmDTIdentifier, renewDate, latestSequenceNumber);
LOG.info("Update RMDT with sequence number " LOG.info("Update RMDT with sequence number "
+ rmDTIdentifier.getSequenceNumber()); + rmDTIdentifier.getSequenceNumber());
} }
Expand Down
Expand Up @@ -77,9 +77,9 @@ protected void removeApplicationStateInternal(ApplicationStateData appState)
} }


@Override @Override
public void storeRMDelegationTokenAndSequenceNumberState( public void storeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
int latestSequenceNumber) throws Exception { throws Exception {
// Do nothing // Do nothing
} }


Expand All @@ -90,9 +90,9 @@ public void removeRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentif
} }


@Override @Override
protected void updateRMDelegationTokenAndSequenceNumberInternal( protected void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
int latestSequenceNumber) throws Exception { throws Exception {
// Do nothing // Do nothing
} }


Expand Down
Expand Up @@ -296,9 +296,8 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
try { try {
LOG.info("Storing RMDelegationToken and SequenceNumber"); LOG.info("Storing RMDelegationToken and SequenceNumber");
store.storeRMDelegationTokenAndSequenceNumberState( store.storeRMDelegationTokenState(
dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate(), dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
dtEvent.getLatestSequenceNumber());
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error While Storing RMDelegationToken and SequenceNumber ", LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
e); e);
Expand Down Expand Up @@ -341,9 +340,8 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
try { try {
LOG.info("Updating RMDelegationToken and SequenceNumber"); LOG.info("Updating RMDelegationToken and SequenceNumber");
store.updateRMDelegationTokenAndSequenceNumberInternal( store.updateRMDelegationTokenState(
dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate(), dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
dtEvent.getLatestSequenceNumber());
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error While Updating RMDelegationToken and SequenceNumber ", LOG.error("Error While Updating RMDelegationToken and SequenceNumber ",
e); e);
Expand Down Expand Up @@ -672,29 +670,28 @@ protected abstract void updateApplicationAttemptStateInternal(
* RMDTSecretManager call this to store the state of a delegation token * RMDTSecretManager call this to store the state of a delegation token
* and sequence number * and sequence number
*/ */
public void storeRMDelegationTokenAndSequenceNumber( public void storeRMDelegationToken(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) {
int latestSequenceNumber) {
handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate, handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
latestSequenceNumber, RMStateStoreEventType.STORE_DELEGATION_TOKEN)); RMStateStoreEventType.STORE_DELEGATION_TOKEN));
} }


/** /**
* Blocking API * Blocking API
* Derived classes must implement this method to store the state of * Derived classes must implement this method to store the state of
* RMDelegationToken and sequence number * RMDelegationToken and sequence number
*/ */
protected abstract void storeRMDelegationTokenAndSequenceNumberState( protected abstract void storeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
int latestSequenceNumber) throws Exception; throws Exception;


/** /**
* RMDTSecretManager call this to remove the state of a delegation token * RMDTSecretManager call this to remove the state of a delegation token
*/ */
public void removeRMDelegationToken( public void removeRMDelegationToken(
RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) { RMDelegationTokenIdentifier rmDTIdentifier) {
handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, null, handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, null,
sequenceNumber, RMStateStoreEventType.REMOVE_DELEGATION_TOKEN)); RMStateStoreEventType.REMOVE_DELEGATION_TOKEN));
} }


/** /**
Expand All @@ -708,21 +705,20 @@ protected abstract void removeRMDelegationTokenState(
* RMDTSecretManager call this to update the state of a delegation token * RMDTSecretManager call this to update the state of a delegation token
* and sequence number * and sequence number
*/ */
public void updateRMDelegationTokenAndSequenceNumber( public void updateRMDelegationToken(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate) {
int latestSequenceNumber) {
handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate, handleStoreEvent(new RMStateStoreRMDTEvent(rmDTIdentifier, renewDate,
latestSequenceNumber, RMStateStoreEventType.UPDATE_DELEGATION_TOKEN)); RMStateStoreEventType.UPDATE_DELEGATION_TOKEN));
} }


/** /**
* Blocking API * Blocking API
* Derived classes must implement this method to update the state of * Derived classes must implement this method to update the state of
* RMDelegationToken and sequence number * RMDelegationToken and sequence number
*/ */
protected abstract void updateRMDelegationTokenAndSequenceNumberInternal( protected abstract void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
int latestSequenceNumber) throws Exception; throws Exception;


/** /**
* RMDTSecretManager call this to store the state of a master key * RMDTSecretManager call this to store the state of a master key
Expand Down
Expand Up @@ -23,18 +23,16 @@
public class RMStateStoreRMDTEvent extends RMStateStoreEvent { public class RMStateStoreRMDTEvent extends RMStateStoreEvent {
private RMDelegationTokenIdentifier rmDTIdentifier; private RMDelegationTokenIdentifier rmDTIdentifier;
private Long renewDate; private Long renewDate;
private int latestSequenceNumber;


public RMStateStoreRMDTEvent(RMStateStoreEventType type) { public RMStateStoreRMDTEvent(RMStateStoreEventType type) {
super(type); super(type);
} }


public RMStateStoreRMDTEvent(RMDelegationTokenIdentifier rmDTIdentifier, public RMStateStoreRMDTEvent(RMDelegationTokenIdentifier rmDTIdentifier,
Long renewDate, int latestSequenceNumber, RMStateStoreEventType type) { Long renewDate, RMStateStoreEventType type) {
this(type); this(type);
this.rmDTIdentifier = rmDTIdentifier; this.rmDTIdentifier = rmDTIdentifier;
this.renewDate = renewDate; this.renewDate = renewDate;
this.latestSequenceNumber = latestSequenceNumber;
} }


public RMDelegationTokenIdentifier getRmDTIdentifier() { public RMDelegationTokenIdentifier getRmDTIdentifier() {
Expand All @@ -44,8 +42,4 @@ public RMDelegationTokenIdentifier getRmDTIdentifier() {
public Long getRenewDate() { public Long getRenewDate() {
return renewDate; return renewDate;
} }

}
public int getLatestSequenceNumber() {
return latestSequenceNumber;
}
}

0 comments on commit 562a701

Please sign in to comment.