diff --git a/symmetric/symmetric-assemble/TODO.txt b/symmetric/symmetric-assemble/TODO.txt index c130f536ef..a89d639b3f 100644 --- a/symmetric/symmetric-assemble/TODO.txt +++ b/symmetric/symmetric-assemble/TODO.txt @@ -30,6 +30,7 @@ DONE = + * verify stats capturing, outgoing batch updates * unit test * purge job for staging manager + * make sure memory buffer is cleared when state goes to done * Hook up JMX @@ -49,6 +50,8 @@ DONE = + * SqlMap create constants for columns and table names * SqlMap, TriggerTemplate format better +* Test timezone columns. Do other databases other than oracle and postgres have timezone columns? + Performance Improvement Opportunities * Pluggable data loaders. * Sync based on updated column values (timestamp or flag) @@ -75,4 +78,5 @@ Documentation * db.default.schema is no longer used * No longer stop purge from running if there wasn't an initial load * Extensions no longer have services injected into them. If they need acccess to services, they should implement ISymmetricEngineAware -* Node concurrency manager no longer allows the same node to make a second request while it already has a reservation \ No newline at end of file +* Node concurrency manager no longer allows the same node to make a second request while it already has a reservation +* Added stage management job (that purges staged files) \ No newline at end of file diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleTriggerTemplate.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleTriggerTemplate.java index c4390c5235..b0adc67b01 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleTriggerTemplate.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleTriggerTemplate.java @@ -14,6 +14,7 @@ public OracleTriggerTemplate() { arrayColumnTemplate = null; numberColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', '\"'||cast($(tableAlias).\"$(columnName)\" as number(30,10))||'\"')" ; datetimeColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.FF3')),'\"'))" ; + dateTimeWithTimeZoneTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM')),'\"'))" ; timeColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS')),'\"'))" ; dateColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS')),'\"'))" ; clobColumnTemplate = "decode(dbms_lob.getlength($(tableAlias).\"$(columnName)\"), null, to_clob(''), '\"'||replace(replace($(tableAlias).\"$(columnName)\",'\\','\\\\'),'\"','\\\"')||'\"')" ; diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/db/postgresql/PostgreSqlTriggerTemplate.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/db/postgresql/PostgreSqlTriggerTemplate.java index 9e3ea46106..f6cac32232 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/db/postgresql/PostgreSqlTriggerTemplate.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/db/postgresql/PostgreSqlTriggerTemplate.java @@ -15,6 +15,7 @@ public PostgreSqlTriggerTemplate() { arrayColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || replace(replace(cast($(tableAlias).\"$(columnName)\" as varchar),$$\\$$,$$\\\\$$),'\"',$$\\\"$$) || '\"' end" ; numberColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || cast($(tableAlias).\"$(columnName)\" as varchar) || '\"' end" ; datetimeColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.US') || '\"' end" ; + dateTimeWithTimeZoneTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS.US ')||lpad(cast(extract(timezone_hour from $(tableAlias).\"$(columnName)\") as varchar),2,'0')||':'||lpad(cast(extract(timezone_minute from $(tableAlias).\"$(columnName)\") as varchar), 2, '0') || '\"' end" ; timeColumnTemplate = null; dateColumnTemplate = null; clobColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || replace(replace($(tableAlias).\"$(columnName)\",$$\\$$,$$\\\\$$),'\"',$$\\\"$$) || '\"' end" ; diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index fd059a54ec..2723239316 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -610,6 +610,10 @@ public ITransportManager getTransportManager() { public IExtensionPointManager getExtensionPointManager() { return extensionPointManger; } + + public IStagingManager getStagingManager() { + return stagingManager; + } private void removeMeFromMap(Map map) { Set keys = new HashSet(map.keySet()); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java index 5223d72db9..f5f7b9d8dc 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java @@ -24,6 +24,7 @@ import org.jumpmind.symmetric.common.DeploymentType; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.ext.IExtensionPointManager; +import org.jumpmind.symmetric.io.stage.IStagingManager; import org.jumpmind.symmetric.job.IJobManager; import org.jumpmind.symmetric.job.OutgoingPurgeJob; import org.jumpmind.symmetric.job.PullJob; @@ -256,4 +257,6 @@ public interface ISymmetricEngine { public IExtensionPointManager getExtensionPointManager(); + public IStagingManager getStagingManager(); + } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/db/TriggerTemplate.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/db/TriggerTemplate.java index 76e1f64bd0..1c6922bd5d 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/db/TriggerTemplate.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/db/TriggerTemplate.java @@ -77,6 +77,8 @@ public class TriggerTemplate { protected String timeColumnTemplate; protected String dateColumnTemplate; + + protected String dateTimeWithTimeZoneTemplate; protected String clobColumnTemplate; @@ -625,6 +627,11 @@ protected ColumnString buildColumnString(String origTableAlias, String tableAlia case Types.OTHER: templateToUse = this.otherColumnTemplate; break; + case -101: + if (StringUtils.isNotBlank(this.dateTimeWithTimeZoneTemplate)) { + templateToUse = this.dateTimeWithTimeZoneTemplate; + break; + } case Types.JAVA_OBJECT: case Types.DISTINCT: case Types.STRUCT: diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/JobManager.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/JobManager.java index fa01fed703..8f1ee53d9a 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/JobManager.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/JobManager.java @@ -56,6 +56,7 @@ public JobManager(ISymmetricEngine engine) { this.jobs.add(new SyncTriggersJob(engine, taskScheduler)); this.jobs.add(new HeartbeatJob(engine, taskScheduler)); this.jobs.add(new WatchdogJob(engine, taskScheduler)); + this.jobs.add(new StageManagementJob(engine, taskScheduler, engine.getStagingManager())); } diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/StageManagementJob.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/StageManagementJob.java new file mode 100644 index 0000000000..d4822e18ba --- /dev/null +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/StageManagementJob.java @@ -0,0 +1,40 @@ +package org.jumpmind.symmetric.job; + +import org.jumpmind.symmetric.ISymmetricEngine; +import org.jumpmind.symmetric.io.stage.IStagingManager; +import org.jumpmind.symmetric.service.ClusterConstants; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +public class StageManagementJob extends AbstractJob { + + private IStagingManager stagingManager; + + public StageManagementJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler, + IStagingManager stagingManager) { + super("job.stage.management", true, engine.getParameterService().is( + "start.stage.management.job"), engine, taskScheduler); + } + + public String getClusterLockName() { + return ClusterConstants.STAGE_MANAGEMENT; + } + + public boolean isClusterable() { + return true; + } + + @Override + long doJob() throws Exception { + if (stagingManager != null) { + long cleanupCount = stagingManager.clean(); + + // TODO it would be a nice feature to be able to import from an + // upload/import directory any files that are dropped there. + + return cleanupCount; + } else { + return 0; + } + } + +} diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java index cd2700fd4a..647128644b 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java @@ -21,10 +21,11 @@ package org.jumpmind.symmetric.service; /** - * + * Names for jobs as locked by the {@link IClusterService} */ public class ClusterConstants { - + + public static final String STAGE_MANAGEMENT = "STAGE_MANAGEMENT"; public static final String ROUTE = "ROUTE"; public static final String PUSH = "PUSH"; public static final String PULL = "PULL"; diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/util/ArgTypePreparedStatementSetter.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/util/ArgTypePreparedStatementSetter.java deleted file mode 100644 index 6f3dd93bda..0000000000 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/util/ArgTypePreparedStatementSetter.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to JumpMind Inc under one or more contributor - * license agreements. See the NOTICE file distributed - * with this work for additional information regarding - * copyright ownership. JumpMind Inc licenses this file - * to you under the GNU Lesser General Public License (the - * "License"); you may not use this file except in compliance - * with the License. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, see - * . - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. */ - - -package org.jumpmind.symmetric.util; - -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Types; - -import org.springframework.jdbc.core.PreparedStatementSetter; -import org.springframework.jdbc.core.StatementCreatorUtils; -import org.springframework.jdbc.support.lob.LobHandler; - -/** - * - */ -public class ArgTypePreparedStatementSetter implements PreparedStatementSetter -{ - private final Object[] args; - - private final int[] argTypes; - - private final LobHandler lobHandler; - - public ArgTypePreparedStatementSetter(Object[] args, int[] argTypes, LobHandler lobHandler) { - this.args = args; - this.argTypes = argTypes; - this.lobHandler = lobHandler; - } - - public void setValues(PreparedStatement ps) throws SQLException { - for (int i = 1; i <= args.length; i++) { - Object arg = args[i-1]; - int argType = argTypes[i-1]; - if (argType == Types.BLOB && lobHandler != null) { - lobHandler.getLobCreator().setBlobAsBytes(ps, i, (byte[]) arg); - } - else if (argType == Types.CLOB && lobHandler != null) { - lobHandler.getLobCreator().setClobAsString(ps, i, (String) arg); - } - else { - StatementCreatorUtils.setParameterValue(ps, i, argType, arg); - } - } - } - - public void cleanupParameters() { - StatementCreatorUtils.cleanupParameters(args); - } -} \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric/symmetric-core/src/main/resources/symmetric-default.properties index 52ff6a3801..f1f44b06a4 100644 --- a/symmetric/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric/symmetric-core/src/main/resources/symmetric-default.properties @@ -64,6 +64,7 @@ db.pool.max.active=40 # This is the query to validate the database connection in Connection Pool. # It is database specific. The following are example statements for different databases. +# # MySQL # db.validation.query=select 1 # Oracle @@ -76,16 +77,19 @@ db.validation.query= # These are settings that will be passed to the JDBC driver as connection properties. # Suggested settings by database are as follows: # Oracle +# # db.connection.properties=oracle.net.CONNECT_TIMEOUT=300000;oracle.net.READ_TIMEOUT=300000;SetBigStringTryClob=true # Tags: database db.connection.properties= # When symmetric tables are created and accessed, this is the prefix to use for the tables. +# # Tags: database sync.table.prefix=sym # This is how long the default transaction time is. # This needs to be fairly big to account for large data loads. +# # Tags: database db.tx.timeout.seconds=7200 @@ -94,12 +98,14 @@ db.tx.timeout.seconds=7200 db.sql.query.timeout.seconds=300 # Set the name of the spring bean that is the DataSource to use. This property maybe be overridden so the end user can specify -# their own DataSource +# their own DataSource. +# # Tags: database db.spring.bean.name=symmetricBasicDataSource # Name of class that can extract native JDBC objects and interact directly with the driver. # Spring uses this to perform operations specific to database, like handling LOBs on Oracle. +# # Tags: database db.native.extractor=org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor @@ -107,11 +113,13 @@ db.native.extractor=org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativ # Symmetric will handle transactionality. # If NOT using a JNDI connection pool, provide information about the database connection. # A DBCP connection pool will be created. +# # Tags: database db.jndi.name= # This is how long a request for a connection from the datasource will wait before # giving up. +# # Tags: database db.pool.max.wait.millis=30000 @@ -120,17 +128,19 @@ db.pool.max.wait.millis=30000 db.pool.min.evictable.idle.millis=120000 # This is the default fetch size for streaming result sets. +# # Tags: database db.jdbc.streaming.results.fetch.size=1000 # This is the default number of rows that will be sent to the database as a batch when # SymmetricDS uses the JDBC batch API. Currently, only routing uses JDBC batch. The # data loader does not. +# # Tags: database,routing db.jdbc.execute.batch.size=100 # This is the schema that will be used for metadata lookup. Some dialect automatically -# figure this out using database specific SQL to get the current schema +# figure this out using database specific SQL to get the current schema. # # DatabaseOverridable: true # Tags: database @@ -160,14 +170,14 @@ db.force.delimited.identifier.mode.off=false http.concurrent.workers.max=20 # This is the amount of time the host will keep a concurrent connection reservation after it has -# been attained by a client node while waiting for the subsequent reconnect to push +# been attained by a client node while waiting for the subsequent reconnect to push. # Tags: transport http.concurrent.reservation.timeout.ms=20000 # During SSL handshaking, if the URL's hostname and the server's # identification hostname mismatch, the verification mechanism # will check this comma separated list of server names to see if the -# cert should be accepted (see javax.net.ssl.HostnameVerifier) +# cert should be accepted (see javax.net.ssl.HostnameVerifier.) # Tags: transport https.verified.server.names= @@ -185,7 +195,7 @@ stream.to.file.enabled=true # If stream.to.file.enabled is true, then the threshold number of bytes at which a file # will be written is controlled by this property. Note that for a synchronization the # entire payload of the synchronization will be buffered in memory up to this number (at -# which point it will be written and continue to stream to disk) +# which point it will be written and continue to stream to disk.) # # DatabaseOverridable: false # Tags: transport @@ -250,7 +260,7 @@ http.push.stream.output.size=30720 # Type: boolean web.compression.disabled=false -# Set the compression level this node will use when compressing synchronization payloads +# Set the compression level this node will use when compressing synchronization payloads. # @see java.util.zip.Deflater # NO_COMPRESSION = 0 # BEST_SPEED = 1 @@ -261,7 +271,7 @@ web.compression.disabled=false # Tags: transport compression.level=-1 -# Set the compression strategy this node will use when compressing synchronization payloads +# Set the compression strategy this node will use when compressing synchronization payloads. # @see java.util.zip.Deflater # FILTERED = 1 # HUFFMAN_ONLY = 2 @@ -271,7 +281,7 @@ compression.level=-1 # Tags: transport compression.strategy=0 -# The base servlet path for embedding SymmetricDS with-in another web application +# The base servlet path for embedding SymmetricDS with-in another web application. # # Tags: transport web.base.servlet.path=/sync @@ -282,12 +292,12 @@ web.base.servlet.path=/sync # Type: boolean web.batch.servlet.enable=true -# Specify the transport type. Supported values currently include: http, internal +# Specify the transport type. Supported values currently include: http, internal. # # Tags: transport transport.type=http -# This is the number of maximum number of bytes to synchronize in one connect +# This is the number of maximum number of bytes to synchronize in one connect. # # DatabaseOverridable: true # Tags: transport @@ -299,30 +309,30 @@ transport.max.bytes.to.sync=1048576 # Tags: general engine.name=SymmetricDS -# If this is true, when symmetric starts up it will try to create the necessary tables +# If this is true, when symmetric starts up it will try to create the necessary tables. # # Tags: general auto.config.database=true -# If this is true. when symmetric starts up it will make sure the triggers in the database are up to date +# If this is true. when symmetric starts up it will make sure the triggers in the database are up to date. # # Tags: general # Type: boolean auto.sync.triggers=true -# If this is true, when symmetric starts up it will try to upgrade tables to latest version +# If this is true, when symmetric starts up it will try to upgrade tables to latest version. # # Tags: general # Type: boolean auto.upgrade=true -# Send symmetricds changes to client nodes when configuration changes +# Send symmetricds changes to client nodes when configuration changes. # # Tags: general # Type: boolean auto.sync.configuration=true -# Update the node row in the database from the local properties during a heartbeat operation +# Update the node row in the database from the local properties during a heartbeat operation. # # Tags: general # Type: boolean @@ -355,7 +365,7 @@ schema.version=? # Tags: general registration.number.of.attempts=-1 -# Set this if tables should be created prior to an initial load +# Set this if tables should be created prior to an initial load. # # DatabaseOverridable: true # Tags: general @@ -384,7 +394,7 @@ initial.load.delete.first.sql=delete from %s # Type: boolean initial.load.use.reload.channel=true -# If this is true, registration is opened automatically for nodes requesting it +# If this is true, registration is opened automatically for nodes requesting it. # # DatabaseOverridable: true # Tags: general @@ -410,7 +420,7 @@ cluster.server.id= # Tags: jobs cluster.lock.timeout.ms=1800000 -# Enables clustering of jobs +# Enables clustering of jobs. # # DatabaseOverridable: true # Tags: jobs @@ -429,7 +439,7 @@ job.push.period.time.ms=60000 # Tags: jobs job.pull.period.time.ms=60000 -# This is how often accumulated statistics will be flushed out to the database from memory +# This is how often accumulated statistics will be flushed out to the database from memory. # # DatabaseOverridable: true # Tags: jobs @@ -453,45 +463,51 @@ job.heartbeat.period.time.ms=1000 # Tags: jobs job.watchdog.period.time.ms=3600000 -# This is how often the purge job will be run +# This is how often the purge job will be run. # # DatabaseOverridable: true # Tags: jobs job.purge.datagaps.cron=0 0 0 * * * -# This is how often the purge job will be run +# This is how often the purge job will be run. # # DatabaseOverridable: true # Tags: jobs job.purge.incoming.cron=0 0 0 * * * -# This is how often the purge job will be run +# This is how often the purge job will be run. # # DatabaseOverridable: true # Tags: jobs job.purge.outgoing.cron=0 0 0 * * * -# This is when the sync triggers job will run +# This is when the sync triggers job will run. # # DatabaseOverridable: true # Tags: jobs job.synctriggers.cron=0 0 0 * * * -# This is the number of batches that will be purged from the data_event table in one database transaction +# This is when the stage management job will run. +# +# DatabaseOverridable: true +# Tags: jobs +job.stage.management=15000 + +# This is the number of batches that will be purged from the data_event table in one database transaction. # # DatabaseOverridable: true # Tags: purge job.purge.max.num.data.events.to.delete.in.tx=5 -# This is the number of batches that will be purged in one database transaction +# This is the number of batches that will be purged in one database transaction. # # DatabaseOverridable: true # Tags: purge job.purge.max.num.batches.to.delete.in.tx=5000 -# This is the number of data ids that will be purged in one database transaction +# This is the number of data ids that will be purged in one database transaction. # # DatabaseOverridable: true # Tags: purge @@ -545,6 +561,12 @@ start.stat.flush.job=true # Type: boolean start.watchdog.job=true +# Whether the stage management job is enabled for this node. +# +# Tags: jobs +# Type: boolean +start.stage.management.job=true + # This is the maximum number of events that will be peeked at to look for additional transaction rows after # the max batch size is reached. The more concurrency in your db and the longer the transaction takes the # bigger this value might have to be. diff --git a/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractDataLoaderServiceTest.java b/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractDataLoaderServiceTest.java index a64759571d..f5bc26b4b5 100644 --- a/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractDataLoaderServiceTest.java +++ b/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractDataLoaderServiceTest.java @@ -29,6 +29,7 @@ import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.symmetric.transport.MockTransportManager; import org.jumpmind.symmetric.transport.internal.InternalIncomingTransport; +import org.junit.After; import org.junit.Test; abstract public class AbstractDataLoaderServiceTest extends AbstractServiceTest { @@ -648,5 +649,10 @@ protected IDataLoaderService getDataLoaderService() { dataLoaderService.setTransportManager(transportManager); return dataLoaderService; } + + @After + public void cleanup() { + getSymmetricEngine().getStagingManager().clean(); + } } diff --git a/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractServiceTest.java b/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractServiceTest.java index 833656a4cc..6b4c782f00 100644 --- a/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractServiceTest.java +++ b/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractServiceTest.java @@ -9,6 +9,7 @@ import org.jumpmind.db.sql.ISqlTemplate; import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.db.ISymmetricDialect; +import org.jumpmind.symmetric.io.stage.IStagingManager; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IDataExtractorService; import org.jumpmind.symmetric.service.IDataService; @@ -130,6 +131,10 @@ protected IIncomingBatchService getIncomingBatchService() { protected ISqlTemplate getSqlTemplate() { return getSymmetricEngine().getSymmetricDialect().getPlatform().getSqlTemplate(); } + + protected IStagingManager getStagingManager() { + return getSymmetricEngine().getStagingManager(); + } protected void assertTrue(boolean condition, String message) { Assert.assertTrue(message, condition); diff --git a/symmetric/symmetric-db/src/main/java/org/jumpmind/db/model/TypeMap.java b/symmetric/symmetric-db/src/main/java/org/jumpmind/db/model/TypeMap.java index e886b6a94b..86184031b9 100644 --- a/symmetric/symmetric-db/src/main/java/org/jumpmind/db/model/TypeMap.java +++ b/symmetric/symmetric-db/src/main/java/org/jumpmind/db/model/TypeMap.java @@ -88,6 +88,9 @@ public abstract class TypeMap public static final String TIME = "TIME"; /** The string representation of the {@link java.sql.Types#TIMESTAMP} constant. */ public static final String TIMESTAMP = "TIMESTAMP"; + + public static final String TIMESTAMPTZ = "TIMESTAMPTZ"; + /** The string representation of the {@link java.sql.Types#TINYINT} constant. */ public static final String TINYINT = "TINYINT"; /** The string representation of the {@link java.sql.Types#VARBINARY} constant. */ @@ -138,6 +141,7 @@ public abstract class TypeMap registerJdbcType(Types.TINYINT, TINYINT, JdbcTypeCategoryEnum.NUMERIC); registerJdbcType(Types.VARBINARY, VARBINARY, JdbcTypeCategoryEnum.BINARY); registerJdbcType(Types.VARCHAR, VARCHAR, JdbcTypeCategoryEnum.TEXTUAL); + registerJdbcType(-101, TIMESTAMPTZ, JdbcTypeCategoryEnum.DATETIME); // only available in JDK 1.4 and above: if (PlatformUtils.supportsJava14JdbcTypes()) diff --git a/symmetric/symmetric-db/src/main/java/org/jumpmind/db/sql/DmlStatement.java b/symmetric/symmetric-db/src/main/java/org/jumpmind/db/sql/DmlStatement.java index c0dcec0a87..df0e80acce 100644 --- a/symmetric/symmetric-db/src/main/java/org/jumpmind/db/sql/DmlStatement.java +++ b/symmetric/symmetric-db/src/main/java/org/jumpmind/db/sql/DmlStatement.java @@ -126,32 +126,15 @@ protected int[] buildTypes(Column[] columns, boolean isDateOverrideToTimestamp) return types; } - public String buildInsertSql(String tableName, String[] columnNames) { - StringBuilder sql = new StringBuilder("insert into " + tableName + "("); - appendColumns(sql, columnNames); - sql.append(") values ("); - appendColumnQuestions(sql, columnNames.length); - sql.append(")"); - return sql.toString(); - } - public String buildInsertSql(String tableName, Column[] keys, Column[] columns) { StringBuilder sql = new StringBuilder("insert into " + tableName + "("); - int columnCount = appendColumns(sql, columns); + appendColumns(sql, columns); sql.append(") values ("); - appendColumnQuestions(sql, columnCount); + appendColumnQuestions(sql, columns); sql.append(")"); return sql.toString(); } - public String buildUpdateSql(String tableName, String[] keyNames, String[] columnNames) { - StringBuilder sql = new StringBuilder("update ").append(tableName).append(" set "); - appendColumnEquals(sql, columnNames, ", "); - sql.append(" where "); - appendColumnEquals(sql, keyNames, " and "); - return sql.toString(); - } - public String buildUpdateSql(String tableName, Column[] keyColumns, Column[] columns) { StringBuilder sql = new StringBuilder("update ").append(tableName).append(" set "); appendColumnEquals(sql, columns, ", "); @@ -160,12 +143,6 @@ public String buildUpdateSql(String tableName, Column[] keyColumns, Column[] col return sql.toString(); } - public String buildDeleteSql(String tableName, String[] keyNames) { - StringBuilder sql = new StringBuilder("delete from ").append(tableName).append(" where "); - appendColumnEquals(sql, keyNames, " and "); - return sql.toString(); - } - public String buildDeleteSql(String tableName, Column[] keyColumns) { StringBuilder sql = new StringBuilder("delete from ").append(tableName).append(" where "); appendColumnEquals(sql, keyColumns, " and "); @@ -179,13 +156,6 @@ public String buildCountSql(String tableName, Column[] keyColumns) { return sql.toString(); } - public void appendColumnEquals(StringBuilder sql, String[] names, String separator) { - for (int i = 0; i < names.length; i++) { - sql.append(quote).append(names[i]).append(quote).append(" = ?").append( - i + 1 < names.length ? separator : ""); - } - } - public void appendColumnEquals(StringBuilder sql, Column[] columns, String separator) { int existingCount = 0; for (int i = 0; i < columns.length; i++) { @@ -198,13 +168,6 @@ public void appendColumnEquals(StringBuilder sql, Column[] columns, String separ } } - public void appendColumns(StringBuilder sql, String[] names) { - for (int i = 0; i < names.length; i++) { - sql.append(quote).append(names[i]).append(quote) - .append(i + 1 < names.length ? "," : ""); - } - } - public int appendColumns(StringBuilder sql, Column[] columns) { int existingCount = 0; for (int i = 0; i < columns.length; i++) { @@ -218,10 +181,17 @@ public int appendColumns(StringBuilder sql, Column[] columns) { return existingCount; } - public void appendColumnQuestions(StringBuilder sql, int number) { - for (int i = 0; i < number; i++) { - sql.append("?").append(i + 1 < number ? "," : ""); - } + public void appendColumnQuestions(StringBuilder sql, Column[] columns) { + for (int i = 0; i < columns.length; i++) { + if (columns[i] != null) { + sql.append("?").append(","); + } + } + + if (columns.length > 0) { + sql.replace(sql.length()-1, sql.length(), ""); + } + } public String getSql() { diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriter.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriter.java index 8cf50540e0..709f13150f 100644 --- a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriter.java +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/StagingDataWriter.java @@ -45,7 +45,7 @@ protected void notifyEndBatch(Batch batch, IProtocolDataWriterListener listener) protected IStagedResource getStagedResource(Batch batch) { IStagedResource resource = stagingManager.find(category, batch.getNodeId(), batch.getBatchId()); - if (resource == null) { + if (resource == null || resource.getState() == State.DONE) { resource = stagingManager.create(category, batch.getNodeId(), batch.getBatchId()); } return resource; diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java index b566e871b9..68990bf9f6 100644 --- a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/IStagingManager.java @@ -6,6 +6,6 @@ public interface IStagingManager { public abstract IStagedResource create(Object... path); - public abstract void clean(); + public abstract long clean(); } diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java index 3fbe97afeb..3025e3cfe7 100644 --- a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java @@ -88,6 +88,9 @@ public void setState(State state) { } else { this.file = newFile; } + } else if (memoryBuffer != null && state == State.DONE) { + this.memoryBuffer.setLength(0); + this.memoryBuffer = null; } this.state = state; } diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java index a5e1e6f179..fbd3e06355 100644 --- a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagingManager.java @@ -56,22 +56,24 @@ protected void refreshResourceList() { * Clean up files that are older than {@link #timeToLiveInMs} and have been * marked as done. */ - public void clean() { + public long clean() { this.refreshResourceList(); Set keys = new HashSet(resourceList.keySet()); - int purgedCount = 0; + long purgedCount = 0; long purgedSize = 0; for (String key : keys) { IStagedResource resource = resourceList.get(key); - if (resource.getState() == State.DONE - && (System.currentTimeMillis() - resource.getCreateTime()) > timeToLiveInMs) { + boolean resourceIsOld = System.currentTimeMillis() - resource.getCreateTime() > timeToLiveInMs; + if (resource.getState() == State.DONE && (resourceIsOld || !resource.exists())) { purgedCount++; purgedSize += resource.getSize(); resource.delete(); resourceList.remove(key); } } - log.info("Purged {} staged files, freeing {} kb of disk space", purgedCount, (int)(purgedSize/1000)); + log.info("Purged {} staged files, freeing {} kb of disk space", purgedCount, + (int) (purgedSize / 1000)); + return purgedCount; } /** @@ -83,14 +85,14 @@ public IStagedResource create(Object... path) { this.resourceList.put(filePath, resource); return resource; } - + protected String buildFilePath(Object... path) { StringBuilder buffer = new StringBuilder(); - for(int i = 0; i < path.length; i++) { + for (int i = 0; i < path.length; i++) { buffer.append(path[i]); - if (i < path.length-1) { + if (i < path.length - 1) { buffer.append(System.getProperty("file.separator")); - } + } } return buffer.toString(); } diff --git a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OracleDdlReader.java b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OracleDdlReader.java index dfedf50ad9..67fb885799 100644 --- a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OracleDdlReader.java +++ b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OracleDdlReader.java @@ -132,7 +132,7 @@ protected Integer overrideJdbcTypeForColumn(Map values) { String typeName = (String) values.get("TYPE_NAME"); if (typeName != null && typeName.startsWith("DATE")) { return Types.DATE; - } else if (typeName != null && typeName.startsWith("TIMESTAMP")) { + } else if (typeName != null && typeName.startsWith("TIMESTAMP") && !typeName.endsWith("TIME ZONE")) { // This is for Oracle's TIMESTAMP(9) return Types.TIMESTAMP; } else if (typeName != null && typeName.startsWith("NVARCHAR")) { diff --git a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OracleDmlStatement.java b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OracleDmlStatement.java new file mode 100644 index 0000000000..cb60833f61 --- /dev/null +++ b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OracleDmlStatement.java @@ -0,0 +1,50 @@ +package org.jumpmind.db.platform.oracle; + +import org.jumpmind.db.model.Column; +import org.jumpmind.db.sql.DmlStatement; + +public class OracleDmlStatement extends DmlStatement { + + public OracleDmlStatement(DmlType type, String catalogName, String schemaName, + String tableName, Column[] keys, Column[] columns, boolean isDateOverrideToTimestamp, + String identifierQuoteString) { + super(type, catalogName, schemaName, tableName, keys, columns, isDateOverrideToTimestamp, + identifierQuoteString); + } + + @Override + public void appendColumnQuestions(StringBuilder sql, Column[] columns) { + for (int i = 0; i < columns.length; i++) { + if (columns[i] != null) { + if (columns[i].getTypeCode() == -101) { + sql.append("TO_TIMESTAMP_TZ(?, 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM')").append(","); + } else { + sql.append("?").append(","); + } + } + } + + if (columns.length > 0) { + sql.replace(sql.length()-1, sql.length(), ""); + } + } + + @Override + public void appendColumnEquals(StringBuilder sql, Column[] columns, String separator) { + for (int i = 0; i < columns.length; i++) { + if (columns[i] != null) { + if (columns[i].getTypeCode() == -101) { + sql.append(quote).append(columns[i].getName()).append(quote) + .append(" = TO_TIMESTAMP_TZ(?, 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM')").append(separator); + } else { + sql.append(quote).append(columns[i].getName()).append(quote).append(" = ?").append(separator); + } + } + } + + if (columns.length > 0) { + sql.replace(sql.length()-separator.length(), sql.length(), ""); + } + } + +} diff --git a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OraclePlatform.java b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OraclePlatform.java index 81050f31f3..f464055282 100644 --- a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OraclePlatform.java +++ b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/oracle/OraclePlatform.java @@ -24,8 +24,11 @@ import javax.sql.DataSource; import org.apache.commons.lang.StringUtils; +import org.jumpmind.db.model.Column; import org.jumpmind.db.platform.AbstractJdbcDatabasePlatform; import org.jumpmind.db.platform.DatabasePlatformSettings; +import org.jumpmind.db.sql.DmlStatement; +import org.jumpmind.db.sql.DmlStatement.DmlType; import org.springframework.jdbc.support.lob.OracleLobHandler; /* @@ -129,4 +132,12 @@ public String getDefaultSchema() { return defaultSchema; } + @Override + public DmlStatement createDmlStatement(DmlType dmlType, String catalogName, String schemaName, + String tableName, Column[] keys, Column[] columns) { + return new OracleDmlStatement(dmlType, catalogName, schemaName, tableName, keys, columns, + getPlatformInfo().isDateOverridesToTimestamp(), + getPlatformInfo().getIdentifierQuoteString()); + } + } diff --git a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/postgresql/PostgreSqlDdlReader.java b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/postgresql/PostgreSqlDdlReader.java index 1acb523ace..2f6e0dfe83 100644 --- a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/postgresql/PostgreSqlDdlReader.java +++ b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/postgresql/PostgreSqlDdlReader.java @@ -84,6 +84,9 @@ protected Integer overrideJdbcTypeForColumn(Map values) { String typeName = (String) values.get("TYPE_NAME"); if (typeName != null && typeName.equalsIgnoreCase("ABSTIME")) { return Types.TIMESTAMP; + } else if (typeName != null && typeName.equalsIgnoreCase("TIMESTAMPTZ")) { + // lets use the same type code that oracle uses + return -101; } else if (typeName != null && typeName.equalsIgnoreCase("OID")) { return Types.BLOB; } else { diff --git a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/postgresql/PostgreSqlDmlStatement.java b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/postgresql/PostgreSqlDmlStatement.java index c595586497..1ca5d23776 100644 --- a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/postgresql/PostgreSqlDmlStatement.java +++ b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/postgresql/PostgreSqlDmlStatement.java @@ -39,9 +39,9 @@ public String buildInsertSql(String tableName, Column[] keyColumns, Column[] col StringBuilder sql = new StringBuilder("insert into "); sql.append(tableName); sql.append("("); - int columnCount = appendColumns(sql, columns); + appendColumns(sql, columns); sql.append(") (select "); - appendColumnQuestions(sql, columnCount); + appendColumnQuestions(sql, columns); sql.append(" where (select 1 from "); sql.append(tableName); sql.append(" where "); @@ -85,5 +85,41 @@ protected int[] buildTypes(Column[] keys, Column[] columns, boolean isDateOverri return super.buildTypes(keys, columns, isDateOverrideToTimestamp); } } + + @Override + public void appendColumnQuestions(StringBuilder sql, Column[] columns) { + for (int i = 0; i < columns.length; i++) { + if (columns[i] != null) { + if (columns[i].getTypeCode() == -101) { + sql.append("cast(? as timestamp with time zone)").append(","); + } else { + sql.append("?").append(","); + } + } + } + + if (columns.length > 0) { + sql.replace(sql.length() - 1, sql.length(), ""); + } + } + + @Override + public void appendColumnEquals(StringBuilder sql, Column[] columns, String separator) { + for (int i = 0; i < columns.length; i++) { + if (columns[i] != null) { + if (columns[i].getTypeCode() == -101) { + sql.append(quote).append(columns[i].getName()).append(quote) + .append(" = cast(? as timestamp with time zone)").append(separator); + } else { + sql.append(quote).append(columns[i].getName()).append(quote).append(" = ?") + .append(separator); + } + } + } + + if (columns.length > 0) { + sql.replace(sql.length() - separator.length(), sql.length(), ""); + } + } } diff --git a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/jdbc/JdbcUtils.java b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/jdbc/JdbcUtils.java index c564cc9817..2e41db50d1 100644 --- a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/jdbc/JdbcUtils.java +++ b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/jdbc/JdbcUtils.java @@ -23,10 +23,18 @@ public static void setValues(PreparedStatement ps, Object[] args, int[] argTypes } else if (argType == Types.CLOB && lobHandler != null) { lobHandler.getLobCreator().setClobAsString(ps, i, (String) arg); } else { - StatementCreatorUtils.setParameterValue(ps, i, argType, arg); + StatementCreatorUtils.setParameterValue(ps, i, verifyArgType(argType), arg); } } } + + protected static int verifyArgType(int argType) { + if (argType == -101) { + return SqlTypeValue.TYPE_UNKNOWN; + } else { + return argType; + } + } public static void setValues(PreparedStatement ps, Object[] args) throws SQLException { if (args != null) {