Skip to content

Commit

Permalink
0002730: Incoming batch has a new ignore row count
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Aug 16, 2016
1 parent febea79 commit 5b5f16b
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 13 deletions.
Expand Up @@ -74,6 +74,8 @@ public String toString() {

private long ignoreCount;

private long ignoreRowCount;

private long missingDeleteCount;

private long skipCount;
Expand Down Expand Up @@ -122,6 +124,7 @@ public void setValues(Statistics readerStatistics, Statistics writerStatistics,
missingDeleteCount = writerStatistics
.get(DataWriterStatisticConstants.MISSINGDELETECOUNT);
ignoreCount = writerStatistics.get(DataWriterStatisticConstants.IGNORECOUNT);
ignoreRowCount = writerStatistics.get(DataWriterStatisticConstants.IGNOREROWCOUNT);
lastUpdatedTime = new Date();
if (!isSuccess) {
failedRowNumber = statementCount;
Expand Down Expand Up @@ -358,7 +361,19 @@ public long getIgnoreCount() {
return ignoreCount;
}

public String getStagedLocation() {
public long getIgnoreRowCount() {
return ignoreRowCount;
}

public void incrementIgnoreRowCount() {
this.ignoreRowCount++;
}

public void setIgnoreRowCount(long ignoreRowCount) {
this.ignoreRowCount = ignoreRowCount;
}

public String getStagedLocation() {
return Batch.getStagedLocation(false, nodeId);
}

Expand Down
Expand Up @@ -262,16 +262,16 @@ public void insertIncomingBatch(ISqlTransaction transaction, IncomingBatch batch
batch.getFailedRowNumber(), batch.getFailedLineNumber(),
batch.getByteCount(), batch.getStatementCount(),
batch.getFallbackInsertCount(), batch.getFallbackUpdateCount(),
batch.getIgnoreCount(), batch.getMissingDeleteCount(),
batch.getIgnoreCount(), batch.getIgnoreRowCount(), batch.getMissingDeleteCount(),
batch.getSkipCount(), batch.getSqlState(), batch.getSqlCode(),
FormatUtils.abbreviateForLogging(batch.getSqlMessage()),
batch.getLastUpdatedHostName(), batch.getLastUpdatedTime() },
new int[] { Types.NUMERIC, Types.VARCHAR, Types.VARCHAR, Types.CHAR,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.VARCHAR,
Types.TIMESTAMP });
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR,
Types.VARCHAR, Types.TIMESTAMP });
}
}
}
Expand Down Expand Up @@ -342,7 +342,7 @@ public int updateIncomingBatch(ISqlTransaction transaction , IncomingBatch batch
batch.getDatabaseMillis(), batch.getFailedRowNumber(),
batch.getFailedLineNumber(), batch.getByteCount(),
batch.getStatementCount(), batch.getFallbackInsertCount(),
batch.getFallbackUpdateCount(), batch.getIgnoreCount(),
batch.getFallbackUpdateCount(), batch.getIgnoreCount(), batch.getIgnoreRowCount(),
batch.getMissingDeleteCount(), batch.getSkipCount(),
batch.getSqlState(), batch.getSqlCode(),
FormatUtils.abbreviateForLogging(batch.getSqlMessage()),
Expand All @@ -351,7 +351,7 @@ public int updateIncomingBatch(ISqlTransaction transaction , IncomingBatch batch
Types.SMALLINT, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR,
Types.NUMERIC, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR,
Types.VARCHAR, Types.TIMESTAMP, symmetricDialect.getSqlTypeForIds(), Types.VARCHAR });
}
return count;
Expand Down Expand Up @@ -408,6 +408,7 @@ public IncomingBatch mapRow(Row rs) {
batch.setFallbackInsertCount(rs.getLong("fallback_insert_count"));
batch.setFallbackUpdateCount(rs.getLong("fallback_update_count"));
batch.setIgnoreCount(rs.getLong("ignore_count"));
batch.setIgnoreRowCount(rs.getLong("ignore_row_count"));
batch.setMissingDeleteCount(rs.getLong("missing_delete_count"));
batch.setSkipCount(rs.getLong("skip_count"));
batch.setSqlState(rs.getString("sql_state"));
Expand Down
Expand Up @@ -35,7 +35,7 @@ public IncomingBatchServiceSqlMap(IDatabasePlatform platform, Map<String, String

putSql("selectIncomingBatchPrefixSql" ,"" +
"select batch_id, node_id, channel_id, status, network_millis, filter_millis, database_millis, failed_row_number, failed_line_number, byte_count, " +
" statement_count, fallback_insert_count, fallback_update_count, ignore_count, missing_delete_count, skip_count, sql_state, sql_code, sql_message, " +
" statement_count, fallback_insert_count, fallback_update_count, ignore_count, ignore_row_count, missing_delete_count, skip_count, sql_state, sql_code, sql_message, " +
" last_update_hostname, last_update_time, create_time, error_flag from $(incoming_batch) " );

putSql("selectCreateTimePrefixSql" ,"" +
Expand All @@ -60,13 +60,13 @@ public IncomingBatchServiceSqlMap(IDatabasePlatform platform, Map<String, String

putSql("insertIncomingBatchSql" ,"" +
"insert into $(incoming_batch) (batch_id, node_id, channel_id, status, network_millis, filter_millis, database_millis, failed_row_number, failed_line_number, byte_count, " +
" statement_count, fallback_insert_count, fallback_update_count, ignore_count, missing_delete_count, skip_count, sql_state, sql_code, sql_message, " +
" statement_count, fallback_insert_count, fallback_update_count, ignore_count, ignore_row_count, missing_delete_count, skip_count, sql_state, sql_code, sql_message, " +
" last_update_hostname, last_update_time, create_time) " +
" values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, current_timestamp) " );
" values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, current_timestamp) " );

putSql("updateIncomingBatchSql" ,"" +
"update $(incoming_batch) set status = ?, error_flag=?, network_millis = ?, filter_millis = ?, database_millis = ?, failed_row_number = ?, failed_line_number = ?, byte_count = ?, " +
" statement_count = ?, fallback_insert_count = ?, fallback_update_count = ?, ignore_count = ?, missing_delete_count = ?, skip_count = ?, sql_state = ?, sql_code = ?, sql_message = ?, " +
" statement_count = ?, fallback_insert_count = ?, fallback_update_count = ?, ignore_count = ?, ignore_row_count = ?, missing_delete_count = ?, skip_count = ?, sql_state = ?, sql_code = ?, sql_message = ?, " +
" last_update_hostname = ?, last_update_time = ? where batch_id = ? and node_id = ? " );

putSql("deleteIncomingBatchSql" ,"" +
Expand Down
3 changes: 2 additions & 1 deletion symmetric-core/src/main/resources/symmetric-schema.xml
Expand Up @@ -247,7 +247,8 @@
<column name="statement_count" type="BIGINT" required="true" default="0" description="The number of statements run to load this batch." />
<column name="fallback_insert_count" type="BIGINT" required="true" default="0" description="The number of times an update was turned into an insert because the data was not already in the target database." />
<column name="fallback_update_count" type="BIGINT" required="true" default="0" description="The number of times an insert was turned into an update because a data row already existed in the target database." />
<column name="ignore_count" type="BIGINT" required="true" default="0" description="The number of times a row was ignored." />
<column name="ignore_count" type="BIGINT" required="true" default="0" description="The number of times a batch was ignored." />
<column name="ignore_row_count" type="BIGINT" required="true" default="0" description="The number of times a row was ignored." />
<column name="missing_delete_count" type="BIGINT" required="true" default="0" description="The number of times a delete did not affect the database because the row was already deleted." />
<column name="skip_count" type="BIGINT" required="true" default="0" description="The number of times a batch was sent and skipped because it had already been loaded according to incoming_batch." />
<column name="sql_state" type="VARCHAR" size="10" description="For a status of error (ER), this is the XOPEN or SQL 99 SQL State." />
Expand Down
Expand Up @@ -203,7 +203,7 @@ protected void logConflictHappened(Conflict conflict, CsvData data, AbstractData
protected void ignore(AbstractDatabaseWriter writer, Conflict conflict) {
if (conflict.isResolveRowOnly()) {
writer.getStatistics().get(writer.getBatch())
.increment(DataWriterStatisticConstants.IGNORECOUNT);
.increment(DataWriterStatisticConstants.IGNOREROWCOUNT);
} else {
throw new IgnoreBatchException();
}
Expand Down
Expand Up @@ -39,6 +39,7 @@ abstract public class DataWriterStatisticConstants {
public static final String FALLBACKUPDATECOUNT = "FALLBACKUPDATECOUNT";
public static final String MISSINGDELETECOUNT = "MISSINGDELETECOUNT";
public static final String IGNORECOUNT = "IGNORECOUNT";
public static final String IGNOREROWCOUNT = "IGNOREROWCOUNT";
public static final String LINENUMBER = "LINENUMBER";

}

0 comments on commit 5b5f16b

Please sign in to comment.