Skip to content

Commit

Permalink
Merge branch '3.8' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.8
  • Loading branch information
mmichalek committed Feb 8, 2017
2 parents 41b564f + 97ed77b commit 42a4853
Show file tree
Hide file tree
Showing 13 changed files with 368 additions and 15 deletions.
Expand Up @@ -58,10 +58,17 @@ public class AndroidJobManager implements IJobManager {
protected long lastFileSyncTrackerTime = System.currentTimeMillis();

protected long lastFileSyncPushTime = System.currentTimeMillis();

protected boolean started = false;

public AndroidJobManager(ISymmetricEngine engine) {
this.engine = engine;
}

@Override
public boolean isStarted() {
return started;
}

public List<IJob> getJobs() {
List<IJob> jobs = new ArrayList<IJob>(1);
Expand All @@ -76,12 +83,14 @@ public void startJobs() {
job = new Job();
job.start();
}
started = true;
}

public void stopJobs() {
if (job != null) {
job.stop();
}
started = false;
}

public IJob getJob(String name) {
Expand Down
11 changes: 3 additions & 8 deletions symmetric-assemble/src/asciidoc/manage/node-send.ad
Expand Up @@ -3,11 +3,8 @@ Events other than data changes can be sent to nodes in the synchronization netwo
SQL Scripts:: Sql can be sent to be executed on a target node
BSH Scripts:: Beanshell scripts can be sent to be executed on a target node
Table Schema:: The schema the source node can be replicated to the target node individually
Table Data:: Tables can be loaded or reloaded individually

ifdef::pro[]
image::manage/manage-nodes-send.png[]
endif::pro[]
ifndef::pro[]

===== Table Data

Expand All @@ -21,10 +18,6 @@ if more than one table is involved, be sure to send any tables which are referre
the channel's synchronization will block because SymmetricDS is unable to insert or update the row because the foreign key relationship refers to
a non-existent row in the destination!

ifdef::pro[]
If you click on the *Send* button and select *Table Data* you will be prompted to select the tables you want to send. Events will be queued up
for each node that is selected on the Manage Nodes screen.
endif::pro[]

You can manually insert "reload" events into the <<DATA>> table that represent the table/s to reload. These reload events are created in
the source database.
Expand Down Expand Up @@ -64,3 +57,5 @@ The following is an example insert statement:
This insert statement generates N rows, one for each configured table that starts with sale_. It uses the most recent
trigger history id for the corresponding table. It takes advantage of the initial load order for each trigger-router to
create the three rows in the correct order (the order corresponding to the order in which the tables would have been initial loaded).

endif::pro[]
Expand Up @@ -39,6 +39,8 @@ public class JobManager implements IJobManager {

private ThreadPoolTaskScheduler taskScheduler;

private boolean started = false;

public JobManager(ISymmetricEngine engine) {

this.taskScheduler = new ThreadPoolTaskScheduler();
Expand Down Expand Up @@ -67,6 +69,11 @@ public JobManager(ISymmetricEngine engine) {
this.jobs.add(new MonitorJob(engine, taskScheduler));
this.jobs.add(new ReportStatusJob(engine, taskScheduler));
}

@Override
public boolean isStarted() {
return started;
}

@Override
public IJob getJob(String name) {
Expand All @@ -91,6 +98,7 @@ public synchronized void startJobs() {
log.info("Job {} not configured for auto start", job.getName());
}
}
started = true;
}

@Override
Expand All @@ -108,6 +116,7 @@ public synchronized void stopJobs() {
job.stop();
}
Thread.interrupted();
started = false;
}

@Override
Expand Down
Expand Up @@ -42,4 +42,6 @@ public interface IJobManager {

public IJob getJob(String name);

public boolean isStarted();

}
Expand Up @@ -275,7 +275,7 @@ public void syncEnded(DataContext context, List<IncomingBatch> batchesProcessed,

if (context.get(CTX_KEY_RESTART_JOBMANAGER_NEEDED) != null) {
IJobManager jobManager = engine.getJobManager();
if (jobManager != null) {
if (jobManager != null && jobManager.isStarted()) {
log.info("About to restart jobs because a new schedule came through the data loader");
jobManager.stopJobs();
jobManager.startJobs();
Expand Down
Expand Up @@ -118,6 +118,8 @@ public interface IConfigurationService {
*/
public boolean isMasterToMaster();

public boolean containsMasterToMaster();

public boolean isMasterToMasterOnly();

}
Expand Up @@ -122,7 +122,7 @@ public boolean isMasterToMaster() {
}

@Override
public boolean isMasterToMasterOnly() {
public boolean containsMasterToMaster() {
boolean masterToMasterOnly = false;
Node me = nodeService.findIdentity();
if (me != null) {
Expand All @@ -131,6 +131,24 @@ public boolean isMasterToMasterOnly() {
return masterToMasterOnly;
}

@Override
public boolean isMasterToMasterOnly() {
Node me = nodeService.findIdentity();
int masterCount=0;
int otherCount=0;
if (me != null) {
for (NodeGroupLink nodeGroupLink : getNodeGroupLinksFor(me.getNodeGroupId(), false)) {
if (nodeGroupLink.getTargetNodeGroupId().equals(me.getNodeGroupId())) {
masterCount++;
}
else {
otherCount++;
}
}
}
return masterCount > 1 && otherCount == 0;
}

public boolean refreshFromDatabase() {
Date date1 = sqlTemplate.queryForObject(getSql("selectMaxChannelLastUpdateTime"),
Date.class);
Expand Down
Expand Up @@ -160,7 +160,7 @@ public Node findNode(String id, boolean useCache) {
}
return nodeCache.get(id);
} else {
return findAllNodesAsMap().get(id);
return findNode(id);
}
}

Expand Down
Expand Up @@ -547,7 +547,7 @@ protected String openRegistration(Node node, String remoteHost, String remoteAdd
node.setNodeId(nodeId);
node.setSyncEnabled(false);

boolean masterToMasterOnly = configurationService.isMasterToMasterOnly();
boolean masterToMasterOnly = configurationService.containsMasterToMaster();
node.setCreatedAtNodeId(masterToMasterOnly ? null: me.getNodeId());
nodeService.save(node);

Expand Down
Expand Up @@ -103,7 +103,7 @@ public String readSqlStatement() {
sql.setLength(0);
}
} else {
checkStatementEndsIndex = sql.length()-1;
checkStatementEndsIndex = sql.length();
}
line = readLine();
} while (line != null);
Expand Down
Expand Up @@ -26,10 +26,21 @@

import org.junit.Test;

@SuppressWarnings("resource")
public class SqlScriptReaderTest {

@Test
public void testReadScript2() throws Exception {
SqlScriptReader reader = new SqlScriptReader(new InputStreamReader(getClass().getResourceAsStream("/test-script-2.sql")));
int count = 0;
while (reader.readSqlStatement() != null) {
count++;
}
assertEquals(36, count);
}

@Test
public void testReadScript() throws Exception {
public void testReadScript1() throws Exception {
SqlScriptReader reader = new SqlScriptReader(new InputStreamReader(getClass().getResourceAsStream("/test-script-1.sql")));

String nextStatement = reader.readSqlStatement();
Expand Down

0 comments on commit 42a4853

Please sign in to comment.