Skip to content

Commit

Permalink
Merge branch '3.8' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.8
  • Loading branch information
mmichalek committed Oct 1, 2016
2 parents 5a57306 + 350a540 commit eb50ea1
Show file tree
Hide file tree
Showing 17 changed files with 245 additions and 51 deletions.
Expand Up @@ -230,10 +230,16 @@ public static File createSnapshot(ISymmetricEngine engine) {
extract(export, new File(tmpDir, "sym_node_communication.csv"),
TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_COMMUNICATION));

extract(export, 5000, "order by create_time desc", new File(tmpDir, "sym_outgoing_batch.csv"),
extract(export, 10000, "order by create_time desc", new File(tmpDir, "sym_outgoing_batch.csv"),
TableConstants.getTableName(tablePrefix, TableConstants.SYM_OUTGOING_BATCH));

extract(export, 5000, "order by create_time desc", new File(tmpDir, "sym_incoming_batch.csv"),

extract(export, 10000, "where status != 'OK' order by create_time", new File(tmpDir, "sym_outgoing_batch_not_ok.csv"),
TableConstants.getTableName(tablePrefix, TableConstants.SYM_OUTGOING_BATCH));

extract(export, 10000, "order by create_time desc", new File(tmpDir, "sym_incoming_batch.csv"),
TableConstants.getTableName(tablePrefix, TableConstants.SYM_INCOMING_BATCH));

extract(export, 10000, "where status != 'OK' order by create_time", new File(tmpDir, "sym_incoming_batch_not_ok.csv"),
TableConstants.getTableName(tablePrefix, TableConstants.SYM_INCOMING_BATCH));

extract(export, 5000, "order by start_id, end_id desc", new File(tmpDir, "sym_data_gap.csv"),
Expand Down
Expand Up @@ -649,50 +649,47 @@ public synchronized void uninstall() {

try {
String prefix = parameterService.getTablePrefix();
Table table = platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_TRIGGER_ROUTER));
if (table != null) {

if (platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_GROUPLET)) != null) {
groupletService.deleteAllGrouplets();
}

if (platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_TRIGGER_ROUTER)) != null) {
triggerRouterService.deleteAllTriggerRouters();
}

if (platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_FILE_TRIGGER_ROUTER)) != null) {
fileSyncService.deleteAllFileTriggerRouters();
}

if (platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_ROUTER)) != null) {
triggerRouterService.deleteAllRouters();
}

table = platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_CONFLICT));
if (table != null) {
// need to remove all conflicts before we can remove the node group links

if (platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_CONFLICT)) != null) {
dataLoaderService.deleteAllConflicts();
}

table = platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_TRANSFORM_TABLE));
if (table != null) {
// need to remove all transforms before we can remove the node group links
if (platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_TRANSFORM_TABLE)) != null) {
transformService.deleteAllTransformTables();
}

table = platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_ROUTER));
if (table != null) {
if (platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_ROUTER)) != null) {
triggerRouterService.deleteAllRouters();
}

table = platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_CONFLICT));
if (table != null) {
if (platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_CONFLICT)) != null) {
dataLoaderService.deleteAllConflicts();
}

table = platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_NODE_GROUP_LINK));
if (table != null) {
// remove the links so the triggers on the sym tables will be removed
if (platform.readTableFromDatabase(null, null, TableConstants.getTableName(prefix, TableConstants.SYM_NODE_GROUP_LINK)) != null) {
configurationService.deleteAllNodeGroupLinks();
}

if (table != null) {
// this should remove all triggers because we have removed all the
// trigger configuration
triggerRouterService.syncTriggers();
}

// this should remove all triggers because we have removed all the trigger configuration
triggerRouterService.syncTriggers();
} catch (SqlException ex) {
log.warn("Error while trying remove triggers on tables", ex);
log.warn("Error while trying to remove triggers on tables", ex);
}

// remove any additional triggers that may remain because they were not in trigger history
Expand Down
Expand Up @@ -202,10 +202,14 @@ public long getCurrentLoadId() {
return currentLoadId;
}

public String getCurrentChannelId() {
public String getCurrentChannelThread() {
if (getKey().getChannelId() != null && getKey().getChannelId().length() > 0) {
return getKey().getChannelId();
}
return "";
}

public String getCurrentChannelId() {
return currentChannelId;
}

Expand Down
Expand Up @@ -579,7 +579,7 @@ public FutureOutgoingBatch call() throws Exception {
*/
log.info("Batch {} is marked as ready but it has been deleted. Rescheduling it for extraction",
extractBatch.getNodeBatchId());
if (changeBatchStatus(Status.RQ, extractBatch, mode)) {
if (mode != ExtractMode.EXTRACT_ONLY) {
resetExtractRequest(extractBatch);
}
status.shouldExtractSkip = outgoingBatch.isExtractSkipped = true;
Expand Down Expand Up @@ -1213,8 +1213,28 @@ public List<ExtractRequest> getExtractRequestsForNode(NodeCommunication nodeComm
}

protected void resetExtractRequest(OutgoingBatch batch) {
sqlTemplate.update(getSql("resetExtractRequestStatus"), ExtractStatus.NE.name(),
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
batch.setStatus(Status.RQ);
outgoingBatchService.updateOutgoingBatch(transaction, batch);

transaction.prepareAndExecute(getSql("resetExtractRequestStatus"), ExtractStatus.NE.name(),
batch.getBatchId(), batch.getBatchId(), batch.getNodeId());
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
close(transaction);
}
}

public void requestExtractRequest(ISqlTransaction transaction, String nodeId, String queue,
Expand Down Expand Up @@ -1282,10 +1302,15 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
* "Trick" the extractor to extract one reload batch, but we
* will split it across the N batches when writing it
*/
processInfo.setCurrentLoadId(batches.get(0).getLoadId());
OutgoingBatch firstBatch = batches.get(0);
processInfo.setCurrentLoadId(firstBatch.getLoadId());
IStagedResource resource = getStagedResource(firstBatch);
if (resource != null && resource.exists() && resource.getState() != State.CREATE) {
resource.delete();
}
extractOutgoingBatch(processInfo, targetNode,
new MultiBatchStagingWriter(identity.getNodeId(), stagingManager,
batches, channel.getMaxBatchSize(), processInfo), batches.get(0), false,
new MultiBatchStagingWriter(request, identity.getNodeId(), stagingManager,
batches, channel.getMaxBatchSize(), processInfo), firstBatch, false,
false, ExtractMode.FOR_SYM_CLIENT);

} else {
Expand Down Expand Up @@ -1336,7 +1361,7 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
request.getEndBatchId() });
}
transaction.commit();

log.info("Done extracting {} batches for request {}", (request.getEndBatchId() - request.getStartBatchId()) + 1, request.getRequestId());
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
Expand Down Expand Up @@ -1382,6 +1407,8 @@ public ExtractRequest mapRow(Row row) {

public class MultiBatchStagingWriter implements IDataWriter {

ExtractRequest request;

long maxBatchSize;

StagingDataWriter currentDataWriter;
Expand All @@ -1405,15 +1432,19 @@ public class MultiBatchStagingWriter implements IDataWriter {
boolean inError = false;

ProcessInfo processInfo;

long startTime, ts, rowCount, byteCount;

public MultiBatchStagingWriter(String sourceNodeId, IStagingManager stagingManager,
public MultiBatchStagingWriter(ExtractRequest request, String sourceNodeId, IStagingManager stagingManager,
List<OutgoingBatch> batches, long maxBatchSize, ProcessInfo processInfo) {
this.request = request;
this.sourceNodeId = sourceNodeId;
this.maxBatchSize = maxBatchSize;
this.stagingManager = stagingManager;
this.batches = new ArrayList<OutgoingBatch>(batches);
this.finishedBatches = new ArrayList<OutgoingBatch>(batches.size());
this.processInfo = processInfo;
this.startTime = this.ts = System.currentTimeMillis();
}

public void open(DataContext context) {
Expand All @@ -1428,9 +1459,14 @@ public void open(DataContext context) {
}

public void close() {
if (this.currentDataWriter != null) {
this.currentDataWriter.close();
}
while (batches.size() > 0) {
startNewBatch();
end(this.table);
end(this.batch, false);
}
if (this.currentDataWriter != null) {
this.currentDataWriter.close();
}
}

public Map<Batch, Statistics> getStatistics() {
Expand Down Expand Up @@ -1471,7 +1507,12 @@ public void write(CsvData data) {
checkSend();
startNewBatch();
}

if (System.currentTimeMillis() - ts > 60000) {
log.info("Request {} has been processing for {} seconds. BATCHES={}, ROWS={}, BYTES={}, RANGE={}-{}, CURRENT={}",
request.getRequestId(), (System.currentTimeMillis() - startTime) / 1000, finishedBatches.size(), rowCount, byteCount,
request.getStartBatchId(), request.getEndBatchId(), batch.getBatchId());
ts = System.currentTimeMillis();
}
}

public void checkSend() {
Expand Down Expand Up @@ -1520,10 +1561,15 @@ public void end(Batch batch, boolean inError) {
protected void nextBatch() {
if (this.outgoingBatch != null) {
this.finishedBatches.add(outgoingBatch);
rowCount += this.outgoingBatch.getDataEventCount();
byteCount += this.outgoingBatch.getByteCount();
}
this.outgoingBatch = this.batches.remove(0);
this.outgoingBatch.setDataEventCount(0);
this.outgoingBatch.setInsertEventCount(0);
if (this.finishedBatches.size() > 0) {
this.outgoingBatch.setExtractCount(this.outgoingBatch.getExtractCount() + 1);
}

/*
* Update the last update time so the batch
Expand Down
Expand Up @@ -373,7 +373,7 @@ auto.start.engine=true
# Type: boolean
auto.config.database=true

# If this is true, when symmetric starts up it will make sure the triggers in the database are up to date.
# If this is true, triggers will be created or dropped to match configuration during the sync triggers process.
#
# DatabaseOverridable: true
# Tags: general
Expand Down Expand Up @@ -779,7 +779,7 @@ job.refresh.cache.cron=0/30 * * * * *
#
# DatabaseOverridable: true
# Tags: jobs
job.stage.management.period.time.ms=15000
job.stage.management.period.time.ms=900000

# This is how often the initial load extract queue job will run in the background
#
Expand Down
Expand Up @@ -751,6 +751,10 @@ protected void filterColumnSqlType(StringBuilder sqlType) {
int identityIndex = sqlType.indexOf("identity");
if (identityIndex > 0) {
sqlType.replace(identityIndex, sqlType.length(), "");
int parensIndex = sqlType.indexOf("()");
if (parensIndex > 0) {
sqlType.replace(parensIndex, sqlType.length(), "");
}
}
if (sqlType.indexOf("datetimeoffset") >= 0) {
sqlType.setLength(0);
Expand Down
Expand Up @@ -110,6 +110,12 @@ public <T> ISqlReadCursor<T> queryForCursor(String sql, ISqlRowMapper<T> mapper)
return this.queryForCursor(sql, mapper, null, null);
}

public <T> ISqlReadCursor<T> queryForCursor(String sql, ISqlRowMapper<T> mapper,
IConnectionHandler connectionHandler, Object[] args,
int[] types) {
return queryForCursor(sql, mapper, args, types);
}

public List<Row> query(String sql) {
return query(sql, (Object[])null, (int[]) null);
}
Expand All @@ -118,6 +124,11 @@ public <T> List<T> query(String sql, ISqlRowMapper<T> mapper, Object... args) {
return query(sql, mapper, args, null);
}

@Override
public <T> List<T> queryWithHandler(String sql, ISqlRowMapper<T> mapper, IConnectionHandler conHandler, Object... params) {
return query(sql, mapper, conHandler, params, null);
}

public Row queryForRow(String sql, Object... args) {
return queryForObject(sql, new ISqlRowMapper<Row>() {
public Row mapRow(Row row) {
Expand Down Expand Up @@ -176,10 +187,19 @@ public Row mapRow(Row row) {
public <T> List<T> query(String sql, ISqlRowMapper<T> mapper, Object[] args, int[] types) {
return query(sql, -1, mapper, args, types);
}

public <T> List<T> query(String sql, ISqlRowMapper<T> mapper, IConnectionHandler handler, Object[] args, int[] types) {
return query(sql, -1, mapper, handler, args, types);
}

public <T> List<T> query(String sql, int maxNumberOfRowsToFetch, ISqlRowMapper<T> mapper,
Object[] args, int[] types) {
ISqlReadCursor<T> cursor = queryForCursor(sql, mapper, args, types);
IConnectionHandler handler = null;
return query(sql, maxNumberOfRowsToFetch, mapper, handler, args, types);
}
public <T> List<T> query(String sql, int maxNumberOfRowsToFetch, ISqlRowMapper<T> mapper,
IConnectionHandler handler, Object[] args, int[] types) {
ISqlReadCursor<T> cursor = queryForCursor(sql, mapper, handler, args, types);
try {
T next = null;
List<T> list = new ArrayList<T>();
Expand Down
@@ -0,0 +1,45 @@
package org.jumpmind.db.sql;

import java.sql.Connection;
import java.sql.SQLException;

public class ChangeCatalogConnectionHandler implements IConnectionHandler {

private String previousCatalog;

private String changeCatalog;

public ChangeCatalogConnectionHandler(String newCatalog) {
changeCatalog = newCatalog;
}

@Override
public void before(Connection connection) {
if (changeCatalog != null) {
try {
previousCatalog = connection.getCatalog();
connection.setCatalog(changeCatalog);
} catch (SQLException e) {
if (changeCatalog != null) {
try {
connection.setCatalog(previousCatalog);
} catch (SQLException ex) {
}
}
throw new SqlException(e);
}
}

}

@Override
public void after(Connection connection) {
try {
if (previousCatalog != null) {
connection.setCatalog(previousCatalog);
}
}
catch (SQLException ex) {}
}

}
@@ -0,0 +1,11 @@
package org.jumpmind.db.sql;

import java.sql.Connection;

public interface IConnectionHandler {

public void before(Connection connection);

public void after(Connection connection);

}

0 comments on commit eb50ea1

Please sign in to comment.