Skip to content

Commit

Permalink
Create a ClientMetrics class to wrap the list of longs
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonwang committed May 26, 2015
1 parent d046d41 commit a489ec3
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 88 deletions.
14 changes: 7 additions & 7 deletions clients/unshaded/src/main/java/tachyon/client/FileOutStream.java
Expand Up @@ -95,7 +95,7 @@ public void close() throws IOException {
}
if (mCurrentBlockOutStream != null) {
mPreviousBlockOutStreams.add(mCurrentBlockOutStream);
mTachyonFS.incBlocksWrittenLocal(1);
mTachyonFS.getClientMetrics().incBlocksWrittenLocal(1);
}

Boolean canComplete = false;
Expand Down Expand Up @@ -157,7 +157,7 @@ private void getNextBlock() throws IOException {
throw new IOException("The current block still has space left, no need to get new block");
}
mPreviousBlockOutStreams.add(mCurrentBlockOutStream);
mTachyonFS.incBlocksWrittenLocal(1);
mTachyonFS.getClientMetrics().incBlocksWrittenLocal(1);
}

if (mWriteType.isCache()) {
Expand Down Expand Up @@ -197,14 +197,14 @@ public void write(byte[] b, int off, int len) throws IOException {
mCurrentBlockOutStream.write(b, tOff, tLen);
mCurrentBlockLeftByte -= tLen;
mCachedBytes += tLen;
mTachyonFS.incBytesWrittenLocal(tLen);
mTachyonFS.getClientMetrics().incBytesWrittenLocal(tLen);
tLen = 0;
} else {
mCurrentBlockOutStream.write(b, tOff, (int) mCurrentBlockLeftByte);
tOff += mCurrentBlockLeftByte;
tLen -= mCurrentBlockLeftByte;
mCachedBytes += mCurrentBlockLeftByte;
mTachyonFS.incBytesWrittenLocal(mCurrentBlockLeftByte);
mTachyonFS.getClientMetrics().incBytesWrittenLocal(mCurrentBlockLeftByte);
mCurrentBlockLeftByte = 0;
}
}
Expand All @@ -220,7 +220,7 @@ public void write(byte[] b, int off, int len) throws IOException {

if (mWriteType.isThrough()) {
mCheckpointOutputStream.write(b, off, len);
mTachyonFS.incBytesWrittenUfs(len);
mTachyonFS.getClientMetrics().incBytesWrittenUfs(len);
}
}

Expand All @@ -235,7 +235,7 @@ public void write(int b) throws IOException {
mCurrentBlockOutStream.write(b);
mCurrentBlockLeftByte --;
mCachedBytes ++;
mTachyonFS.incBytesWrittenLocal(1);
mTachyonFS.getClientMetrics().incBytesWrittenLocal(1);
} catch (IOException e) {
if (mWriteType.isMustCache()) {
LOG.error(e.getMessage(), e);
Expand All @@ -248,7 +248,7 @@ public void write(int b) throws IOException {

if (mWriteType.isThrough()) {
mCheckpointOutputStream.write(b);
mTachyonFS.incBytesWrittenUfs(1);
mTachyonFS.getClientMetrics().incBytesWrittenUfs(1);
}
}
}
Expand Up @@ -49,7 +49,7 @@ public void close() throws IOException {
if (!mClosed) {
mTachyonBuffer.close();
if (mBytesReadLocal > 0) {
mTachyonFS.incBlocksReadLocal(1);
mTachyonFS.getClientMetrics().incBlocksReadLocal(1);
}
}
mClosed = true;
Expand All @@ -62,7 +62,7 @@ public int read() throws IOException {
return -1;
}
mBytesReadLocal ++;
mTachyonFS.incBytesReadLocal(1);
mTachyonFS.getClientMetrics().incBytesReadLocal(1);
return mBuffer.get() & 0xFF;
}

Expand All @@ -88,7 +88,7 @@ public int read(byte[] b, int off, int len) throws IOException {
}
mBuffer.get(b, off, ret);
mBytesReadLocal += ret;
mTachyonFS.incBytesReadLocal(ret);
mTachyonFS.getClientMetrics().incBytesReadLocal(ret);
return ret;
}

Expand Down
Expand Up @@ -168,7 +168,7 @@ public void close() throws IOException {
mCheckpointInputStream.close();
}
if (mBytesReadRemote > 0) {
mTachyonFS.incBlocksReadRemote(1);
mTachyonFS.getClientMetrics().incBlocksReadRemote(1);
}
mClosed = true;
}
Expand Down Expand Up @@ -227,7 +227,7 @@ public int read(byte[] b, int off, int len) throws IOException {
mBlockPos += bytesToRead;
}
mBytesReadRemote += len - bytesLeft;
mTachyonFS.incBytesReadRemote(len - bytesLeft);
mTachyonFS.getClientMetrics().incBytesReadRemote(len - bytesLeft);

if (bytesLeft > 0) {
// Unable to read from worker memory, reading this block from underfs in the future.
Expand All @@ -253,7 +253,7 @@ public int read(byte[] b, int off, int len) throws IOException {
bytesLeft -= readBytes;
mBlockPos += readBytes;
mCheckpointPos += readBytes;
mTachyonFS.incBytesReadUfs(readBytes);
mTachyonFS.getClientMetrics().incBytesReadUfs(readBytes);
}
}
return len;
Expand Down
73 changes: 11 additions & 62 deletions clients/unshaded/src/main/java/tachyon/client/TachyonFS.java
Expand Up @@ -19,8 +19,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -51,6 +49,7 @@
import tachyon.util.CommonUtils;
import tachyon.util.NetworkUtils;
import tachyon.util.ThreadFactoryUtils;
import tachyon.worker.ClientMetrics;
import tachyon.worker.WorkerClient;

/**
Expand Down Expand Up @@ -181,14 +180,11 @@ public static synchronized TachyonFS get(TachyonConf tachyonConf) throws IOExcep
private final AtomicInteger mBlockLockId = new AtomicInteger(0);

private TachyonURI mRootUri = null;
private List<Long> mClientMetrics = new ArrayList<Long>(Collections.nCopies(
Constants.CLIENT_METRICS_SIZE, 0L));
private ClientMetrics mClientMetrics = new ClientMetrics();

private TachyonFS(TachyonConf tachyonConf) throws IOException {
super(tachyonConf);

mClientMetrics.set(Constants.CLIENT_METRICS_VERSION_INDEX, Constants.CLIENT_METRICS_VERSION);

String masterHost =
tachyonConf.get(Constants.MASTER_HOSTNAME, NetworkUtils.getLocalHostName(tachyonConf));
int masterPort = tachyonConf.getInt(Constants.MASTER_PORT, Constants.DEFAULT_MASTER_PORT);
Expand Down Expand Up @@ -464,6 +460,15 @@ public synchronized ClientDependencyInfo getClientDependencyInfo(int depId) thro
return mMasterClient.getClientDependencyInfo(depId);
}

/**
* Get the user's ClientMetrics.
*
* @return the ClientMetrics object.
*/
ClientMetrics getClientMetrics() {
return mClientMetrics;
}

/**
* Get <code>TachyonFile</code> based on the file id.
*
Expand Down Expand Up @@ -742,62 +747,6 @@ public synchronized boolean hasLocalWorker() throws IOException {
return mWorkerClient.isLocal();
}

void incBlocksReadLocal(long n) {
synchronized (mClientMetrics) {
mClientMetrics.set(Constants.BLOCKS_READ_LOCAL_INDEX,
mClientMetrics.get(Constants.BLOCKS_READ_LOCAL_INDEX) + n);
}
}

void incBlocksReadRemote(long n) {
synchronized (mClientMetrics) {
mClientMetrics.set(Constants.BLOCKS_READ_REMOTE_INDEX,
mClientMetrics.get(Constants.BLOCKS_READ_REMOTE_INDEX) + n);
}
}

void incBlocksWrittenLocal(long n) {
synchronized (mClientMetrics) {
mClientMetrics.set(Constants.BLOCKS_WRITTEN_LOCAL_INDEX,
mClientMetrics.get(Constants.BLOCKS_WRITTEN_LOCAL_INDEX) + n);
}
}

void incBytesReadLocal(long n) {
synchronized (mClientMetrics) {
mClientMetrics.set(Constants.BYTES_READ_LOCAL_INDEX,
mClientMetrics.get(Constants.BYTES_READ_LOCAL_INDEX) + n);
}
}

void incBytesReadRemote(long n) {
synchronized (mClientMetrics) {
mClientMetrics.set(Constants.BYTES_READ_REMOTE_INDEX,
mClientMetrics.get(Constants.BYTES_READ_REMOTE_INDEX) + n);
}
}

void incBytesReadUfs(long n) {
synchronized (mClientMetrics) {
mClientMetrics.set(Constants.BYTES_READ_UFS_INDEX,
mClientMetrics.get(Constants.BYTES_READ_UFS_INDEX) + n);
}
}

void incBytesWrittenLocal(long n) {
synchronized (mClientMetrics) {
mClientMetrics.set(Constants.BYTES_WRITTEN_LOCAL_INDEX,
mClientMetrics.get(Constants.BYTES_WRITTEN_LOCAL_INDEX) + n);
}
}

void incBytesWrittenUfs(long n) {
synchronized (mClientMetrics) {
mClientMetrics.set(Constants.BYTES_WRITTEN_UFS_INDEX,
mClientMetrics.get(Constants.BYTES_WRITTEN_UFS_INDEX) + n);
}
}

/**
* Check if this client is connected to master.
*
Expand Down
85 changes: 85 additions & 0 deletions common/src/main/java/tachyon/worker/ClientMetrics.java
@@ -0,0 +1,85 @@
/*
* Licensed to the University of California, Berkeley under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package tachyon.worker;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import tachyon.Constants;

/**
* ClientMetrics is used to pass client metrics from client to worker by user heartbeat.
*/
public class ClientMetrics {
private List<Long> mMetrics;

public ClientMetrics() {
mMetrics = createDefaultMetrics();
}

private List<Long> createDefaultMetrics() {
List<Long> defaultMetrics =
new ArrayList<Long>(Collections.nCopies(Constants.CLIENT_METRICS_SIZE, 0L));
defaultMetrics.set(Constants.CLIENT_METRICS_VERSION_INDEX, Constants.CLIENT_METRICS_VERSION);
return defaultMetrics;
}

public synchronized List<Long> getHeartbeatData() {
List<Long> ret = mMetrics;
mMetrics = createDefaultMetrics();
return ret;
}

public synchronized void incBlocksReadLocal(long n) {
mMetrics.set(Constants.BLOCKS_READ_LOCAL_INDEX,
mMetrics.get(Constants.BLOCKS_READ_LOCAL_INDEX) + n);
}

public synchronized void incBlocksReadRemote(long n) {
mMetrics.set(Constants.BLOCKS_READ_REMOTE_INDEX,
mMetrics.get(Constants.BLOCKS_READ_REMOTE_INDEX) + n);
}

public synchronized void incBlocksWrittenLocal(long n) {
mMetrics.set(Constants.BLOCKS_WRITTEN_LOCAL_INDEX,
mMetrics.get(Constants.BLOCKS_WRITTEN_LOCAL_INDEX) + n);
}

public synchronized void incBytesReadLocal(long n) {
mMetrics.set(Constants.BYTES_READ_LOCAL_INDEX,
mMetrics.get(Constants.BYTES_READ_LOCAL_INDEX) + n);
}

public synchronized void incBytesReadRemote(long n) {
mMetrics.set(Constants.BYTES_READ_REMOTE_INDEX,
mMetrics.get(Constants.BYTES_READ_REMOTE_INDEX) + n);
}

public synchronized void incBytesReadUfs(long n) {
mMetrics.set(Constants.BYTES_READ_UFS_INDEX, mMetrics.get(Constants.BYTES_READ_UFS_INDEX) + n);
}

public synchronized void incBytesWrittenLocal(long n) {
mMetrics.set(Constants.BYTES_WRITTEN_LOCAL_INDEX,
mMetrics.get(Constants.BYTES_WRITTEN_LOCAL_INDEX) + n);
}

public synchronized void incBytesWrittenUfs(long n) {
mMetrics.set(Constants.BYTES_WRITTEN_UFS_INDEX,
mMetrics.get(Constants.BYTES_WRITTEN_UFS_INDEX) + n);
}
}
14 changes: 4 additions & 10 deletions common/src/main/java/tachyon/worker/WorkerClient.java
Expand Up @@ -69,7 +69,7 @@ public class WorkerClient implements Closeable {
private HeartbeatExecutor mHeartbeatExecutor;

private final TachyonConf mTachyonConf;
private final List<Long> mClientMetrics;
private final ClientMetrics mClientMetrics;

/**
* Create a WorkerClient, with a given MasterClient.
Expand All @@ -81,7 +81,7 @@ public class WorkerClient implements Closeable {
* @throws IOException
*/
public WorkerClient(MasterClient masterClient, ExecutorService executorService,
TachyonConf conf, List<Long> clientMetrics) throws IOException {
TachyonConf conf, ClientMetrics clientMetrics) throws IOException {
mMasterClient = masterClient;
mExecutorService = executorService;
mTachyonConf = conf;
Expand Down Expand Up @@ -441,17 +441,11 @@ public synchronized boolean unlockBlock(long blockId) throws IOException {
* @param userId The id of the user
* @throws IOException
*/
public synchronized void userHeartbeat(long userId, List<Long> metrics) throws IOException {
public synchronized void userHeartbeat(long userId, ClientMetrics metrics) throws IOException {
mustConnect();

try {
synchronized (metrics) {
mClient.userHeartbeat(userId, metrics);
for (int i = Constants.CLIENT_METRICS_VERSION_INDEX + 1; i < Constants.CLIENT_METRICS_SIZE;
i++) {
metrics.set(i, 0L);
}
}
mClient.userHeartbeat(userId, metrics.getHeartbeatData());
} catch (TException e) {
mConnected = false;
throw new IOException(e);
Expand Down
Expand Up @@ -16,7 +16,6 @@
package tachyon.worker;

import java.io.IOException;
import java.util.List;

import com.google.common.base.Throwables;

Expand All @@ -29,10 +28,10 @@
class WorkerClientHeartbeatExecutor implements HeartbeatExecutor {
private final WorkerClient mWorkerClient;
private final long mUserId;
private final List<Long> mClientMetrics;
private final ClientMetrics mClientMetrics;

public WorkerClientHeartbeatExecutor(WorkerClient workerClient, long userId,
List<Long> clientMetrics) {
ClientMetrics clientMetrics) {
mWorkerClient = workerClient;
mUserId = userId;
mClientMetrics = clientMetrics;
Expand Down

0 comments on commit a489ec3

Please sign in to comment.