Skip to content

Commit

Permalink
Merge branch '3.14' of https://github.com/JumpMind/symmetric-ds into …
Browse files Browse the repository at this point in the history
…3.14
  • Loading branch information
JishLong committed Jun 30, 2023
2 parents 9ebf333 + aac7f6b commit 713313a
Show file tree
Hide file tree
Showing 64 changed files with 1,050 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
package org.jumpmind.symmetric.android;

import java.util.ArrayList;
import java.util.List;

import org.jumpmind.symmetric.ISymmetricEngine;
Expand Down Expand Up @@ -137,4 +138,10 @@ public List<Monitor> getActiveMonitorsUnresolvedForNode(String nodeGroupId, Stri
public List<Monitor> getActiveMonitorsUnresolvedForNodeFromDb(String nodeGroupId, String externalId) {
return null;
}

@Override
public List<MonitorEvent> getMonitorEventsByMonitorId(String monitorId) {
return new ArrayList<MonitorEvent>();
}

}
21 changes: 21 additions & 0 deletions symmetric-assemble/src/asciidoc/appendix/sybase-ase.ad
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,27 @@ If you're unable to change NOCOUNT for the server, the "db.init.sql" parameter c
Connections are pooled and expected to be in the database context like a new connection, so avoid using the "USE database"
Transact-SQL statement in extension code.


.Supported Data Types
|===
|Data Type|Supported?

|BigInt, Int, SmallInt, TinyInt|Yes
|Decimal, Numeric|Yes
|Bit|Yes
|Money, SmallMoney|No
|Float, Real|Yes
|NChar, NVarchar|No
|UNIChar, UNIVarchar, UNIText|No
|Date, DateTime, SmallDatetime, Time|Yes
|Char, Varchar, Text|Yes
|Binary, Varbinary|Yes
|Image|Yes
|Blob|Yes
|===



==== Permissions
The SymmetricDS database user generally needs privileges for connecting and creating tables (including indexes), triggers, sequences,
and procedures (including packages and functions). In Sybase ASE, only the System Administrator can create or drop triggers on tables it does not own. The table owner can create or drop triggers freely on their tables. For change data capture, it is recommended that the application user who controls Sybase and SymmetricDS be the same, so that there will be no permissions errors with dropping or creating triggers. This means that all of the Sybase tables pertaining to and being used by SymmetricDS are owned by the same user, and that is the user who is assigned to SymmetricDS. If this is not possible, the SA user is the only other option, as users cannot be granted the permission to create or delete triggers on tables they do not own. Any outside users that will need to Insert, Update, Delete or read from a Sybase table that they do not own will need the following permissions for any tables they need access to:
Expand Down
2 changes: 2 additions & 0 deletions symmetric-assemble/src/asciidoc/configuration/monitors.ad
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ be compared to a threshold value.

|fileHandles|Percentage from 0 to 100 of Operating System's open file handles. Not implemented for Windows.

|job|Number of jobs that are in error. This only applies to jobs that record statistics in the <<NODE_HOST_JOB_STATS>> table. The built-in jobs that write to this table are Routing, Purge Outgoing, Purge Incoming, and SyncTriggers.

ifdef::pro[]
|licenseExpire|Percentage from 0 to 100 of the license usage, with expiration occurring at 100%.

Expand Down
52 changes: 52 additions & 0 deletions symmetric-assemble/src/asciidoc/configuration/transforms/types.ad
Original file line number Diff line number Diff line change
Expand Up @@ -564,3 +564,55 @@ This transformation checks to see if the source value is null and if so replaces

For an update, this transform returns a comma-separated list of columns names that were set to null and previously not null.

===== Java Transform

Java Transform ('java'): Use Java code in the transform expression that is included
in the transform method of a class that extends JavaColumnTransform. The class is compiled
whenever the transform expression changes and kept in memory for runtime.
The code must return a String for the new value of the column being mapped.

Some variables are provided to the code:

.Variables
|===
|Variable Name|Java Type|Description

|platform|org.jumpmind.db.platform.IDatabasePlatform|The platform for the database that this node is connected to
|context|org.jumpmind.symmetric.io.data.DataContext|The data cotext for the synchronization of the current batch
|column|org.jumpmind.symmetric.io.data.transform.TransformColumn|The transform column
|data|org.jumpmind.symmetric.io.data.transform.TransformedData|The transformed data
|sourceValues|java.util.Map<java.lang.String, java.lang.String>|The map of source values
|newValue|java.lang.String|The captured new value
|oldValue|java.lang.String|The captured old value

|===

.Transform Expression Example Returning a String
====

----
if (sourceValues.containsKey("OLDKEY")) {
return sourceValues.get("OLDKEY");
} else {
return sourceValues.get("NEWKEY");
}
----
====

ifndef::pro[]
[source, SQL]
----
INSERT INTO SYM_TRANSFORM_COLUMN (
transform_id, include_on, target_column_name, source_column_name, pk,
transform_type, transform_expression, transform_order, last_update_time,
last_update_by, create_time
) VALUES (
'testjava', '*', 'NEWKEY', null, 0,
'java', 'if (sourceValues.containsKey("OLDKEY")) {
return sourceValues.get("OLDKEY");
} else {
return sourceValues.get("NEWKEY");
}', 0, current_timestamp, 'Documentation', current_timestamp);
----
endif::pro[]

4 changes: 2 additions & 2 deletions symmetric-assemble/src/asciidoc/manage/monitors.ad
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ image::images/manage/manage-monitors.png[]
endif::pro[]

When a <<_monitors,Monitor>> is configured, it is run periodically to check the current value of a system metric and compare it to a threshold value.
Different monitor types can check the CPU usage, disk usage, memory usage, batch errors, outstanding batches, unrouted data, and number
of data gaps.
Different monitor types can check the CPU usage, disk usage, memory usage, batch errors, outstanding batches, unrouted data, number
of data gaps, and job errors.
Custom monitor types can be created using <<_extensions,Extensions>> that use the IMonitorType interface.
When the value returned from the check meets or exceeds the threshold value, a <<_monitor_event>> is recorded.
The <<_monitor_event>> table is synchronized on the "monitor" channel, which allows a central server to see events from remote nodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.commons.lang3.math.NumberUtils;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.JobDefinition;
import org.jumpmind.symmetric.model.Lock;
Expand Down Expand Up @@ -182,7 +181,6 @@ public boolean invoke(boolean force) {
try {
MDC.put("engineName", engine.getEngineName());
IParameterService parameterService = engine.getParameterService();
long recordStatisticsCountThreshold = parameterService.getLong(ParameterConstants.STATISTIC_RECORD_COUNT_THRESHOLD, -1);
boolean ok = checkPrerequsites(force);
if (!ok) {
return false;
Expand All @@ -209,11 +207,6 @@ public boolean invoke(boolean force) {
long endTime = System.currentTimeMillis();
lastExecutionTimeInMs = endTime - startTime;
totalExecutionTimeInMs += lastExecutionTimeInMs;
if (lastExecutionTimeInMs > Constants.LONG_OPERATION_THRESHOLD ||
(recordStatisticsCountThreshold > 0 && getProcessedCount() > recordStatisticsCountThreshold)) {
engine.getStatisticManager().addJobStats(targetNodeId, targetNodeCount, jobName,
startTime, endTime, getProcessedCount());
}
numberOfRuns++;
running.set(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,14 @@ public static File createSnapshot(ISymmetricEngine engine, IProgressListener lis
}
}
}
File backupConfig = new File("conf/.config");
if (backupConfig.canRead()) {
try {
FileUtils.copyFileToDirectory(backupConfig, tmpDir);
} catch (IOException e) {
log.warn("Failed to copy {}", backupConfig.getName());
}
}
checkpoint(engine, listener, 5, 7);
File jarFile = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table
if (useTriggerTemplateForColumnTemplatesDuringInitialLoad(column)) {
ColumnString columnString = fillOutColumnTemplate(tableAlias,
tableAlias, "", table, column, DataEventType.INSERT, false, channel,
triggerRouter.getTrigger());
triggerRouter.getTrigger(), true);
columnExpression = columnString.columnString;
if (isNotBlank(textColumnExpression)
&& TypeMap.isTextType(column.getMappedTypeCode())) {
Expand Down Expand Up @@ -777,7 +777,7 @@ protected ColumnString buildColumnsString(String origTableAlias, String tableAli
Column column = columns[i];
if (column != null) {
ColumnString columnString = fillOutColumnTemplate(origTableAlias, tableAlias,
columnPrefix, table, column, dml, isOld, channel, trigger);
columnPrefix, table, column, dml, isOld, channel, trigger, false);
columnsText = columnsText + "\n " + columnString.columnString
+ lastCommandToken;
containsLob |= columnString.isBlobClob;
Expand All @@ -792,7 +792,7 @@ protected ColumnString buildColumnsString(String origTableAlias, String tableAli

protected ColumnString fillOutColumnTemplate(String origTableAlias, String tableAlias,
String columnPrefix, Table table, Column column, DataEventType dml, boolean isOld, Channel channel,
Trigger trigger) {
Trigger trigger, boolean ignoreStreamLobs) {
boolean isLob = symmetricDialect.getPlatform().isLob(column.getMappedTypeCode());
String templateToUse = null;
if (column.getJdbcTypeName() != null
Expand Down Expand Up @@ -925,7 +925,7 @@ protected ColumnString fillOutColumnTemplate(String origTableAlias, String table
}
if (dml == DataEventType.DELETE && isLob && requiresEmptyLobTemplateForDeletes()) {
templateToUse = emptyColumnTemplate;
} else if (isLob && trigger.isUseStreamLobs()) {
} else if (isLob && trigger.isUseStreamLobs() && !ignoreStreamLobs) {
templateToUse = emptyColumnTemplate;
}
if (templateToUse != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.service.ITriggerRouterService;
import org.jumpmind.symmetric.util.SymmetricUtils;
Expand Down Expand Up @@ -110,10 +111,12 @@ protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId,
table.setSchema(null);
} else if (StringUtils.isNotBlank(router.getTargetSchemaName())) {
table.setSchema(SymmetricUtils.replaceNodeVariables(sourceNode, targetNode, router.getTargetSchemaName()));
table.setSchema(SymmetricUtils.replaceCatalogSchemaVariables(catalogName, schemaName, router.getTargetSchemaName()));
}
if (StringUtils.isNotBlank(router.getTargetTableName())) {
table.setName(router.getTargetTableName());
}

}
return table;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand All @@ -39,8 +40,11 @@
import org.jumpmind.symmetric.ext.IHeartbeatListener;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IStatisticService;
import org.jumpmind.symmetric.statistic.IStatisticManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -59,9 +63,59 @@ public void heartbeat(Node me) {
boolean updateWithBatchStatus = parameterService.is(ParameterConstants.HEARTBEAT_UPDATE_NODE_WITH_BATCH_STATUS, false);
int outgoingErrorCount = -1;
int outgoingUnsentCount = -1;
int outgoingUnsentRowCount = -1;
Date lastSuccessfulSyncTime = null;
String mostRecentActiveTableSynced = null;
int totalRowsLoaded = -1;
Date oldestLoadedDate = null;
long purgeOutgoingLastMs = -1;
Date purgeOutgoingLastRun = null;
long purgeOutgoingAverage = -1;

long routingLastMs = -1;
long routingAveragetMs = -1;
Date routingLastRun = null;
long symDataSize = -1;

if (updateWithBatchStatus) {
outgoingUnsentCount = engine.getOutgoingBatchService().countOutgoingBatchesUnsent();
outgoingErrorCount = engine.getOutgoingBatchService().countOutgoingBatchesInError();
int[] batchesRowsUnsent = engine.getOutgoingBatchService().countOutgoingNonSystemBatchesRowsUnsent();

outgoingUnsentCount = batchesRowsUnsent[0];
outgoingUnsentRowCount = batchesRowsUnsent[1];

Date outDate = engine.getOutgoingBatchService().getOutgoingBatchesLatestUpdateSql();
Date inDate = engine.getIncomingBatchService().getIncomingBatchesLatestUpdateSql();
if (outDate == null && inDate == null) {
lastSuccessfulSyncTime = null;
} else if (outDate == null) {
lastSuccessfulSyncTime = inDate;
} else if (inDate == null) {
lastSuccessfulSyncTime = outDate;
} else {
lastSuccessfulSyncTime = outDate.after(inDate) ? outDate : inDate;
}

IStatisticManager statisticsManager = engine.getStatisticManager();
mostRecentActiveTableSynced = statisticsManager.getMostRecentActiveTableSynced();
Map<Integer, Date> totalLoadedRowsMap = statisticsManager.getTotalLoadedRows();
if (totalLoadedRowsMap != null && totalLoadedRowsMap.size() == 1) {
totalRowsLoaded = totalLoadedRowsMap.keySet().iterator().next();
oldestLoadedDate = totalLoadedRowsMap.values().iterator().next();
}

IJob purgeOutgoingJob = engine.getJobManager().getJob(ClusterConstants.PURGE_OUTGOING);
purgeOutgoingLastMs = purgeOutgoingJob.getLastExecutionTimeInMs();
purgeOutgoingLastRun = purgeOutgoingJob.getLastFinishTime();
purgeOutgoingAverage = purgeOutgoingJob.getAverageExecutionTimeInMs();

IJob routeJob = engine.getJobManager().getJob(ClusterConstants.ROUTE);
routingAveragetMs = routeJob.getAverageExecutionTimeInMs();
routingLastRun = routeJob.getLastFinishTime();
routingLastMs = routeJob.getLastExecutionTimeInMs();

symDataSize = engine.getDataService().countData();

}
if (!parameterService.getExternalId().equals(me.getExternalId())
|| !parameterService.getNodeGroupId().equals(me.getNodeGroupId())
Expand All @@ -73,7 +127,16 @@ public void heartbeat(Node me) {
|| !symmetricDialect.getName().equals(me.getDatabaseType())
|| !symmetricDialect.getVersion().equals(me.getDatabaseVersion())
|| me.getBatchInErrorCount() != outgoingErrorCount
|| me.getBatchToSendCount() != outgoingUnsentCount) {
|| me.getBatchToSendCount() != outgoingUnsentCount
|| me.getLastSuccessfulSyncDate() != lastSuccessfulSyncTime
|| me.getMostRecentActiveTableSynced() != mostRecentActiveTableSynced
|| me.getPurgeOutgoingLastMs() != purgeOutgoingLastMs
|| me.getPurgeOutgoingLastRun() != purgeOutgoingLastRun
|| me.getPurgeOutgoingAverageMs() != purgeOutgoingAverage
|| me.getRoutingAverageMs() != routingAveragetMs
|| me.getRoutingLastRun() != routingLastRun
|| me.getRoutingLastMs() != routingLastMs
|| me.getSymDataSize() != symDataSize) {
log.info("Some attribute(s) of node changed. Recording changes");
me.setDeploymentType(engine.getDeploymentType());
me.setDeploymentSubType(engine.getDeploymentSubType());
Expand All @@ -83,6 +146,19 @@ public void heartbeat(Node me) {
me.setDatabaseName(engine.getDatabasePlatform().getName());
me.setBatchInErrorCount(outgoingErrorCount);
me.setBatchToSendCount(outgoingUnsentCount);
me.setLastSuccessfulSyncDate(lastSuccessfulSyncTime);
me.setDataRowsToSendCount(outgoingUnsentRowCount);
me.setMostRecentActiveTableSynced(mostRecentActiveTableSynced);
me.setDataRowsLoadedCount(totalRowsLoaded);
me.setOldestLoadTime(oldestLoadedDate);
me.setPurgeOutgoingLastMs(purgeOutgoingLastMs);
me.setPurgeOutgoingLastRun(purgeOutgoingLastRun);
me.setPurgeOutgoingAverageMs(purgeOutgoingAverage);
me.setRoutingAverageMs(routingAveragetMs);
me.setRoutingLastRun(routingLastRun);
me.setRoutingLastMs(routingLastMs);
me.setSymDataSize(symDataSize);

me.setSchemaVersion(parameterService.getString(ParameterConstants.SCHEMA_VERSION));
if (engine.getParameterService().isRegistrationServer()) {
me.setConfigVersion(Version.version());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) {
}
}
}
} else if (matchesTable(table, TableConstants.SYM_MONITOR) && (data.getDataEventType() == DataEventType.INSERT)) {
Map<String, String> newData = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA);
String monitorID = newData.get("MONITOR_ID");
if (monitorID != null && (monitorID.equals("SystemBatchErrorMonitor")
|| monitorID.equals("SystemLogMonitor")
|| monitorID.equals("SystemOfflineNodeMonitor"))) {
return false;
}

}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class AbstractBatch implements Serializable {

public enum Status {
OK("Ok"), ER("Error"), RQ("Request"), NE("New"), QY("Querying"), SE("Sending"), LD("Loading"), RT("Routing"), IG("Ignored"), RS(
"Resend"), XX("Unknown");
"Resend"), XX("Unknown"), LS("LoadSetup");

private String description;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class ExtractRequest implements Serializable {
private static final long serialVersionUID = 1L;

public enum ExtractStatus {
NE, OK
NE, OK, LS
};

private long requestId;
Expand Down

0 comments on commit 713313a

Please sign in to comment.