Skip to content

Commit

Permalink
0003282: Common batch extracts may fail when a cluster is using a shared
Browse files Browse the repository at this point in the history
staging area
  • Loading branch information
mmichalek committed Oct 17, 2017
1 parent 05cc777 commit 50da75c
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -53,6 +54,7 @@

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.jumpmind.db.io.DatabaseXmlUtil;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Database;
Expand Down Expand Up @@ -101,6 +103,7 @@
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagedResource.State;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.io.stage.StagingFileLock;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.ChannelMap;
import org.jumpmind.symmetric.model.Data;
Expand Down Expand Up @@ -148,10 +151,13 @@
import org.jumpmind.symmetric.transport.IOutgoingTransport;
import org.jumpmind.symmetric.transport.TransportUtils;
import org.jumpmind.symmetric.util.SymmetricUtils;
import org.jumpmind.util.AppUtils;
import org.jumpmind.util.CustomizableThreadFactory;
import org.jumpmind.util.FormatUtils;
import org.jumpmind.util.Statistics;

import sun.java2d.BackBufferCapsProvider;

/**
* @see IDataExtractorService
*/
Expand Down Expand Up @@ -186,7 +192,7 @@ protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY };

private IClusterService clusterService;

private Map<String, Semaphore> locks = new HashMap<String, Semaphore>();
private Map<String, Semaphore> locks = new ConcurrentHashMap<String, Semaphore>();

private CustomizableThreadFactory threadPoolFactory;

Expand Down Expand Up @@ -783,9 +789,9 @@ protected FutureOutgoingBatch extractBatch(OutgoingBatch extractBatch, FutureExt

if (status.byteExtractCount >= maxBytesToSync && status.batchExtractCount < activeBatches.size()) {
log.info(
"Reached the total byte threshold after {} of {} batches were extracted for node '{}'. "
+ "The remaining batches will be extracted on a subsequent sync",
new Object[] { status.batchExtractCount, activeBatches.size(), targetNode.getNodeId() });
"Reached the total byte threshold after {} of {} batches were extracted for node '{}' (extracted {} bytes, the max is {}). "
+ "The remaining batches will be extracted on a subsequent sync.",
new Object[] { status.batchExtractCount, activeBatches.size(), targetNode.getNodeId(), status.byteExtractCount, maxBytesToSync });
status.shouldExtractSkip = true;
}
} catch (Exception e) {
Expand Down Expand Up @@ -869,22 +875,10 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
if (currentBatch.getStatus() == Status.IG) {
cleanupIgnoredBatch(sourceNode, targetNode, currentBatch, writer);
} else if (!isPreviouslyExtracted(currentBatch, true)) {
String semaphoreKey = useStagingDataWriter ? Long.toString(currentBatch
.getBatchId()) : currentBatch.getNodeBatchId();
Semaphore lock = null;

BatchLock lock = null;
try {
synchronized (locks) {
lock = locks.get(semaphoreKey);
if (lock == null) {
lock = new Semaphore(1);
locks.put(semaphoreKey, lock);
}
try {
lock.acquire();
} catch (InterruptedException e) {
throw new org.jumpmind.exception.InterruptedException(e);
}
}
lock = acquireLock(currentBatch, useStagingDataWriter);

if (!isPreviouslyExtracted(currentBatch, true)) {
currentBatch.setExtractCount(currentBatch.getExtractCount() + 1);
Expand Down Expand Up @@ -928,10 +922,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
if (resource != null) {
resource.setState(State.DONE);
}
lock.release();
synchronized (locks) {
locks.remove(semaphoreKey);
}
releaseLock(lock, currentBatch, useStagingDataWriter);
}
}

Expand Down Expand Up @@ -966,6 +957,104 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
processInfo.incrementCurrentBatchCount();
return currentBatch;
}

protected String getSemaphoreKey(OutgoingBatch batch, boolean useStagingDataWriter) {
return useStagingDataWriter ? Long.toString(batch.getBatchId()) : batch.getNodeBatchId();
}

private BatchLock acquireLock(OutgoingBatch batch, boolean useStagingDataWriter) {
String semaphoreKey = getSemaphoreKey(batch, useStagingDataWriter);

BatchLock batchLock = new BatchLock(semaphoreKey);

Semaphore lock = locks.get(semaphoreKey);
if (lock == null) {
lock = new Semaphore(1);
locks.put(semaphoreKey, lock);
}
try {
lock.acquire(); // In-memory, intra-process lock.
batchLock.inMemoryLock = lock;
if (isStagingFileLockRequired(batch)) { // File-system, inter-process lock for clustering.
StagingFileLock fileLock = acquireStagingFileLock(batch);
if (fileLock.isAcquired()) {
batchLock.fileLock = fileLock;
} else { // Didn't get the fileLock, ditch the in-memory lock as well.
locks.remove(semaphoreKey);
lock.release();
batchLock.inMemoryLock = null;
throw new SymmetricException("Failed to get extract lock on batch " + batch.getNodeBatchId());
}
}
} catch (InterruptedException e) {
throw new org.jumpmind.exception.InterruptedException(e);
}

return batchLock;
}

protected StagingFileLock acquireStagingFileLock(OutgoingBatch batch) {
boolean stagingFileAcquired = false;
long startTimeMs = System.currentTimeMillis();

StagingFileLock fileLock = null;

int iterations = 0;

while (!stagingFileAcquired) {
fileLock = stagingManager.acquireFileLock(getLockingServerInfo(), Constants.STAGING_CATEGORY_OUTGOING,
batch.getStagedLocation(), batch.getBatchId());
stagingFileAcquired = fileLock.isAcquired();
if (!stagingFileAcquired) {
if (fileLock.getLockFile() == null) {
log.warn("Staging lock file not acquired " + fileLock.getLockFailureMessage());
return fileLock;
}
long lockAge = fileLock.getLockAge();
if (lockAge >= parameterService.getLong(ParameterConstants.LOCK_TIMEOUT_MS)) {
log.warn("Lock {} in place for {} > about to BREAK the lock.", fileLock.getLockFile(), DurationFormatUtils.formatDurationWords(lockAge, true, true));
fileLock.breakLock();
} else {
if ((iterations % 10) == 0) {
log.info("Lock {} in place for {}, waiting...", fileLock.getLockFile(), DurationFormatUtils.formatDurationWords(lockAge, true, true));
} else {
log.debug("Lock {} in place for {}, waiting...", fileLock.getLockFile(), DurationFormatUtils.formatDurationWords(lockAge, true, true));
}
try {
Thread.sleep(parameterService.getLong(ParameterConstants.LOCK_WAIT_RETRY_MILLIS));
} catch (InterruptedException ex) {
log.debug("Interrupted.", ex);
}
}
}
iterations++;
}

return fileLock;
}

private String getLockingServerInfo() {
return String.format("Server: '%s' Host: '%s' IP: '%s'", clusterService.getServerId(), AppUtils.getHostName(), AppUtils.getIpAddress());
}

protected void releaseLock(BatchLock lock, OutgoingBatch batch, boolean useStagingDataWriter) {
if (lock != null) {
if (lock.inMemoryLock != null) {
lock.inMemoryLock.release();
locks.remove(lock.semaphoreKey);
}
if (lock.fileLock != null) {
lock.fileLock.releaseLock();
}
}
}

protected boolean isStagingFileLockRequired(OutgoingBatch batch) {
return batch.isCommonFlag()
&& parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED)
&& parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)
&& parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB);
}

protected void triggerReExtraction(OutgoingBatch currentBatch) {
// Allow user to reset batch status to NE in the DB to trigger a batch re-extract
Expand Down Expand Up @@ -1034,6 +1123,8 @@ protected void cleanupIgnoredBatch(Node sourceNode, Node targetNode, OutgoingBat
writer.close();
}
}



protected IStagedResource getStagedResource(OutgoingBatch currentBatch) {
return stagingManager.find(Constants.STAGING_CATEGORY_OUTGOING,
Expand Down Expand Up @@ -2235,6 +2326,15 @@ public boolean isRetry() {
return isRetry;
}
}

class BatchLock {
public BatchLock(String semaphoreKey) {
this.semaphoreKey = semaphoreKey;
}
String semaphoreKey;
Semaphore inMemoryLock;
StagingFileLock fileLock;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ protected void print(Batch batch, String data) {
int size = data.length();
for (int i = 0; i < size; i = i + 1024) {
int end = i + 1024;
// try {
// Thread.sleep(5);
// } catch (InterruptedException ex) {
// }
writer.append(data, i, end < size ? end : size);
}
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ public interface IStagingManager {

public Set<String> getResourceReferences();

public StagingFileLock acquireFileLock(String serverInfo, Object... path);

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.StringReader;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
Expand All @@ -47,7 +48,7 @@ public class StagedResource implements IStagedResource {

static final Logger log = LoggerFactory.getLogger(StagedResource.class);

private int references = 0;
private AtomicInteger references = new AtomicInteger(0);

private File directory;

Expand Down Expand Up @@ -103,18 +104,18 @@ protected static String toPath(File directory, File file) {

@Override
public void reference() {
references++;
references.incrementAndGet();
log.debug("Increased reference to {} for {} by {}", references, path, Thread.currentThread().getName());
}

@Override
public void dereference() {
references--;
references.decrementAndGet();
log.debug("Decreased reference to {} for {} by {}", references, path, Thread.currentThread().getName());
}

public boolean isInUse() {
return references > 0 || (readers != null && readers.size() > 0) || writer != null ||
return references.get() > 0 || (readers != null && readers.size() > 0) || writer != null ||
(inputStreams != null && inputStreams.size() > 0) ||
outputStream != null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.io.stage;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileTime;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StagingFileLock {

protected static final Logger log = LoggerFactory.getLogger(StagingFileLock.class);

boolean acquired = false;
private File lockFile;
private String lockFailureMessage;

public long getLockAge() {
if (lockFile != null) {
FileTime lastModifiedTime;
try {
lastModifiedTime = Files.getLastModifiedTime(lockFile.toPath());
return System.currentTimeMillis() - lastModifiedTime.toMillis();
} catch (IOException ex) {
if (log.isDebugEnabled()) {
log.debug("Failed to get last modified time for file " + lockFile, ex);
}
return 0;
}
} else {
return 0;
}
}

public boolean isAcquired() {
return acquired;
}
public void setAcquired(boolean acquired) {
this.acquired = acquired;
}
public File getLockFile() {
return lockFile;
}
public void setLockFile(File lockFile) {
this.lockFile = lockFile;
}
public String getLockFailureMessage() {
return lockFailureMessage;
}
public void setLockFailureMessage(String lockFailureMessage) {
this.lockFailureMessage = lockFailureMessage;
}

public void releaseLock() {
if (lockFile.delete()) {
log.debug("Lock {} released successfully.", lockFile);
} else {
log.warn("Failed to release lock {}", lockFile);
}
}

public void breakLock() {
if (lockFile.delete()) {
log.info("Lock {} broken successfully.", lockFile);
} else {
log.warn("Failed to break lock {}", lockFile);
}
}

@Override
public String toString() {
return String.format("%s [%s]", super.toString(), lockFile);
}
}
Loading

0 comments on commit 50da75c

Please sign in to comment.