Skip to content

Commit

Permalink
0005338: Flag for If Batch Was Bulk Loaded and Showing Percentage of
Browse files Browse the repository at this point in the history
Batches Bulk Loaded in a Load
  • Loading branch information
JumpMind authored and JumpMind committed Jun 21, 2022
1 parent 472476b commit aeb6db6
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 37 deletions.
Expand Up @@ -49,7 +49,7 @@ public void start(Batch batch) {
super.start(batch);
batch.setBulkLoaderFlag(true);
if (isFallBackToDefault()) {
batch.setBulkLoaderFlag(false);
batch.setBulkLoaderFlag(false);
getTransaction().setInBatchMode(false);
getTransaction().clearBatch();
log.debug("Writing batch " + batch.getBatchId() + " on channel " + batch.getChannelId() + " to node " + batch.getTargetNodeId()
Expand Down
Expand Up @@ -703,12 +703,11 @@ public void setTableExtractedCount(Map<String, Map<String, Long>> tableExtracted
this.tableExtractedCount = tableExtractedCount;
}

public boolean isBulkLoadFlag() {
return bulkLoadFlag;
}

public void setBulkLoadFlag(boolean bulkLoadFlag) {
this.bulkLoadFlag = bulkLoadFlag;
}

public boolean isBulkLoadFlag() {
return bulkLoadFlag;
}

public void setBulkLoadFlag(boolean bulkLoadFlag) {
this.bulkLoadFlag = bulkLoadFlag;
}
}
Expand Up @@ -259,12 +259,11 @@ public long getTransformLoadMillis() {
return transformLoadMillis;
}

public boolean isBulkLoaderFlag() {
return bulkLoaderFlag;
}

public void setBulkLoaderFlag(boolean bulkLoaderFlag) {
this.bulkLoaderFlag = bulkLoaderFlag;
}

public boolean isBulkLoaderFlag() {
return bulkLoaderFlag;
}

public void setBulkLoaderFlag(boolean bulkLoaderFlag) {
this.bulkLoaderFlag = bulkLoaderFlag;
}
}
Expand Up @@ -50,7 +50,7 @@ public class TableReloadStatus {
protected Date lastUpdateTime = new Date();
protected int numBatchesBulkLoaded;

@Override
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
Expand Down Expand Up @@ -307,12 +307,12 @@ public int getTotalBatchCount() {
public int getTotalBatchLoaded() {
return this.setupBatchLoaded + this.dataBatchLoaded + this.finalizeBatchLoaded;
}

public int getNumBatchesBulkLoaded() {
return numBatchesBulkLoaded;
}
return numBatchesBulkLoaded;
}

public void setNumBatchesBulkLoaded(int numBatchesBulkLoaded) {
this.numBatchesBulkLoaded = numBatchesBulkLoaded;
}
public void setNumBatchesBulkLoaded(int numBatchesBulkLoaded) {
this.numBatchesBulkLoaded = numBatchesBulkLoaded;
}
}
Expand Up @@ -450,7 +450,7 @@ batchId, batchCount, batchId, batchCount, batchId, batchCount, new Date(),
new int[] { idType, Types.NUMERIC, idType, Types.NUMERIC, idType, Types.NUMERIC,
idType, Types.NUMERIC, idType, Types.NUMERIC, idType, Types.NUMERIC, Types.TIMESTAMP,
idType, Types.NUMERIC, idType, Types.NUMERIC, idType, Types.NUMERIC, idType,
Types.VARCHAR, Types.TIMESTAMP, idType, Types.NUMERIC, idType});
Types.VARCHAR, Types.TIMESTAMP, idType, Types.NUMERIC, idType });
List<TableReloadStatus> status = transaction.query(getSql("selectTableReloadStatusByLoadId"),
new TableReloadStatusMapper(), new Object[] { loadId }, new int[] { idType });
if (status != null && status.size() > 0 && count > 0) {
Expand Down
Expand Up @@ -141,7 +141,7 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
+ " setup_batch_count <= (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end) and "
+ " finalize_batch_loaded <= (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end)) "
+ " then ? else end_time end, "
+ " data_batch_loaded = case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end, "
+ " data_batch_loaded = case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end, "
+ " setup_batch_loaded = case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end, "
+ " finalize_batch_loaded = case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end, "
+ " rows_loaded = (select case when sum(loaded_rows) is null then 0 else sum(loaded_rows) end from $(extract_request) where load_id = ? and source_node_id = ?), "
Expand Down
Expand Up @@ -184,9 +184,9 @@ public void batchInError(DataContext context, Throwable ex) {
throw ex;
}
if (context.get(ContextConstants.CONTEXT_BULK_WRITER_TO_USE) != null && context.get(ContextConstants.CONTEXT_BULK_WRITER_TO_USE).equals("bulk")) {
currentBatch.setBulkLoadFlag(false);
currentBatch.setBulkLoadFlag(false);
log.info("Bulk loading failed for this batch " + context.getBatch().getBatchId() + ", falling back to default loading. (" + ex + ")");
log.debug("Bulk loading error.", ex);
log.debug("Bulk loading error.", ex);
} else {
/*
* Reread batch to make sure it wasn't set to IG or OK
Expand Down
Expand Up @@ -215,7 +215,8 @@ public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo
outgoingBatch.getFallbackInsertCount(), outgoingBatch.getFallbackUpdateCount(), outgoingBatch.getIgnoreRowCount(),
outgoingBatch.getMissingDeleteCount(), outgoingBatch.getSkipCount(), outgoingBatch.getExtractRowCount(),
outgoingBatch.getExtractInsertRowCount(), outgoingBatch.getExtractUpdateRowCount(),
outgoingBatch.getExtractDeleteRowCount(), outgoingBatch.getTransformExtractMillis(), outgoingBatch.getTransformLoadMillis(), outgoingBatch.isBulkLoadFlag(),
outgoingBatch.getExtractDeleteRowCount(), outgoingBatch.getTransformExtractMillis(), outgoingBatch.getTransformLoadMillis(),
outgoingBatch.isBulkLoadFlag(),
outgoingBatch.getBatchId(), outgoingBatch.getNodeId() },
new int[] { Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Expand Down Expand Up @@ -257,7 +258,8 @@ public void updateOutgoingBatches(ISqlTransaction transaction, List<OutgoingBatc
outgoingBatch.getFallbackInsertCount(), outgoingBatch.getFallbackUpdateCount(), outgoingBatch.getIgnoreRowCount(),
outgoingBatch.getMissingDeleteCount(), outgoingBatch.getSkipCount(), outgoingBatch.getExtractRowCount(),
outgoingBatch.getExtractInsertRowCount(), outgoingBatch.getExtractUpdateRowCount(),
outgoingBatch.getExtractDeleteRowCount(), outgoingBatch.getTransformExtractMillis(), outgoingBatch.getTransformLoadMillis(), outgoingBatch.isBulkLoadFlag(),
outgoingBatch.getExtractDeleteRowCount(), outgoingBatch.getTransformExtractMillis(), outgoingBatch.getTransformLoadMillis(),
outgoingBatch.isBulkLoadFlag(),
outgoingBatch.getBatchId(), outgoingBatch.getNodeId() }, types);
if (++count >= flushSize) {
transaction.flush();
Expand Down
Expand Up @@ -56,16 +56,16 @@ public enum BatchType {
protected Statistics statistics;
protected boolean invalidRetry = false;
protected boolean bulkLoaderFlag;

public boolean isBulkLoaderFlag() {
return bulkLoaderFlag;
}
return bulkLoaderFlag;
}

public void setBulkLoaderFlag(boolean bulkLoaderFlag) {
this.bulkLoaderFlag = bulkLoaderFlag;
}
public void setBulkLoaderFlag(boolean bulkLoaderFlag) {
this.bulkLoaderFlag = bulkLoaderFlag;
}

protected Map<String, Long> timers = new HashMap<String, Long>();
protected Map<String, Long> timers = new HashMap<String, Long>();

public Batch(BatchType batchType, long batchId, String channelId, BinaryEncoding binaryEncoding, String sourceNodeId, String targetNodeId, boolean common) {
this.batchType = batchType;
Expand Down

0 comments on commit aeb6db6

Please sign in to comment.