Navigation Menu

Skip to content

Commit

Permalink
3.0 JdbcDataLoaderServiceTest now working
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jan 12, 2012
1 parent f3ee8b5 commit 905abe1
Show file tree
Hide file tree
Showing 49 changed files with 250 additions and 230 deletions.
Expand Up @@ -98,44 +98,24 @@ protected static boolean isConnectionValid(Properties properties) throws Excepti
}

protected static void removeEmbededdedDatabases() {
File derby = new File("target/derby");
if (derby.exists()) {
File clientDbDir = new File("target/clientdbs");
if (clientDbDir.exists()) {
try {
TestUtils.getLog().info("Removing derby database files.");
FileUtils.deleteDirectory(derby);
TestUtils.getLog().info("Removing client database files.");
FileUtils.deleteDirectory(clientDbDir);
} catch (IOException e) {
TestUtils.getLog().error(e);
}
}
File h2 = new File("target/h2");
if (h2.exists()) {
File rootDbDir = new File("target/rootdbs");
if (rootDbDir.exists()) {
try {
TestUtils.getLog().info("Removing h2 database files");
FileUtils.deleteDirectory(h2);
TestUtils.getLog().info("Removing root database files");
FileUtils.deleteDirectory(rootDbDir);
} catch (IOException e) {
TestUtils.getLog().error(e);
}
}
File hsqldb = new File("target/hsqldb");
if (hsqldb.exists()) {
try {
TestUtils.getLog().info("Removing hsqldb database files");
FileUtils.deleteDirectory(hsqldb);
} catch (IOException e) {
TestUtils.getLog().error(e);
}
}
File sqlitedb = new File("target/sqlite");
if (sqlitedb.exists() && FileUtils.listFiles(sqlitedb, null, true).size() > 0) {
try {
TestUtils.getLog().info("Removing sqlite database files");
FileUtils.deleteDirectory(sqlitedb);

} catch (IOException e) {
TestUtils.getLog().error(e);
}
}
sqlitedb.mkdirs();
}

protected static URL getResource(String resource) {
Expand Down
Expand Up @@ -50,5 +50,16 @@ public SymmetricException(String message, Throwable cause) {
public SymmetricException(String message, Throwable cause, Object... args) {
super(String.format(message, args), cause);
}

public Throwable getRootCause() {
Throwable rootCause = null;
Throwable cause = getCause();
while (cause != null && cause != rootCause) {
rootCause = cause;
cause = cause.getCause();
}
return rootCause;
}


}
Expand Up @@ -39,14 +39,14 @@
import org.jumpmind.db.sql.AbstractSqlMap;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.UniqueKeyException;
import org.jumpmind.log.Log;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.Lock;
import org.jumpmind.symmetric.service.IClusterService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.util.AppUtils;
import org.springframework.dao.DataIntegrityViolationException;

/**
* @see IClusterService
Expand Down Expand Up @@ -76,7 +76,7 @@ public void initLockTable(final String action) {
sqlTemplate.update(getSql("insertLockSql"), new Object[] { action });
log.debug("Inserted into the NODE_LOCK table for %s", action);

} catch (final DataIntegrityViolationException ex) {
} catch (UniqueKeyException ex) {
log.debug("Failed to insert to the NODE_LOCK table for %s. Must be initialized already.", action);
}
}
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.SqlScript;
import org.jumpmind.db.sql.UniqueKeyException;
import org.jumpmind.log.Log;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
Expand All @@ -55,7 +56,6 @@
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.springframework.dao.DataIntegrityViolationException;

/**
* @see IConfigurationService
Expand Down Expand Up @@ -365,7 +365,7 @@ protected void autoConfigRegistrationServer() {
String nodeId = parameterService.getExternalId();
try {
nodeService.insertNode(nodeId, nodeGroupId, nodeId, nodeId);
} catch (DataIntegrityViolationException ex) {
} catch (UniqueKeyException ex) {
log.warn("Not inserting node row for %s because it already exists", nodeId);
}
nodeService.insertNodeIdentity(nodeId);
Expand Down
Expand Up @@ -189,7 +189,8 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce
loadDataFromPull(null, status);
nodeService.findIdentity(false);
} catch (MalformedURLException e) {
log.error("Could not connect to the %s node's transport because of a bad URL: %s", remote.getNodeId(), remote.getSyncUrl());
log.error("Could not connect to the %s node's transport because of a bad URL: %s",
remote.getNodeId(), remote.getSyncUrl());
throw e;
}
}
Expand All @@ -208,10 +209,12 @@ private void sendAck(Node remote, Node local, NodeSecurity localSecurity,
sendAck = transportManager.sendAcknowledgement(remote, list, local,
localSecurity.getNodePassword(), parameterService.getRegistrationUrl());
} catch (IOException ex) {
log.warn("Ack was not sent successfully on try number %d. %s", (i + 1), ex.getMessage());
log.warn("Ack was not sent successfully on try number %d. %s", (i + 1),
ex.getMessage());
error = ex;
} catch (RuntimeException ex) {
log.warn("Ack was not sent successfully on try number %d. %s", (i + 1), ex.getMessage());
log.warn("Ack was not sent successfully on try number %d. %s", (i + 1),
ex.getMessage());
error = ex;
}
if (sendAck != HttpURLConnection.HTTP_OK) {
Expand Down Expand Up @@ -271,6 +274,7 @@ public void end(Batch batch, IoResource resource) {
transforms = transformsList != null ? transformsList
.toArray(new TransformTable[transformsList.size()]) : null;
}

TransformDatabaseWriter writer = new TransformDatabaseWriter(
symmetricDialect.getPlatform(), settings, null, transforms,
filters.toArray(new IDatabaseWriterFilter[filters.size()]));
Expand All @@ -288,10 +292,10 @@ public void end(Batch batch, IoResource resource) {
}

for (IncomingBatch incomingBatch : batchesProcessed) {
// TODO I wonder if there is a way to avoid the second update?
if (incomingBatch.getBatchId() != BatchInfo.VIRTUAL_BATCH_FOR_REGISTRATION
&& incomingBatchService.updateIncomingBatch(incomingBatch) == 0) {
log.error("Failed to update batch %d. Zero rows returned.", incomingBatch.getBatchId());
log.error("Failed to update batch %d. Zero rows returned.",
incomingBatch.getBatchId());
}
}

Expand All @@ -300,7 +304,8 @@ public void end(Batch batch, IoResource resource) {
} catch (ConnectException ex) {
throw ex;
} catch (UnknownHostException ex) {
log.warn("Could not connect to the transport because the host was unknown: %s", ex.getMessage());
log.warn("Could not connect to the transport because the host was unknown: %s",
ex.getMessage());
throw ex;
} catch (RegistrationNotOpenException ex) {
log.warn("Registration attempt failed. Registration was not open for the node.");
Expand Down Expand Up @@ -473,28 +478,29 @@ protected void enableSyncTriggers(DataContext<IDataReader, TransformDatabaseWrit

public void batchInError(DataContext<IDataReader, TransformDatabaseWriter> context,
Exception ex) {
Batch batch = context.getBatch();
this.currentBatch.setValues(context.getReader().getStatistics().get(batch), context
.getWriter().getStatistics().get(batch), false);
enableSyncTriggers(context);
statisticManager.incrementDataLoadedErrors(this.currentBatch.getChannelId(), 1);
if (ex instanceof IOException || ex instanceof TransportException) {
log.warn("Failed to load batch %s because: %s", this.currentBatch.getNodeBatchId(), ex.getMessage());
this.currentBatch.setSqlMessage(ex.getMessage());
} else {
log.error("Failed to load batch %s because: %s", ex, this.currentBatch.getNodeBatchId(),
ex.getMessage());
SQLException se = unwrapSqlException(ex);
if (se != null) {
this.currentBatch.setSqlState(se.getSQLState());
this.currentBatch.setSqlCode(se.getErrorCode());
this.currentBatch.setSqlMessage(se.getMessage());
} else {
try {
Batch batch = context.getBatch();
this.currentBatch.setValues(context.getReader().getStatistics().get(batch), context
.getWriter().getStatistics().get(batch), false);
enableSyncTriggers(context);
statisticManager.incrementDataLoadedErrors(this.currentBatch.getChannelId(), 1);
if (ex instanceof IOException || ex instanceof TransportException) {
log.warn("Failed to load batch %s because: %s",
this.currentBatch.getNodeBatchId(), ex.getMessage());
this.currentBatch.setSqlMessage(ex.getMessage());
} else {
log.error("Failed to load batch %s because: %s", ex,
this.currentBatch.getNodeBatchId(), ex.getMessage());
SQLException se = unwrapSqlException(ex);
if (se != null) {
this.currentBatch.setSqlState(se.getSQLState());
this.currentBatch.setSqlCode(se.getErrorCode());
this.currentBatch.setSqlMessage(se.getMessage());
} else {
this.currentBatch.setSqlMessage(ex.getMessage());
}
}
}

try {
// If we were in the process of skipping a batch
// then its status would have been OK. We should not
// set the status to ER.
Expand All @@ -503,7 +509,9 @@ public void batchInError(DataContext<IDataReader, TransformDatabaseWriter> conte
}
incomingBatchService.updateIncomingBatch(this.currentBatch);
} catch (Exception e) {
log.error("Failed to record status of batch %s", this.currentBatch.getNodeBatchId());
log.error("Failed to record status of batch %s",
this.currentBatch != null ? this.currentBatch.getNodeBatchId() : context
.getBatch().getNodeBatchId());
}
}

Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.UniqueKeyException;
import org.jumpmind.db.sql.mapper.NumberMapper;
import org.jumpmind.log.Log;
import org.jumpmind.symmetric.Version;
Expand Down Expand Up @@ -80,8 +81,6 @@
import org.jumpmind.symmetric.service.ITriggerRouterService;
import org.jumpmind.symmetric.statistic.IStatisticManager;
import org.jumpmind.symmetric.util.AppUtils;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.dao.EmptyResultDataAccessException;

/**
* @see IDataService
Expand All @@ -106,11 +105,11 @@ public class DataService extends AbstractService implements IDataService {

private IStatisticManager statisticManager;

public DataService(Log log, IParameterService parameterService, ISymmetricDialect symmetricDialect,
DeploymentType deploymentType, ITriggerRouterService triggerRouterService,
INodeService nodeService, IPurgeService purgeService,
IConfigurationService configurationService, IOutgoingBatchService outgoingBatchService,
IStatisticManager statisticManager) {
public DataService(Log log, IParameterService parameterService,
ISymmetricDialect symmetricDialect, DeploymentType deploymentType,
ITriggerRouterService triggerRouterService, INodeService nodeService,
IPurgeService purgeService, IConfigurationService configurationService,
IOutgoingBatchService outgoingBatchService, IStatisticManager statisticManager) {
super(log, parameterService, symmetricDialect);
this.deploymentType = deploymentType;
this.triggerRouterService = triggerRouterService;
Expand Down Expand Up @@ -215,8 +214,9 @@ public void checkForAndUpdateMissingChannelIds(long firstDataId, long lastDataId
int numberUpdated = sqlTemplate.update(getSql("checkForAndUpdateMissingChannelIdSql"),
Constants.CHANNEL_DEFAULT, firstDataId, lastDataId);
if (numberUpdated > 0) {
log.warn("There were %d data records found between %d and %d that an invalid channel_id. Updating them to be on the '%s' channel.", numberUpdated, firstDataId, lastDataId,
Constants.CHANNEL_DEFAULT);
log.warn(
"There were %d data records found between %d and %d that an invalid channel_id. Updating them to be on the '%s' channel.",
numberUpdated, firstDataId, lastDataId, Constants.CHANNEL_DEFAULT);
}
}

Expand All @@ -235,7 +235,7 @@ public void insertCreateEvent(final Node targetNode, final TriggerRouter trigger
try {
insertDataAndDataEventAndOutgoingBatch(data, targetNode.getNodeId(),
Constants.UNKNOWN_ROUTER_ID, isLoad);
} catch (DataIntegrityViolationException e) {
} catch (UniqueKeyException e) {
if (e.getRootCause() != null && e.getRootCause() instanceof DataTruncation) {
log.error("Table data definition XML was too large and failed. The feature to send table creates during the initial load may be limited on your platform. You may need to set the initial.load.create.first parameter to false.");
}
Expand Down Expand Up @@ -522,10 +522,14 @@ public void insertHeartbeatEvent(Node node, boolean isReload) {
if (data != null) {
insertData(data);
} else {
log.warn("Not generating data/data events for table %s because a trigger or trigger hist is not created yet.", tableName);
log.warn(
"Not generating data/data events for table %s because a trigger or trigger hist is not created yet.",
tableName);
}
} else {
log.warn("Not generating data/data events for table %s because a trigger or trigger hist is not created yet.", tableName);
log.warn(
"Not generating data/data events for table %s because a trigger or trigger hist is not created yet.",
tableName);
}
}
}
Expand Down Expand Up @@ -625,8 +629,9 @@ public void insertDataGap(DataGap gap) {
sqlTemplate.update(getSql("insertDataGapSql"), new Object[] { DataGap.Status.GP.name(),
AppUtils.getHostName(), gap.getStartId(), gap.getEndId() }, new int[] {
Types.VARCHAR, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC });
} catch (DataIntegrityViolationException ex) {
log.warn("A gap already existed for %d to %d. Updating instead.", gap.getStartId(), gap.getEndId());
} catch (UniqueKeyException ex) {
log.warn("A gap already existed for %d to %d. Updating instead.", gap.getStartId(),
gap.getEndId());
updateDataGap(gap, DataGap.Status.GP);
}
}
Expand All @@ -640,21 +645,13 @@ public void updateDataGap(DataGap gap, DataGap.Status status) {
}

public Date findCreateTimeOfEvent(long dataId) {
try {
return sqlTemplate.queryForObject(getSql("findDataEventCreateTimeSql"), Date.class,
new Object[] { dataId }, new int[] { Types.NUMERIC });
} catch (EmptyResultDataAccessException ex) {
return null;
}
return sqlTemplate.queryForObject(getSql("findDataEventCreateTimeSql"), Date.class,
new Object[] { dataId }, new int[] { Types.NUMERIC });
}

public Date findCreateTimeOfData(long dataId) {
try {
return sqlTemplate.queryForObject(getSql("findDataCreateTimeSql"), Date.class,
new Object[] { dataId }, new int[] { Types.NUMERIC });
} catch (EmptyResultDataAccessException ex) {
return null;
}
return sqlTemplate.queryForObject(getSql("findDataCreateTimeSql"), Date.class,
new Object[] { dataId }, new int[] { Types.NUMERIC });
}

public Map<String, String> getRowDataAsMap(Data data) {
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.jumpmind.db.sql.AbstractSqlMap;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.Row;
import org.jumpmind.db.sql.UniqueKeyException;
import org.jumpmind.db.sql.mapper.DateMapper;
import org.jumpmind.log.Log;
import org.jumpmind.symmetric.common.ParameterConstants;
Expand All @@ -39,7 +40,6 @@
import org.jumpmind.symmetric.service.IIncomingBatchService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.util.AppUtils;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.transaction.annotation.Transactional;

/**
Expand Down Expand Up @@ -148,7 +148,7 @@ public boolean acquireIncomingBatch(IncomingBatch batch) {

try {
insertIncomingBatch(batch);
} catch (DataIntegrityViolationException e) {
} catch (UniqueKeyException e) {
batch.setRetry(true);
existingBatch = findIncomingBatch(batch.getBatchId(), batch.getNodeId());
}
Expand Down

0 comments on commit 905abe1

Please sign in to comment.