Skip to content

Commit

Permalink
dev checkin.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Mar 30, 2012
1 parent 0d42156 commit e92d5bd
Show file tree
Hide file tree
Showing 13 changed files with 333 additions and 259 deletions.
Expand Up @@ -318,7 +318,7 @@ public synchronized boolean start(boolean startJobs) {
}
triggerRouterService.syncTriggers();
heartbeat(false);
if (startJobs) {
if (startJobs && jobManager != null) {
jobManager.startJobs();
}
log.info("Started SymmetricDS");
Expand Down
Expand Up @@ -51,6 +51,7 @@ private ParameterConstants() {
public final static String START_HEARTBEAT_JOB = "start.heartbeat.job";
public final static String START_SYNCTRIGGERS_JOB = "start.synctriggers.job";
public final static String START_STATISTIC_FLUSH_JOB = "start.stat.flush.job";
public final static String START_STAGE_MGMT_JOB = "start.stage.management.job";
public final static String START_WATCHDOG_JOB = "start.watchdog.job";

public final static String JOB_RANDOM_MAX_START_TIME_MS = "job.random.max.start.time.ms";
Expand Down
Expand Up @@ -224,8 +224,12 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce
loadDataFromPull(null, status);
nodeService.findIdentity(false);
} catch (MalformedURLException e) {
log.error("Could not connect to the {} node's transport because of a bad URL: {}",
remote.getNodeId(), remote.getSyncUrl());
if (remote != null) {
log.error("Could not connect to the {} node's transport because of a bad URL: {}",
remote.getNodeId(), remote.getSyncUrl());
} else {
log.error(e.getMessage(), e);
}
throw e;
}
}
Expand Down Expand Up @@ -438,16 +442,16 @@ protected IDataLoaderFactory getFactory(String channelId) {

return factory;
}

public List<ConflictSettingNodeGroupLink> getConflictSettingsNodeGroupLinks() {
List<ConflictSettingNodeGroupLink> list = new ArrayList<DataLoaderService.ConflictSettingNodeGroupLink>();
list = sqlTemplate.query(getSql("selectConflictSettingsSql"),
new ConflictSettingsNodeGroupLinkMapper());
return list;
}

public List<ConflictSettingNodeGroupLink> getConflictSettingsNodeGroupLinks(
NodeGroupLink link, boolean refreshCache) {
public List<ConflictSettingNodeGroupLink> getConflictSettingsNodeGroupLinks(NodeGroupLink link,
boolean refreshCache) {
if (link != null) {
long cacheTime = parameterService
.getLong(ParameterConstants.CACHE_TIMEOUT_CONFLICT_IN_MS);
Expand Down Expand Up @@ -487,16 +491,15 @@ public void save(ConflictSettingNodeGroupLink setting) {
.getSourceNodeGroupId(), setting.getNodeGroupLink().getTargetNodeGroupId(), setting
.getTargetChannelId(), setting.getTargetCatalogName(), setting
.getTargetSchemaName(), setting.getTargetTableName(), setting.getDetectType()
.name(), setting.getResolveType().name(),
setting.isResolveChangesOnly() ? 1 : 0, setting.isResolveRowOnly() ? 1 : 0, setting
.getDetectExpression(), setting.getLastUpdateBy(), setting
.getConflictId()) == 0) {
.name(), setting.getResolveType().name(), setting.isResolveChangesOnly() ? 1 : 0,
setting.isResolveRowOnly() ? 1 : 0, setting.getDetectExpression(), setting
.getLastUpdateBy(), setting.getConflictId()) == 0) {
sqlTemplate.update(getSql("insertConflictSettingsSql"), setting.getNodeGroupLink()
.getSourceNodeGroupId(), setting.getNodeGroupLink().getTargetNodeGroupId(),
setting.getTargetChannelId(), setting.getTargetCatalogName(), setting
.getTargetSchemaName(), setting.getTargetTableName(), setting
.getDetectType().name(), setting.getResolveType().name(),
setting.isResolveChangesOnly() ? 1 : 0, setting.isResolveRowOnly() ? 1 : 0,
.getDetectType().name(), setting.getResolveType().name(), setting
.isResolveChangesOnly() ? 1 : 0, setting.isResolveRowOnly() ? 1 : 0,
setting.getDetectExpression(), setting.getLastUpdateBy(), setting
.getConflictId());
}
Expand All @@ -517,7 +520,7 @@ public void insertIncomingError(IncomingError incomingError) {
incomingError.getNodeId(), incomingError.getFailedRowNumber(),
incomingError.getFailedLineNumber(), incomingError.getTargetCatalogName(),
incomingError.getTargetSchemaName(), incomingError.getTargetTableName(),
incomingError.getEventType().getCode(), incomingError.getColumnNames(),
incomingError.getEventType().getCode(), incomingError.getColumnNames(),
incomingError.getPrimaryKeyColumnNames(), incomingError.getRowData(),
incomingError.getOldData(), incomingError.getResolveData(),
incomingError.getResolveData(), incomingError.getCreateTime(),
Expand Down Expand Up @@ -547,10 +550,9 @@ public ConflictSettingNodeGroupLink mapRow(Row rs) {
setting.setTargetCatalogName(rs.getString("target_catalog_name"));
setting.setTargetSchemaName(rs.getString("target_schema_name"));
setting.setTargetTableName(rs.getString("target_table_name"));
setting.setDetectType(DetectConflict.valueOf(rs.getString(
"detect_type").toUpperCase()));
setting.setResolveType(ResolveConflict.valueOf(rs.getString(
"resolve_type").toUpperCase()));
setting.setDetectType(DetectConflict.valueOf(rs.getString("detect_type").toUpperCase()));
setting.setResolveType(ResolveConflict.valueOf(rs.getString("resolve_type")
.toUpperCase()));
setting.setResolveChangesOnly(rs.getBoolean("resolve_changes_only"));
setting.setResolveRowOnly(rs.getBoolean("resolve_row_only"));
setting.setDetectExpression(rs.getString("detect_expression"));
Expand Down Expand Up @@ -737,8 +739,10 @@ public void batchInError(DataContext context, Exception ex) {
IncomingError error = new IncomingError();
error.setBatchId(this.currentBatch.getBatchId());
error.setNodeId(this.currentBatch.getNodeId());
error.setColumnNames(Table.getCommaDeliminatedColumns(context.getTable().getColumns()));
error.setPrimaryKeyColumnNames(Table.getCommaDeliminatedColumns(context.getTable().getPrimaryKeyColumns()));
error.setColumnNames(Table.getCommaDeliminatedColumns(context
.getTable().getColumns()));
error.setPrimaryKeyColumnNames(Table.getCommaDeliminatedColumns(context
.getTable().getPrimaryKeyColumns()));
error.setCsvData(context.getData());
error.setEventType(context.getData().getDataEventType());
error.setFailedLineNumber(this.currentBatch.getFailedLineNumber());
Expand Down
Expand Up @@ -903,8 +903,9 @@ protected void updateOrCreateDatabaseTriggers(List<Trigger> triggers, StringBuil
}
}
} catch (Exception ex) {
log.error("Failed to create triggers for {}", ex,
trigger.qualifiedSourceTableName());
log.error(
String.format("Failed to create triggers for %s",
trigger.qualifiedSourceTableName()), ex);

if (newestHistory != null) {
// Make sure all the triggers are removed from the table
Expand Down

0 comments on commit e92d5bd

Please sign in to comment.