Skip to content

Commit

Permalink
continuing work on fs
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Oct 1, 2012
1 parent 4301840 commit 29d476f
Show file tree
Hide file tree
Showing 19 changed files with 450 additions and 106 deletions.
@@ -1,3 +1,23 @@
/*
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU Lesser General Public License (the
* "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.fs;

final public class SyncParameterConstants {
Expand Down
Expand Up @@ -143,7 +143,7 @@ protected String getConfigDirectory() {
}

protected TransportConnectorFactory createTransportConnectorFactory(IPersisterServices persisterServices) {
return new TransportConnectorFactory(persisterServices);
return new TransportConnectorFactory(persisterServices, config.getProperties());
}

protected IPersisterServices createPersisterServices() {
Expand Down
Expand Up @@ -54,7 +54,7 @@ public class SyncJob implements Runnable {
protected Node serverNode;
protected SyncConfig syncConfig;
protected DirectoryChangeTracker directoryChangeTracker;
protected TransportConnectorFactory transportConnectorFactory;
protected ITransportConnector connector;
protected TypedProperties properties;
protected NodeDirectoryKey key;
protected RandomTimeSlot randomTimeSlot;
Expand All @@ -81,7 +81,6 @@ public SyncJob(TransportConnectorFactory transportConnectorFactory,
IPersisterServices persisterServices, IServerNodeLocker serverNodeLocker,
TaskScheduler taskScheduler, Node node, SyncConfig config, TypedProperties properties,
ISyncClientListener syncClientListener, ScriptAPI api) {
this.transportConnectorFactory = transportConnectorFactory;
this.persisterServices = persisterServices;
this.serverNodeLocker = serverNodeLocker;
this.syncClientListener = syncClientListener;
Expand All @@ -92,6 +91,8 @@ public SyncJob(TransportConnectorFactory transportConnectorFactory,
this.key = new NodeDirectoryKey(node, config.getClientDir());
this.randomTimeSlot = new RandomTimeSlot(serverNode.getNodeId(),
properties.getInt(SyncParameterConstants.JOB_RANDOM_MAX_START_TIME_MS));
this.connector = transportConnectorFactory.createTransportConnector(syncConfig,
serverNode);
}

public boolean isStarted() {
Expand Down Expand Up @@ -179,10 +180,18 @@ public boolean stop() {
} else {
log.warn("Failed to cancel this {} for node {}", syncConfig.getConfigId(),
serverNode.getNodeId());
}
}
}
}
return success;
}

public void destroy() {
stop();
if (connector != null) {
connector.destroy();
connector = null;
}
}

protected String getEngineName() {
return properties.getProperty(SyncParameterConstants.ENGINE_NAME);
Expand Down Expand Up @@ -235,17 +244,16 @@ public boolean invoke(boolean force) {

protected void doSync() {
if (serverNodeLocker.lock(serverNode)) {
ITransportConnector connector = null;
try {
connector = transportConnectorFactory.createTransportConnector(syncConfig,
serverNode);
connector.connect();

SyncStatus syncStatus = persisterServices.getSyncStatusPersister().get(
SyncStatus.class, key);
if (syncStatus == null) {
syncStatus = new SyncStatus(serverNode, syncConfig);
persisterServices.getSyncStatusPersister().save(syncStatus, key);
}

connector.connect(syncStatus);

initDirectoryChangeTracker();

Expand All @@ -258,7 +266,7 @@ protected void doSync() {
persisterServices.getSyncStatusPersister().save(syncStatus, key);
break;
case RAN_PRESCRIPT:
syncStatus.setDirectorySpecSnapshot(directoryChangeTracker
syncStatus.setClientSnapshot(directoryChangeTracker
.takeSnapshot());
syncStatus.setStage(Stage.RECORDED_FILES_TO_SEND);
persisterServices.getSyncStatusPersister().save(syncStatus, key);
Expand Down
Expand Up @@ -20,11 +20,14 @@
*/
package org.jumpmind.symmetric.fs.client;

import java.util.ArrayList;
import java.util.List;

import org.jumpmind.symmetric.fs.config.ConflictStrategy;
import org.jumpmind.symmetric.fs.config.Node;
import org.jumpmind.symmetric.fs.config.SyncConfig;
import org.jumpmind.symmetric.fs.track.DirectorySpecSnapshot;
import org.jumpmind.symmetric.fs.track.FileChange;

public class SyncStatus {

Expand All @@ -35,17 +38,24 @@ public enum Stage {
protected Node node;
protected Stage stage = Stage.START;
protected SyncConfig syncConfig;
protected DirectorySpecSnapshot directorySpecSnapshot;
protected DirectorySpecSnapshot clientSnapshot;
protected DirectorySpecSnapshot serverSnapshot;
protected List<String> filesToSend;
protected List<String> fileSent;
protected List<String> filesToReceive;
protected List<String> filesReceived;

protected List<String> filesInConflict;

public SyncStatus() {

filesToSend = new ArrayList<String>();
fileSent = new ArrayList<String>();
filesToReceive = new ArrayList<String>();
filesReceived = new ArrayList<String>();
filesInConflict = new ArrayList<String>();
}

public SyncStatus(Node node, SyncConfig syncConfig) {
this();
this.node = node;
this.syncConfig = syncConfig;
}
Expand All @@ -66,45 +76,66 @@ public void setStage(Stage stage) {
this.stage = stage;
}

public DirectorySpecSnapshot getDirectorySpecSnapshot() {
return directorySpecSnapshot;
public DirectorySpecSnapshot getClientSnapshot() {
return clientSnapshot;
}

public void setDirectorySpecSnapshot(DirectorySpecSnapshot directorySpecSnapshot) {
this.directorySpecSnapshot = directorySpecSnapshot;
public void setClientSnapshot(DirectorySpecSnapshot clientSnapshot) {
this.clientSnapshot = clientSnapshot;
List<FileChange> changes = clientSnapshot.getFiles();
for (FileChange fileChange : changes) {
filesToSend.add(fileChange.getFileName());
}
}

public void setServerSnapshot(DirectorySpecSnapshot serverSnapshot) {
this.serverSnapshot = serverSnapshot;
List<FileChange> changes = serverSnapshot.getFiles();
for (FileChange fileChange : changes) {
String fileName = fileChange.getFileName();
if (!filesToSend.contains(fileName)) {
filesToReceive.add(fileName);
} else {
ConflictStrategy conflictStrategy = syncConfig.getConflictStrategy();
switch (conflictStrategy) {
case CLIENT_WINS:
// do nothing
break;
case REPORT_ERROR:
filesInConflict.add(fileName);
filesToSend.remove(fileName);
break;
case SERVER_WINS:
filesToReceive.add(fileName);
filesToSend.remove(fileName);
break;
default:
break;
}
}

}
}

public List<String> getFilesInConflict() {
return filesInConflict;
}

public List<String> getFilesToSend() {
return filesToSend;
}

public void setFilesToSend(List<String> filesToSend) {
this.filesToSend = filesToSend;
}

public List<String> getFileSent() {
return fileSent;
}

public void setFileSent(List<String> fileSent) {
this.fileSent = fileSent;
}

public List<String> getFilesToReceive() {
return filesToReceive;
}

public void setFilesToReceive(List<String> filesToReceive) {
this.filesToReceive = filesToReceive;
}

public List<String> getFilesReceived() {
return filesReceived;
}

public void setFilesReceived(List<String> filesReceived) {
this.filesReceived = filesReceived;
}

public SyncConfig getSyncConfig() {
return syncConfig;
Expand All @@ -114,4 +145,8 @@ public void setSyncConfig(SyncConfig syncConfig) {
this.syncConfig = syncConfig;
}

public DirectorySpecSnapshot getServerSnapshot() {
return serverSnapshot;
}

}
Expand Up @@ -20,24 +20,24 @@
*/
package org.jumpmind.symmetric.fs.client.connector;

import org.jumpmind.properties.TypedProperties;
import org.jumpmind.symmetric.fs.client.SyncStatus;
import org.jumpmind.symmetric.fs.config.Node;
import org.jumpmind.symmetric.fs.config.SyncConfig;
import org.jumpmind.symmetric.fs.service.IPersisterServices;

public class AbstractTransportConnector implements ITransportConnector {

protected SyncConfig config;
protected Node node;
protected IPersisterServices persisterSerivces;
protected TypedProperties properties;

public void init(SyncConfig syncConfig, Node serverNode, IPersisterServices persisterServices) {
this.config = syncConfig;
public void init(Node serverNode, IPersisterServices persisterServices, TypedProperties properties) {
this.node = serverNode;
this.persisterSerivces = persisterServices;
this.properties = properties;
}

public void connect() {
public void connect(SyncStatus syncStatus) {
}

public void prepare(SyncStatus syncStatus) {
Expand All @@ -51,5 +51,8 @@ public void receive(SyncStatus syncStatus) {

public void close() {
}

public void destroy() {
}

}
Expand Up @@ -29,17 +29,14 @@ public ConnectorException() {

public ConnectorException(String message, Throwable cause) {
super(message, cause);

}

public ConnectorException(String message) {
super(message);

}

public ConnectorException(Throwable cause) {
super(cause);

}

}
Expand Up @@ -20,29 +20,6 @@
*/
package org.jumpmind.symmetric.fs.client.connector;

import org.jumpmind.symmetric.fs.client.SyncStatus;
import org.jumpmind.symmetric.fs.config.Node;
import org.jumpmind.symmetric.fs.config.SyncConfig;
import org.jumpmind.symmetric.fs.service.IPersisterServices;

public class HttpTransportConnector implements ITransportConnector {

public void init(SyncConfig syncConfig, Node serverNode, IPersisterServices persisterServices) {
}

public void connect() {
}

public void prepare(SyncStatus status) {
}

public void send(SyncStatus syncStatus) {
}

public void receive(SyncStatus syncStatus) {
}

public void close() {
}
public class HttpTransportConnector extends AbstractTransportConnector {

}
Expand Up @@ -20,9 +20,9 @@
*/
package org.jumpmind.symmetric.fs.client.connector;

import org.jumpmind.properties.TypedProperties;
import org.jumpmind.symmetric.fs.client.SyncStatus;
import org.jumpmind.symmetric.fs.config.Node;
import org.jumpmind.symmetric.fs.config.SyncConfig;
import org.jumpmind.symmetric.fs.service.IPersisterServices;

public interface ITransportConnector {
Expand All @@ -32,12 +32,13 @@ public interface ITransportConnector {
* {@link TransportConnectorFactory} then this method is called by the
* factory.
*/
public void init(SyncConfig syncConfig, Node serverNode, IPersisterServices persisterServices);
public void init(Node serverNode, IPersisterServices persisterServices, TypedProperties properties);

/**
* Validate the the destination is available. Initialize connections.
* @param syncStatus TODO
*/
public void connect();
public void connect(SyncStatus syncStatus);

/**
* Update the sync status with the list of files to receive.
Expand All @@ -49,5 +50,7 @@ public interface ITransportConnector {
public void receive(SyncStatus syncStatus);

public void close();

public void destroy();

}

0 comments on commit 29d476f

Please sign in to comment.