Skip to content

Commit

Permalink
0005389: Implemented conflict_win_count and conflict_lose_count columns
Browse files Browse the repository at this point in the history
  • Loading branch information
evan-miller-jumpmind committed Aug 2, 2022
1 parent 67eacf1 commit 5ff8830
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 50 deletions.
Expand Up @@ -101,6 +101,8 @@ public static String getNameFromDescription(String description) {
private boolean bulkLoadFlag;
private long fallbackInsertCount;
private long fallbackUpdateCount;
private long conflictWinCount;
private long conflictLoseCount;
private long ignoreRowCount;
private long missingDeleteCount;
private long skipCount;
Expand Down Expand Up @@ -557,6 +559,22 @@ public void setFallbackUpdateCount(long fallbackUpdateCount) {
this.fallbackUpdateCount = fallbackUpdateCount;
}

public long getConflictWinCount() {
return conflictWinCount;
}

public void setConflictWinCount(long conflictWinCount) {
this.conflictWinCount = conflictWinCount;
}

public long getConflictLoseCount() {
return conflictLoseCount;
}

public void setConflictLoseCount(long conflictLoseCount) {
this.conflictLoseCount = conflictLoseCount;
}

public long getMissingDeleteCount() {
return missingDeleteCount;
}
Expand Down
Expand Up @@ -51,6 +51,8 @@ public class BatchAck implements Serializable {
private long loadDeleteRowCount;
private long fallbackInsertCount;
private long fallbackUpdateCount;
private long conflictWinCount;
private long conflictLoseCount;
private long ignoreRowCount;
private long missingDeleteCount;
private long skipCount;
Expand Down Expand Up @@ -227,6 +229,22 @@ public void setFallbackUpdateCount(long fallbackUpdateCount) {
this.fallbackUpdateCount = fallbackUpdateCount;
}

public long getConflictWinCount() {
return conflictWinCount;
}

public void setConflictWinCount(long conflictWinCount) {
this.conflictWinCount = conflictWinCount;
}

public long getConflictLoseCount() {
return conflictLoseCount;
}

public void setConflictLoseCount(long conflictLoseCount) {
this.conflictLoseCount = conflictLoseCount;
}

public long getIgnoreRowCount() {
return ignoreRowCount;
}
Expand Down
Expand Up @@ -88,6 +88,8 @@ public void setValues(Statistics readerStatistics, Statistics writerStatistics,
setTransformLoadMillis(writerStatistics.get(TRANSFORMMILLIS));
setFallbackInsertCount(writerStatistics.get(FALLBACKINSERTCOUNT));
setFallbackUpdateCount(writerStatistics.get(FALLBACKUPDATECOUNT));
setConflictWinCount(writerStatistics.get(DataWriterStatisticConstants.CONFLICTWINCOUNT));
setConflictLoseCount(writerStatistics.get(DataWriterStatisticConstants.CONFLICTLOSECOUNT));
setMissingDeleteCount(writerStatistics.get(DataWriterStatisticConstants.MISSINGDELETECOUNT));
setIgnoreCount(writerStatistics.get(DataWriterStatisticConstants.IGNORECOUNT));
setIgnoreRowCount(writerStatistics.get(DataWriterStatisticConstants.IGNOREROWCOUNT));
Expand Down
Expand Up @@ -94,6 +94,8 @@ public BatchAckResult ack(final BatchAck batch) {
outgoingBatch.setLoadDeleteRowCount(batch.getLoadDeleteRowCount());
outgoingBatch.setFallbackInsertCount(batch.getFallbackInsertCount());
outgoingBatch.setFallbackUpdateCount(batch.getFallbackUpdateCount());
outgoingBatch.setConflictWinCount(batch.getConflictWinCount());
outgoingBatch.setConflictLoseCount(batch.getConflictLoseCount());
outgoingBatch.setIgnoreRowCount(batch.getIgnoreRowCount());
outgoingBatch.setMissingDeleteCount(batch.getMissingDeleteCount());
outgoingBatch.setSkipCount(batch.getSkipCount());
Expand Down
Expand Up @@ -290,8 +290,8 @@ public void insertIncomingBatch(ISqlTransaction transaction, IncomingBatch batch
new Object[] { batch.getBatchId(), batch.getNodeId(), batch.getChannelId(), batch.getStatus().name(),
batch.getNetworkMillis(), batch.getFilterMillis(), batch.getLoadMillis(), batch.getFailedRowNumber(),
batch.getFailedLineNumber(), batch.getByteCount(), batch.getLoadRowCount(), batch.getFallbackInsertCount(),
batch.getFallbackUpdateCount(), batch.getIgnoreCount(), batch.getIgnoreRowCount(),
batch.getMissingDeleteCount(), batch.getSkipCount(), batch.getSqlState(), batch.getSqlCode(),
batch.getFallbackUpdateCount(), batch.getConflictWinCount(), batch.getConflictLoseCount(), batch.getIgnoreCount(),
batch.getIgnoreRowCount(), batch.getMissingDeleteCount(), batch.getSkipCount(), batch.getSqlState(), batch.getSqlCode(),
FormatUtils.abbreviateForLogging(batch.getSqlMessage()), batch.getLastUpdatedHostName(), new Date(), batch.getSummary(),
new Date(), batch.isLoadFlag(), batch.getExtractCount(), batch.getSentCount(), batch.getLoadCount(), batch.getLoadId(),
batch.isCommonFlag(), batch.getRouterMillis(), batch.getExtractMillis(), batch.getTransformExtractMillis(),
Expand All @@ -302,11 +302,12 @@ public void insertIncomingBatch(ISqlTransaction transaction, IncomingBatch batch
batch.getExtractDeleteRowCount(), batch.getFailedDataId() },
new int[] { Types.NUMERIC, Types.VARCHAR, Types.VARCHAR, Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.VARCHAR,
Types.TIMESTAMP, Types.VARCHAR, Types.TIMESTAMP, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC,
Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.TIMESTAMP, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC });
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC });
}
}
}
Expand Down Expand Up @@ -375,11 +376,11 @@ public int updateIncomingBatch(ISqlTransaction transaction, IncomingBatch batch)
count = transaction.prepareAndExecute(sql,
new Object[] { batch.getStatus().name(), batch.isErrorFlag() ? 1 : 0, batch.getNetworkMillis(), batch.getFilterMillis(),
batch.getLoadMillis(), batch.getFailedRowNumber(), batch.getFailedLineNumber(), batch.getByteCount(),
batch.getLoadRowCount(), batch.getFallbackInsertCount(), batch.getFallbackUpdateCount(), batch.getIgnoreCount(),
batch.getIgnoreRowCount(), batch.getMissingDeleteCount(), batch.getSkipCount(), batch.getSqlState(),
batch.getSqlCode(), FormatUtils.abbreviateForLogging(batch.getSqlMessage()), batch.getLastUpdatedHostName(), new Date(),
batch.getSummary(), batch.isLoadFlag(), batch.getExtractCount(), batch.getSentCount(), batch.getLoadCount(),
batch.getLoadId(), batch.isCommonFlag(), batch.getRouterMillis(), batch.getExtractMillis(),
batch.getLoadRowCount(), batch.getFallbackInsertCount(), batch.getFallbackUpdateCount(), batch.getConflictWinCount(),
batch.getConflictLoseCount(), batch.getIgnoreCount(), batch.getIgnoreRowCount(), batch.getMissingDeleteCount(),
batch.getSkipCount(), batch.getSqlState(), batch.getSqlCode(), FormatUtils.abbreviateForLogging(batch.getSqlMessage()),
batch.getLastUpdatedHostName(), new Date(), batch.getSummary(), batch.isLoadFlag(), batch.getExtractCount(), batch.getSentCount(),
batch.getLoadCount(), batch.getLoadId(), batch.isCommonFlag(), batch.getRouterMillis(), batch.getExtractMillis(),
batch.getTransformExtractMillis(), batch.getTransformLoadMillis(), batch.getReloadRowCount(),
batch.getOtherRowCount(), batch.getDataRowCount(), batch.getDataInsertRowCount(), batch.getDataUpdateRowCount(),
batch.getDataDeleteRowCount(), batch.getExtractRowCount(), batch.getExtractInsertRowCount(),
Expand All @@ -388,11 +389,12 @@ public int updateIncomingBatch(ISqlTransaction transaction, IncomingBatch batch)
batch.getNodeId() },
new int[] { Types.CHAR, Types.SMALLINT, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.VARCHAR,
Types.TIMESTAMP, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, symmetricDialect.getSqlTypeForIds(), Types.VARCHAR });
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
symmetricDialect.getSqlTypeForIds(), Types.VARCHAR });
}
return count;
}
Expand Down Expand Up @@ -553,6 +555,8 @@ public IncomingBatch mapRow(Row rs) {
batch.setOtherRowCount(rs.getLong("other_row_count"));
batch.setFallbackInsertCount(rs.getLong("fallback_insert_count"));
batch.setFallbackUpdateCount(rs.getLong("fallback_update_count"));
batch.setConflictWinCount(rs.getLong("conflict_win_count"));
batch.setConflictLoseCount(rs.getLong("conflict_lose_count"));
batch.setIgnoreCount(rs.getLong("ignore_count"));
batch.setIgnoreRowCount(rs.getLong("ignore_row_count"));
batch.setMissingDeleteCount(rs.getLong("missing_delete_count"));
Expand Down
Expand Up @@ -34,9 +34,10 @@ public IncomingBatchServiceSqlMap(IDatabasePlatform platform, Map<String, String
putSql("selectIncomingBatchPrefixSql" ,""
+ "select batch_id, node_id, channel_id, status, network_millis, filter_millis, load_millis, "
+ " failed_row_number, failed_line_number, byte_count, load_row_count, fallback_insert_count, "
+ " fallback_update_count, ignore_count, ignore_row_count, missing_delete_count, skip_count, "
+ " sql_state, sql_code, sql_message, last_update_hostname, last_update_time, create_time, "
+ " error_flag, summary, load_insert_row_count, load_update_row_count, load_delete_row_count, "
+ " fallback_update_count, conflict_win_count, conflict_lose_count, ignore_count, "
+ " ignore_row_count, missing_delete_count, skip_count, sql_state, sql_code, sql_message, "
+ " last_update_hostname, last_update_time, create_time, error_flag, summary, "
+ " load_insert_row_count, load_update_row_count, load_delete_row_count, "
+ " data_insert_row_count, data_update_row_count, data_delete_row_count, "
+ " data_row_count, extract_insert_row_count, extract_update_row_count, "
+ " extract_delete_row_count, extract_row_count, reload_row_count, other_row_count, "
Expand Down Expand Up @@ -68,22 +69,22 @@ public IncomingBatchServiceSqlMap(IDatabasePlatform platform, Map<String, String

putSql("insertIncomingBatchSql" ,"" +
"insert into $(incoming_batch) (batch_id, node_id, channel_id, status, network_millis, filter_millis, load_millis, failed_row_number, failed_line_number, byte_count, " +
" load_row_count, fallback_insert_count, fallback_update_count, ignore_count, ignore_row_count, missing_delete_count, skip_count, sql_state, sql_code, sql_message, " +
" last_update_hostname, last_update_time, summary, create_time, load_flag, extract_count, sent_count, load_count, load_id, common_flag, router_millis, extract_millis, " +
" transform_extract_millis, transform_load_millis, reload_row_count, other_row_count, data_row_count, data_insert_row_count, data_update_row_count, " +
" data_delete_row_count, extract_row_count, extract_insert_row_count, extract_update_row_count, extract_delete_row_count, load_insert_row_count, " +
" load_update_row_count, load_delete_row_count, failed_data_id) " +
" load_row_count, fallback_insert_count, fallback_update_count, conflict_win_count, conflict_lose_count, ignore_count, ignore_row_count, missing_delete_count, " +
" skip_count, sql_state, sql_code, sql_message, last_update_hostname, last_update_time, summary, create_time, load_flag, extract_count, sent_count, load_count, " +
" load_id, common_flag, router_millis, extract_millis, transform_extract_millis, transform_load_millis, reload_row_count, other_row_count, data_row_count, " +
" data_insert_row_count, data_update_row_count, data_delete_row_count, extract_row_count, extract_insert_row_count, extract_update_row_count, " +
" extract_delete_row_count, load_insert_row_count, load_update_row_count, load_delete_row_count, failed_data_id) " +
" values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " +
" ?, ?, ?, ?, ?, ?)");
" ?, ?, ?, ?, ?, ?, ?, ?)");

putSql("updateIncomingBatchSql" ,"" +
"update $(incoming_batch) set status = ?, error_flag=?, network_millis = ?, filter_millis = ?, load_millis = ?, failed_row_number = ?, failed_line_number = ?, byte_count = ?, " +
" load_row_count = ?, fallback_insert_count = ?, fallback_update_count = ?, ignore_count = ?, ignore_row_count = ?, missing_delete_count = ?, skip_count = ?, sql_state = ?, " +
" sql_code = ?, sql_message = ?, last_update_hostname = ?, last_update_time = ?, summary = ?, load_flag = ?, extract_count = ?, sent_count = ?, " +
" load_count = ?, load_id = ?, common_flag = ?, router_millis = ?, extract_millis = ?, transform_extract_millis = ?, transform_load_millis = ?, reload_row_count = ?, " +
" other_row_count = ?, data_row_count = ?, data_insert_row_count = ?, data_update_row_count = ?, data_delete_row_count = ?, extract_row_count = ?, " +
" extract_insert_row_count = ?, extract_update_row_count = ?, extract_delete_row_count = ?, load_insert_row_count = ?, load_update_row_count = ?, load_delete_row_count = ?, " +
" failed_data_id = ?, bulk_loader_flag=? where batch_id = ? and node_id = ? " );
" load_row_count = ?, fallback_insert_count = ?, fallback_update_count = ?, conflict_win_count = ?, conflict_lose_count = ?, ignore_count = ?, ignore_row_count = ?, " +
" missing_delete_count = ?, skip_count = ?, sql_state = ?, sql_code = ?, sql_message = ?, last_update_hostname = ?, last_update_time = ?, summary = ?, load_flag = ?, " +
" extract_count = ?, sent_count = ?, load_count = ?, load_id = ?, common_flag = ?, router_millis = ?, extract_millis = ?, transform_extract_millis = ?, " +
" transform_load_millis = ?, reload_row_count = ?, other_row_count = ?, data_row_count = ?, data_insert_row_count = ?, data_update_row_count = ?, data_delete_row_count = ?, " +
" extract_row_count = ?, extract_insert_row_count = ?, extract_update_row_count = ?, extract_delete_row_count = ?, load_insert_row_count = ?, load_update_row_count = ?, " +
" load_delete_row_count = ?, failed_data_id = ?, bulk_loader_flag=? where batch_id = ? and node_id = ? " );

putSql("statusNotOk", " and status not in ('OK', 'IG')");

Expand Down

0 comments on commit 5ff8830

Please sign in to comment.