Skip to content

Commit

Permalink
0003250: Fix batch re-extraction issue on clustered setup with shared
Browse files Browse the repository at this point in the history
cache.
  • Loading branch information
gregwilmer committed Sep 15, 2017
1 parent a818175 commit b71874f
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 8 deletions.
Expand Up @@ -104,7 +104,7 @@ protected IDatabasePlatform createDatabasePlatform(TypedProperties properties) {
protected IStagingManager createStagingManager() {
String directory = androidContext.getCacheDir().toString();
log.info("Staging manager directory: " + directory);
return new StagingManager(directory);
return new StagingManager(directory,false);
}

@Override
Expand Down
Expand Up @@ -54,7 +54,7 @@ public static void setup() throws Exception {
.equals("net.sourceforge.jtds.jdbc.Driver")) {
platform = DbTestUtils.createDatabasePlatform(DbTestUtils.ROOT);
platform.createDatabase(platform.readDatabaseFromXml("/testBulkWriter.xml", true), true, false);
stagingManager = new StagingManager("target/tmp");
stagingManager = new StagingManager("target/tmp",false);
}
}

Expand Down
Expand Up @@ -45,7 +45,7 @@ public static void setup() throws Exception {
.equals("com.mysql.jdbc.Driver")) {
platform = DbTestUtils.createDatabasePlatform(DbTestUtils.ROOT);
platform.createDatabase(platform.readDatabaseFromXml("/testBulkWriter.xml", true), true, false);
stagingManager = new StagingManager("tmp");
stagingManager = new StagingManager("tmp",false);
}
}

Expand Down
Expand Up @@ -18,7 +18,7 @@ public class BatchStagingManager extends StagingManager {
ISymmetricEngine engine;

public BatchStagingManager(ISymmetricEngine engine, String directory) {
super(directory);
super(directory,engine.getParameterService().is(ParameterConstants.CLUSTER_LOCKING_ENABLED));
this.engine = engine;
}

Expand Down
Expand Up @@ -45,13 +45,16 @@ public class StagingManager implements IStagingManager {
protected Set<String> resourcePaths;

protected Map<String, IStagedResource> inUse;

boolean clusterEnabled;

public StagingManager(String directory) {
public StagingManager(String directory, boolean clusterEnabled) {
log.info("The staging directory was initialized at the following location: " + directory);
this.directory = new File(directory);
this.directory.mkdirs();
this.resourcePaths = Collections.synchronizedSet(new TreeSet<String>());
this.inUse = new ConcurrentHashMap<String, IStagedResource>();
this.clusterEnabled = clusterEnabled;
refreshResourceList();
}

Expand Down Expand Up @@ -176,8 +179,15 @@ protected String buildFilePath(Object... path) {

public IStagedResource find(String path) {
IStagedResource resource = inUse.get(path);
if (resource == null && resourcePaths.contains(path)) {
resource = new StagedResource(directory, path, this);
if (resource == null) {
boolean foundResourcePath = resourcePaths.contains(path);
if (clusterEnabled) {
refreshResourceList();
foundResourcePath = resourcePaths.contains(path);
}
if (foundResourcePath) {
resource = new StagedResource(directory, path, this);
}
}
return resource;
}
Expand Down
Expand Up @@ -78,7 +78,7 @@ public void readThenWrite(long threshold) throws Exception {
String origCsv = IOUtils.toString(is);
is.close();

StagingManager stagingManager = new StagingManager(DIR.getAbsolutePath());
StagingManager stagingManager = new StagingManager(DIR.getAbsolutePath(),false);
ProtocolDataReader reader = new ProtocolDataReader(BatchType.LOAD, "test", origCsv);
StagingDataWriter writer = new StagingDataWriter(threshold, false, "aaa", "test", stagingManager, new BatchListener());
DataProcessor processor = new DataProcessor(reader, writer, "test");
Expand Down

0 comments on commit b71874f

Please sign in to comment.