Skip to content

Commit

Permalink
Save one copy of BlockLocation
Browse files Browse the repository at this point in the history
In `DefaultBlockMaster` we create extra copies of `BlockLocation` while
we can just use the one given.

pr-link: #13461
change-id: cid-2b46998b82a66be582be2ed0a3cdef06348f8a18
  • Loading branch information
jiacheliu3 committed May 25, 2021
1 parent 412524c commit 23a31c0
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 19 deletions.
Expand Up @@ -76,7 +76,7 @@ public void blockHeartbeat(BlockHeartbeatPRequest request,
final Map<String, StorageList> lostStorageMap = request.getLostStorageMap();

final Map<Block.BlockLocation, List<Long>> addedBlocksMap =
reconstructBlocksOnLocationMap(request.getAddedBlocksList());
reconstructBlocksOnLocationMap(request.getAddedBlocksList(), workerId);

final List<Metric> metrics = request.getOptions().getMetricsList()
.stream().map(Metric::fromProto).collect(Collectors.toList());
Expand Down Expand Up @@ -143,7 +143,7 @@ public void registerWorker(RegisterWorkerPRequest request,
final Map<String, StorageList> lostStorageMap = request.getLostStorageMap();

final Map<Block.BlockLocation, List<Long>> currBlocksOnLocationMap =
reconstructBlocksOnLocationMap(request.getCurrentBlocksList());
reconstructBlocksOnLocationMap(request.getCurrentBlocksList(), workerId);

RegisterWorkerPOptions options = request.getOptions();
RpcUtils.call(LOG,
Expand All @@ -162,11 +162,11 @@ public void registerWorker(RegisterWorkerPRequest request,
* tier alias and medium type.
* */
private Map<Block.BlockLocation, List<Long>> reconstructBlocksOnLocationMap(
List<LocationBlockIdListEntry> entries) {
List<LocationBlockIdListEntry> entries, long workerId) {
return entries.stream().collect(
Collectors.toMap(
e -> Block.BlockLocation.newBuilder().setTier(e.getKey().getTierAlias())
.setMediumType(e.getKey().getMediumType()).build(),
.setMediumType(e.getKey().getMediumType()).setWorkerId(workerId).build(),
e -> e.getValue().getBlockIdList(),
/**
* The merger function is invoked on key collisions to merge the values.
Expand Down
Expand Up @@ -999,12 +999,12 @@ private void processWorkerAddedBlocks(MasterWorkerInfo workerInfo,
Optional<BlockMeta> block = mBlockStore.getBlock(blockId);
if (block.isPresent()) {
workerInfo.addBlock(blockId);
BlockLocation blockLocation = BlockLocation.newBuilder()
.setWorkerId(workerInfo.getId())
.setTier(entry.getKey().getTier())
.setMediumType(entry.getKey().getMediumType())
.build();
mBlockStore.addLocation(blockId, blockLocation);
BlockLocation location = entry.getKey();
Preconditions.checkState(location.getWorkerId() == workerInfo.getId(),
String.format("BlockLocation has a different workerId %s from "
+ "the request sender's workerId %s!",
location.getWorkerId(), workerInfo.getId()));
mBlockStore.addLocation(blockId, location);
mLostBlocks.remove(blockId);
} else {
LOG.warn("Invalid block: {} from worker {}.", blockId,
Expand Down
Expand Up @@ -76,8 +76,6 @@ public class BlockMasterTest {
private static final Map<Block.BlockLocation, List<Long>> NO_BLOCKS_ON_LOCATION
= ImmutableMap.of();
private static final Map<String, StorageList> NO_LOST_STORAGE = ImmutableMap.of();
private static final Block.BlockLocation BLOCK_LOCATION = Block.BlockLocation.newBuilder()
.setTier(Constants.MEDIUM_MEM).setMediumType(Constants.MEDIUM_MEM).build();

private BlockMaster mBlockMaster;
private MasterRegistry mRegistry;
Expand Down Expand Up @@ -240,16 +238,20 @@ public void removeBlockTellsWorkersToRemoveTheBlock() throws Exception {
@Test
public void registerCleansUpOrphanedBlocks() throws Exception {
// Create a worker with unknown blocks.
long worker = mBlockMaster.getWorkerId(NET_ADDRESS_1);
long workerId = mBlockMaster.getWorkerId(NET_ADDRESS_1);
List<Long> orphanedBlocks = Arrays.asList(1L, 2L);
Map<String, Long> memUsage = ImmutableMap.of(Constants.MEDIUM_MEM, 10L);
mBlockMaster.workerRegister(worker, Arrays.asList(Constants.MEDIUM_MEM),

Block.BlockLocation blockLoc = Block.BlockLocation.newBuilder()
.setWorkerId(workerId).setTier(Constants.MEDIUM_MEM)
.setMediumType(Constants.MEDIUM_MEM).build();
mBlockMaster.workerRegister(workerId, Arrays.asList(Constants.MEDIUM_MEM),
ImmutableMap.of(Constants.MEDIUM_MEM, 100L),
memUsage, ImmutableMap.of(BLOCK_LOCATION, orphanedBlocks), NO_LOST_STORAGE,
memUsage, ImmutableMap.of(blockLoc, orphanedBlocks), NO_LOST_STORAGE,
RegisterWorkerPOptions.getDefaultInstance());

// Check that the worker heartbeat tells the worker to remove the blocks.
alluxio.grpc.Command heartBeat = mBlockMaster.workerHeartbeat(worker, null,
alluxio.grpc.Command heartBeat = mBlockMaster.workerHeartbeat(workerId, null,
memUsage, NO_BLOCKS, NO_BLOCKS_ON_LOCATION, NO_LOST_STORAGE, mMetrics);
assertEquals(orphanedBlocks, heartBeat.getDataList());
}
Expand Down Expand Up @@ -313,9 +315,12 @@ public void workerHeartbeatUpdatesAddedBlocks() throws Exception {

// Send a heartbeat from worker2 saying that it's added blockId.
List<Long> addedBlocks = ImmutableList.of(blockId);
Block.BlockLocation blockOnWorker2 = Block.BlockLocation.newBuilder()
.setWorkerId(worker2).setTier(Constants.MEDIUM_MEM)
.setMediumType(Constants.MEDIUM_MEM).build();
mBlockMaster.workerHeartbeat(worker2, null,
ImmutableMap.of(Constants.MEDIUM_MEM, 0L), NO_BLOCKS,
ImmutableMap.of(BLOCK_LOCATION, addedBlocks),
ImmutableMap.of(blockOnWorker2, addedBlocks),
NO_LOST_STORAGE, mMetrics);

// The block now has two locations.
Expand Down
Expand Up @@ -268,8 +268,8 @@ private long createWorkerHelper(int workerIndex) throws Exception {
private void heartbeatToAddLocationHelper(long blockId, long workerId) throws Exception {
List<Long> addedBlocks = ImmutableList.of(blockId);
Block.BlockLocation blockLocation =
Block.BlockLocation.newBuilder().setTier(Constants.MEDIUM_MEM)
.setMediumType(Constants.MEDIUM_MEM).build();
Block.BlockLocation.newBuilder().setWorkerId(workerId)
.setTier(Constants.MEDIUM_MEM).setMediumType(Constants.MEDIUM_MEM).build();

mBlockMaster.workerHeartbeat(workerId, null,
ImmutableMap.of(Constants.MEDIUM_MEM, 0L), NO_BLOCKS,
Expand Down

0 comments on commit 23a31c0

Please sign in to comment.