Skip to content

Commit

Permalink
dev checkin. conflict resolution infrastructure.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Mar 6, 2012
1 parent 604bc85 commit da6c7db
Show file tree
Hide file tree
Showing 23 changed files with 852 additions and 208 deletions.
14 changes: 10 additions & 4 deletions symmetric/symmetric-assemble/TODO.txt
Expand Up @@ -58,11 +58,10 @@ Documentation
* The db.metadata.ignore.case property is no long available. All configured database names will be case sensitive
* dont.include.keys.in.update.statement is not longer supported. if old_data is not provided then pks will not be in the update
* db.spring.bean.name is no longer available
* dataloader will now load each batch as it arrives.
* dataloader will now load each batch as it arrives
* datareload.batch.insert.transactional is no longer available
* routing.data.reader.type is no longer available. data_ref has been removed.
* jdbc datasources are no longer supported
* All data loader properties will be configurable by channel
* routing.data.reader.type is no longer available. data_ref has been removed
* jndi datasources are no longer supported
* IDatabaseWriterFilter replaces IDataLoaderFilter
* IDatabaseWriterFilter replaces IBatchListener
* db.default.schema is no longer used
Expand All @@ -75,3 +74,10 @@ Documentation
* SymmetricDS tables are now created in the default case of the database
* Now supports mixed case tables on Firebird.
* Got rid of the isAutoRegister() method from IExtensionPoint
* data loaders are pluggable
* There is no longer a data extractor ext point
* The following parameters have been removed in favor of conflict resolution
* dataloader.allow.missing.delete
* dataloader.enable.fallback.insert
* dataloader.enable.fallback.update
* The following property is not longer needed: dataloader.enable.fallback.savepoint
Expand Up @@ -104,10 +104,6 @@ private ParameterConstants() {
public final static String DATA_LOADER_ENABLED = "dataloader.enable";
public final static String DATA_LOADER_NUM_OF_ACK_RETRIES = "num.of.ack.retries";
public final static String DATA_LOADER_TIME_BETWEEN_ACK_RETRIES = "time.between.ack.retries.ms";
public final static String DATA_LOADER_ENABLE_FALLBACK_UPDATE = "dataloader.enable.fallback.update";
public final static String DATA_LOADER_ENABLE_FALLBACK_SAVEPOINT = "dataloader.enable.fallback.savepoint";
public final static String DATA_LOADER_ENABLE_FALLBACK_INSERT = "dataloader.enable.fallback.insert";
public final static String DATA_LOADER_ALLOW_MISSING_DELETE = "dataloader.allow.missing.delete";
public final static String DATA_LOADER_MAX_ROWS_BEFORE_COMMIT = "dataloader.max.rows.before.commit";
public final static String DATA_LOADER_TREAT_DATETIME_AS_VARCHAR = "db.treat.date.time.as.varchar.enabled";

Expand Down Expand Up @@ -141,6 +137,7 @@ private ParameterConstants() {
public final static String CACHE_TIMEOUT_TRIGGER_ROUTER_IN_MS = "cache.trigger.router.time.ms";
public final static String CACHE_TIMEOUT_CHANNEL_IN_MS = "cache.channel.time.ms";
public final static String CACHE_TIMEOUT_TRANSFORM_IN_MS = "cache.transform.time.ms";
public final static String CACHE_TIMEOUT_CONFLICT_IN_MS = "cache.conflict.time.ms";
public final static String CACHE_TIMEOUT_TABLES_IN_MS = "cache.table.time.ms";

public final static String TRIGGER_UPDATE_CAPTURE_CHANGED_DATA_ONLY = "trigger.update.capture.changed.data.only.enabled";
Expand Down
Expand Up @@ -56,6 +56,8 @@ public class TableConstants {
public static final String SYM_REGISTRATION_REQUEST = "registration_request";
public static final String SYM_REGISTRATION_REDIRECT = "registration_redirect";
public static final String SYM_NODE_CHANNEL_CTL = "node_channel_ctl";
public static final String SYM_CONFLICT_SETTINGS = "conflict_settings";
public static final String SYM_CONFLICT_EVENT = "conflict_event";
public static final String SYM_NODE_GROUP_CHANNEL_WINDOW = "node_group_channel_window";
public static final String SYM_NODE_HOST_CHANNEL_STATS = "node_host_channel_stats";

Expand Down Expand Up @@ -109,6 +111,8 @@ protected static Set<String> populateAllTables(String tablePrefix) {
tables.add(getTableName(tablePrefix, SYM_OUTGOING_BATCH));
tables.add(getTableName(tablePrefix, SYM_INCOMING_BATCH));
tables.add(getTableName(tablePrefix, SYM_LOCK));
tables.add(getTableName(tablePrefix, SYM_CONFLICT_SETTINGS));
tables.add(getTableName(tablePrefix, SYM_CONFLICT_EVENT));
return tables;
}

Expand Down
@@ -1,8 +1,14 @@
package org.jumpmind.symmetric.load;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.ConflictSettings;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.DefaultTransformWriterConflictResolver;
Expand All @@ -23,32 +29,43 @@ public String getTypeName() {
}

public IDataWriter getDataWriter(String sourceNodeId, IDatabasePlatform platform,
TransformWriter transformWriter, IDatabaseWriterFilter[] filters) {
DatabaseWriterSettings defaultSettings = buildDatabaseWriterSettings();
DatabaseWriter writer = new DatabaseWriter(platform, defaultSettings, null, filters);
writer.setConflictResolver(new DefaultTransformWriterConflictResolver(transformWriter));
TransformWriter transformWriter, List<IDatabaseWriterFilter> filters,
List<? extends ConflictSettings> conflictSettings) {
DatabaseWriter writer = new DatabaseWriter(platform,
new DefaultTransformWriterConflictResolver(transformWriter),
buildDatabaseWriterSettings(filters, conflictSettings));
return writer;
}

public boolean isPlatformSupported(IDatabasePlatform platform) {
return true;
}

protected DatabaseWriterSettings buildDatabaseWriterSettings() {
protected DatabaseWriterSettings buildDatabaseWriterSettings(
List<IDatabaseWriterFilter> filters, List<? extends ConflictSettings> conflictSettings) {
DatabaseWriterSettings settings = new DatabaseWriterSettings();
settings.setConflictResolutionDeletes(parameterService
.is(ParameterConstants.DATA_LOADER_ALLOW_MISSING_DELETE) ? DatabaseWriterSettings.ConflictResolutionDeletes.IGNORE_CONTINUE
: DatabaseWriterSettings.ConflictResolutionDeletes.ERROR_STOP);
settings.setConflictResolutionInserts(parameterService
.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_UPDATE) ? DatabaseWriterSettings.ConflictResolutionInserts.FALLBACK_UPDATE
: DatabaseWriterSettings.ConflictResolutionInserts.ERROR_STOP);
settings.setConflictResolutionUpdates(parameterService
.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_INSERT) ? DatabaseWriterSettings.ConflictResolutionUpdates.FALLBACK_INSERT
: DatabaseWriterSettings.ConflictResolutionUpdates.ERROR_STOP);
settings.setDatabaseWriterFilters(filters);
settings.setMaxRowsBeforeCommit(parameterService
.getLong(ParameterConstants.DATA_LOADER_MAX_ROWS_BEFORE_COMMIT));
settings.setTreatDateTimeFieldsAsVarchar(parameterService
.is(ParameterConstants.DATA_LOADER_TREAT_DATETIME_AS_VARCHAR));

Map<String, ConflictSettings> byChannel = new HashMap<String, ConflictSettings>();
Map<String, ConflictSettings> byTable = new HashMap<String, ConflictSettings>();
if (conflictSettings != null) {
for (ConflictSettings conflictSetting : conflictSettings) {
String qualifiedTableName = conflictSetting.toQualifiedTableName();
if (StringUtils.isNotBlank(qualifiedTableName)) {
byTable.put(qualifiedTableName, conflictSetting);
} else if (StringUtils.isNotBlank(conflictSetting.getTargetChannelId())) {
byChannel.put(conflictSetting.getTargetChannelId(), conflictSetting);
} else {
settings.setDefaultConflictSetting(conflictSetting);
}
}
}
settings.setConflictSettingsByChannel(byChannel);
settings.setConflictSettingsByTable(byTable);
return settings;
}

Expand Down
@@ -1,17 +1,22 @@
package org.jumpmind.symmetric.load;

import java.util.List;

import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.extension.IExtensionPoint;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.ConflictSettings;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;

public interface IDataLoaderFactory extends IExtensionPoint {

public String getTypeName();

public IDataWriter getDataWriter(String sourceNodeId, IDatabasePlatform platform, TransformWriter transformWriter, IDatabaseWriterFilter[] filters);


public IDataWriter getDataWriter(String sourceNodeId, IDatabasePlatform platform,
TransformWriter transformWriter, List<IDatabaseWriterFilter> filters,
List<? extends ConflictSettings> conflictSettings);

public boolean isPlatformSupported(IDatabasePlatform platform);

}
Expand Up @@ -30,7 +30,9 @@
import org.jumpmind.symmetric.load.IDataLoaderFactory;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.service.impl.DataLoaderService.ConflictSettingsNodeGroupLink;

/**
* This service provides an API to load data into a SymmetricDS node's database
Expand All @@ -53,5 +55,11 @@ public interface IDataLoaderService {
public void removeDatabaseWriterFilter(IDatabaseWriterFilter filter);

public List<IncomingBatch> loadDataBatch(String batchData) throws IOException;

public List<ConflictSettingsNodeGroupLink> getConflictSettingsNodeGroupLinks(NodeGroupLink link, boolean refreshCache);

public void delete(ConflictSettingsNodeGroupLink settings);

public void save(ConflictSettingsNodeGroupLink settings);

}

0 comments on commit da6c7db

Please sign in to comment.