Skip to content

Commit

Permalink
0003142: Sync Columns Between Incoming and Outgoing Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
maxwellpettit committed Jul 5, 2017
1 parent cd34255 commit 6bd0591
Show file tree
Hide file tree
Showing 18 changed files with 317 additions and 285 deletions.

Large diffs are not rendered by default.

Expand Up @@ -158,7 +158,7 @@ protected void bulkWrite(CsvData data) {
throw getPlatform().getSqlTemplate().translate(ex);
} finally {
statistics.get(batch).stopTimer(DataWriterStatisticConstants.LOADMILLIS);
statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
statistics.get(batch).stopTimer(DataWriterStatisticConstants.LOADMILLIS);
}
Expand Down
Expand Up @@ -97,7 +97,7 @@ protected void bulkWrite(CsvData data) {
boolean requiresFlush = false;
switch (dataEventType) {
case INSERT:
statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
if (filterBefore(data)) {
Object[] rowData = platform.getObjectValues(batch.getBinaryEncoding(), getRowData(data, CsvData.ROW_DATA),
Expand Down Expand Up @@ -300,7 +300,7 @@ protected void flush() {

if (errors.length > 0) {
// set the statement count so the failed row number get reported correctly
statistics.get(batch).set(DataWriterStatisticConstants.STATEMENTCOUNT,
statistics.get(batch).set(DataWriterStatisticConstants.ROWCOUNT,
errors[0]);

throw new BulkSqlException(errors, lastEventType.toString(), sql);
Expand Down
Expand Up @@ -124,7 +124,7 @@ protected void bulkWrite(CsvData data) {
}
}
statistics.get(batch).stopTimer(DataWriterStatisticConstants.LOADMILLIS);
statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
}

Expand Down Expand Up @@ -176,7 +176,7 @@ protected void endCopy() {
copyIn.endCopy();
}
} catch (Exception ex) {
statistics.get(batch).set(DataWriterStatisticConstants.STATEMENTCOUNT, 0);
statistics.get(batch).set(DataWriterStatisticConstants.ROWCOUNT, 0);
statistics.get(batch).set(DataWriterStatisticConstants.LINENUMBER, 0);

throw getPlatform().getSqlTemplate().translate(ex);
Expand Down
Expand Up @@ -123,7 +123,7 @@ public void bulkWrite(CsvData data) {

switch (dataEventType) {
case INSERT:
statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS);
try {
Expand Down
Expand Up @@ -479,7 +479,7 @@ public long getDataDeleteRowCount() {
return dataDeleteRowCount;
}

public void incrementEventCount(DataEventType type) {
public void incrementRowCount(DataEventType type) {
switch (type) {
case RELOAD:
reloadRowCount++;
Expand All @@ -498,6 +498,26 @@ public void incrementEventCount(DataEventType type) {
break;
}
}

public void incrementExtractRowCount(DataEventType type) {
switch (type) {
case INSERT:
extractInsertRowCount++;
break;
case UPDATE:
extractUpdateRowCount++;
break;
case DELETE:
extractDeleteRowCount++;
break;
default:
break;
}
}

public void incrementExtractRowCount() {
this.extractRowCount++;
}

public void setDataInsertRowCount(long dataInsertRowCount) {
this.dataInsertRowCount = dataInsertRowCount;
Expand Down
Expand Up @@ -58,7 +58,7 @@ public void setValues(Statistics readerStatistics, Statistics writerStatistics,
if (writerStatistics != null) {
setFilterMillis(writerStatistics.get(DataWriterStatisticConstants.FILTERMILLIS));
setLoadMillis(writerStatistics.get(DataWriterStatisticConstants.LOADMILLIS));
setLoadRowCount(writerStatistics.get(DataWriterStatisticConstants.STATEMENTCOUNT));
setLoadRowCount(writerStatistics.get(DataWriterStatisticConstants.ROWCOUNT));
setFallbackInsertCount(writerStatistics.get(DataWriterStatisticConstants.FALLBACKINSERTCOUNT));
setFallbackUpdateCount(writerStatistics.get(DataWriterStatisticConstants.FALLBACKUPDATECOUNT));
setMissingDeleteCount(writerStatistics.get(DataWriterStatisticConstants.MISSINGDELETECOUNT));
Expand Down
Expand Up @@ -899,12 +899,16 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
Statistics stats = getExtractStats(writer);
if (stats != null) {
transformTimeInMs = stats.get(DataWriterStatisticConstants.TRANSFORMMILLIS);
currentBatch.setExtractCount(stats.get(DataWriterStatisticConstants.STATEMENTCOUNT));
currentBatch.setExtractCount(stats.get(DataWriterStatisticConstants.ROWCOUNT));
currentBatch.setDataRowCount(stats.get(DataWriterStatisticConstants.ROWCOUNT));
currentBatch.setDataInsertRowCount(stats.get(DataWriterStatisticConstants.INSERTCOUNT));
currentBatch.setDataUpdateRowCount(stats.get(DataWriterStatisticConstants.UPDATECOUNT));
currentBatch.setDataDeleteRowCount(stats.get(DataWriterStatisticConstants.DELETECOUNT));
extractTimeInMs = extractTimeInMs - transformTimeInMs;
byteCount = stats.get(DataWriterStatisticConstants.BYTECOUNT);
statisticManager.incrementDataBytesExtracted(currentBatch.getChannelId(), byteCount);
statisticManager.incrementDataExtracted(currentBatch.getChannelId(),
stats.get(DataWriterStatisticConstants.STATEMENTCOUNT));
stats.get(DataWriterStatisticConstants.ROWCOUNT));
}

}
Expand Down Expand Up @@ -1173,7 +1177,6 @@ protected void transferFromStaging(ExtractMode mode, BatchType batchType, Outgoi
}
char[] buffer = new char[bufferSize];

//TODO: Write Batch Stats
boolean batchStatsWritten = false;
String prevBuffer = "";
while ((numCharsRead = reader.read(buffer)) != -1) {
Expand Down Expand Up @@ -1253,7 +1256,7 @@ protected int findStatsIndex(String bufferString, String prevBuffer) {
int index = -1;
String fullBuffer = prevBuffer + bufferString;

String pattern = "\n" + CsvConstants.BATCH + ",\\d*\r*\n";
String pattern = "\n" + CsvConstants.BATCH + "\\s*,\\s*\\d*\r*\n";
Pattern r = Pattern.compile(pattern);
Matcher m = r.matcher(fullBuffer);
if (m.find()) {
Expand Down Expand Up @@ -1904,7 +1907,8 @@ public CsvData next() {
|| symmetricDialect.getName().equals(
DatabaseNamesConstants.MSSQL2008));

outgoingBatch.incrementDataRowCount();
outgoingBatch.incrementExtractRowCount();
outgoingBatch.incrementExtractRowCount(data.getDataEventType());
} else {
log.error(
"Could not locate a trigger with the id of {} for {}. It was recorded in the hist table with a hist id of {}",
Expand Down Expand Up @@ -2068,8 +2072,8 @@ public CsvData next() {
.isNotBlank(triggerRouter.getInitialLoadSelect()), triggerRouter));

if (data != null && outgoingBatch != null && !outgoingBatch.isExtractJobFlag()) {
outgoingBatch.incrementDataRowCount();
outgoingBatch.incrementEventCount(data.getDataEventType());
outgoingBatch.incrementExtractRowCount();
outgoingBatch.incrementExtractRowCount(data.getDataEventType());
}

return data;
Expand Down
Expand Up @@ -1374,7 +1374,7 @@ protected long insertDataEventAndOutgoingBatch(ISqlTransaction transaction, long
outgoingBatch.setLoadId(loadId);
outgoingBatch.setCreateBy(createBy);
outgoingBatch.setLoadFlag(isLoad);
outgoingBatch.incrementEventCount(eventType);
outgoingBatch.incrementRowCount(eventType);
outgoingBatch.incrementDataRowCount();
if (tableName != null) {
outgoingBatch.incrementTableCount(tableName.toLowerCase());
Expand Down
Expand Up @@ -207,26 +207,30 @@ public void updateOutgoingBatch(OutgoingBatch outgoingBatch) {
public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch) {
outgoingBatch.setLastUpdatedTime(new Date());
outgoingBatch.setLastUpdatedHostName(clusterService.getServerId());
transaction.prepareAndExecute(getSql("updateOutgoingBatchSql"), new Object[] { outgoingBatch.getStatus().name(),
outgoingBatch.getLoadId(), outgoingBatch.isExtractJobFlag() ? 1 : 0, outgoingBatch.isLoadFlag() ? 1 : 0,
outgoingBatch.isErrorFlag() ? 1 : 0, outgoingBatch.getByteCount(), outgoingBatch.getExtractCount(),
outgoingBatch.getSentCount(), outgoingBatch.getLoadCount(), outgoingBatch.getDataRowCount(),
outgoingBatch.getReloadRowCount(), outgoingBatch.getDataInsertRowCount(), outgoingBatch.getDataUpdateRowCount(),
outgoingBatch.getDataDeleteRowCount(), outgoingBatch.getOtherRowCount(), outgoingBatch.getIgnoreCount(),
outgoingBatch.getRouterMillis(), outgoingBatch.getNetworkMillis(), outgoingBatch.getFilterMillis(),
outgoingBatch.getLoadMillis(), outgoingBatch.getExtractMillis(), outgoingBatch.getExtractStartTime(),
outgoingBatch.getTransferStartTime(), outgoingBatch.getLoadStartTime(), outgoingBatch.getSqlState(),
outgoingBatch.getSqlCode(), FormatUtils.abbreviateForLogging(outgoingBatch.getSqlMessage()), outgoingBatch.getFailedDataId(),
outgoingBatch.getLastUpdatedHostName(), outgoingBatch.getSummary(), outgoingBatch.getLoadRowCount(),
outgoingBatch.getLoadInsertRowCount(), outgoingBatch.getLoadUpdateRowCount(), outgoingBatch.getLoadDeleteRowCount(),
outgoingBatch.getFallbackInsertCount(), outgoingBatch.getFallbackUpdateCount(), outgoingBatch.getIgnoreRowCount(),
outgoingBatch.getMissingDeleteCount(), outgoingBatch.getSkipCount(), outgoingBatch.getBatchId(), outgoingBatch.getNodeId() },
new int[] { Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
transaction.prepareAndExecute(getSql("updateOutgoingBatchSql"),
new Object[] { outgoingBatch.getStatus().name(), outgoingBatch.getLoadId(), outgoingBatch.isExtractJobFlag() ? 1 : 0,
outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isErrorFlag() ? 1 : 0, outgoingBatch.getByteCount(),
outgoingBatch.getExtractCount(), outgoingBatch.getSentCount(), outgoingBatch.getLoadCount(),
outgoingBatch.getDataRowCount(), outgoingBatch.getReloadRowCount(), outgoingBatch.getDataInsertRowCount(),
outgoingBatch.getDataUpdateRowCount(), outgoingBatch.getDataDeleteRowCount(), outgoingBatch.getOtherRowCount(),
outgoingBatch.getIgnoreCount(), outgoingBatch.getRouterMillis(), outgoingBatch.getNetworkMillis(),
outgoingBatch.getFilterMillis(), outgoingBatch.getLoadMillis(), outgoingBatch.getExtractMillis(),
outgoingBatch.getExtractStartTime(), outgoingBatch.getTransferStartTime(), outgoingBatch.getLoadStartTime(),
outgoingBatch.getSqlState(), outgoingBatch.getSqlCode(),
FormatUtils.abbreviateForLogging(outgoingBatch.getSqlMessage()), outgoingBatch.getFailedDataId(),
outgoingBatch.getLastUpdatedHostName(), outgoingBatch.getSummary(), outgoingBatch.getLoadRowCount(),
outgoingBatch.getLoadInsertRowCount(), outgoingBatch.getLoadUpdateRowCount(), outgoingBatch.getLoadDeleteRowCount(),
outgoingBatch.getFallbackInsertCount(), outgoingBatch.getFallbackUpdateCount(), outgoingBatch.getIgnoreRowCount(),
outgoingBatch.getMissingDeleteCount(), outgoingBatch.getSkipCount(), outgoingBatch.getExtractRowCount(),
outgoingBatch.getExtractInsertRowCount(), outgoingBatch.getExtractUpdateRowCount(),
outgoingBatch.getExtractDeleteRowCount(), outgoingBatch.getBatchId(), outgoingBatch.getNodeId() },
new int[] { Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP,
Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR,
Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.TIMESTAMP, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC,
Types.VARCHAR, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC,

symmetricDialect.getSqlTypeForIds(), Types.VARCHAR });
}
Expand Down
Expand Up @@ -64,7 +64,8 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
+ " failed_data_id=?, last_update_hostname=?, last_update_time=current_timestamp, summary=?, "
+ " load_row_count=?, load_insert_row_count=?, load_update_row_count=?, load_delete_row_count=?, "
+ " fallback_insert_count=?, fallback_update_count=?, ignore_row_count=?, missing_delete_count=?, "
+ " skip_count=? where batch_id=? and node_id=? ");
+ " skip_count=?, extract_row_count=?, extract_insert_row_count=?, extract_update_row_count=?, extract_delete_row_count=? "
+ " where batch_id=? and node_id=? ");

putSql("findOutgoingBatchSql", "where batch_id=? and node_id=? ");

Expand Down
Expand Up @@ -1039,7 +1039,7 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con
context.setLastLoadId(-1);
}

batch.incrementEventCount(dataMetaData.getData().getDataEventType());
batch.incrementRowCount(dataMetaData.getData().getDataEventType());
batch.incrementDataRowCount();
batch.incrementTableCount(dataMetaData.getTable().getNameLowerCase());

Expand Down
Expand Up @@ -151,7 +151,7 @@ public void write(CsvData data) {
|| (targetTable == null && data.getDataEventType() == DataEventType.SQL)) {
try {

statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
if (filterBefore(data)) {

Expand Down
Expand Up @@ -38,7 +38,7 @@ public void needsResolved(AbstractDatabaseWriter writer, CsvData data, LoadStatu
DatabaseWriterSettings writerSettings = writer.getWriterSettings();
Conflict conflict = writerSettings.pickConflict(writer.getTargetTable(), writer.getBatch());
Statistics statistics = writer.getStatistics().get(writer.getBatch());
long statementCount = statistics.get(DataWriterStatisticConstants.STATEMENTCOUNT);
long statementCount = statistics.get(DataWriterStatisticConstants.ROWCOUNT);
long lineNumber = statistics.get(DataWriterStatisticConstants.LINENUMBER);
ResolvedData resolvedData = writerSettings.getResolvedData(statementCount);

Expand Down
Expand Up @@ -154,11 +154,12 @@ public void write(CsvData data) {
println(CsvConstants.NO_BINARY_OLD_DATA, Boolean.toString(noBinaryOldData));
}

statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
switch (data.getDataEventType()) {
case INSERT:
println(CsvConstants.INSERT, data.getCsvData(CsvData.ROW_DATA));
statistics.get(batch).increment(DataWriterStatisticConstants.INSERTCOUNT);
break;

case UPDATE:
Expand All @@ -170,6 +171,7 @@ public void write(CsvData data) {
}
println(CsvConstants.UPDATE, data.getCsvData(CsvData.ROW_DATA),
data.getCsvData(CsvData.PK_DATA));
statistics.get(batch).increment(DataWriterStatisticConstants.UPDATECOUNT);
break;

case DELETE:
Expand All @@ -180,6 +182,7 @@ public void write(CsvData data) {
}
}
println(CsvConstants.DELETE, data.getCsvData(CsvData.PK_DATA));
statistics.get(batch).increment(DataWriterStatisticConstants.DELETECOUNT);
break;

case CREATE:
Expand Down
Expand Up @@ -27,7 +27,7 @@ abstract public class DataWriterStatisticConstants {
public static final String TRANSFORMMILLIS = "TRANSFORMMILLIS";
public static final String FILTERMILLIS = "FILTERMILLIS";
public static final String LOADMILLIS = "LOADMILLIS";
public static final String STATEMENTCOUNT = "STATEMENTCOUNT";
public static final String ROWCOUNT = "STATEMENTCOUNT";
public static final String INSERTCOUNT = "INSERTCOUNT";
public static final String DELETECOUNT = "DELETECOUNT";
public static final String UPDATECOUNT = "UPDATECOUNT";
Expand Down

0 comments on commit 6bd0591

Please sign in to comment.