Skip to content
Permalink
Browse files

0003824: Additional metrics on table_reload_request and extract_request

to support loads
Description
  • Loading branch information...
jumpmind-josh committed Dec 10, 2018
1 parent 4c71e6d commit 77b85e2d8188797af5549dacae81a55bdd0d929d
@@ -420,10 +420,8 @@ protected IStatisticManager createStatisticManager() {
String statisticManagerClassName = parameterService.getString(ParameterConstants.STATISTIC_MANAGER_CLASS);
if (statisticManagerClassName != null) {
try {
Constructor<?> cons = Class.forName(statisticManagerClassName).getConstructor(IParameterService.class,
INodeService.class, IConfigurationService.class, IStatisticService.class, IClusterService.class);
return (IStatisticManager) cons.newInstance(parameterService, nodeService,
configurationService, statisticService, clusterService);
Constructor<?> cons = Class.forName(statisticManagerClassName).getConstructor(ISymmetricEngine.class);
return (IStatisticManager) cons.newInstance(this);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -57,7 +57,7 @@

public List<TableReloadRequest> getTableReloadRequestsByLoadId();

public void updateTableReloadRequestsCounts(long loadId, int batchCount, long rowsCount);
public void updateTableReloadRequestsCounts(ISqlTransaction transaction, long loadId, int batchCount, long rowsCount);

public void updateTableReloadRequestsLoadedCounts(ISqlTransaction transcation, long loadId, int batchCount, long rowsCount);

@@ -375,53 +375,61 @@ public void updateTableReloadRequestsLoadedCounts(ISqlTransaction transaction, l

}

public void updateTableReloadRequestsCounts(long loadId, int batchCount, long rowsCount) {
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
transaction.prepareAndExecute(getSql("updateTableReloadRequestCounts"),
new Object[] { batchCount, rowsCount, new Date(), loadId },
new int[] { Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.NUMERIC});
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
public void updateTableReloadRequestsCounts(ISqlTransaction transaction, long loadId, int batchCount, long rowsCount) {
String sql = getSql("updateTableReloadRequestCounts");
Object[] args = new Object[] {batchCount, rowsCount, new Date(), loadId};
int[] types = new int[] { Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.NUMERIC};

if (transaction == null) {
try {
transaction = sqlTemplate.startSqlTransaction();
transaction.prepareAndExecute(sql, args, types);
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
close(transaction);
}
throw ex;
} finally {
close(transaction);
} else {
transaction.prepareAndExecute(sql, args, types);
}
}

public void updateTableReloadRequestsLoadAndTableCounts(long loadId, int tableCount, TableReloadRequest request) {
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
transaction.prepareAndExecute(getSql("updateTableReloadRequestLoadId"),
new Object[] {
loadId, tableCount, new Date(), request.getTargetNodeId(), request.getSourceNodeId(),
request.getTriggerId(), request.getRouterId(), request.getCreateTime()
},
new int[] { Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP});
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
public void updateTableReloadRequestsLoadAndTableCounts(ISqlTransaction transaction, long loadId, int tableCount, TableReloadRequest request) {
Object[] args = new Object[] { loadId, tableCount, new Date(), request.getTargetNodeId(), request.getSourceNodeId(),
request.getTriggerId(), request.getRouterId(), request.getCreateTime() };
String sql = getSql("updateTableReloadRequestLoadId");
int[] types = new int[] { Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP};

if (transaction == null) {
try {
transaction = sqlTemplate.startSqlTransaction();
transaction.prepareAndExecute(sql, args, types);
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
close(transaction);
}
throw ex;
} finally {
close(transaction);
} else {
transaction.prepareAndExecute(sql, args, types);
}
}

@@ -677,7 +685,9 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa

if (reloadRequests != null && reloadRequests.size() > 0) {
for (TableReloadRequest request : reloadRequests) {
updateTableReloadRequestsLoadAndTableCounts(loadId, totalTableCount, request);
updateTableReloadRequestsLoadAndTableCounts(
platform.supportsMultiThreadedTransactions() ? null : transaction,
loadId, totalTableCount, request);
}
}

@@ -1197,7 +1207,8 @@ private int insertLoadBatchesForReload(Node targetNode, long loadId, String crea
}
}

updateTableReloadRequestsCounts(loadId, new Long(numberOfBatches).intValue(), rowCount);
updateTableReloadRequestsCounts(platform.supportsMultiThreadedTransactions() ? null : transaction,
loadId, new Long(numberOfBatches).intValue(), rowCount);
totalBatchCount += numberOfBatches;

engine.getDataExtractorService().requestExtractRequest(transaction, targetNode.getNodeId(), channel.getQueue(),
@@ -112,6 +112,8 @@

protected Boolean supportsTransactions;

protected Boolean supportsMultiThreadedTransactions;

public AbstractDatabasePlatform(SqlTemplateSettings settings) {
this.settings = settings;
}
@@ -1143,6 +1145,11 @@ public boolean supportsTransactions() {
return supportsTransactions;
}

@Override
public boolean supportsMultiThreadedTransactions() {
return true;
}

public long getEstimatedRowCount(Table table) {
DatabaseInfo dbInfo = getDatabaseInfo();
String quote = dbInfo.getDelimiterToken();
@@ -193,6 +193,8 @@ public DmlStatement createDmlStatement(DmlType dmlType, String catalogName, Stri

public boolean supportsTransactions();

public boolean supportsMultiThreadedTransactions();

public long getEstimatedRowCount(Table table);

}
@@ -103,4 +103,9 @@ public PermissionResult getCreateSymTriggerPermission() {
return result;
}

@Override
public boolean supportsMultiThreadedTransactions() {
return false;
}

}

0 comments on commit 77b85e2

Please sign in to comment.
You can’t perform that action at this time.