Skip to content

Commit

Permalink
use "node" instead of "client"
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 24, 2007
1 parent 0f3499b commit abec013
Show file tree
Hide file tree
Showing 30 changed files with 232 additions and 234 deletions.
Expand Up @@ -257,7 +257,7 @@ public void heartbeat() {
}

/**
* Open up registration for client to attach.
* Open up registration for node to attach.
* @see IRegistrationService#openRegistration(String, String)
*/
public void openRegistration(String groupId, String externalId) {
Expand Down
Expand Up @@ -50,7 +50,7 @@ public void initTrigger(DataEventType dml, Trigger config,

public String getTransactionTriggerExpression();

public String createInitalLoadSqlFor(Node client, Trigger config);
public String createInitalLoadSqlFor(Node node, Trigger config);

public String createPurgeSqlFor(Node node, Trigger trig);

Expand Down
Expand Up @@ -29,7 +29,7 @@ public class DataLoaderContext implements IDataLoaderContext {

private String version;

private String clientId;
private String nodeId;

private String tableName;

Expand Down Expand Up @@ -63,12 +63,12 @@ public void setBatchId(String batchId) {
isSkipping = false;
}

public String getClientId() {
return clientId;
public String getNodeId() {
return nodeId;
}

public void setClientId(String locationId) {
this.clientId = locationId;
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}

public String getTableName() {
Expand Down
Expand Up @@ -25,7 +25,7 @@ public interface IDataLoaderContext {

public String getBatchId();

public String getClientId();
public String getNodeId();

public String getTableName();

Expand Down
Expand Up @@ -90,7 +90,7 @@ public boolean hasNext() throws IOException {
stats = new DataLoaderStatistics();
return true;
} else if (tokens[0].equals(CsvConstants.NODEID)) {
context.setClientId(tokens[1]);
context.setNodeId(tokens[1]);
} else if (tokens[0].equals(CsvConstants.VERSION)) {
context.setVersion(tokens[1] + "." + tokens[2] + "." + tokens[3]);
} else if (isMetaTokenParsed(tokens)) {
Expand Down
Expand Up @@ -35,7 +35,7 @@ public enum Status {

private String batchId;

private String clientId;
private String nodeId;

private Status status;

Expand All @@ -46,12 +46,12 @@ public IncomingBatch() {

public IncomingBatch(IDataLoaderContext context) {
batchId = context.getBatchId();
clientId = context.getClientId();
nodeId = context.getNodeId();
status = Status.OK;
}

public String getClientBatchId() {
return clientId + "-" + batchId;
public String getNodeBatchId() {
return nodeId + "-" + batchId;
}

public String getBatchId() {
Expand All @@ -70,12 +70,12 @@ public void setStatus(Status status) {
this.status = status;
}

public String getClientId() {
return clientId;
public String getNodeId() {
return nodeId;
}

public void setClientId(String clientId) {
this.clientId = clientId;
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}

public boolean isRetry() {
Expand Down
Expand Up @@ -39,7 +39,7 @@ public enum Status {

private String batchId;

private String clientId;
private String nodeId;

private Status status;

Expand Down Expand Up @@ -74,7 +74,7 @@ public IncomingBatchHistory() {

public IncomingBatchHistory(IDataLoaderContext context) {
batchId = context.getBatchId();
clientId = context.getClientId();
nodeId = context.getNodeId();
status = Status.OK;
startTime = new Date();
}
Expand All @@ -91,8 +91,8 @@ public void setValues(IDataLoaderStatistics statistics, boolean isSuccess) {
}
}

public String getClientBatchId() {
return clientId + "-" + batchId;
public String getNodeBatchId() {
return nodeId + "-" + batchId;
}

public String getBatchId() {
Expand Down Expand Up @@ -147,12 +147,12 @@ public String getHostName() {
return hostName;
}

public String getClientId() {
return clientId;
public String getNodeId() {
return nodeId;
}

public void setClientId(String clientId) {
this.clientId = clientId;
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}

public Date getStartTime() {
Expand Down
Expand Up @@ -74,8 +74,8 @@ public Node(IRuntimeConfig runtimeConfig, IDbDialect dbDialect) {
setSchemaVersion(runtimeConfig.getSchemaVersion());
}

public Node(String clientId, String syncURL, String version) {
this.nodeId = clientId;
public Node(String nodeId, String syncURL, String version) {
this.nodeId = nodeId;
this.syncURL = syncURL;
this.schemaVersion = version;
}
Expand All @@ -84,8 +84,8 @@ public String getNodeId() {
return nodeId;
}

public void setNodeId(String clientId) {
this.nodeId = clientId;
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}

public String getSyncURL() {
Expand Down
Expand Up @@ -39,8 +39,8 @@ public String getNodeId() {
return nodeId;
}

public void setNodeId(String clientId) {
this.nodeId = clientId;
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}

public String getPassword() {
Expand Down
Expand Up @@ -45,8 +45,8 @@ public enum Status {
public OutgoingBatch() {
}

public OutgoingBatch(Node client, String channelId, BatchType batchType) {
this.nodeId = client.getNodeId();
public OutgoingBatch(Node node, String channelId, BatchType batchType) {
this.nodeId = node.getNodeId();
this.channelId = channelId;
this.status = Status.NE;
this.batchType = batchType;
Expand Down
Expand Up @@ -28,7 +28,7 @@

public interface IDataExtractorService {

public void extractClientIdentityFor(Node client, IOutgoingTransport transport);
public void extractNodeIdentityFor(Node node, IOutgoingTransport transport);

public OutgoingBatch extractInitialLoadFor(Node node, Trigger config, IOutgoingTransport transport);

Expand Down
Expand Up @@ -29,9 +29,9 @@

public interface IIncomingBatchService {

public IncomingBatch findIncomingBatch(String batchId, String clientId);
public IncomingBatch findIncomingBatch(String batchId, String nodeId);

public List<IncomingBatchHistory> findIncomingBatchHistory(String batchId, String clientId);
public List<IncomingBatchHistory> findIncomingBatchHistory(String batchId, String nodeId);

@Transactional
public boolean acquireIncomingBatch(IncomingBatch status);
Expand Down
Expand Up @@ -30,15 +30,15 @@

public interface INodeService {

public Node findNode(String clientId);
public Node findNode(String nodeId);

public Node findNodeByExternalId(String externalId);

public NodeSecurity findNodeSecurity(String clientId);
public NodeSecurity findNodeSecurity(String nodeId);

public void ignoreNodeChannelForExternalId(boolean ignore, String channelId, String externalId);

public boolean isNodeAuthorized(String clientId, String password);
public boolean isNodeAuthorized(String nodeId, String password);

public boolean isRegistrationEnabled(String nodeId);

Expand Down
Expand Up @@ -28,7 +28,7 @@

public interface IRegistrationService {

public boolean registerNode(Node client, OutputStream out) throws IOException;
public boolean registerNode(Node node, OutputStream out) throws IOException;

public void openRegistration(String nodeGroupId, String externalId);

Expand Down
Expand Up @@ -302,8 +302,8 @@ public void setAutoConfigureDatabase(boolean autoConfigureDatabase) {
this.autoConfigureDatabase = autoConfigureDatabase;
}

public void setNodeService(INodeService clientService) {
this.nodeService = clientService;
public void setNodeService(INodeService nodeService) {
this.nodeService = nodeService;
}

public void setTransportManager(ITransportManager transportManager) {
Expand Down
Expand Up @@ -69,7 +69,7 @@ public class DataExtractorService implements IDataExtractorService {

private String selectEventDataToExtractSql;

public void extractClientIdentityFor(Node node, IOutgoingTransport transport) {
public void extractNodeIdentityFor(Node node, IOutgoingTransport transport) {
String tableName = tablePrefix + "_node_identity";
OutgoingBatch batch = new OutgoingBatch(node, Constants.CHANNEL_CONFIG,
BatchType.INITIAL_LOAD);
Expand Down Expand Up @@ -168,9 +168,9 @@ public boolean extract(Node node, final IExtractListener handler)
selectEventDataToExtract(handler, batch);
handler.endBatch(batch);

// At this point, we've already sent the data to the client, so if
// At this point, we've already sent the data to the node, so if
// updating the batch to 'sent' fails, all this means is that the batch
// will be sent to the client again. This is expected to happen so
// will be sent to the node again. This is expected to happen so
// infrequently, that the inefficiencies associated with re-sending a batch
// are negligible.
outgoingBatchService.markOutgoingBatchSent(batch);
Expand Down
Expand Up @@ -117,7 +117,7 @@ protected List<IncomingBatchHistory> loadDataAndReturnBatches(
} catch (Exception e) {
if (status != null) {
logger.error("Failed to load batch "
+ status.getClientBatchId(), e);
+ status.getNodeBatchId(), e);
history.setValues(dataLoader.getStatistics(), false);
handleBatchError(status, history);
} else {
Expand Down Expand Up @@ -179,14 +179,14 @@ protected void handleBatchError(final IncomingBatch status,
}
} catch (Exception e) {
logger.error("Failed to record status of batch "
+ status.getClientBatchId());
+ status.getNodeBatchId());
}
try {
history.setStatus(IncomingBatchHistory.Status.ER);
incomingBatchService.insertIncomingBatchHistory(history);
} catch (Exception e) {
logger.error("Failed to record history of batch "
+ status.getClientBatchId());
+ status.getNodeBatchId());
}
}

Expand Down
Expand Up @@ -50,19 +50,19 @@ public class IncomingBatchService extends AbstractService implements IIncomingBa

private boolean skipDuplicateBatches = true;

public IncomingBatch findIncomingBatch(String batchId, String clientId) {
public IncomingBatch findIncomingBatch(String batchId, String nodeId) {
try {
return (IncomingBatch) jdbcTemplate.queryForObject(findIncomingBatchSql,
new Object[] { batchId, clientId }, new IncomingBatchMapper());
new Object[] { batchId, nodeId }, new IncomingBatchMapper());
} catch (EmptyResultDataAccessException e) {
return null;
}
}

@SuppressWarnings("unchecked")
public List<IncomingBatchHistory> findIncomingBatchHistory(String batchId, String clientId) {
public List<IncomingBatchHistory> findIncomingBatchHistory(String batchId, String nodeId) {
return (List<IncomingBatchHistory>) jdbcTemplate.query(findIncomingBatchHistorySql, new Object[] { batchId,
clientId }, new IncomingBatchHistoryMapper());
nodeId }, new IncomingBatchHistoryMapper());
}

public boolean acquireIncomingBatch(IncomingBatch status) {
Expand All @@ -73,26 +73,26 @@ public boolean acquireIncomingBatch(IncomingBatch status) {
status.setRetry(true);
okayToProcess = updateIncomingBatch(status) > 0 || (! skipDuplicateBatches);
if (okayToProcess) {
logger.warn("Retrying batch " + status.getClientBatchId());
logger.warn("Retrying batch " + status.getNodeBatchId());
} else {
logger.warn("Skipping batch " + status.getClientBatchId());
logger.warn("Skipping batch " + status.getNodeBatchId());
}
}
return okayToProcess;
}

public void insertIncomingBatch(IncomingBatch status) {
jdbcTemplate.update(insertIncomingBatchSql, new Object[] { status.getBatchId(), status.getClientId(),
jdbcTemplate.update(insertIncomingBatchSql, new Object[] { status.getBatchId(), status.getNodeId(),
status.getStatus().toString() });
}

public int updateIncomingBatch(IncomingBatch status) {
return jdbcTemplate.update(updateIncomingBatchSql, new Object[] { status.getStatus().toString(),
status.getBatchId(), status.getClientId(), IncomingBatch.Status.ER.toString() });
status.getBatchId(), status.getNodeId(), IncomingBatch.Status.ER.toString() });
}

public void insertIncomingBatchHistory(IncomingBatchHistory history) {
jdbcTemplate.update(insertIncomingBatchHistorySql, new Object[] { history.getBatchId(), history.getClientId(),
jdbcTemplate.update(insertIncomingBatchHistorySql, new Object[] { history.getBatchId(), history.getNodeId(),
history.getStatus().toString(), history.getHostName(), history.getStatementCount(),
history.getFallbackInsertCount(), history.getFallbackUpdateCount(), history.getMissingDeleteCount(),
history.getFailedRowNumber(), history.getStartTime(), history.getEndTime() });
Expand All @@ -102,7 +102,7 @@ class IncomingBatchMapper implements RowMapper {
public Object mapRow(ResultSet rs, int num) throws SQLException {
IncomingBatch batch = new IncomingBatch();
batch.setBatchId(rs.getString(1));
batch.setClientId(rs.getString(2));
batch.setNodeId(rs.getString(2));
batch.setStatus(IncomingBatch.Status.valueOf(rs.getString(3)));
return batch;
}
Expand All @@ -112,7 +112,7 @@ class IncomingBatchHistoryMapper implements RowMapper {
public Object mapRow(ResultSet rs, int num) throws SQLException {
IncomingBatchHistory history = new IncomingBatchHistory();
history.setBatchId(rs.getString(1));
history.setClientId(rs.getString(2));
history.setNodeId(rs.getString(2));
history.setStatus(IncomingBatchHistory.Status.valueOf(rs.getString(3)));
history.setStartTime(rs.getTime(4));
history.setEndTime(rs.getTime(5));
Expand Down

0 comments on commit abec013

Please sign in to comment.