Skip to content

Commit

Permalink
remove file sync tables for point release. remove process info.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Apr 26, 2013
1 parent cbb8504 commit 31397e1
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 54 deletions.
Expand Up @@ -126,8 +126,8 @@ public void exportTestDatabaseSQL() throws Exception {
export.setCompatible(Compatible.H2);
String output = export.exportTables(tables).toLowerCase();

Assert.assertEquals(output, 39, StringUtils.countMatches(output, "create table \"sym_"));
Assert.assertEquals(35,
Assert.assertEquals(output, 36, StringUtils.countMatches(output, "create table \"sym_"));
Assert.assertEquals(36,
StringUtils.countMatches(output, "varchar(" + Integer.MAX_VALUE + ")"));
}

Expand Down
Expand Up @@ -222,10 +222,6 @@ public String showInError(String identityNodeId) {
return key.getSourceNodeId();
case GAP_DETECT:
return key.getSourceNodeId();
case REGISTRATION_ATTEMPT:
return null;
case REGISTRATION_HANDLER:
return key.getTargetNodeId();
default:
return null;
}
Expand Down
Expand Up @@ -27,7 +27,7 @@ public class ProcessInfoKey implements Serializable {
private static final long serialVersionUID = 1L;

public enum ProcessType {
PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, ROUTER_JOB, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, REGISTRATION_ATTEMPT, REGISTRATION_HANDLER;
PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, ROUTER_JOB, GAP_DETECT, ROUTER_READER, MANUAL_LOAD;

public String toString() {
switch (this) {
Expand All @@ -47,10 +47,6 @@ public String toString() {
return "Routing Reader";
case GAP_DETECT:
return "Gap Detection";
case REGISTRATION_ATTEMPT:
return "Registration Attempt";
case REGISTRATION_HANDLER:
return "Service Registration";
default:
return name();
}
Expand Down
Expand Up @@ -30,7 +30,7 @@ public class RegistrationRequest implements Serializable {
private static final long serialVersionUID = 1L;

public static enum RegistrationStatus {
OK, RQ, IG, RR
OK, RQ, IG, RR, ER
};

private String nodeGroupId;
Expand All @@ -40,6 +40,7 @@ public static enum RegistrationStatus {
private String ipAddress;
private long attemptCount;
private String registeredNodeId;
private String errorMessage;
private Date createTime = new Date();
private String lastUpdateBy = "engine";
private Date lastUpdateTime = new Date();
Expand Down Expand Up @@ -137,5 +138,13 @@ public Date getLastUpdateTime() {
public void setLastUpdateTime(Date lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}

public void setErrorMessage(String message) {
this.errorMessage = message;
}

public String getErrorMessage() {
return errorMessage;
}

}
Expand Up @@ -40,9 +40,6 @@
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.RegistrationRequest;
import org.jumpmind.symmetric.model.RegistrationRequest.RegistrationStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatus.Status;
Expand Down Expand Up @@ -117,16 +114,14 @@ public boolean registerNode(Node nodePriorToRegistration, String remoteHost,
throws IOException {
Node identity = nodeService.findIdentity();
if (identity == null) {
saveRegisgtrationRequest(new RegistrationRequest(nodePriorToRegistration,
RegistrationStatus.RQ, remoteHost, remoteAddress));
log.warn("Registration is not allowed until this node has an identity");
RegistrationRequest req = new RegistrationRequest(nodePriorToRegistration,
RegistrationStatus.ER, remoteHost, remoteAddress);
req.setErrorMessage("Registration is not allowed until this node has an identity");
saveRegisgtrationRequest(req);
log.warn(req.getErrorMessage());
return false;
}

ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(identity.getNodeId(),
nodePriorToRegistration.getNodeId() == null ? nodePriorToRegistration
.getExternalId() : nodePriorToRegistration.getNodeId(),
ProcessType.REGISTRATION_HANDLER));
try {

if (!nodeService.isRegistrationServer()) {
Expand All @@ -136,10 +131,11 @@ public boolean registerNode(Node nodePriorToRegistration, String remoteHost,
*/
NodeSecurity security = nodeService.findNodeSecurity(identity.getNodeId());
if (security == null || security.getInitialLoadTime() == null) {
saveRegisgtrationRequest(new RegistrationRequest(nodePriorToRegistration,
RegistrationStatus.RQ, remoteHost, remoteAddress));
log.warn("Registration is not allowed until this node has an initial load (ie. node_security.initial_load_time is a non null value)");
processInfo.setStatus(ProcessInfo.Status.ERROR);
RegistrationRequest req = new RegistrationRequest(nodePriorToRegistration,
RegistrationStatus.ER, remoteHost, remoteAddress);
req.setErrorMessage("Registration is not allowed until this node has an initial load (ie. node_security.initial_load_time is a non null value)");
saveRegisgtrationRequest(req);
log.warn(req.getErrorMessage());
return false;
}
}
Expand All @@ -150,7 +146,6 @@ public boolean registerNode(Node nodePriorToRegistration, String remoteHost,
nodePriorToRegistration.getExternalId(), redirectUrl);
saveRegisgtrationRequest(new RegistrationRequest(nodePriorToRegistration,
RegistrationStatus.RR, remoteHost, remoteAddress));
processInfo.setStatus(ProcessInfo.Status.DONE);
throw new RegistrationRedirectException(redirectUrl);
}

Expand All @@ -163,12 +158,12 @@ public boolean registerNode(Node nodePriorToRegistration, String remoteHost,
if (link == null
&& parameterService.is(ParameterConstants.REGISTRATION_REQUIRE_NODE_GROUP_LINK,
true)) {
saveRegisgtrationRequest(new RegistrationRequest(nodePriorToRegistration,
RegistrationStatus.RQ, remoteHost, remoteAddress));
log.warn(
"Registration is not allowed unless a node group link exists so the registering node can receive configuration updates. Please add a group link where the source group id is {} and the target group id is {}",
identity.getNodeGroupId(), nodePriorToRegistration.getNodeGroupId());
processInfo.setStatus(ProcessInfo.Status.ERROR);
RegistrationRequest req = new RegistrationRequest(nodePriorToRegistration,
RegistrationStatus.ER, remoteHost, remoteAddress);
req.setErrorMessage(String.format("Registration is not allowed unless a node group link exists so the registering node can receive configuration updates. Please add a group link where the source group id is %s and the target group id is %s",
identity.getNodeGroupId(), nodePriorToRegistration.getNodeGroupId()));
saveRegisgtrationRequest(req);
log.warn(req.getErrorMessage());
return false;
}

Expand All @@ -189,7 +184,6 @@ public boolean registerNode(Node nodePriorToRegistration, String remoteHost,
|| !security.isRegistrationEnabled()) {
saveRegisgtrationRequest(new RegistrationRequest(nodePriorToRegistration,
RegistrationStatus.RQ, remoteHost, remoteAddress));
processInfo.setStatus(ProcessInfo.Status.ERROR);
return false;
}

Expand Down Expand Up @@ -224,25 +218,21 @@ public boolean registerNode(Node nodePriorToRegistration, String remoteHost,

statisticManager.incrementNodesRegistered(1);

processInfo.setStatus(ProcessInfo.Status.DONE);

return true;

} catch (RegistrationNotOpenException ex) {
if (StringUtils.isNotBlank(ex.getMessage())) {
log.warn("Registration not allowed for {} because {}",
nodePriorToRegistration.toString(), ex.getMessage());
}
processInfo.setStatus(ProcessInfo.Status.ERROR);
return false;
}
}

public List<RegistrationRequest> getRegistrationRequests(
boolean includeNodesWithOpenRegistrations) {
List<RegistrationRequest> requests = sqlTemplate.query(
getSql("selectRegistrationRequestSql"), new RegistrationRequestMapper(),
RegistrationStatus.RQ.name());
getSql("selectRegistrationRequestSql"), new RegistrationRequestMapper());
if (!includeNodesWithOpenRegistrations) {
Collection<Node> nodes = nodeService.findNodesWithOpenRegistration();
Iterator<RegistrationRequest> i = requests.iterator();
Expand Down Expand Up @@ -273,15 +263,14 @@ public void saveRegisgtrationRequest(RegistrationRequest request) {
int count = sqlTemplate.update(
getSql("updateRegistrationRequestSql"),
new Object[] { request.getLastUpdateBy(), request.getLastUpdateTime(),
request.getRegisteredNodeId(), request.getStatus().name(), nodeGroupId,
externalId, request.getIpAddress(), request.getHostName(),
RegistrationStatus.RQ.name() });
request.getRegisteredNodeId(), request.getStatus().name(), request.getErrorMessage(),
nodeGroupId, externalId, request.getIpAddress(), request.getHostName() });
if (count == 0) {
sqlTemplate.update(
getSql("insertRegistrationRequestSql"),
new Object[] { request.getLastUpdateBy(), request.getLastUpdateTime(),
request.getRegisteredNodeId(), request.getStatus().name(), nodeGroupId,
externalId, request.getIpAddress(), request.getHostName() });
externalId, request.getIpAddress(), request.getHostName(), request.getErrorMessage() });
}

}
Expand Down Expand Up @@ -462,7 +451,6 @@ protected String openRegistration(Node node, String remoteHost, String remoteAdd
} else {
reOpenRegistration(nodeId);
}
statisticManager.removeProcessInfo(new ProcessInfoKey(me.getNodeId(), nodeId, ProcessType.REGISTRATION_HANDLER));
return nodeId;
} else {
throw new IllegalStateException(
Expand Down Expand Up @@ -509,6 +497,7 @@ public RegistrationRequest mapRow(Row rs) {
request.setCreateTime(rs.getDateTime("create_time"));
request.setLastUpdateBy(rs.getString("last_update_by"));
request.setLastUpdateTime(rs.getDateTime("last_update_time"));
request.setErrorMessage(rs.getString("error_message"));
return request;
}
}
Expand Down
Expand Up @@ -41,25 +41,25 @@ public RegistrationServiceSqlMap(IDatabasePlatform platform,

putSql("insertRegistrationRequestSql",
""
+ "insert into $(registration_request) "
+ " (last_update_by, last_update_time, attempt_count, registered_node_id, status, "
+ " node_group_id, external_id, ip_address, host_name, create_time) "
+ " values (?,?,1,?,?,?,?,?,?,current_timestamp) ");
+ "insert into $(registration_request) "
+ " (last_update_by, last_update_time, attempt_count, registered_node_id, status, "
+ " node_group_id, external_id, ip_address, host_name, error_message, create_time) "
+ " values (?,?,1,?,?,?,?,?,?,?,current_timestamp) ");

putSql("updateRegistrationRequestSql",
""
+ "update $(registration_request) "
+ " set "
+ " last_update_by=?, last_update_time=?, attempt_count=attempt_count+1, registered_node_id=?, status=? "
+ " last_update_by=?, last_update_time=?, attempt_count=attempt_count+1, registered_node_id=?, status=?, error_message=? "
+ " where "
+ " node_group_id=? and external_id=? and ip_address=? and host_name=? and status=? ");
+ " node_group_id=? and external_id=? and ip_address=? and host_name=? and status in ('RQ','ER') ");

putSql("selectRegistrationRequestSql",
""
+ "select node_group_id, external_id, status, host_name, ip_address, "
+ "select node_group_id, external_id, status, host_name, ip_address, error_message, "
+ " attempt_count, registered_node_id, create_time, last_update_by, last_update_time "
+ " from $(registration_request) "
+ " where status=? ");
+ " from $(registration_request) "
+ " where status in ('RQ','ER') ");

putSql("deleteRegistrationRequestSql",
"delete from $(registration_request) where node_group_id=? and external_id=? and ip_address=? and host_name=? and status=?");
Expand Down
4 changes: 3 additions & 1 deletion symmetric-core/src/main/resources/symmetric-schema.xml
Expand Up @@ -312,6 +312,7 @@
<column name="ip_address" type="VARCHAR" size="50" required="true" description="The ip address for the host." />
<column name="attempt_count" type="INTEGER" default="0" description="The number of registration attempts." />
<column name="registered_node_id" type="VARCHAR" size="50" description="A unique identifier for a node." />
<column name="error_message" type="LONGVARCHAR" description="Record any errors or warnings that occurred when attempting to register." />
<column name="create_time" primaryKey="true" type="TIMESTAMP" required="true" description="Timestamp when this entry was created." />
<column name="last_update_by" type="VARCHAR" size="50" description="The user who last updated this entry." />
<column name="last_update_time" type="TIMESTAMP" required="true" description="Timestamp when a user last updated this entry." />
Expand Down Expand Up @@ -645,6 +646,7 @@
</foreign-key>
</table>

<!--
<table name="file_trigger" description="This table defines files or sets of files for which changes will be captured for file synchronization">
<column name="trigger_id" type="VARCHAR" size="50" required="true" primaryKey="true" description="Unique identifier for a trigger." />
<column name="channel_id" type="VARCHAR" size="20" required="true" description="The channel_id of the channel that data changes will flow through." />
Expand Down Expand Up @@ -692,6 +694,6 @@
<column name="target_base_dir" type="VARCHAR" size="255" required="true" description="" />
<column name="create_time" type="TIMESTAMP" required="true" description="Timestamp when this entry was created." />
</table>

-->

</database>

0 comments on commit 31397e1

Please sign in to comment.