Skip to content

Commit

Permalink
improve webUI about Hierarchy
Browse files Browse the repository at this point in the history
  • Loading branch information
Yours-shupeng committed Feb 9, 2015
1 parent d014c21 commit bda3829
Show file tree
Hide file tree
Showing 9 changed files with 921 additions and 527 deletions.
22 changes: 11 additions & 11 deletions core/src/main/java/tachyon/master/MasterClient.java
Expand Up @@ -732,14 +732,14 @@ public synchronized boolean user_freepath(int fileId, String path, boolean recur
return false; return false;
} }


public synchronized void worker_cacheBlock(long workerId, long workerUsedBytes, public synchronized void worker_cacheBlock(long workerId, long usedBytesOfAlias,
long storageDirId, long blockId, long length) throws IOException, FileDoesNotExistException, long storageDirId, long blockId, long length) throws IOException, FileDoesNotExistException,
SuspectedFileSizeException, BlockInfoException { SuspectedFileSizeException, BlockInfoException {
while (!mIsShutdown) { while (!mIsShutdown) {
connect(); connect();


try { try {
mClient.worker_cacheBlock(workerId, workerUsedBytes, storageDirId, blockId, length); mClient.worker_cacheBlock(workerId, usedBytesOfAlias, storageDirId, blockId, length);
return; return;
} catch (FileDoesNotExistException e) { } catch (FileDoesNotExistException e) {
throw e; throw e;
Expand Down Expand Up @@ -781,14 +781,14 @@ public synchronized List<Integer> worker_getPriorityDependencyList() throws IOEx
return new ArrayList<Integer>(); return new ArrayList<Integer>();
} }


public synchronized Command worker_heartbeat(long workerId, long usedBytes, public synchronized Command worker_heartbeat(long workerId, List<Long> usedBytesByAlias,
List<Long> removedBlockIds, Map<Long, List<Long>> addedBlockIds) List<Long> removedBlockIds, Map<Long, List<Long>> addedBlockIds)
throws IOException { throws IOException {
while (!mIsShutdown) { while (!mIsShutdown) {
connect(); connect();


try { try {
return mClient.worker_heartbeat(workerId, usedBytes, removedBlockIds, addedBlockIds); return mClient.worker_heartbeat(workerId, usedBytesByAlias, removedBlockIds, addedBlockIds);
} catch (BlockInfoException e) { } catch (BlockInfoException e) {
throw new IOException(e); throw new IOException(e);
} catch (TException e) { } catch (TException e) {
Expand All @@ -803,22 +803,22 @@ public synchronized Command worker_heartbeat(long workerId, long usedBytes,
* Register the worker to the master. * Register the worker to the master.
* *
* @param workerNetAddress Worker's NetAddress * @param workerNetAddress Worker's NetAddress
* @param totalBytes Worker's capacity * @param totalBytesByAlias capacity of different storage alias in the work in bytes
* @param usedBytes Worker's used storage * @param usedBytesByAlias the number of bytes of different storage alias in the work
* @param currentBlockList Blocks in worker's space. * @param currentBlockList Blocks in worker's space.
* @return the worker id assigned by the master. * @return the worker id assigned by the master.
* @throws BlockInfoException * @throws BlockInfoException
* @throws TException * @throws TException
*/ */
public synchronized long worker_register(NetAddress workerNetAddress, long totalBytes, public synchronized long worker_register(NetAddress workerNetAddress,
long usedBytes, Map<Long, List<Long>> currentBlockList) List<Long> totalBytesByAlias,List<Long> usedBytesByAlias,
throws BlockInfoException, IOException { Map<Long, List<Long>> currentBlockList) throws BlockInfoException, IOException {
while (!mIsShutdown) { while (!mIsShutdown) {
connect(); connect();


try { try {
long ret = long ret = mClient.worker_register(workerNetAddress, totalBytesByAlias, usedBytesByAlias,
mClient.worker_register(workerNetAddress, totalBytes, usedBytes, currentBlockList); currentBlockList);
LOG.info("Registered at the master " + mMasterAddress + " from worker " + workerNetAddress LOG.info("Registered at the master " + mMasterAddress + " from worker " + workerNetAddress
+ " , got WorkerId " + ret); + " , got WorkerId " + ret);
return ret; return ret;
Expand Down
108 changes: 76 additions & 32 deletions core/src/main/java/tachyon/master/MasterInfo.java
Expand Up @@ -52,6 +52,8 @@
import tachyon.HeartbeatThread; import tachyon.HeartbeatThread;
import tachyon.Pair; import tachyon.Pair;
import tachyon.PrefixList; import tachyon.PrefixList;
import tachyon.StorageDirId;
import tachyon.StorageLevelAlias;
import tachyon.TachyonURI; import tachyon.TachyonURI;
import tachyon.UnderFileSystem; import tachyon.UnderFileSystem;
import tachyon.UnderFileSystem.SpaceType; import tachyon.UnderFileSystem.SpaceType;
Expand Down Expand Up @@ -897,7 +899,7 @@ private void addToInodeMap(Inode inode, Map<Integer, Inode> map) {
* A worker cache a block in its memory. * A worker cache a block in its memory.
* *
* @param workerId * @param workerId
* @param workerUsedBytes * @param usedBytesOfAlias
* @param blockId * @param blockId
* @param length * @param length
* @return the dependency id of the file if it has not been checkpointed. -1 means the file either * @return the dependency id of the file if it has not been checkpointed. -1 means the file either
Expand All @@ -906,15 +908,16 @@ private void addToInodeMap(Inode inode, Map<Integer, Inode> map) {
* @throws SuspectedFileSizeException * @throws SuspectedFileSizeException
* @throws BlockInfoException * @throws BlockInfoException
*/ */
public int cacheBlock(long workerId, long workerUsedBytes, long storageDirId, long blockId, public int cacheBlock(long workerId, long usedBytesOfAlias, long storageDirId, long blockId,
long length) long length)
throws FileDoesNotExistException, SuspectedFileSizeException, BlockInfoException { throws FileDoesNotExistException, SuspectedFileSizeException, BlockInfoException {
LOG.debug("Cache block: {}", LOG.debug("Cache block: {}",
CommonUtils.parametersToString(workerId, workerUsedBytes, blockId, length)); CommonUtils.parametersToString(workerId, usedBytesOfAlias, blockId, length));

MasterWorkerInfo tWorkerInfo = getWorkerInfo(workerId); MasterWorkerInfo tWorkerInfo = getWorkerInfo(workerId);
int storageLevelAliasValue = StorageDirId.getStorageLevelAliasValue(storageDirId);
tWorkerInfo.updateBlock(true, blockId); tWorkerInfo.updateBlock(true, blockId);
tWorkerInfo.updateUsedBytes(workerUsedBytes); tWorkerInfo.updateUsedBytes(storageLevelAliasValue, usedBytesOfAlias);
tWorkerInfo.updateLastUpdatedTimeMs(); tWorkerInfo.updateLastUpdatedTimeMs();


int fileId = BlockInfo.computeInodeId(blockId); int fileId = BlockInfo.computeInodeId(blockId);
Expand Down Expand Up @@ -1376,6 +1379,42 @@ public List<ClientFileInfo> getFilesInfo(TachyonURI path) throws FileDoesNotExis
} }
return ret; return ret;
} }

/**
* @return the capacities of different storage level alias in all workers in bytes.
*/
public List<Long> getHierarchyTotalBytesByAlias() {
List<Long> ret = new ArrayList<Long>();
synchronized (mWorkers) {
for (int i = 0; i < StorageLevelAlias.values().length; i ++) {
ret.add((long) 0);
}
for (MasterWorkerInfo worker : mWorkers.values()) {
for (int i = 0; i < worker.getTotalBytesByAlias().size(); i ++) {
ret.set(i, ret.get(i) + worker.getTotalBytesByAlias().get(i));
}
}
}
return ret;
}

/**
* @return the number of bytes used of different storage level alias in all workers.
*/
public List<Long> getHierarchyUsedBytesByAlias() {
List<Long> ret = new ArrayList<Long>();
synchronized (mWorkers) {
for (int i = 0; i < StorageLevelAlias.values().length; i ++) {
ret.add((long) 0);
}
for (MasterWorkerInfo worker : mWorkers.values()) {
for (int i = 0; i < worker.getUsedBytesByAlias().size(); i ++) {
ret.set(i, ret.get(i) + worker.getUsedBytesByAlias().get(i));
}
}
}
return ret;
}


/** /**
* Get absolute paths of all in memory files. * Get absolute paths of all in memory files.
Expand Down Expand Up @@ -1987,15 +2026,17 @@ private void recomputePinnedFiles(Inode inode, Optional<Boolean> setPinState) {
* blocks. * blocks.
* *
* @param workerNetAddress The address of the worker to register * @param workerNetAddress The address of the worker to register
* @param totalBytes The capacity of the worker in bytes * @param totalBytesByAlias The capacity of different storage alias in the worker in bytes
* @param usedBytes The number of bytes already used in the worker * @param usedBytesByAlias The number of bytes used of different storage alias in the worker
* @param currentBlockIds Mapping from id of the StorageDir to id list of the blocks * @param currentBlockIds Mapping from id of the StorageDir to id list of the blocks
* @return the new id of the registered worker * @return the new id of the registered worker
* @throws BlockInfoException * @throws BlockInfoException
*/ */
public long registerWorker(NetAddress workerNetAddress, long totalBytes, long usedBytes, public long registerWorker(NetAddress workerNetAddress, List<Long> totalBytesByAlias,
Map<Long, List<Long>> currentBlockIds) throws BlockInfoException { List<Long> usedBytesByAlias, Map<Long, List<Long>> currentBlockIds)
throws BlockInfoException {
long id = 0; long id = 0;
long capacityBytes = 0;
NetAddress workerAddress = new NetAddress(workerNetAddress); NetAddress workerAddress = new NetAddress(workerNetAddress);
LOG.info("registerWorker(): WorkerNetAddress: " + workerAddress); LOG.info("registerWorker(): WorkerNetAddress: " + workerAddress);


Expand All @@ -2012,8 +2053,12 @@ public long registerWorker(NetAddress workerNetAddress, long totalBytes, long us
LOG.warn("The worker with id " + id + " has been removed."); LOG.warn("The worker with id " + id + " has been removed.");
} }
id = mStartTimeNSPrefix + mWorkerCounter.incrementAndGet(); id = mStartTimeNSPrefix + mWorkerCounter.incrementAndGet();
MasterWorkerInfo tWorkerInfo = new MasterWorkerInfo(id, workerAddress, totalBytes); for (long b : totalBytesByAlias) {
tWorkerInfo.updateUsedBytes(usedBytes); capacityBytes += b;
}
MasterWorkerInfo tWorkerInfo =
new MasterWorkerInfo(id, workerAddress, totalBytesByAlias, capacityBytes);
tWorkerInfo.updateUsedBytes(usedBytesByAlias);
for (List<Long> blockIds : currentBlockIds.values()) { for (List<Long> blockIds : currentBlockIds.values()) {
tWorkerInfo.updateBlocks(true, blockIds); tWorkerInfo.updateBlocks(true, blockIds);
} }
Expand Down Expand Up @@ -2142,14 +2187,14 @@ public void setPinned(int fileId, boolean pinned) throws FileDoesNotExistExcepti
} }
} }


/** /**
* Free the file/folder based on the files' ID * Free the file/folder based on the files' ID
* *
* @param fileId the file/folder to be freed. * @param fileId the file/folder to be freed.
* @param recursive whether free the folder recursively or not * @param recursive whether free the folder recursively or not
* @return succeed or not * @return succeed or not
* @throws TachyonException * @throws TachyonException
*/ */
boolean freepath(int fileId, boolean recursive) throws TachyonException { boolean freepath(int fileId, boolean recursive) throws TachyonException {
LOG.info("free(" + fileId + ")"); LOG.info("free(" + fileId + ")");
synchronized (mRootLock) { synchronized (mRootLock) {
Expand Down Expand Up @@ -2197,14 +2242,14 @@ boolean freepath(int fileId, boolean recursive) throws TachyonException {
return true; return true;
} }


/** /**
* Frees files based on the path * Frees files based on the path
* *
* @param path The file to be freed. * @param path The file to be freed.
* @param recursive whether delete the file recursively or not. * @param recursive whether delete the file recursively or not.
* @return succeed or not * @return succeed or not
* @throws TachyonException * @throws TachyonException
*/ */
public boolean freepath(TachyonURI path, boolean recursive) throws TachyonException { public boolean freepath(TachyonURI path, boolean recursive) throws TachyonException {
LOG.info("free(" + path + ")"); LOG.info("free(" + path + ")");
synchronized (mRootLock) { synchronized (mRootLock) {
Expand Down Expand Up @@ -2324,15 +2369,14 @@ public void updateRawTableMetadata(int tableId, ByteBuffer metadata)
* block id's. * block id's.
* *
* @param workerId The id of the worker to deal with * @param workerId The id of the worker to deal with
* @param usedBytes The number of bytes used in the worker * @param usedBytesByAlias The number of bytes used of different storage alias in the worker
* @param removedBlockIds The list of removed block ids * @param removedBlockIds The list of removed block ids
* @param addedBlockIds Mapping from id of the StorageDir and id list of blocks evicted in * @param addedBlockIds Mapping from id of the StorageDir and id list of blocks evicted in
* @return a command specifying an action to take * @return a command specifying an action to take
* @throws BlockInfoException * @throws BlockInfoException
*/ */
public Command workerHeartbeat(long workerId, long usedBytes, List<Long> removedBlockIds, public Command workerHeartbeat(long workerId, List<Long> usedBytesByAlias,
Map<Long, List<Long>> addedBlockIds) List<Long> removedBlockIds, Map<Long, List<Long>> addedBlockIds) throws BlockInfoException {
throws BlockInfoException {
LOG.debug("WorkerId: {}", workerId); LOG.debug("WorkerId: {}", workerId);
synchronized (mRootLock) { synchronized (mRootLock) {
synchronized (mWorkers) { synchronized (mWorkers) {
Expand All @@ -2344,7 +2388,7 @@ public Command workerHeartbeat(long workerId, long usedBytes, List<Long> removed
return new Command(CommandType.Register, new ArrayList<Long>()); return new Command(CommandType.Register, new ArrayList<Long>());
} }


tWorkerInfo.updateUsedBytes(usedBytes); tWorkerInfo.updateUsedBytes(usedBytesByAlias);
tWorkerInfo.updateBlocks(false, removedBlockIds); tWorkerInfo.updateBlocks(false, removedBlockIds);
tWorkerInfo.updateToRemovedBlocks(false, removedBlockIds); tWorkerInfo.updateToRemovedBlocks(false, removedBlockIds);
tWorkerInfo.updateLastUpdatedTimeMs(); tWorkerInfo.updateLastUpdatedTimeMs();
Expand Down
16 changes: 9 additions & 7 deletions core/src/main/java/tachyon/master/MasterServiceHandler.java
Expand Up @@ -296,10 +296,10 @@ public void user_updateRawTableMetadata(int tableId, ByteBuffer metadata)
} }


@Override @Override
public void worker_cacheBlock(long workerId, long workerUsedBytes, long storageDirId, public void worker_cacheBlock(long workerId, long usedBytesOfAlias, long storageDirId,
long blockId, long length) throws FileDoesNotExistException, SuspectedFileSizeException, long blockId, long length) throws FileDoesNotExistException, SuspectedFileSizeException,
BlockInfoException, TException { BlockInfoException, TException {
mMasterInfo.cacheBlock(workerId, workerUsedBytes, storageDirId, blockId, length); mMasterInfo.cacheBlock(workerId, usedBytesOfAlias, storageDirId, blockId, length);
} }


@Override @Override
Expand All @@ -314,15 +314,17 @@ public List<Integer> worker_getPriorityDependencyList() throws TException {
} }


@Override @Override
public Command worker_heartbeat(long workerId, long usedBytes, public Command worker_heartbeat(long workerId, List<Long> usedBytesByAlias,
List<Long> removedBlockIds, Map<Long, List<Long>> addedBlockIds) List<Long> removedBlockIds, Map<Long, List<Long>> addedBlockIds)
throws BlockInfoException, TException { throws BlockInfoException, TException {
return mMasterInfo.workerHeartbeat(workerId, usedBytes, removedBlockIds, addedBlockIds); return mMasterInfo.workerHeartbeat(workerId, usedBytesByAlias, removedBlockIds, addedBlockIds);
} }


@Override @Override
public long worker_register(NetAddress workerNetAddress, long totalBytes, long usedBytes, public long worker_register(NetAddress workerNetAddress, List<Long> totalBytesByAlias,
Map<Long, List<Long>> currentBlockIds) throws BlockInfoException, TException { List<Long> usedBytesByAlias, Map<Long, List<Long>> currentBlockIds)
return mMasterInfo.registerWorker(workerNetAddress, totalBytes, usedBytes, currentBlockIds); throws BlockInfoException, TException {
return mMasterInfo.registerWorker(workerNetAddress, totalBytesByAlias, usedBytesByAlias,
currentBlockIds);
} }
} }
54 changes: 50 additions & 4 deletions core/src/main/java/tachyon/master/MasterWorkerInfo.java
Expand Up @@ -46,10 +46,16 @@ public class MasterWorkerInfo {
private Set<Long> mBlocks; private Set<Long> mBlocks;
/** IDs of blocks the worker should remove **/ /** IDs of blocks the worker should remove **/
private Set<Long> mToRemoveBlocks; private Set<Long> mToRemoveBlocks;
/** Total bytes of different storage level alias **/
private List<Long> mTotalBytesByAlias;
/** Used bytes of different storage level alias **/
private List<Long> mUsedBytesByAlias;


public MasterWorkerInfo(long id, NetAddress address, long capacityBytes) { public MasterWorkerInfo(long id, NetAddress address, List<Long> totalBytesByAlias,
long capacityBytes) {
mId = id; mId = id;
mWorkerAddress = address; mWorkerAddress = address;
mTotalBytesByAlias = totalBytesByAlias;
mCapacityBytes = capacityBytes; mCapacityBytes = capacityBytes;
mStartTimeMs = System.currentTimeMillis(); mStartTimeMs = System.currentTimeMillis();


Expand Down Expand Up @@ -131,6 +137,31 @@ public synchronized long getUsedBytes() {
return mUsedBytes; return mUsedBytes;
} }


/**
* @return the total bytes of each storage alias in the work
*/
public synchronized List<Long> getTotalBytesByAlias() {
return mTotalBytesByAlias;
}

/**
* @return the used space of each storage alias in bytes
*/
public synchronized List<Long> getUsedBytesByAlias() {
return mUsedBytesByAlias;
}

/**
* @return the free space of each storage alias in bytes
*/
public synchronized List<Long> getFreeBytesByAlias() {
List<Long> freeCapacityBytes = new ArrayList<Long>();
for (int i = 0; i < mTotalBytesByAlias.size(); i ++) {
freeCapacityBytes.add(mTotalBytesByAlias.get(i) - mUsedBytesByAlias.get(i));
}
return freeCapacityBytes;
}

@Override @Override
public synchronized String toString() { public synchronized String toString() {
StringBuilder sb = new StringBuilder("MasterWorkerInfo("); StringBuilder sb = new StringBuilder("MasterWorkerInfo(");
Expand Down Expand Up @@ -214,9 +245,24 @@ public synchronized void updateToRemovedBlocks(boolean add, Collection<Long> blo
/** /**
* Set the used space of the worker in bytes. * Set the used space of the worker in bytes.
* *
* @param usedBytes the used space in bytes * @param usedBytesByAlias the used space of each storage alias in bytes
*/
public synchronized void updateUsedBytes(List<Long> usedBytesByAlias) {
mUsedBytes = 0;
mUsedBytesByAlias = usedBytesByAlias;
for (long t : mUsedBytesByAlias) {
mUsedBytes += t;
}
}

/**
* Set the used space of the worker in bytes.
*
* @param aliasValue storage level alias value on the worker.
* @param usedBytes the used space of the storage alias in bytes.
*/ */
public synchronized void updateUsedBytes(long usedBytes) { public synchronized void updateUsedBytes(int aliasValue, long usedBytesOfAlias) {
mUsedBytes = usedBytes; mUsedBytes += usedBytesOfAlias - mUsedBytesByAlias.get(aliasValue - 1);
mUsedBytesByAlias.set(aliasValue - 1, usedBytesOfAlias);
} }
} }

0 comments on commit bda3829

Please sign in to comment.