diff --git a/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidJobManager.java b/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidJobManager.java index 1b43bf4247..353759adc7 100644 --- a/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidJobManager.java +++ b/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidJobManager.java @@ -58,10 +58,17 @@ public class AndroidJobManager implements IJobManager { protected long lastFileSyncTrackerTime = System.currentTimeMillis(); protected long lastFileSyncPushTime = System.currentTimeMillis(); + + protected boolean started = false; public AndroidJobManager(ISymmetricEngine engine) { this.engine = engine; } + + @Override + public boolean isStarted() { + return started; + } public List getJobs() { List jobs = new ArrayList(1); @@ -76,12 +83,14 @@ public void startJobs() { job = new Job(); job.start(); } + started = true; } public void stopJobs() { if (job != null) { job.stop(); } + started = false; } public IJob getJob(String name) { diff --git a/symmetric-assemble/src/asciidoc/manage/node-send.ad b/symmetric-assemble/src/asciidoc/manage/node-send.ad index c20513f417..209e6b310d 100644 --- a/symmetric-assemble/src/asciidoc/manage/node-send.ad +++ b/symmetric-assemble/src/asciidoc/manage/node-send.ad @@ -3,11 +3,8 @@ Events other than data changes can be sent to nodes in the synchronization netwo SQL Scripts:: Sql can be sent to be executed on a target node BSH Scripts:: Beanshell scripts can be sent to be executed on a target node Table Schema:: The schema the source node can be replicated to the target node individually -Table Data:: Tables can be loaded or reloaded individually -ifdef::pro[] -image::manage/manage-nodes-send.png[] -endif::pro[] +ifndef::pro[] ===== Table Data @@ -21,10 +18,6 @@ if more than one table is involved, be sure to send any tables which are referre the channel's synchronization will block because SymmetricDS is unable to insert or update the row because the foreign key relationship refers to a non-existent row in the destination! -ifdef::pro[] -If you click on the *Send* button and select *Table Data* you will be prompted to select the tables you want to send. Events will be queued up -for each node that is selected on the Manage Nodes screen. -endif::pro[] You can manually insert "reload" events into the <> table that represent the table/s to reload. These reload events are created in the source database. @@ -64,3 +57,5 @@ The following is an example insert statement: This insert statement generates N rows, one for each configured table that starts with sale_. It uses the most recent trigger history id for the corresponding table. It takes advantage of the initial load order for each trigger-router to create the three rows in the correct order (the order corresponding to the order in which the tables would have been initial loaded). + +endif::pro[] diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java index d1958bc99a..d95630abc0 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java @@ -39,6 +39,8 @@ public class JobManager implements IJobManager { private ThreadPoolTaskScheduler taskScheduler; + private boolean started = false; + public JobManager(ISymmetricEngine engine) { this.taskScheduler = new ThreadPoolTaskScheduler(); @@ -67,6 +69,11 @@ public JobManager(ISymmetricEngine engine) { this.jobs.add(new MonitorJob(engine, taskScheduler)); this.jobs.add(new ReportStatusJob(engine, taskScheduler)); } + + @Override + public boolean isStarted() { + return started; + } @Override public IJob getJob(String name) { @@ -91,6 +98,7 @@ public synchronized void startJobs() { log.info("Job {} not configured for auto start", job.getName()); } } + started = true; } @Override @@ -108,6 +116,7 @@ public synchronized void stopJobs() { job.stop(); } Thread.interrupted(); + started = false; } @Override diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/job/IJobManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/job/IJobManager.java index 8b8efaf14a..8da0db8de9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/job/IJobManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/job/IJobManager.java @@ -42,4 +42,6 @@ public interface IJobManager { public IJob getJob(String name); + public boolean isStarted(); + } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java index d3d39e3ff9..8974dfdd6d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java @@ -275,7 +275,7 @@ public void syncEnded(DataContext context, List batchesProcessed, if (context.get(CTX_KEY_RESTART_JOBMANAGER_NEEDED) != null) { IJobManager jobManager = engine.getJobManager(); - if (jobManager != null) { + if (jobManager != null && jobManager.isStarted()) { log.info("About to restart jobs because a new schedule came through the data loader"); jobManager.stopJobs(); jobManager.startJobs(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java index 81c7c0a18d..7d08e6bcf6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java @@ -118,6 +118,8 @@ public interface IConfigurationService { */ public boolean isMasterToMaster(); + public boolean containsMasterToMaster(); + public boolean isMasterToMasterOnly(); } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java index 01eeefc1d8..1c16e5861e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java @@ -122,7 +122,7 @@ public boolean isMasterToMaster() { } @Override - public boolean isMasterToMasterOnly() { + public boolean containsMasterToMaster() { boolean masterToMasterOnly = false; Node me = nodeService.findIdentity(); if (me != null) { @@ -131,6 +131,24 @@ public boolean isMasterToMasterOnly() { return masterToMasterOnly; } + @Override + public boolean isMasterToMasterOnly() { + Node me = nodeService.findIdentity(); + int masterCount=0; + int otherCount=0; + if (me != null) { + for (NodeGroupLink nodeGroupLink : getNodeGroupLinksFor(me.getNodeGroupId(), false)) { + if (nodeGroupLink.getTargetNodeGroupId().equals(me.getNodeGroupId())) { + masterCount++; + } + else { + otherCount++; + } + } + } + return masterCount > 1 && otherCount == 0; + } + public boolean refreshFromDatabase() { Date date1 = sqlTemplate.queryForObject(getSql("selectMaxChannelLastUpdateTime"), Date.class); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java index 8a540cedb1..6eafce9fe4 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java @@ -160,7 +160,7 @@ public Node findNode(String id, boolean useCache) { } return nodeCache.get(id); } else { - return findAllNodesAsMap().get(id); + return findNode(id); } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java index 7cdd5ba537..d10042f61f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java @@ -547,7 +547,7 @@ protected String openRegistration(Node node, String remoteHost, String remoteAdd node.setNodeId(nodeId); node.setSyncEnabled(false); - boolean masterToMasterOnly = configurationService.isMasterToMasterOnly(); + boolean masterToMasterOnly = configurationService.containsMasterToMaster(); node.setCreatedAtNodeId(masterToMasterOnly ? null: me.getNodeId()); nodeService.save(node); diff --git a/symmetric-db/src/main/java/org/jumpmind/db/sql/SqlScriptReader.java b/symmetric-db/src/main/java/org/jumpmind/db/sql/SqlScriptReader.java index 7b3004d9e3..85aa61030f 100644 --- a/symmetric-db/src/main/java/org/jumpmind/db/sql/SqlScriptReader.java +++ b/symmetric-db/src/main/java/org/jumpmind/db/sql/SqlScriptReader.java @@ -103,7 +103,7 @@ public String readSqlStatement() { sql.setLength(0); } } else { - checkStatementEndsIndex = sql.length()-1; + checkStatementEndsIndex = sql.length(); } line = readLine(); } while (line != null); diff --git a/symmetric-db/src/test/java/org/jumpmind/db/sql/SqlScriptReaderTest.java b/symmetric-db/src/test/java/org/jumpmind/db/sql/SqlScriptReaderTest.java index 6b8e29eba5..3c1472230a 100644 --- a/symmetric-db/src/test/java/org/jumpmind/db/sql/SqlScriptReaderTest.java +++ b/symmetric-db/src/test/java/org/jumpmind/db/sql/SqlScriptReaderTest.java @@ -26,10 +26,21 @@ import org.junit.Test; +@SuppressWarnings("resource") public class SqlScriptReaderTest { + + @Test + public void testReadScript2() throws Exception { + SqlScriptReader reader = new SqlScriptReader(new InputStreamReader(getClass().getResourceAsStream("/test-script-2.sql"))); + int count = 0; + while (reader.readSqlStatement() != null) { + count++; + } + assertEquals(36, count); + } @Test - public void testReadScript() throws Exception { + public void testReadScript1() throws Exception { SqlScriptReader reader = new SqlScriptReader(new InputStreamReader(getClass().getResourceAsStream("/test-script-1.sql"))); String nextStatement = reader.readSqlStatement(); diff --git a/symmetric-db/src/test/resources/test-script-2.sql b/symmetric-db/src/test/resources/test-script-2.sql new file mode 100644 index 0000000000..321f4bd420 --- /dev/null +++ b/symmetric-db/src/test/resources/test-script-2.sql @@ -0,0 +1,308 @@ + +CREATE TABLE `test_very_long_table_name_1234`( + `id` VARCHAR(100) NOT NULL, + PRIMARY KEY (`id`) +); + +CREATE TABLE `test_extract_table`( + `id` INTEGER NOT NULL, + `varchar_value` VARCHAR(255) NULL, + `longvarchar_value` MEDIUMTEXT NULL, + `timestamp_value` DATETIME, + `date_value` DATE, + `bit_value` BIT, + `bigint_value` BIGINT, + `decimal_value` DECIMAL(18,15), + PRIMARY KEY (`id`) +); + +CREATE TABLE `test_triggers_table`( + `id` INTEGER NOT NULL AUTO_INCREMENT, + `string_one_value` VARCHAR(50) NULL, + `string_two_value` VARCHAR(255) NULL, + `long_string_value` VARCHAR(255) NULL, + `time_value` DATETIME, + `date_value` DATE, + `boolean_value` SMALLINT, + `bigint_value` BIGINT, + `decimal_value` DECIMAL, + PRIMARY KEY (`id`) +); + +CREATE TABLE `test_dataloader_table`( + `string_value` VARCHAR(50) NULL, + `string_required_value` VARCHAR(50) NOT NULL, + `char_value` CHAR(50) NULL, + `char_required_value` CHAR(50) NOT NULL, + `date_value` DATE, + `id` INTEGER NOT NULL AUTO_INCREMENT, + `time_value` DATETIME, + `boolean_value` BIT, + `integer_value` INTEGER, + `decimal_value` DECIMAL(10,2), + `double_value` DOUBLE, + PRIMARY KEY (`id`) +); + +CREATE TABLE `test_order_header`( + `order_id` VARCHAR(50) NOT NULL, + `customer_id` INTEGER NOT NULL, + `status` CHAR(1) NULL, + `deliver_date` DATE, + PRIMARY KEY (`order_id`) +); + +CREATE TABLE `test_column_mapping`( + `id` VARCHAR(50) NOT NULL, + `column1` VARCHAR(50) NULL, + `column2` VARCHAR(50) NULL, + `field1` INTEGER, + `time1` DATETIME, + `another_id_column` VARCHAR(50) NULL, + PRIMARY KEY (`id`) +); + +CREATE TABLE `test_order_detail`( + `order_id` VARCHAR(50) NOT NULL, + `line_number` INTEGER NOT NULL, + `item_type` CHAR(5) NOT NULL, + `item_id` VARCHAR(20) NOT NULL, + `quantity` INTEGER, + `price` DECIMAL(10,2), + PRIMARY KEY (`order_id`, `line_number`) +); + +CREATE TABLE `test_store_status`( + `store_id` CHAR(5) NOT NULL, + `register_id` CHAR(3) NOT NULL, + `status` INTEGER, + PRIMARY KEY (`store_id`, `register_id`) +); + +CREATE TABLE `test_key_word`( + `id` INTEGER NOT NULL, + PRIMARY KEY (`id`) +); + +CREATE TABLE `test_customer`( + `customer_id` INTEGER NOT NULL, + `name` VARCHAR(50) NOT NULL, + `is_active` CHAR(1) NULL, + `status` VARCHAR(20) NULL, + `address` VARCHAR(50) NOT NULL, + `city` VARCHAR(50) NOT NULL, + `state` VARCHAR(50) NOT NULL, + `zip` INTEGER, + `entry_timestamp` DATETIME, + `entry_time` TIME, + `notes` LONGTEXT NULL, + `icon` LONGBLOB NULL, + PRIMARY KEY (`customer_id`) +); + +CREATE TABLE `test_use_stream_lob`( + `test_id` INTEGER NOT NULL, + `test_clob` LONGTEXT NULL, + `test_blob` LONGBLOB NULL, + `test_varbinary` VARBINARY(254) NULL, + `test_binary` BINARY(254) NULL, + `test_longvarchar` MEDIUMTEXT NULL, + `test_longvarbinary` MEDIUMBLOB NULL, + PRIMARY KEY (`test_id`) +); + +CREATE TABLE `test_use_capture_lob`( + `test_id` INTEGER NOT NULL, + `test_clob` LONGTEXT NULL, + `test_blob` LONGBLOB NULL, + `test_varbinary` VARBINARY(254) NULL, + `test_binary` BINARY(254) NULL, + `test_longvarchar` MEDIUMTEXT NULL, + `test_longvarbinary` MEDIUMBLOB NULL, + PRIMARY KEY (`test_id`) +); + +CREATE TABLE `one_column_table`( + `my_one_column` INTEGER NOT NULL, + PRIMARY KEY (`my_one_column`) +); + +CREATE TABLE `no_primary_key_table`( + `one_column` INTEGER NOT NULL, + `two_column` INTEGER NOT NULL, + `three_column` VARCHAR(50) NOT NULL +); + +CREATE TABLE `init_load_from_client_table`( + `id` INTEGER NOT NULL, + `data` VARCHAR(10) NULL, + PRIMARY KEY (`id`) +); + +CREATE TABLE `test_sync_incoming_batch`( + `id` INTEGER NOT NULL, + `data` VARCHAR(10) NULL, + PRIMARY KEY (`id`) +); + +CREATE TABLE `test_sync_column_level`( + `id` INTEGER NOT NULL, + `string_value` VARCHAR(50) NULL, + `time_value` DATETIME, + `date_value` DATE, + `bigint_value` BIGINT, + `decimal_value` DECIMAL(10,2), + PRIMARY KEY (`id`) +); + +CREATE TABLE `test_all_caps`( + `all_caps_id` INTEGER NOT NULL, + `name` VARCHAR(50) NOT NULL, + PRIMARY KEY (`all_caps_id`) +); + +CREATE TABLE `Test_Mixed_Case`( + `Mixed_Case_Id` INTEGER NOT NULL, + `Name` VARCHAR(50) NOT NULL, + PRIMARY KEY (`Mixed_Case_Id`) +); + +CREATE TABLE `test_xml_publisher`( + `id1` INTEGER NOT NULL, + `id2` INTEGER NOT NULL, + `data1` VARCHAR(10) NULL, + `data2` INTEGER, + `data3` DATETIME, + PRIMARY KEY (`id1`) +); + +CREATE TABLE `test_add_dl_table_1`( + `pk1` VARCHAR(100) NOT NULL, + `pk2` VARCHAR(100) NOT NULL, + `add1` DECIMAL(10,0), + `add2` DECIMAL(10,2), + `add3` INTEGER, + `ovr1` DECIMAL(10,2), + `ovr2` INTEGER, + `ovr3` VARCHAR(10) NULL, + `nada1` INTEGER, + PRIMARY KEY (`pk1`, `pk2`) +); + +CREATE TABLE `test_add_dl_table_2`( + `pk1` VARCHAR(100) NOT NULL, + `add1` DECIMAL(10,0), + PRIMARY KEY (`pk1`) +); + +CREATE TABLE `test_target_table_a`( + `id_field` VARCHAR(100) NOT NULL, + `desc_field` VARCHAR(254) NULL, + PRIMARY KEY (`id_field`) +); + +CREATE TABLE `test_target_table_b`( + `id_field` VARCHAR(100) NOT NULL, + `desc_field` VARCHAR(254) NULL, + PRIMARY KEY (`id_field`) +); + +CREATE TABLE `test_changing_column_name`( + `id1` VARCHAR(100) NOT NULL, + `test` DECIMAL(10,0), + PRIMARY KEY (`id1`) +); + +CREATE TABLE `test_routing_data_1`( + `pk` INTEGER NOT NULL AUTO_INCREMENT, + `routing_int` INTEGER, + `routing_varchar` VARCHAR(10) NULL, + `data_blob` LONGBLOB NULL, + `my_time` DATETIME, + PRIMARY KEY (`pk`) +); + +CREATE TABLE `test_routing_data_2`( + `pk` INTEGER NOT NULL AUTO_INCREMENT, + `routing_int` INTEGER, + `routing_varchar` VARCHAR(10) NULL, + `data_blob` LONGBLOB NULL, + `my_time` DATETIME, + PRIMARY KEY (`pk`) +); + +CREATE TABLE `test_routing_data_subtable`( + `pk` INTEGER NOT NULL AUTO_INCREMENT, + `fk` INTEGER, + PRIMARY KEY (`pk`) +); + +CREATE TABLE `test_lookup_table`( + `column_one` VARCHAR(10) NULL, + `column_two` VARCHAR(10) NULL +); + +CREATE TABLE `test_transform_a`( + `id_a` INTEGER NOT NULL AUTO_INCREMENT, + `s1_a` VARCHAR(50) NULL, + `s2_a` VARCHAR(255) NULL, + `longstring_a` MEDIUMTEXT NULL, + `time_a` DATETIME, + `date_a` DATE, + `boolean_a` BIT, + `bigint_a` BIGINT, + `decimal_a` DECIMAL, + PRIMARY KEY (`id_a`) +); + +CREATE TABLE `test_transform_b`( + `id_b` INTEGER NOT NULL AUTO_INCREMENT, + `s1_b` VARCHAR(50) NULL, + `s2_b` VARCHAR(255) NULL, + `longstring_b` MEDIUMTEXT NULL, + `time_b` DATETIME, + `date_b` DATE, + `boolean_b` BIT, + `bigint_b` BIGINT, + `decimal_b` DECIMAL, + PRIMARY KEY (`id_b`) +); + +CREATE TABLE `test_transform_c`( + `id_c` INTEGER NOT NULL AUTO_INCREMENT, + `s1_c` VARCHAR(50) NULL, + `s2_c` VARCHAR(255) NULL, + `longstring_c` MEDIUMTEXT NULL, + `time_c` DATETIME, + `date_c` DATE, + `boolean_c` BIT, + `bigint_c` BIGINT, + `decimal_c` DECIMAL, + PRIMARY KEY (`id_c`) +); + +CREATE TABLE `test_transform_d`( + `id_d` INTEGER NOT NULL AUTO_INCREMENT, + `s1_d` VARCHAR(50) NULL, + `s2_d` VARCHAR(255) NULL, + `longstring_d` MEDIUMTEXT NULL, + `time_d` DATETIME, + `date_d` DATE, + `boolean_d` BIT, + `bigint_d` BIGINT, + `decimal_d` DECIMAL, + PRIMARY KEY (`id_d`) +); + +CREATE TABLE `test_a`( + `id` VARCHAR(10) NOT NULL, + PRIMARY KEY (`id`) +); + +CREATE TABLE `test_b`( + `id` VARCHAR(10) NOT NULL, + `aid` VARCHAR(10) NOT NULL, + PRIMARY KEY (`id`) +); +ALTER TABLE `test_b` + ADD CONSTRAINT `fk_b_to_a_id` FOREIGN KEY (`aid`) REFERENCES `test_a` (`id`); diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/AckUriHandler.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/AckUriHandler.java index b807cbbc95..0b4b7f5a1e 100644 --- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/AckUriHandler.java +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/AckUriHandler.java @@ -57,7 +57,6 @@ public void handle(HttpServletRequest req, HttpServletResponse res) throws IOExc if (log.isDebugEnabled()) { log.debug("Reading ack: {}", req.getParameterMap()); } - @SuppressWarnings("unchecked") List batches = AbstractTransportManager.readAcknowledgement(req .getParameterMap()); Collections.sort(batches, BATCH_ID_COMPARATOR);