Skip to content
Permalink
Browse files

0003814: Better handling of low disk space

  • Loading branch information...
erilong committed Nov 30, 2018
1 parent 5c0ffcc commit 91252bc88e8f8a8273bf744696fb42e0f2344a97
@@ -433,6 +433,8 @@ private ParameterConstants() {
public final static String STAGING_MANAGER_CLASS = "staging.manager.class";

public final static String STAGING_DIR = "staging.dir";

public final static String STAGING_LOW_SPACE_THRESHOLD_MEGABYTES = "staging.low.space.threshold.megabytes";

public final static String STATISTIC_MANAGER_CLASS = "statistic.manager.class";

@@ -19,7 +19,8 @@
ISymmetricEngine engine;

public BatchStagingManager(ISymmetricEngine engine, String directory) {
super(directory,engine.getParameterService().is(ParameterConstants.CLUSTER_LOCKING_ENABLED));
super(directory, engine.getParameterService().is(ParameterConstants.CLUSTER_LOCKING_ENABLED),
engine.getParameterService().getLong(ParameterConstants.STAGING_LOW_SPACE_THRESHOLD_MEGABYTES, 0));
this.engine = engine;
}

@@ -112,6 +112,7 @@
import org.jumpmind.symmetric.io.stage.IStagedResource.State;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.io.stage.StagingFileLock;
import org.jumpmind.symmetric.io.stage.StagingLowFreeSpace;
import org.jumpmind.symmetric.load.IReloadVariableFilter;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
import org.jumpmind.symmetric.model.Channel;
@@ -837,6 +838,8 @@ public boolean extractOnlyOutgoingBatch(String nodeId, long batchId, Writer writ
}
if (e.getCause() instanceof InterruptedException) {
log.info("Extract of batch {} was interrupted", currentBatch);
} else if (e instanceof StagingLowFreeSpace) {
log.error("Extract is disabled because disk is almost full: {}", e.getMessage());
} else {
log.error("Failed to extract batch {}", currentBatch, e);
}
@@ -2028,8 +2031,12 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
resource.delete();
}
outgoingBatchService.updateOutgoingBatch(outgoingBatch);
}
throw ex;
}
if (ex instanceof StagingLowFreeSpace) {
log.error("Extract load is disabled because disk is almost full: {}", ex.getMessage());
} else {
throw ex;
}
}
}
}
@@ -95,6 +95,7 @@
import org.jumpmind.symmetric.io.stage.IStagedResource.State;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.io.stage.SimpleStagingDataWriter;
import org.jumpmind.symmetric.io.stage.StagingLowFreeSpace;
import org.jumpmind.symmetric.load.ConfigurationChangedDatabaseWriterFilter;
import org.jumpmind.symmetric.load.DefaultDataLoaderFactory;
import org.jumpmind.symmetric.load.DynamicDatabaseWriterFilter;
@@ -658,6 +659,8 @@ protected void logOrRethrow(Throwable ex) throws IOException {
throw (IOException) ex;
} else if (ex instanceof InvalidRetryException) {
throw (InvalidRetryException) ex;
} else if (ex instanceof StagingLowFreeSpace) {
log.error("Loading is disabled because disk is almost full: {}", ex.getMessage());
} else if (!(ex instanceof ConflictException) && !(ex instanceof SqlException)) {
log.error("Failed to process batch", ex);
} else {
@@ -2313,3 +2313,12 @@ allow.updates.with.results=false
# DatabaseOverridable: false
# Tags: init
staging.dir=

# Staging directory low disk space threshold in megabytes.
# When free space goes below the threshold, requests to create staging files will receive an exception.
# Data loading and extracting will stop and log an error until disk space becomes available.
# To disable free space checks, set to zero or negative.
#
# DatabaseOverridable: false
# Tags: init
staging.low.space.threshold.megabytes=100
@@ -0,0 +1,35 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.io.stage;

public class StagingLowFreeSpace extends RuntimeException {

private static final long serialVersionUID = 1L;

public StagingLowFreeSpace() {
super();
}

public StagingLowFreeSpace(String message) {
super(message);
}

}
@@ -48,17 +48,24 @@
protected Map<String, IStagedResource> inUse;

boolean clusterEnabled;

long lowFreeSpaceThresholdMegabytes;

public StagingManager(String directory, boolean clusterEnabled) {
public StagingManager(String directory, boolean clusterEnabled, long lowFreeSpaceThresholdMegabytes) {
log.info("The staging directory was initialized at the following location: " + directory);
this.directory = new File(directory);
this.directory.mkdirs();
this.resourcePaths = Collections.synchronizedSet(new TreeSet<String>());
this.inUse = new ConcurrentHashMap<String, IStagedResource>();
this.clusterEnabled = clusterEnabled;
this.lowFreeSpaceThresholdMegabytes = lowFreeSpaceThresholdMegabytes;
refreshResourceList();
}

public StagingManager(String directory, boolean clusterEnabled) {
this(directory, clusterEnabled, 0);
}

public Set<String> getResourceReferences() {
synchronized (resourcePaths) {
return new TreeSet<String>(resourcePaths);
@@ -152,6 +159,19 @@ public long clean(long ttlInMs) {
* Create a handle that can be written to
*/
public IStagedResource create(Object... path) {
if (lowFreeSpaceThresholdMegabytes > 0) {
long freeSpace = 0;
if (path.length == 0) {
freeSpace = directory.getFreeSpace() / 1000000;
} else {
freeSpace = new File(directory, (String) path[0]).getFreeSpace() / 1000000;
}
if (freeSpace <= lowFreeSpaceThresholdMegabytes) {
throw new StagingLowFreeSpace(String.format("Free disk space of %d MB is below threshold of %d MB",
freeSpace, lowFreeSpaceThresholdMegabytes));
}
}

String filePath = buildFilePath(path);
IStagedResource resource = createStagedResource(filePath);
if (resource.exists()) {

0 comments on commit 91252bc

Please sign in to comment.
You can’t perform that action at this time.