From f1da057b4f7826cb2f82326f5c1f524a586a5ef9 Mon Sep 17 00:00:00 2001 From: Eric Long Date: Tue, 5 Feb 2019 10:51:41 -0500 Subject: [PATCH] 0003785: Improve visibility of full and partial loads --- .../service/IOutgoingBatchService.java | 2 + .../service/impl/AcknowledgeService.java | 4 +- .../service/impl/DataExtractorService.java | 232 +++++++++++------- .../impl/DataExtractorServiceSqlMap.java | 15 +- .../service/impl/DataLoaderService.java | 3 +- .../impl/FileSyncExtractorService.java | 4 +- .../service/impl/MultiBatchStagingWriter.java | 75 +++--- .../service/impl/OutgoingBatchService.java | 7 + .../impl/OutgoingBatchServiceSqlMap.java | 6 +- .../symmetric/io/data/DataProcessor.java | 5 +- 10 files changed, 213 insertions(+), 140 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index c732ddb200..cbf5f42bab 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -79,6 +79,8 @@ public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, N public void updateOutgoingBatch(OutgoingBatch batch); + public void updateOutgoingBatchStatus(ISqlTransaction transaction, Status status, String nodeId, long startBatchId, long endBatchId); + public void updateCommonBatchExtractStatistics(OutgoingBatch batch); public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java index 9d7472281c..550491f18e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java @@ -74,7 +74,7 @@ public BatchAckResult ack(final BatchAck batch) { .findOutgoingBatch(batch.getBatchId(), batch.getNodeId()); Status status = batch.isResend() ? Status.RS : batch.isOk() ? Status.OK : Status.ER; Status oldStatus = null; - if (outgoingBatch != null) { + if (outgoingBatch != null && outgoingBatch.getStatus() != Status.RQ) { // Allow an outside system/user to indicate that a batch // is OK. if (outgoingBatch.getStatus() != Status.OK && @@ -182,7 +182,7 @@ public BatchAckResult ack(final BatchAck batch) { } engine.getStatisticManager().removeRouterStatsByBatch(batch.getBatchId()); } - } else { + } else if (outgoingBatch == null) { log.error("Could not find batch {}-{} to acknowledge as {}", new Object[] {batch.getNodeId(), batch.getBatchId(), status.name()}); result.setOk(false); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index ec348ac02e..32fecfc7fe 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -843,7 +843,7 @@ protected List extract(final ProcessInfo extractInfo, final Node resource.delete(); } } - if (e.getCause() instanceof InterruptedException) { + if (e.getCause() instanceof InterruptedException || e.getCause() instanceof CancellationException) { log.info("Extract of batch {} was interrupted", currentBatch); } else if (e instanceof StagingLowFreeSpace) { log.error("Extract is disabled because disk is almost full: {}", e.getMessage()); @@ -905,14 +905,14 @@ protected FutureOutgoingBatch extractBatch(OutgoingBatch extractBatch, FutureExt * the batch must have been purged. it needs to be * re-extracted */ - log.info("Batch {} is marked as ready but it has been deleted. Rescheduling it for extraction", + log.info("Batch {} is marked as ready but it is missing in staging. Rescheduling it for extraction.", extractBatch.getNodeBatchId()); if (mode != ExtractMode.EXTRACT_ONLY) { resetExtractRequest(extractBatch); } status.shouldExtractSkip = outgoingBatch.isExtractSkipped = true; } else if (extractBatch.getStatus() == Status.RQ) { - log.info("Batch {} is not ready for delivery. It is currently scheduled for extraction", + log.info("Batch {} is not ready for delivery. It is currently scheduled for extraction.", extractBatch.getNodeBatchId()); status.shouldExtractSkip = outgoingBatch.isExtractSkipped = true; } @@ -1905,6 +1905,11 @@ protected List getExtractRequestsForNode(NodeCommunication nodeC ExtractRequest.ExtractStatus.NE.name()); } + protected ExtractRequest getExtractRequestForBatch(OutgoingBatch batch) { + return sqlTemplate.queryForObject(getSql("selectExtractRequestForBatchSql"), + new ExtractRequestMapper(), batch.getBatchId(), batch.getBatchId(), batch.getNodeId(), batch.getLoadId()); + } + protected Map> getExtractChildRequestsForNode(NodeCommunication nodeCommunication, List parentRequests) { Map> requests = new HashMap>(); @@ -1923,29 +1928,33 @@ protected Map> getExtractChildRequestsForNode(NodeCom return requests; } + protected List getExtractChildRequestsForNode(ExtractRequest parentRequest) { + return sqlTemplate.query(getSql("selectExtractChildRequestsByParentSql"), new ExtractRequestMapper(), parentRequest.getRequestId()); + } + @Override public void resetExtractRequest(OutgoingBatch batch) { - ISqlTransaction transaction = null; - try { - transaction = sqlTemplate.startSqlTransaction(); - batch.setStatus(Status.RQ); - outgoingBatchService.updateOutgoingBatch(transaction, batch); - - transaction.prepareAndExecute(getSql("resetExtractRequestStatus"), ExtractStatus.NE.name(), - batch.getBatchId(), batch.getBatchId(), batch.getNodeId()); - transaction.commit(); - } catch (Error ex) { - if (transaction != null) { - transaction.rollback(); + ExtractRequest request = getExtractRequestForBatch(batch); + if (request != null) { + List infos = statisticManager.getProcessInfos(); + for (ProcessInfo info : infos) { + if (info.getProcessType().equals(ProcessType.INITIAL_LOAD_EXTRACT_JOB) && + request.getNodeId().equals(info.getTargetNodeId()) && + info.getCurrentBatchId() >= request.getStartBatchId() && + info.getCurrentBatchId() <= request.getEndBatchId()) { + log.info("Sending interrupt to " + info.getKey().toString()); + info.getThread().interrupt(); + } } - throw ex; - } catch (RuntimeException ex) { - if (transaction != null) { - transaction.rollback(); + + List batches = outgoingBatchService.getOutgoingBatchRange(request.getStartBatchId(), request.getEndBatchId()).getBatches(); + List childRequests = null; + if (request.getParentRequestId() == 0) { + childRequests = getExtractChildRequestsForNode(request); } - throw ex; - } finally { - close(transaction); + restartExtractRequest(batches, request, childRequests); + } else { + log.warn("Unable to find extract request for node {} batch {} load {}", batch.getNodeId(), batch.getBatchId(), batch.getLoadId()); } } @@ -2018,10 +2027,9 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status } Node identity = nodeService.findIdentity(); Node targetNode = nodeService.findNode(nodeCommunication.getNodeId(), true); - log.info("Begin extracting table {} request {} batches {} through {}.", - new Object[] { request.getTableName(), request.getRequestId(), request.getStartBatchId(), request.getEndBatchId() }); - List batches = outgoingBatchService.getOutgoingBatchRange( - request.getStartBatchId(), request.getEndBatchId()).getBatches(); + log.info("Starting request {} to extract table {} into batches {} through {} for node {}.", + new Object[] { request.getRequestId(), request.getTableName(), request.getStartBatchId(), request.getEndBatchId(), request.getNodeId() }); + List batches = outgoingBatchService.getOutgoingBatchRange(request.getStartBatchId(), request.getEndBatchId()).getBatches(); ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(identity .getNodeId(), nodeCommunication.getQueue(), nodeCommunication.getNodeId(), @@ -2030,71 +2038,41 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status List childRequests = allChildRequests.get(request.getRequestId()); try { - boolean areBatchesOk = true; - - /* - * check to see if batches have been OK'd by another reload - * request - */ + boolean isCanceled = true; + boolean isRestarted = false; for (OutgoingBatch outgoingBatch : batches) { - if (outgoingBatch.getStatus() != Status.OK) { - areBatchesOk = false; - break; + if (outgoingBatch.getStatus() != Status.OK && outgoingBatch.getStatus() != Status.IG) { + isCanceled = false; + } + if (outgoingBatch.getStatus() != Status.RQ) { + isRestarted = true; } } - if (!areBatchesOk) { + if (!isCanceled) { - Channel channel = configurationService - .getChannel(batches.get(0).getChannelId()); + Channel channel = configurationService.getChannel(batches.get(0).getChannelId()); /* * "Trick" the extractor to extract one reload batch, but we * will split it across the N batches when writing it */ OutgoingBatch firstBatch = batches.get(0); processInfo.setCurrentLoadId(firstBatch.getLoadId()); - IStagedResource resource = getStagedResource(firstBatch); - if (resource != null && resource.exists() && resource.getState() != State.CREATE) { - resource.delete(); + + if (isRestarted) { + restartExtractRequest(batches, request, childRequests); } - + MultiBatchStagingWriter multiBatchStagingWriter = - buildMultiBatchStagingWriter(request, childRequests, identity, targetNode, batches, processInfo, channel); + buildMultiBatchStagingWriter(request, childRequests, identity, targetNode, batches, processInfo, channel, isRestarted); extractOutgoingBatch(processInfo, targetNode, multiBatchStagingWriter, firstBatch, false, false, ExtractMode.FOR_SYM_CLIENT, new ClusterLockRefreshListener(clusterService)); - - for (OutgoingBatch outgoingBatch : batches) { - resource = getStagedResource(outgoingBatch); - if (resource != null) { - resource.setState(State.DONE); - } - } checkSendDeferredConstraints(request, targetNode, firstBatch); } else { - log.info("Batches already had an OK status for table {} request {} batches {} through {}. Not extracting", - new Object[] { request.getTableName(), request.getRequestId(), request.getStartBatchId(), request.getEndBatchId() }); - } - - /* - * re-query the batches to see if they have been OK'd while - * extracting - */ - List checkBatches = outgoingBatchService.getOutgoingBatchRange( - request.getStartBatchId(), request.getEndBatchId()).getBatches(); - - areBatchesOk = true; - - /* - * check to see if batches have been OK'd by another reload - * request while extracting - */ - for (OutgoingBatch outgoingBatch : checkBatches) { - if (outgoingBatch.getStatus() != Status.OK) { - areBatchesOk = false; - break; - } + log.info("Batches already had an OK status for request {} to extract table {} for batches {} through {} for node {}. Not extracting.", + new Object[] { request.getRequestId(), request.getTableName(), request.getStartBatchId(), request.getEndBatchId(), request.getNodeId() }); } ISqlTransaction transaction = null; @@ -2107,8 +2085,8 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status } } transaction.commit(); - log.info("Done extracting table {} request {} batches {} through {}", - request.getTableName(), request.getRequestId(), request.getStartBatchId(), request.getEndBatchId()); + log.info("Done with request {} to extract table {} into batches {} through {} for node {}", + request.getRequestId(), request.getTableName(), request.getStartBatchId(), request.getEndBatchId(), request.getNodeId()); } catch (Error ex) { if (transaction != null) { transaction.rollback(); @@ -2127,26 +2105,16 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status processInfo.setStatus(ProcessInfo.ProcessStatus.OK); } catch (CancellationException ex) { - log.info("Interrupted extract for table {} request {} batches {} through {}", - new Object[] { request.getTableName(), request.getRequestId(), request.getStartBatchId(), request.getEndBatchId() }); + log.info("Interrupted on request {} to extract table {} for batches {} through {} for node {}", + new Object[] { request.getRequestId(), request.getTableName(), request.getStartBatchId(), request.getEndBatchId(), request.getNodeId() }); processInfo.setStatus(ProcessInfo.ProcessStatus.OK); } catch (RuntimeException ex) { - log.warn("Failed to extract batches for table {} request {} batches {} through {}", - new Object[] { request.getTableName(), request.getRequestId(), request.getStartBatchId(), request.getEndBatchId() }); + log.warn("Failed on request {} to extract table {} into batches {} through {} for node {}", + new Object[] { request.getRequestId(), request.getTableName(), request.getStartBatchId(), request.getEndBatchId(), request.getNodeId() }); processInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); - List checkBatches = outgoingBatchService.getOutgoingBatchRange( - request.getStartBatchId(), request.getEndBatchId()).getBatches(); - for (OutgoingBatch outgoingBatch : checkBatches) { - outgoingBatch.setStatus(Status.RQ); - IStagedResource resource = getStagedResource(outgoingBatch); - if (resource != null) { - resource.close(); - resource.delete(); - } - outgoingBatchService.updateOutgoingBatch(outgoingBatch); - } if (ex instanceof StagingLowFreeSpace) { log.error("Extract load is disabled because disk is almost full: {}", ex.getMessage()); + break; } else { throw ex; } @@ -2154,6 +2122,88 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status } } + protected void restartExtractRequest(List batches, ExtractRequest request, List childRequests) { + /* + * This extract request was interrupted and must start over + */ + log.info("Resetting status of request {} to extract table {} into batches {} through {} for node {}", + request.getRequestId(), request.getTableName(), request.getStartBatchId(), request.getEndBatchId(), request.getNodeId()); + + long batchLoadedCount = 0; + if (request.getLastLoadedBatchId() > 0) { + batchLoadedCount = request.getLastLoadedBatchId() - request.getStartBatchId() + 1; + } + long rowLoadedCount = request.getLoadedRows(); + + List allRequests = new ArrayList(); + allRequests.add(request); + if (childRequests != null) { + allRequests.addAll(childRequests); + } + + ISqlTransaction transaction = null; + try { + transaction = sqlTemplate.startSqlTransaction(); + + for (ExtractRequest extractRequest : allRequests) { + + // reset statistics for extract request + transaction.prepareAndExecute(getSql("restartExtractRequest"), ExtractStatus.NE.name(), extractRequest.getRequestId(), extractRequest.getNodeId()); + + // back out statistics from table reload request + if (batchLoadedCount > 0 || rowLoadedCount > 0) { + dataService.updateTableReloadRequestsLoadedCounts(transaction, extractRequest.getLoadId(), (int) batchLoadedCount * -1, rowLoadedCount * -1); + } + + // set status of batches back to requested + outgoingBatchService.updateOutgoingBatchStatus(transaction, Status.RQ, extractRequest.getNodeId(), extractRequest.getStartBatchId(), + extractRequest.getEndBatchId()); + } + + transaction.commit(); + } catch (Error ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } catch (RuntimeException ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } finally { + close(transaction); + } + + // remove the batches from staging + for (OutgoingBatch outgoingBatch : batches) { + IStagedResource resource = getStagedResource(outgoingBatch); + if (resource != null) { + resource.delete(); + } + if (childRequests != null) { + long batchIndex = outgoingBatch.getBatchId() - request.getStartBatchId(); + for (ExtractRequest extractRequest : childRequests) { + OutgoingBatch childBatch = new OutgoingBatch(extractRequest.getNodeId(), outgoingBatch.getChannelId(), outgoingBatch.getStatus()); + childBatch.setBatchId(outgoingBatch.getBatchId() + batchIndex); + resource = getStagedResource(childBatch); + if (resource != null) { + resource.delete(); + } + } + } + } + + // clear the incoming batch table for the batches at the target node, so the batches won't be skipped + for (ExtractRequest extractRequest : allRequests) { + String symNode = TableConstants.getTableName(parameterService.getTablePrefix(), TableConstants.SYM_NODE); + String symIncomingBatch = TableConstants.getTableName(parameterService.getTablePrefix(), TableConstants.SYM_INCOMING_BATCH); + String sql = "delete from " + symIncomingBatch + " where node_id = '" + nodeService.findIdentityNodeId() + + "' and batch_id between " + extractRequest.getStartBatchId() + " and " + extractRequest.getEndBatchId(); + dataService.sendSQL(extractRequest.getNodeId(), null, null, symNode, sql); + } + } + public void releaseMissedExtractRequests() { int missingCount = sqlTemplateDirty.queryForInt(getSql("countExtractChildRequestMissed"), Status.NE.name(), Status.OK.name()); if (missingCount > 0) { @@ -2202,9 +2252,9 @@ protected boolean isApplicable(NodeCommunication nodeCommunication) { } protected MultiBatchStagingWriter buildMultiBatchStagingWriter(ExtractRequest request, List childRequests, Node sourceNode, - Node targetNode, List batches, ProcessInfo processInfo, Channel channel) { + Node targetNode, List batches, ProcessInfo processInfo, Channel channel, boolean isRestarted) { MultiBatchStagingWriter multiBatchStatingWriter = new MultiBatchStagingWriter(this, request, childRequests, sourceNode.getNodeId(), stagingManager, - batches, channel.getMaxBatchSize(), processInfo); + batches, channel.getMaxBatchSize(), processInfo, isRestarted); return multiBatchStatingWriter; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java index 6a1ab170f7..507da0c958 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java @@ -34,10 +34,14 @@ public DataExtractorServiceSqlMap(IDatabasePlatform platform, putSql("selectNodeIdsForExtractSql", "select node_id, queue from $(extract_request) where status=? and parent_request_id=0 group by node_id, queue"); putSql("selectExtractRequestForNodeSql", "select * from $(extract_request) where node_id=? and queue=? and status=? and parent_request_id=0 order by request_id"); - + + putSql("selectExtractRequestForBatchSql", "select * from $(extract_request) where start_batch_id <= ? and end_batch_id >= ? and node_id = ? and load_id = ?"); + putSql("selectExtractChildRequestForNodeSql", "select c.* from $(extract_request) c " + "inner join $(extract_request) p on p.request_id = c.parent_request_id where p.node_id=? and p.queue=? and p.status=? and p.parent_request_id=0"); + putSql("selectExtractChildRequestsByParentSql", "select * from $(extract_request) where parent_request_id = ?"); + putSql("countExtractChildRequestMissed", "select count(*) from $(extract_request) where status = ? and parent_request_id > 0 " + "and parent_request_id in (select request_id from sym_extract_request where parent_request_id = 0 and status = ?)"); @@ -57,10 +61,11 @@ public DataExtractorServiceSqlMap(IDatabasePlatform platform, putSql("updateExtractRequestTransferred", "update $(extract_request) set last_transferred_batch_id=?, transferred_rows = transferred_rows + ?, transferred_millis = ?" + " where start_batch_id <= ? and end_batch_id >= ? and node_id=? and load_id=? and (last_transferred_batch_id is null or last_transferred_batch_id < ?)"); - - putSql("resetExtractRequestStatus", "update $(extract_request) set status=?, parent_request_id=0, last_update_time= current_timestamp" - + " where start_batch_id <= ? and end_batch_id >= ? and node_id=?"); - + + putSql("restartExtractRequest", "update $(extract_request) set last_transferred_batch_id = null, transferred_rows = 0, transferred_millis = 0, " + + "last_loaded_batch_id = null, loaded_rows = 0, loaded_millis = 0, parent_request_id = 0, status = ? " + + "where request_id = ? and node_id = ?"); + putSql("cancelExtractRequests", "update $(extract_request) set status=?, last_update_time=current_timestamp where load_id = ?"); putSql("selectIncompleteTablesForExtractByLoadId", "select * from $(extract_request) where load_id = ? and loaded_time is null order by request_id desc"); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index fb79a6af9a..b725c32f15 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -671,6 +671,7 @@ protected IDataWriter chooseDataWriter(Batch batch) { } protected void logOrRethrow(Throwable ex) throws IOException { + // Throwing exception will mean acks are not sent, so only certain exceptions should be thrown if (ex instanceof RegistrationRequiredException) { throw (RegistrationRequiredException) ex; } else if (ex instanceof ConnectException) { @@ -689,8 +690,6 @@ protected void logOrRethrow(Throwable ex) throws IOException { throw (SyncDisabledException) ex; } else if (ex instanceof HttpException) { throw (HttpException) ex; - } else if (ex instanceof IOException) { - throw (IOException) ex; } else if (ex instanceof InvalidRetryException) { throw (InvalidRetryException) ex; } else if (ex instanceof StagingLowFreeSpace) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java index 1f0e8ac8f6..dc3199ba9c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java @@ -114,9 +114,9 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe @Override protected MultiBatchStagingWriter buildMultiBatchStagingWriter(ExtractRequest request, List childRequests, final Node sourceNode, final Node targetNode, - List batches, ProcessInfo processInfo, Channel channel) { + List batches, ProcessInfo processInfo, Channel channel, boolean isRestarted) { MultiBatchStagingWriter multiBatchStagingWriter = new MultiBatchStagingWriter(this, request, childRequests, sourceNode.getNodeId(), stagingManager, - batches, channel.getMaxBatchSize(), processInfo) { + batches, channel.getMaxBatchSize(), processInfo, isRestarted) { @Override protected IDataWriter buildWriter() { IStagedResource stagedResource = stagingManager.create( diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/MultiBatchStagingWriter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/MultiBatchStagingWriter.java index c6d312c185..1e541b78b4 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/MultiBatchStagingWriter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/MultiBatchStagingWriter.java @@ -87,17 +87,17 @@ public class MultiBatchStagingWriter implements IDataWriter { protected ProcessInfo processInfo; protected long startTime, ts, rowCount, byteCount; - - protected boolean cancelled = false; protected List childRequests; protected Map childBatches; protected long memoryThresholdInBytes; + + protected boolean isRestarted; public MultiBatchStagingWriter(DataExtractorService dataExtractorService, ExtractRequest request, List childRequests, String sourceNodeId, - IStagingManager stagingManager, List batches, long maxBatchSize, ProcessInfo processInfo) { + IStagingManager stagingManager, List batches, long maxBatchSize, ProcessInfo processInfo, boolean isRestarted) { this.dataExtractorService = dataExtractorService; this.request = request; this.sourceNodeId = sourceNodeId; @@ -110,6 +110,7 @@ public MultiBatchStagingWriter(DataExtractorService dataExtractorService, Extrac this.childRequests = childRequests; this.memoryThresholdInBytes = this.dataExtractorService.parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD); this.childBatches = new HashMap(); + this.isRestarted = isRestarted; } @Override @@ -117,7 +118,7 @@ public void open(DataContext context) { this.context = context; this.nextBatch(); this.currentDataWriter = buildWriter(); - this.currentDataWriter.open(context); + this.currentDataWriter.open(context); } protected IDataWriter buildWriter() { @@ -127,18 +128,14 @@ protected IDataWriter buildWriter() { @Override public void close() { - while (!cancelled && batches.size() > 0 && table != null) { + while (!inError && batches.size() > 0 && table != null) { startNewBatch(); end(this.table); end(this.batch, false); - } - if (table == null && batch != null) { log.debug("Batch {} is empty", new Object[] { batch.getNodeBatchId() }); - - this.currentDataWriter.end(batch, false); - Statistics stats = this.closeCurrentDataWriter(); + Statistics stats = closeCurrentDataWriter(); checkSend(stats); - } + } closeCurrentDataWriter(); } @@ -150,7 +147,14 @@ private Statistics closeCurrentDataWriter() { this.outgoingBatch.setExtractMillis(System.currentTimeMillis() - batch.getStartTime().getTime()); this.currentDataWriter.close(); this.currentDataWriter = null; - checkSend(stats); + if (inError) { + IStagedResource resource = this.dataExtractorService.getStagedResource(outgoingBatch); + if (resource != null) { + resource.delete(); + } + } else { + checkSend(stats); + } } return stats; } @@ -196,34 +200,33 @@ public void write(CsvData data) { startNewBatch(); } if (System.currentTimeMillis() - ts > 60000) { + long currentRowCount = rowCount + this.currentDataWriter.getStatistics().get(batch).get(DataWriterStatisticConstants.ROWCOUNT); + long currentByteCount = byteCount + this.currentDataWriter.getStatistics().get(batch).get(DataWriterStatisticConstants.BYTECOUNT); this.dataExtractorService.log.info( - "Extracting table {} request {} for {} seconds, {} batches, {} rows, and {} bytes. Current batch {} of batches {} through {}.", - request.getTableName(), request.getRequestId(), (System.currentTimeMillis() - startTime) / 1000, finishedBatches.size(), rowCount, byteCount, - batch.getBatchId(), request.getStartBatchId(), request.getEndBatchId()); + "Extract request {} for table {} extracting for {} seconds, {} batches, {} rows, and {} bytes. Current batch is {} in range {}-{}.", + request.getRequestId(), request.getTableName(), (System.currentTimeMillis() - startTime) / 1000, finishedBatches.size() + 1, + currentRowCount, currentByteCount, batch.getBatchId(), request.getStartBatchId(), request.getEndBatchId()); ts = System.currentTimeMillis(); } } public void checkSend(Statistics stats) { - if (this.outgoingBatch.getStatus() != Status.OK) { - IStagedResource resource = this.dataExtractorService.getStagedResource(outgoingBatch); - if (resource != null) { - resource.setState(State.DONE); - } - OutgoingBatch batchFromDatabase = this.dataExtractorService.outgoingBatchService.findOutgoingBatch(outgoingBatch.getBatchId(), - outgoingBatch.getNodeId()); + IStagedResource resource = this.dataExtractorService.getStagedResource(outgoingBatch); + if (resource != null) { + resource.setState(State.DONE); + } + OutgoingBatch batchFromDatabase = this.dataExtractorService.outgoingBatchService.findOutgoingBatch(outgoingBatch.getBatchId(), + outgoingBatch.getNodeId()); - if (batchFromDatabase.getIgnoreCount() == 0) { - this.outgoingBatch.setStatus(Status.NE); - } else { - cancelled = true; - throw new CancellationException(); - } - + if (!batchFromDatabase.getStatus().equals(Status.OK) && !batchFromDatabase.getStatus().equals(Status.IG)) { + this.outgoingBatch.setStatus(Status.NE); checkSendChildRequests(batchFromDatabase, resource, stats); + this.dataExtractorService.outgoingBatchService.updateOutgoingBatch(this.outgoingBatch); + } else { + // The user canceled a batch before it tried to load, so they probably canceled all batches. + log.info("User cancelled batches, so cancelling extract request"); + throw new CancellationException(); } - - this.dataExtractorService.outgoingBatchService.updateOutgoingBatch(this.outgoingBatch); } protected void checkSendChildRequests(OutgoingBatch parentBatch, IStagedResource parentResource, Statistics stats) { @@ -261,11 +264,13 @@ protected void checkSendChildRequests(OutgoingBatch parentBatch, IStagedResource OutgoingBatch childBatch = this.dataExtractorService.outgoingBatchService.findOutgoingBatch(childBatchId, childRequest.getNodeId()); childBatch.setExtractStartTime(startExtractTime); childBatch.setExtractMillis(System.currentTimeMillis() - startExtractTime.getTime()); - childBatch.setDataRowCount(stats.get(DataWriterStatisticConstants.ROWCOUNT)); - childBatch.setDataInsertRowCount(stats.get(DataWriterStatisticConstants.INSERTCOUNT)); - childBatch.setByteCount(stats.get(DataWriterStatisticConstants.BYTECOUNT)); + if (stats != null) { + childBatch.setDataRowCount(stats.get(DataWriterStatisticConstants.ROWCOUNT)); + childBatch.setDataInsertRowCount(stats.get(DataWriterStatisticConstants.INSERTCOUNT)); + childBatch.setByteCount(stats.get(DataWriterStatisticConstants.BYTECOUNT)); + } - if (childBatch.getIgnoreCount() == 0) { + if (!childBatch.getStatus().equals(Status.OK) && !childBatch.getStatus().equals(Status.IG)) { childBatch.setStatus(Status.NE); this.dataExtractorService.outgoingBatchService.updateOutgoingBatch(childBatch); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index 4983f46532..1d673f5ef3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -251,6 +251,13 @@ public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo symmetricDialect.getSqlTypeForIds(), Types.VARCHAR }); } + public void updateOutgoingBatchStatus(ISqlTransaction transaction, Status status, String nodeId, long startBatchId, long endBatchId) { + transaction.prepareAndExecute(getSql("updateOutgoingBatchStatusSql"), + new Object[] { status.name(), new Date(), clusterService.getServerId(), nodeId, startBatchId, endBatchId }, + new int[] { Types.CHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, + symmetricDialect.getSqlTypeForIds(), symmetricDialect.getSqlTypeForIds() }); + } + public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) { ISqlTransaction transaction = null; try { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 21477464d9..a694b7ad10 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -68,7 +68,11 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, + " skip_count=?, extract_row_count=?, extract_insert_row_count=?, extract_update_row_count=?, extract_delete_row_count=?, " + " transform_extract_millis=?, transform_load_millis=? " + " where batch_id=? and node_id=? "); - + + putSql("updateOutgoingBatchStatusSql", + "update $(outgoing_batch) set status=?, last_update_time=?, last_update_hostname=? " + + "where node_id = ? and batch_id between ? and ?"); + putSql("updateCommonBatchExtractStatsSql", "update $(outgoing_batch) set byte_count=?, data_row_count=?, " + " data_insert_row_count=?, data_update_row_count=?, data_delete_row_count=?, other_row_count=?, " diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/DataProcessor.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/DataProcessor.java index 9da62100f0..576f979c53 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/DataProcessor.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/DataProcessor.java @@ -20,9 +20,10 @@ */ package org.jumpmind.symmetric.io.data; +import java.util.concurrent.CancellationException; + import org.jumpmind.db.model.Table; import org.jumpmind.exception.InvalidRetryException; -import org.jumpmind.exception.IoException; import org.jumpmind.symmetric.io.data.Batch.BatchType; import org.jumpmind.symmetric.io.data.writer.IgnoreBatchException; import org.jumpmind.util.Statistics; @@ -232,7 +233,7 @@ protected int forEachDataInTable(DataContext context, boolean processTable, Batc } if (Thread.currentThread().isInterrupted()) { - throw new IoException("This thread was interrupted"); + throw new CancellationException("This thread was interrupted"); } } while (currentData != null);