Skip to content

Commit

Permalink
[FOLLOWUP] Store app user in shuffleTaskInfo (#181)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Store app user information in shuffleTaskInfo.

### Why are the changes needed?
Reduce cache.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Passed the ut.
  • Loading branch information
smallzhongfeng committed Aug 23, 2022
1 parent e968c00 commit 5864420
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.collect.Maps;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

/**
* ShuffleTaskInfo contains the information of submitting the shuffle,
* the information of the cache block, and the timestamp corresponding to the app
* the information of the cache block, user and timestamp corresponding to the app
*/
public class ShuffleTaskInfo {

Expand All @@ -39,12 +40,14 @@ public class ShuffleTaskInfo {
* shuffleId -> blockIds
*/
private Map<Integer, Roaring64NavigableMap> cachedBlockIds;
private AtomicReference<String> user;

public ShuffleTaskInfo() {
this.currentTimes = System.currentTimeMillis();
this.commitCounts = Maps.newConcurrentMap();
this.commitLocks = Maps.newConcurrentMap();
this.cachedBlockIds = Maps.newConcurrentMap();
this.user = new AtomicReference<>();
}

public Long getCurrentTimes() {
Expand All @@ -66,4 +69,12 @@ public Map<Integer, Object> getCommitLocks() {
public Map<Integer, Roaring64NavigableMap> getCachedBlockIds() {
return cachedBlockIds;
}

public String getUser() {
return user.get();
}

public void setUser(String user) {
this.user.set(user);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ public class ShuffleTaskManager {
private Map<Long, PreAllocatedBufferInfo> requireBufferIds = Maps.newConcurrentMap();
private Runnable clearResourceThread;
private BlockingQueue<String> expiredAppIdQueue = Queues.newLinkedBlockingQueue();
// appId -> user
private Map<String, String> appUserMap = Maps.newConcurrentMap();
// appId -> shuffleId -> serverReadHandler

public ShuffleTaskManager(
Expand Down Expand Up @@ -130,7 +128,7 @@ public StatusCode registerShuffle(
RemoteStorageInfo remoteStorageInfo,
String user) {
refreshAppId(appId);
appUserMap.putIfAbsent(appId, user);
shuffleTaskInfos.get(appId).setUser(user);
partitionsToBlockIds.putIfAbsent(appId, Maps.newConcurrentMap());
for (PartitionRange partitionRange : partitionRanges) {
shuffleBufferManager.registerBuffer(appId, shuffleId, partitionRange.getStart(), partitionRange.getEnd());
Expand Down Expand Up @@ -384,9 +382,8 @@ public void removeResources(String appId) {
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
if (!shuffleToCachedBlockIds.isEmpty()) {
storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet(), appUserMap.get(appId));
storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet(), getUserByAppId(appId));
}
appUserMap.remove(appId);
shuffleTaskInfos.remove(appId);
LOG.info("Finish remove resource for appId[" + appId + "] cost " + (System.currentTimeMillis() - start) + " ms");
}
Expand Down Expand Up @@ -424,7 +421,7 @@ public int getRequireBufferSize(long requireId) {
}

public String getUserByAppId(String appId) {
return appUserMap.get(appId);
return shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo()).getUser();
}

@VisibleForTesting
Expand Down

0 comments on commit 5864420

Please sign in to comment.