Skip to content

Commit

Permalink
[1908764] Incoming batch timings for network, filter, and database ac…
Browse files Browse the repository at this point in the history
…cess
  • Loading branch information
erilong committed Mar 6, 2008
1 parent 37aac07 commit 0e22913
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 9 deletions.
Expand Up @@ -27,6 +27,14 @@ public class DataLoaderStatistics implements IDataLoaderStatistics {

private Date startTime;

private long networkMillis;

private long filterMillis;

private long databaseMillis;

private long byteCount;

private long lineCount;

private long statementCount;
Expand All @@ -36,6 +44,8 @@ public class DataLoaderStatistics implements IDataLoaderStatistics {
private long fallbackUpdateCount;

private long missingDeleteCount;

private long timerMillis;

public DataLoaderStatistics() {
this.startTime = new Date();
Expand All @@ -60,7 +70,31 @@ public long incrementMissingDeleteCount() {
public long incrementStatementCount() {
return ++statementCount;
}

public void incrementNetworkMillis(long millis) {
networkMillis += millis;
}

public void incrementFilterMillis(long millis) {
filterMillis += millis;
}

public void incrementDatabaseMillis(long millis) {
databaseMillis += millis;
}

public void incrementByteCount(long count) {
byteCount += count;
}

public void startTimer() {
timerMillis = System.currentTimeMillis();
}

public long endTimer() {
return System.currentTimeMillis() - timerMillis;
}

public long getFallbackInsertCount() {
return fallbackInsertCount;
}
Expand Down Expand Up @@ -109,4 +143,36 @@ public void setMissingDeleteCount(long missingDeleteCount) {
this.missingDeleteCount = missingDeleteCount;
}

public long getDatabaseMillis() {
return databaseMillis;
}

public void setDatabaseMillis(long databaseMillis) {
this.databaseMillis = databaseMillis;
}

public long getFilterMillis() {
return filterMillis;
}

public void setFilterMillis(long filterMillis) {
this.filterMillis = filterMillis;
}

public long getNetworkMillis() {
return networkMillis;
}

public void setNetworkMillis(long networkMillis) {
this.networkMillis = networkMillis;
}

public long getByteCount() {
return byteCount;
}

public void setByteCount(long byteCount) {
this.byteCount = byteCount;
}

}
Expand Up @@ -25,6 +25,14 @@

public interface IDataLoaderStatistics {

public long getByteCount();

public long getNetworkMillis();

public long getFilterMillis();

public long getDatabaseMillis();

public long getFallbackInsertCount();

public long getFallbackUpdateCount();
Expand Down
Expand Up @@ -87,6 +87,7 @@ public void open(BufferedReader reader, List<IDataLoaderFilter> filters, Map<Str
public boolean hasNext() throws IOException {
while (csvReader.readRecord()) {
String[] tokens = csvReader.getValues();
stats.incrementByteCount(csvReader.getRawRecord().length());

if (tokens[0].equals(CsvConstants.BATCH)) {
context.setBatchId(tokens[1]);
Expand Down Expand Up @@ -116,6 +117,7 @@ public void load() throws IOException {
while (csvReader.readRecord()) {
String[] tokens = csvReader.getValues();
stats.incrementLineCount();
stats.incrementByteCount(csvReader.getRawRecord().length());

if (tokens[0].equals(CsvConstants.INSERT)) {
if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
Expand Down Expand Up @@ -192,13 +194,16 @@ protected int insert(String[] tokens, BinaryEncoding encoding) {
int rows = 0;

if (filters != null) {
stats.startTimer();
for (IDataLoaderFilter filter : filters) {
filter.filterInsert(context, columnValues);
}
stats.incrementFilterMillis(stats.endTimer());
}

Object savepoint = null;
try {
stats.startTimer();
if (enableFallbackUpdate) {
savepoint = dbDialect.createSavepointForFallback();
}
Expand All @@ -223,6 +228,8 @@ protected int insert(String[] tokens, BinaryEncoding encoding) {
// TODO: log the PK information as an ERROR level.
throw e;
}
} finally {
stats.incrementDatabaseMillis(stats.endTimer());
}
return rows;
}
Expand All @@ -233,11 +240,14 @@ protected int update(String[] tokens, BinaryEncoding encoding) {
String keyValues[] = parseKeys(tokens, 1 + columnValues.length);

if (filters != null) {
stats.startTimer();
for (IDataLoaderFilter filter : filters) {
filter.filterUpdate(context, columnValues, keyValues);
}
stats.incrementFilterMillis(stats.endTimer());
}

stats.startTimer();
int rows = context.getTableTemplate().update(columnValues, keyValues, encoding);
if (rows == 0) {
if (enableFallbackInsert) {
Expand All @@ -246,16 +256,18 @@ protected int update(String[] tokens, BinaryEncoding encoding) {
+ ArrayUtils.toString(tokens));
}
stats.incrementFallbackInsertCount();
return context.getTableTemplate().insert(columnValues, encoding);
rows = context.getTableTemplate().insert(columnValues, encoding);
} else {
// TODO: log the PK information as an ERROR level.
stats.incrementDatabaseMillis(stats.endTimer());
throw new RuntimeException("Unable to update " + context.getTableName() + ": "
+ ArrayUtils.toString(tokens));
}
} else if (rows > 1) {
logger.warn("Too many rows (" + rows + ") updated for " + context.getTableName() + ": "
+ ArrayUtils.toString(tokens));
}
stats.incrementDatabaseMillis(stats.endTimer());
return rows;
}

Expand All @@ -264,12 +276,16 @@ protected int delete(String[] tokens) {
String keyValues[] = parseKeys(tokens, 1);

if (filters != null) {
stats.startTimer();
for (IDataLoaderFilter filter : filters) {
filter.filterDelete(context, keyValues);
}
stats.incrementFilterMillis(stats.endTimer());
}

stats.startTimer();
int rows = context.getTableTemplate().delete(keyValues);
stats.incrementDatabaseMillis(stats.endTimer());
if (rows == 0) {
if (allowMissingDelete) {
logger
Expand Down
Expand Up @@ -45,6 +45,14 @@ public enum Status {

private static String hostName;

private long byteCount;

private long networkMillis;

private long filterMillis;

private long databaseMillis;

private long statementCount;

private long fallbackInsertCount;
Expand Down Expand Up @@ -80,6 +88,10 @@ public IncomingBatchHistory(IDataLoaderContext context) {
}

public void setValues(IDataLoaderStatistics statistics, boolean isSuccess) {
byteCount = statistics.getByteCount();
networkMillis = statistics.getNetworkMillis();
filterMillis = statistics.getFilterMillis();
databaseMillis = statistics.getDatabaseMillis();
statementCount = statistics.getStatementCount();
fallbackInsertCount = statistics.getFallbackInsertCount();
fallbackUpdateCount = statistics.getFallbackUpdateCount();
Expand Down Expand Up @@ -179,4 +191,36 @@ public void setMissingDeleteCount(long missingDeleteCount) {
this.missingDeleteCount = missingDeleteCount;
}

public long getByteCount() {
return byteCount;
}

public void setByteCount(long byteCount) {
this.byteCount = byteCount;
}

public long getDatabaseMillis() {
return databaseMillis;
}

public void setDatabaseMillis(long databaseMillis) {
this.databaseMillis = databaseMillis;
}

public long getFilterMillis() {
return filterMillis;
}

public void setFilterMillis(long filterMillis) {
this.filterMillis = filterMillis;
}

public long getNetworkMillis() {
return networkMillis;
}

public void setNetworkMillis(long networkMillis) {
this.networkMillis = networkMillis;
}

}
Expand Up @@ -108,8 +108,9 @@ public int updateIncomingBatch(IncomingBatch status) {

public void insertIncomingBatchHistory(IncomingBatchHistory history) {
jdbcTemplate.update(insertIncomingBatchHistorySql, new Object[] { Long.valueOf(history.getBatchId()),
history.getNodeId(), history.getStatus().toString(), history.getHostName(),
history.getStatementCount(), history.getFallbackInsertCount(),
history.getNodeId(), history.getStatus().toString(), history.getNetworkMillis(),
history.getFilterMillis(), history.getDatabaseMillis(), history.getHostName(),
history.getByteCount(), history.getStatementCount(), history.getFallbackInsertCount(),
history.getFallbackUpdateCount(), history.getMissingDeleteCount(),
history.getFailedRowNumber(), history.getStartTime(), history.getEndTime() });
}
Expand All @@ -134,10 +135,14 @@ public Object mapRow(ResultSet rs, int num) throws SQLException {
history.setStartTime(rs.getTime(4));
history.setEndTime(rs.getTime(5));
history.setFailedRowNumber(rs.getLong(6));
history.setStatementCount(rs.getLong(7));
history.setFallbackInsertCount(rs.getLong(8));
history.setFallbackUpdateCount(rs.getLong(9));
history.setMissingDeleteCount(rs.getLong(10));
history.setByteCount(rs.getLong(7));
history.setNetworkMillis(rs.getLong(8));
history.setFilterMillis(rs.getLong(9));
history.setDatabaseMillis(rs.getLong(10));
history.setStatementCount(rs.getLong(11));
history.setFallbackInsertCount(rs.getLong(12));
history.setFallbackUpdateCount(rs.getLong(13));
history.setMissingDeleteCount(rs.getLong(14));
return history;
}
}
Expand Down
4 changes: 4 additions & 0 deletions symmetric/src/main/resources/ddl-config.xml
Expand Up @@ -241,8 +241,12 @@
<column name="status" type="CHAR" size="2" />
<column name="start_time" type="TIMESTAMP" />
<column name="end_time" type="TIMESTAMP" />
<column name="network_millis" type="INTEGER" />
<column name="filter_millis" type="INTEGER" />
<column name="database_millis" type="INTEGER" />
<column name="host_name" type="VARCHAR" size="50" />
<column name="failed_row_number" type="INTEGER" />
<column name="byte_count" type="INTEGER" />
<column name="statement_count" type="INTEGER" />
<column name="fallback_insert_count" type="INTEGER" />
<column name="fallback_update_count" type="INTEGER" />
Expand Down
6 changes: 4 additions & 2 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -401,6 +401,7 @@
<property name="findIncomingBatchHistorySql">
<value>
select batch_id, node_id, status, start_time, end_time, failed_row_number,
byte_count, network_millis, filter_millis, database_millis,
statement_count, fallback_insert_count, fallback_update_count, missing_delete_count
from ${sync.table.prefix}_incoming_batch_hist where batch_id = ? and node_id = ?
order by start_time
Expand All @@ -421,9 +422,10 @@
<property name="insertIncomingBatchHistorySql">
<value>
insert into ${sync.table.prefix}_incoming_batch_hist (batch_id, node_id, status,
host_name, statement_count, fallback_insert_count, fallback_update_count,
network_millis, filter_millis, database_millis,
host_name, byte_count, statement_count, fallback_insert_count, fallback_update_count,
missing_delete_count, failed_row_number, start_time, end_time) values (?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?)
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
</value>
</property>
</bean>
Expand Down
12 changes: 12 additions & 0 deletions symmetric/src/main/resources/symmetric-upgrade.xml
Expand Up @@ -183,6 +183,18 @@
create index idx_de_btchd_nid_cid on
${sync.table.prefix}_data_event(batched, node_id, channel_id)
</value>
<value>
alter table ${sync.table.prefix}_incoming_batch_hist add (byte_count numeric(22))
</value>
<value>
alter table ${sync.table.prefix}_incoming_batch_hist add (network_millis numeric(22))
</value>
<value>
alter table ${sync.table.prefix}_incoming_batch_hist add (filter_millis numeric(22))
</value>
<value>
alter table ${sync.table.prefix}_incoming_batch_hist add (database_millis numeric(22))
</value>
</list>
</property>
</bean>
Expand Down
Expand Up @@ -126,6 +126,7 @@ public void testStatistics() throws Exception {
Assert.assertNotNull(history.getStartTime(), "Start time cannot be null");
Assert.assertNotNull(history.getEndTime(), "End time cannot be null");
Assert.assertEquals(history.getFailedRowNumber(), 8, "Wrong failed row number");
Assert.assertEquals(history.getByteCount(), 290, "Wrong byte count");
Assert.assertEquals(history.getStatementCount(), 8, "Wrong statement count");
Assert.assertEquals(history.getFallbackInsertCount(), 1, "Wrong fallback insert count");
Assert.assertEquals(history.getFallbackUpdateCount(), 2, "Wrong fallback update count");
Expand Down

0 comments on commit 0e22913

Please sign in to comment.