Skip to content

Commit

Permalink
added additional statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Apr 17, 2008
1 parent dedabfe commit 4169cab
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 39 deletions.
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.Constants;
Expand Down Expand Up @@ -59,7 +60,6 @@
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.core.NestedRuntimeException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
Expand All @@ -79,7 +79,7 @@ public class DataLoaderService extends AbstractService implements IDataLoaderSer
protected BeanFactory beanFactory;

protected List<IDataLoaderFilter> filters;

protected IStatisticManager statisticManager;

protected Map<String, IColumnFilter> columnFilters = new HashMap<String, IColumnFilter>();
Expand Down Expand Up @@ -127,7 +127,7 @@ private void sendAck(Node remote, Node local, List<IncomingBatchHistory> list) t
}
if (!sendAck) {
if (i < numberOfStatusSendRetries - 1) {
sleep();
sleepBetweenFailedAcks();
} else if (error instanceof RuntimeException) {
throw (RuntimeException) error;
} else if (error instanceof IOException) {
Expand All @@ -137,7 +137,7 @@ private void sendAck(Node remote, Node local, List<IncomingBatchHistory> list) t
}
}

private final void sleep() {
private final void sleepBetweenFailedAcks() {
try {
Thread.sleep(timeBetweenStatusSendRetriesMs);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -168,36 +168,35 @@ protected List<IncomingBatchHistory> loadDataAndReturnBatches(IIncomingTransport
throw ex;
} catch (ConnectException ex) {
logger.warn(ErrorConstants.COULD_NOT_CONNECT_TO_TRANSPORT);
statisticManager.getStatistic(StatisticName.INCOMING_TRANSPORT_CONNECT_ERROR_COUNT).increment();
} catch (UnknownHostException ex) {
logger.warn(ErrorConstants.COULD_NOT_CONNECT_TO_TRANSPORT + " Unknown host name of "
+ ex.getMessage());
statisticManager.getStatistic(StatisticName.INCOMING_TRANSPORT_CONNECT_ERROR_COUNT).increment();
} catch (RegistrationNotOpenException ex) {
logger.warn(ErrorConstants.REGISTRATION_NOT_OPEN);
} catch (ConnectionRejectedException ex) {
logger.warn(ErrorConstants.TRANSPORT_REJECTED_CONNECTION);
statisticManager.getStatistic(StatisticName.INCOMING_TRANSPORT_REJECTED_COUNT).increment();
} catch (AuthenticationException ex) {
logger.warn(ErrorConstants.NOT_AUTHENTICATED);
} catch (Exception e) {
if (status != null) {
if (e instanceof IOException || e instanceof TransportException) {
logger.warn("Failed to load batch " + status.getNodeBatchId() + " because: "
+ e.getMessage());
statisticManager.getStatistic(StatisticName.INCOMING_TRANSPORT_ERROR_COUNT).increment();
} else {
logger.error("Failed to load batch " + status.getNodeBatchId(), e);
}
if (e instanceof NestedRuntimeException) {
SQLException se = null;
NestedRuntimeException ne = (NestedRuntimeException) e;
if (ne.getCause() instanceof SQLException) {
se = (SQLException)ne.getCause();
} else if (ne.getRootCause() instanceof SQLException) {
se = (SQLException) ne.getRootCause();
}

SQLException se = unwrapSqlException(e);
if (se != null) {
statisticManager.getStatistic(StatisticName.INCOMING_DATABASE_ERROR_COUNT)
.increment();
history.setSqlState(se.getSQLState());
history.setSqlCode(se.getErrorCode());
history.setSqlMessage(se.getMessage());
} else {
statisticManager.getStatistic(StatisticName.INCOMING_OTHER_ERROR_COUNT).increment();
}
}
history.setValues(dataLoader.getStatistics(), false);
Expand All @@ -209,20 +208,35 @@ protected List<IncomingBatchHistory> loadDataAndReturnBatches(IIncomingTransport
logger.error("Failed while parsing batch.", e);
}
}
} finally {
} finally {
dataLoader.close();
recordStatistics(list);
}
return list;
}


@SuppressWarnings("unchecked")
private SQLException unwrapSqlException(Exception e) {
List<Throwable> exs = ExceptionUtils.getThrowableList(e);
for (Throwable throwable : exs) {
if (throwable instanceof SQLException) {
return (SQLException) throwable;
}
}
return null;
}

private void recordStatistics(List<IncomingBatchHistory> list) {
if (list != null) {
statisticManager.getStatistic(StatisticName.INCOMING_BATCH_COUNT).add(list.size());
for (IncomingBatchHistory incomingBatchHistory : list) {
statisticManager.getStatistic(StatisticName.INCOMING_ROW_COUNT).add(incomingBatchHistory.getStatementCount());
statisticManager.getStatistic(StatisticName.INCOMING_MS_PER_ROW).add(incomingBatchHistory.getDatabaseMillis(), incomingBatchHistory.getStatementCount());
}
statisticManager.getStatistic(StatisticName.INCOMING_MS_PER_ROW).add(
incomingBatchHistory.getDatabaseMillis(), incomingBatchHistory.getStatementCount());
statisticManager.getStatistic(StatisticName.INCOMING_BATCH_COUNT).increment();
if (IncomingBatchHistory.Status.SK.equals(incomingBatchHistory.getStatus())) {
statisticManager.getStatistic(StatisticName.INCOMING_SKIP_BATCH_COUNT).increment();
}
}
}
}

Expand Down
Expand Up @@ -35,9 +35,14 @@ public void setStatisticManager(IStatisticManager statisticManager) {
this.statisticManager = statisticManager;
}

@ManagedAttribute(description = "Get the number of milliseconds the system is currently taking to commit a data row coming in from another node")
public BigDecimal getDatabaseMsPerRow() {
@ManagedAttribute(description = "Get the number of milliseconds the system is currently taking to commit a data row coming in from another node since the last time statistics were flushed")
public BigDecimal getDatabaseMsPerRowSinceLastFlush() {
return this.statisticManager.getStatistic(StatisticName.INCOMING_MS_PER_ROW).getAverageValue();
}


@ManagedAttribute(description = "Get the number of milliseconds the system is currently taking to commit a data row coming in from another node for the lifetime of the server")
public BigDecimal getDatabaseMsPerRowForLifetime() {
return this.statisticManager.getStatistic(StatisticName.INCOMING_MS_PER_ROW).getLifetimeAverageValue();
}

}
Expand Up @@ -21,6 +21,8 @@

import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class Statistic {

Expand All @@ -31,19 +33,33 @@ public class Statistic {
private Date captureStartTimeMs;

private BigDecimal total;

private long count;

private static Map<StatisticName, BigDecimal> lifeTimeTotal = new HashMap<StatisticName, BigDecimal>(
StatisticName.values().length);

private static Map<StatisticName, Long> lifeTimeCount = new HashMap<StatisticName, Long>(StatisticName
.values().length);

static {
StatisticName[] allStatistics = StatisticName.values();
for (StatisticName statisticName : allStatistics) {
lifeTimeCount.put(statisticName, new Long(0));
lifeTimeTotal.put(statisticName, BigDecimal.ZERO);
}
}

public Statistic(StatisticName name, String nodeId) {
this(name, nodeId, new Date());
}

public Statistic(StatisticName name, String nodeId, Date startTime) {
this.name = name;
this.nodeId = nodeId;
this.captureStartTimeMs = startTime;
this.total = BigDecimal.ZERO;
}
}

public StatisticName getName() {
return name;
Expand All @@ -56,40 +72,72 @@ public String getNodeId() {
public BigDecimal getTotal() {
return total;
}


public BigDecimal getLifetimeTotal() {
return lifeTimeTotal.get(name);
}

public long getLifetimeCount() {
return lifeTimeCount.get(name);
}

public BigDecimal getLifetimeAverageValue() {
return getAverageValue(getLifetimeTotal(), getLifetimeCount());
}

public BigDecimal getAverageValue() {
return getAverageValue(this.total, this.count);
}

private BigDecimal getAverageValue(BigDecimal total, long count) {
if (total != null && count > 0) {
return total.divide(new BigDecimal(count));
} else {
return BigDecimal.ZERO;
}
}

public void increment() {
add(1,1);
}

public void add(int v) {
add(v,1);
add(v, 1);
}

public void add(BigDecimal v) {
add(v,1);
add(v, 1);
}

public void add(long v) {
add(v,1);
add(v, 1);
}

public void add(long v, long count) {
this.total = this.total.add(new BigDecimal(v));
this.count += this.count;
synchronized (name) {
this.total = this.total.add(new BigDecimal(v));
this.count += this.count;
lifeTimeCount.put(name, new Long(lifeTimeCount.get(name) + count));
lifeTimeTotal.put(name, lifeTimeTotal.get(name).add(new BigDecimal(v)));
}
}

public void add(int v, long count) {
this.total = this.total.add(new BigDecimal(v));
this.count += this.count;
synchronized (name) {
this.total = this.total.add(new BigDecimal(v));
this.count += this.count;
lifeTimeCount.put(name, new Long(lifeTimeCount.get(name) + count));
lifeTimeTotal.put(name, lifeTimeTotal.get(name).add(new BigDecimal(v)));
}
}

public void add(BigDecimal v, long count) {
this.total = this.total.add(v);
this.count += this.count;
synchronized (name) {
this.total = this.total.add(v);
this.count += this.count;
lifeTimeCount.put(name, new Long(lifeTimeCount.get(name) + count));
lifeTimeTotal.put(name, lifeTimeTotal.get(name).add(v));
}
}

public Date getCaptureStartTimeMs() {
Expand Down
Expand Up @@ -21,11 +21,14 @@

public enum StatisticName {

INCOMING_NETWORK_ERRORS,
INCOMING_DATABASE_ERRORS,
INCOMING_TRANSPORT_ERROR_COUNT,
INCOMING_TRANSPORT_CONNECT_ERROR_COUNT,
INCOMING_TRANSPORT_REJECTED_COUNT,
INCOMING_DATABASE_ERROR_COUNT,
INCOMING_OTHER_ERROR_COUNT,
INCOMING_MS_PER_ROW,
INCOMING_BATCH_COUNT,
INCOMING_ROW_COUNT,
INCOMING_SKIP_BATCH_COUNT,

OUTGOING_NETWORK_ERRORS,
OUTGOING_DATABASE_ERRORS,
Expand Down

0 comments on commit 4169cab

Please sign in to comment.