Skip to content

Commit

Permalink
dev checkin. additional unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Feb 14, 2012
1 parent 0343e64 commit 03d1ab7
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 219 deletions.
Expand Up @@ -373,10 +373,12 @@ public String sendSQL(String nodeId, String catalogName, String schemaName, Stri
}

public RemoteNodeStatuses push() {
MDC.put("engineName", getEngineName());
return pushService.pushData();
}

public void syncTriggers() {
MDC.put("engineName", getEngineName());
triggerRouterService.syncTriggers();
}

Expand All @@ -385,14 +387,17 @@ public NodeStatus getNodeStatus() {
}

public RemoteNodeStatuses pull() {
MDC.put("engineName", getEngineName());
return pullService.pullData();
}

public void route() {
MDC.put("engineName", getEngineName());
routerService.routeData();
}

public void purge() {
MDC.put("engineName", getEngineName());
purgeService.purgeOutgoing();
purgeService.purgeIncoming();
purgeService.purgeDataGaps();
Expand Down Expand Up @@ -470,14 +475,17 @@ public boolean isConfigured() {
}

public void heartbeat(boolean force) {
MDC.put("engineName", getEngineName());
dataService.heartbeat(force);
}

public void openRegistration(String nodeGroupId, String externalId) {
MDC.put("engineName", getEngineName());
registrationService.openRegistration(nodeGroupId, externalId);
}

public void reOpenRegistration(String nodeId) {
MDC.put("engineName", getEngineName());
registrationService.reOpenRegistration(nodeId);
}

Expand Down
Expand Up @@ -202,6 +202,7 @@ public boolean invoke(boolean force) {
* This method is called from the job
*/
public void run() {
MDC.put("engineName", engine != null ? engine.getEngineName() : "unknown");
invoke(false);
}

Expand Down
Expand Up @@ -131,18 +131,18 @@ private boolean matchesTable(Table table, String tableSuffix) {
public <R extends IDataReader, W extends IDataWriter> void batchCommitted(
DataContext context) {
if (context.get(CTX_KEY_FLUSH_CHANNELS_NEEDED) != null) {
log.info("ChannelFlushed");
log.info("Channels flushed because new channels came through the data loader");
configurationService.reloadChannels();
}
if (context.get(CTX_KEY_RESYNC_NEEDED) != null
&& parameterService.is(ParameterConstants.AUTO_SYNC_CONFIGURATION)
&& parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
log.info(".");
log.info("About to syncTriggers because new configuration came through the data loader");
triggerRouterService.syncTriggers();
}
if (context.get(CTX_KEY_FLUSH_TRANSFORMS_NEEDED) != null
&& parameterService.is(ParameterConstants.AUTO_SYNC_CONFIGURATION)) {
log.info(".");
log.info("About to refresh the cache of transformation because new configuration come through the data loader");
transformService.resetCache();
}
}
Expand Down
Expand Up @@ -93,7 +93,8 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
// if this is sym_node or sym_node_security determine which nodes it
// goes to.
if (tableMatches(dataMetaData, TableConstants.SYM_NODE)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)) {
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_HOST)) {

if (didNodeSecurityChangeForNodeInitialization(dataMetaData)) {
return null;
Expand Down Expand Up @@ -251,17 +252,17 @@ private boolean isLinked(String nodeIdInQuestion, Node nodeThatCouldBeRoutedTo,
@Override
public void contextCommitted(SimpleRouterContext routingContext) {
if (routingContext.getContextCache().get(CTX_KEY_FLUSH_CHANNELS_NEEDED) != null) {
log.info("Channels flushed because new channels came through the dataloader");
log.info("Channels flushed because new channels came through the data router");
configurationService.reloadChannels();
}
if (routingContext.getContextCache().get(CTX_KEY_RESYNC_NEEDED) != null
&& parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
log.info("About to syncTriggers because new configuration came through the dataloader");
log.info("About to syncTriggers because new configuration came through the data router");
triggerRouterService.syncTriggers();
}
if (routingContext.getContextCache().get(CTX_KEY_FLUSH_TRANSFORMS_NEEDED) != null
&& parameterService.is(ParameterConstants.AUTO_SYNC_CONFIGURATION)) {
log.info("About to refresh the cache of transformation because new configuration come through the dataloader");
log.info("About to refresh the cache of transformation because new configuration come through the data router");
transformService.resetCache();
}
}
Expand Down
Expand Up @@ -319,7 +319,10 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport,
int batchesSentCount = 0;

OutgoingBatch currentBatch = null;

try {
IDataWriter dataWriter = null;

for (int i = 0; i < activeBatches.size(); i++) {
currentBatch = activeBatches.get(i);

Expand Down Expand Up @@ -382,7 +385,9 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport,
currentBatch.getBatchId());
if (extractedBatch != null) {
IDataReader dataReader = new ProtocolDataReader(extractedBatch);
IDataWriter dataWriter = new ProtocolDataWriter(targetTransport.open());
if (dataWriter == null) {
dataWriter = new ProtocolDataWriter(targetTransport.open());
}
new DataProcessor(dataReader, dataWriter).process();
}

Expand Down
Expand Up @@ -230,7 +230,7 @@ private int purgeByMinMax(long[] minMax, MinMaxDeleteSql identifier, Date retent
break;
case STRANDED_DATA:
deleteSql = getSql("deleteStrandedData");
args = new Object[] { minId, maxId, cutoffTime, minId, maxId, minId, maxId };
args = new Object[] { minId, maxId, cutoffTime, minId, maxId };
break;
}

Expand Down Expand Up @@ -274,7 +274,7 @@ private long purgeIncomingBatch(final Calendar time) {
getSql("selectIncomingBatchRangeSql"), new ISqlRowMapper<NodeBatchRange>() {
public NodeBatchRange mapRow(Row rs) {
return new NodeBatchRange(rs.getString("node_id"), rs
.getLong("min_batch_id"), rs.getLong("max_batch_id"));
.getLong("min_id"), rs.getLong("max_id"));
}
}, time.getTime());
int incomingBatchesPurgedCount = purgeByNodeBatchRangeList(
Expand Down
Expand Up @@ -8,20 +8,22 @@ public class PurgeServiceSqlMap extends AbstractSqlMap {

public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replacementTokens) {
super(platform, replacementTokens);

// @formatter:off

putSql("selectOutgoingBatchRangeSql" ,"" +
"select min(batch_id) as min_id, max(batch_id) as max_id from $(outgoing_batch) where " +
" create_time < ? and status in ('OK','IG') and batch_id < (select max(batch_id) from $(outgoing_batch)) " );

putSql("deleteOutgoingBatchSql" ,"" +
"delete from $(outgoing_batch) where status in ('OK','IG') and batch_id between :MIN " +
" and :MAX and batch_id not in (select batch_id from $(data_event) where batch_id between :MIN " +
" and :MAX) " );
"delete from $(outgoing_batch) where status in ('OK','IG') and batch_id between ? \n" +
" and ? and batch_id not in (select batch_id from $(data_event) where batch_id between ? \n" +
" and ?) \n" );

putSql("deleteDataEventSql" ,"" +
"delete from $(data_event) where batch_id not in (select batch_id from " +
" $(outgoing_batch) where batch_id between :MIN and :MAX and status not in ('OK','IG')) " +
" and batch_id between :MIN and :MAX " );
"delete from $(data_event) where batch_id not in (select batch_id from \n" +
" $(outgoing_batch) where batch_id between ? and ? and status not in ('OK','IG')) \n" +
" and batch_id between ? and ? \n" );

putSql("deleteUnroutedDataEventSql" ,"" +
"delete from $(data_event) where " +
Expand All @@ -35,27 +37,27 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac
" in (select node_id from $(node) where sync_enabled=1) and status != 'OK' " );

putSql("deleteStrandedData" ,"" +
"delete from $(data) where " +
" data_id between :MIN and :MAX and " +
" data_id < (select max(ref_data_id) from $(data)_ref) and " +
" create_time < :CUTOFF_TIME and " +
" data_id not in (select e.data_id from $(data_event) e where " +
" e.data_id between :MIN and :MAX) " );
"delete from $(data) where \n" +
" data_id between ? and ? and \n" +
" data_id < (select min(start_id) from sym_data_gap) and \n" +
" create_time < ? and \n" +
" data_id not in (select e.data_id from $(data_event) e where \n" +
" e.data_id between ? and ?) \n" );

putSql("deleteDataSql" ,"" +
"delete from $(data) where " +
" data_id between :MIN and :MAX and " +
" create_time < :CUTOFF_TIME and " +
" data_id in (select e.data_id from $(data_event) e where " +
" e.data_id between :MIN and :MAX) " +
" and " +
" data_id not in " +
" (select e.data_id from $(data_event) e where " +
" e.data_id between :MIN and :MAX and " +
" (e.data_id is null or " +
" e.batch_id in " +
" (select batch_id from $(outgoing_batch) where " +
" status not in ('OK','IG')))) " );
"delete from $(data) where \n" +
" data_id between ? and ? and \n" +
" create_time < ? and \n" +
" data_id in (select e.data_id from $(data_event) e where \n" +
" e.data_id between ? and ?) \n" +
" and \n" +
" data_id not in \n" +
" (select e.data_id from $(data_event) e where \n" +
" e.data_id between ? and ? and \n" +
" (e.data_id is null or \n" +
" e.batch_id in \n" +
" (select batch_id from $(outgoing_batch) where \n" +
" status not in ('OK','IG')))) \n" );

putSql("selectIncomingBatchRangeSql" ,"" +
"select node_id, min(batch_id) as min_id, max(batch_id) as max_id from $(incoming_batch) where " +
Expand Down
Expand Up @@ -120,6 +120,7 @@ public TriggerRouterService(IParameterService parameterService,
configTables.add(TableConstants.getTableName(tablePrefix,
TableConstants.SYM_NODE_GROUP_LINK));
configTables.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE));
configTables.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_HOST));
configTables
.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_SECURITY));
configTables.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_PARAMETER));
Expand Down Expand Up @@ -414,16 +415,12 @@ protected boolean isMatch(String catalogName, String schemaName, String tableNam
*/
protected List<TriggerRouter> getConfigurationTablesTriggerRoutersForCurrentNode(
String sourceNodeGroupId) {
List<TriggerRouter> triggers = new ArrayList<TriggerRouter>();
List<TriggerRouter> triggerRouters = new ArrayList<TriggerRouter>();
List<NodeGroupLink> links = configurationService.getNodeGroupLinksFor(sourceNodeGroupId);
for (NodeGroupLink nodeGroupLink : links) {
triggers.addAll(buildTriggerRoutersForSymmetricTables(Version.version(), nodeGroupLink));
if (NodeGroupLinkAction.P == nodeGroupLink.getDataEventAction()) {
triggers.add(buildTriggerRoutersForSymmetricTables(Version.version(),
buildTriggerForSymmetricTable(TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_HOST)), nodeGroupLink));
}
triggerRouters.addAll(buildTriggerRoutersForSymmetricTables(Version.version(), nodeGroupLink));
}
return triggers;
return triggerRouters;
}

protected void mergeInConfigurationTablesTriggerRoutersForCurrentNode(String sourceNodeGroupId,
Expand Down
4 changes: 4 additions & 0 deletions symmetric/symmetric-core/src/test/resources/log4j.xml
Expand Up @@ -12,6 +12,10 @@
<category name="org.jumpmind">
<priority value="INFO" />
</category>

<category name="org.junit">
<priority value="DEBUG" />
</category>

<!--
<category name="org.jumpmind.symmetric.io.data.writer">
Expand Down
@@ -1,4 +1,4 @@
test.root=firebird
test.root=h2
test.client=h2

mysql.db.driver=com.mysql.jdbc.Driver
Expand Down
91 changes: 91 additions & 0 deletions symmetric/symmetric-server/src/test/java/TestArrayProc.java
@@ -0,0 +1,91 @@
import oracle.jdbc.*;
import oracle.sql.ArrayDescriptor;
import oracle.sql.ARRAY;
import oracle.jdbc.OracleTypes;
import java.sql.*;

public class TestArrayProc {

public static void main(String[] args)
throws ClassNotFoundException, SQLException
{
long ts = System.currentTimeMillis();
DriverManager.registerDriver (new oracle.jdbc.OracleDriver());

String url = "jdbc:oracle:thin:@//localhost:1521/XE";

Connection conn =
DriverManager.getConnection(url,"SampleRoot","admin");

conn.setAutoCommit(false);

// Create descriptors for each Oracle collection type required
ArrayDescriptor oracleVarchar2Collection =
ArrayDescriptor.createDescriptor("VARCHAR2_T",conn);

ArrayDescriptor oracleIntegerCollection =
ArrayDescriptor.createDescriptor("INTEGER_T",conn);

ArrayDescriptor oracleNumberCollection =
ArrayDescriptor.createDescriptor("NUMBER_T",conn);

CallableStatement stmt =
conn.prepareCall("{ call insert_payments_a(?, ?, ?, ?, ?, ? ) }");

int INSERT_COUNT = 100000;
// JAVA arrays to hold the data.
double[] payment_amount_array = new double[INSERT_COUNT];
String[] card_number_array = new String[INSERT_COUNT];
String[] expire_month_array = new String[INSERT_COUNT];
String[] expire_year_array = new String[INSERT_COUNT];
String[] name_on_card_array = new String[INSERT_COUNT];

// Fill the Java arrays.
for (int i=0; i< INSERT_COUNT; i++) {
payment_amount_array[i] = 99.99;
card_number_array[i] = "1234567890123456";
expire_month_array[i] = "12";
expire_year_array[i] = "15";
name_on_card_array[i] = "Mr S ODONNELL";
}

payment_amount_array[50] = 1001.00;

// Cast the Java arrays into Oracle arrays
ARRAY ora_payment_amount = new ARRAY (oracleNumberCollection, conn, payment_amount_array);
ARRAY ora_card_number = new ARRAY (oracleVarchar2Collection, conn, card_number_array);
ARRAY ora_expire_month = new ARRAY (oracleVarchar2Collection, conn, expire_month_array);
ARRAY ora_expire_year = new ARRAY (oracleVarchar2Collection, conn, expire_year_array);
ARRAY ora_name_on_card = new ARRAY (oracleVarchar2Collection, conn, name_on_card_array);

// Bind the input arrays.
stmt.setObject(1, ora_payment_amount);
stmt.setObject(2, ora_card_number);
stmt.setObject(3, ora_expire_month);
stmt.setObject(4, ora_expire_year);
stmt.setObject(5, ora_name_on_card);

// Bind the output array, this will contain any exception indexes.
stmt.registerOutParameter(6, OracleTypes.ARRAY, "INTEGER_T");

stmt.execute();

// Get any exceptions. Remember Oracle arrays index from 1,
// so all indexes are +1 off.
int[] errors = new int[100];
ARRAY ora_errors = ((OracleCallableStatement)stmt).getARRAY(6);
// cast the oracle array back to a Java array
// Remember, Oracle arrays are indexed from 1, while Java is indexed from zero
// Any array indexes returned as failures from Oracle need to have 1 subtracted from them
// to reference the correct Java array element!
errors = ora_errors.getIntArray();
System.out.println(errors.length);
for (int i : errors) {
System.out.println("error:"+i);
}
conn.commit();

ts = System.currentTimeMillis() -ts;
System.err.println("It took " + ts + "ms to insert " + INSERT_COUNT + " rows at " + ((double)ts/(double)INSERT_COUNT) + "ms/row");
}
}

0 comments on commit 03d1ab7

Please sign in to comment.