diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleTriggerTemplate.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleTriggerTemplate.java index 9da5d5e839..4a972b3488 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleTriggerTemplate.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleTriggerTemplate.java @@ -43,7 +43,7 @@ public OracleTriggerTemplate(ISymmetricDialect symmetricDialect) { dateTimeWithLocalTimeZoneColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char(cast($(tableAlias).\"$(columnName)\" as timestamp), 'YYYY-MM-DD HH24:MI:SS.FF9')),'\"'))" ; timeColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS','NLS_CALENDAR=''GREGORIAN''')),'\"'))" ; dateColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS','NLS_CALENDAR=''GREGORIAN''')),'\"'))" ; - clobColumnTemplate = "decode(dbms_lob.getlength(to_clob($(tableAlias).\"$(columnName)\")), null, to_clob(''), '\"'||replace(replace($(tableAlias).\"$(columnName)\",'\\','\\\\'),'\"','\\\"')||'\"')" ; + clobColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then null else '\"'||replace(replace($(tableAlias).\"$(columnName)\",'\\','\\\\'),'\"','\\\"')||'\"' end" ; blobColumnTemplate = "decode(dbms_lob.getlength($(tableAlias).\"$(columnName)\"), null, to_clob(''), '\"'||$(prefixName)_blob2clob($(tableAlias).\"$(columnName)\")||'\"')" ; longColumnTemplate = "$(oracleToClob)'\"\\b\"'"; booleanColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', '\"'||cast($(tableAlias).\"$(columnName)\" as number("+symmetricDialect.getTemplateNumberPrecisionSpec()+"))||'\"')" ; @@ -270,7 +270,7 @@ protected String getCreateTimeExpression(ISymmetricDialect symmetricDialect) { } protected String toClobExpression(Table table) { - if (table.hasNTypeColumns() || symmetricDialect.getParameterService().is(ParameterConstants.DBDIALECT_ORACLE_USE_NTYPES_FOR_SYNC)) { + if (symmetricDialect.getParameterService().is(ParameterConstants.DBDIALECT_ORACLE_USE_NTYPES_FOR_SYNC)) { return "to_nclob('')||"; } else { return "to_clob('')||"; diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/PostgresBulkDataLoaderFactory.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/PostgresBulkDataLoaderFactory.java index aba277c349..9e0594e1ec 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/PostgresBulkDataLoaderFactory.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/PostgresBulkDataLoaderFactory.java @@ -38,7 +38,7 @@ public class PostgresBulkDataLoaderFactory extends DefaultDataLoaderFactory { public PostgresBulkDataLoaderFactory(ISymmetricEngine engine) { - super(engine.getParameterService()); + super(engine); } public String getTypeName() { diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/InitialLoadExtractorJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/InitialLoadExtractorJob.java index 3271fd0821..a2f12b546d 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/InitialLoadExtractorJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/InitialLoadExtractorJob.java @@ -42,9 +42,13 @@ public JobDefaults getDefaults() { @Override public void doJob(boolean force) throws Exception { - if (engine.getParameterService().is(ParameterConstants.FILE_SYNC_ENABLE)) { - engine.getFileSyncExtractorService().queueWork(force); + if (engine.getParameterService().is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB)) { + + if (engine.getParameterService().is(ParameterConstants.FILE_SYNC_ENABLE)) { + engine.getFileSyncExtractorService().queueWork(force); + } + + engine.getDataExtractorService().queueWork(force); } - engine.getDataExtractorService().queueWork(force); } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java index 5c2f37aff4..7a15a1d1a5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java @@ -544,9 +544,9 @@ protected String replaceTemplateVariables(DataEventType dml, Trigger trigger, ddl = FormatUtils.replace("sourceNodeExpression", symmetricDialect.getSourceNodeExpression(), ddl); - ddl = FormatUtils.replace("oracleLobType", trigger.isUseCaptureLobs() ? "clob" : "long", + ddl = FormatUtils.replace("oracleLobType", trigger.isUseCaptureLobs() ? getClobType(table) : "long", ddl); - ddl = FormatUtils.replace("oracleLobTypeClobAlways", "clob", ddl); + ddl = FormatUtils.replace("oracleLobTypeClobAlways", getClobType(table), ddl); String syncTriggersExpression = symmetricDialect.getSyncTriggersExpression(); ddl = FormatUtils.replace("syncOnIncomingBatchCondition", @@ -680,13 +680,17 @@ private String getChannelExpression() { } protected String toClobExpression(Table table) { - if (table.hasNTypeColumns()) { + if (symmetricDialect.getParameterService().is(ParameterConstants.DBDIALECT_ORACLE_USE_NTYPES_FOR_SYNC)) { return "to_nclob('')||"; } else { return "to_clob('')||"; } } - + + protected String getClobType(Table table) { + return symmetricDialect.getParameterService().is(ParameterConstants.DBDIALECT_ORACLE_USE_NTYPES_FOR_SYNC) ? "nclob" : "clob"; + } + protected String getChannelExpression(Trigger trigger, TriggerHistory history, Table originalTable) { if (trigger.getChannelId().equals(Constants.CHANNEL_DYNAMIC)) { if (StringUtils.isNotBlank(trigger.getChannelExpression())) { @@ -910,6 +914,7 @@ else if (column.getJdbcTypeName() != null break; } case Types.CLOB: + case Types.NCLOB: if (isOld && symmetricDialect.needsToSelectLobData()) { templateToUse = emptyColumnTemplate; } else { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java index 7f2be57e9a..9c569c4790 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java @@ -47,7 +47,6 @@ import org.jumpmind.symmetric.io.data.writer.KafkaWriter; import org.jumpmind.symmetric.io.data.writer.ResolvedData; import org.jumpmind.symmetric.io.data.writer.TransformWriter; -import org.jumpmind.symmetric.service.IParameterService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,8 +59,9 @@ public class DefaultDataLoaderFactory extends AbstractDataLoaderFactory implemen public DefaultDataLoaderFactory() { } - public DefaultDataLoaderFactory(IParameterService parameterService) { - this.parameterService = parameterService; + public DefaultDataLoaderFactory(ISymmetricEngine engine) { + this.engine = engine; + this.parameterService = engine.getParameterService(); } public String getTypeName() { @@ -214,7 +214,7 @@ protected DatabaseWriterSettings buildDatabaseWriterSettings(List> kafkaDataMap = new HashMap>(); + protected Map>> kafkaDataMap = new HashMap>>(); protected String kafkaDataKey; private final Logger log = LoggerFactory.getLogger(getClass()); @@ -131,6 +131,8 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter { Map tableNameCache = new HashMap(); Map> tableColumnCache = new HashMap>(); + public static KafkaProducer kafkaProducer; + public KafkaWriterFilter(IParameterService parameterService) { schema = parser.parse(AVRO_CDC_SCHEMA); this.url = parameterService.getString(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + "db.url"); @@ -147,24 +149,28 @@ public KafkaWriterFilter(IParameterService parameterService) { this.confluentUrl = parameterService.getString(ParameterConstants.KAFKA_CONFLUENT_REGISTRY_URL); this.schemaPackage = parameterService.getString(ParameterConstants.KAFKA_AVRO_JAVA_PACKAGE); - configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.url); - configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - configs.put(ProducerConfig.CLIENT_ID_CONFIG, this.producer); - - if (confluentUrl != null) { - configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); - configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); - - configs.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentUrl); - } - - TypedProperties props = parameterService.getAllParameters(); - for (Object key : props.keySet()) { - if (key.toString().startsWith("kafkaclient.")) { - configs.put(key.toString().substring(12), props.get(key)); - } - } + if (kafkaProducer == null) { + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.url); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + configs.put(ProducerConfig.CLIENT_ID_CONFIG, this.producer); + + if (confluentUrl != null) { + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); + + configs.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentUrl); + } + + TypedProperties props = parameterService.getAllParameters(); + for (Object key : props.keySet()) { + if (key.toString().startsWith("kafkaclient.")) { + configs.put(key.toString().substring(12), props.get(key)); + } + } + kafkaProducer = new KafkaProducer(configs); + this.log.debug("Kafka client config: {}", configs); + } } public boolean beforeWrite(DataContext context, Table table, CsvData data) { @@ -178,6 +184,19 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { } StringBuffer kafkaText = new StringBuffer(); + String kafkaKey = null; + + if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) { + StringBuffer sb = new StringBuffer(); + sb.append(table.getName()).append(":"); + for (int i = 0; i < table.getPrimaryKeyColumnNames().length; i++) { + sb.append(":").append(rowData[i]); + } + kafkaKey = String.valueOf(sb.toString().hashCode()); + } else if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) { + String s = context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId(); + kafkaKey = String.valueOf(s.hashCode()); + } if (topicBy.equals(KAFKA_TOPIC_BY_CHANNEL)) { kafkaDataKey = context.getBatch().getChannelId(); @@ -188,9 +207,9 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { log.debug("Processing table {} for Kafka on topic {}", table, kafkaDataKey); if (kafkaDataMap.get(kafkaDataKey) == null) { - kafkaDataMap.put(kafkaDataKey, new ArrayList()); + kafkaDataMap.put(kafkaDataKey, new ArrayList>()); } - List kafkaDataList = kafkaDataMap.get(kafkaDataKey); + List> kafkaDataList = kafkaDataMap.get(kafkaDataKey); if (outputFormat.equals(KAFKA_FORMAT_JSON)) { kafkaText.append("{\"").append(table.getName()).append("\": {").append("\"eventType\": \"" + data.getDataEventType() + "\",") @@ -259,7 +278,7 @@ else if (Long.class.equals(propertyTypeClass)) { } } } - sendKafkaMessageByObject(pojo, kafkaDataKey); + sendKafkaMessage(new ProducerRecord(kafkaDataKey, kafkaKey, pojo)); } else { throw new RuntimeException("Unable to find a POJO to load for AVRO based message onto Kafka for table : " + tableName); } @@ -300,7 +319,7 @@ else if (Long.class.equals(propertyTypeClass)) { } } } - kafkaDataList.add(kafkaText.toString()); + kafkaDataList.add(new ProducerRecord(kafkaDataKey, kafkaKey, kafkaText.toString())); } return false; } @@ -430,21 +449,22 @@ public void batchComplete(DataContext context) { try { if (confluentUrl == null && kafkaDataMap.size() > 0) { StringBuffer kafkaText = new StringBuffer(); + String kafkaKey = null; - - for (Map.Entry> entry : kafkaDataMap.entrySet()) { - for (String row : entry.getValue()) { + for (Map.Entry>> entry : kafkaDataMap.entrySet()) { + for (ProducerRecord record : entry.getValue()) { if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) { - sendKafkaMessage(producer, row, entry.getKey()); + sendKafkaMessage(record); } else { - kafkaText.append(row); + kafkaKey = record.key(); + kafkaText.append(record.value()); } } if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) { - sendKafkaMessage(producer, kafkaText.toString(), entry.getKey()); + sendKafkaMessage(new ProducerRecord(entry.getKey(), kafkaKey, kafkaText.toString())); } } - kafkaDataMap = new HashMap>(); + kafkaDataMap = new HashMap>>(); } } catch (Exception e) { log.warn("Unable to write batch to Kafka " + batchFileName, e); @@ -464,16 +484,9 @@ public void batchCommitted(DataContext context) { public void batchRolledback(DataContext context) { } - public void sendKafkaMessage(KafkaProducer producer, String kafkaText, String topic) { - log.debug("Sending message (topic={}) {}", topic, kafkaText); - producer.send(new ProducerRecord(topic, kafkaText)); - } - - public void sendKafkaMessageByObject(Object bean, String topic) { - log.debug("Sending object (topic={}) {}", topic, bean); - KafkaProducer producer = new KafkaProducer(configs); - producer.send(new ProducerRecord(topic, bean)); - producer.close(); + public void sendKafkaMessage(ProducerRecord record) { + log.debug("Sending message (topic={}) (key={}) {}", record.topic(), record.key(), record.value()); + kafkaProducer.send(record); } public static byte[] datumToByteArray(Schema schema, GenericRecord datum) throws IOException { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeLog.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeLog.java index 8f7815b829..ca7d759f1f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeLog.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeLog.java @@ -75,7 +75,7 @@ public MonitorEvent check(Monitor monitor) { protected String serializeDetails(List logs) { String result = null; try { - GsonBuilder builder = new GsonBuilder(); + GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation(); builder.addSerializationExclusionStrategy(new SuperClassExclusion()); builder.addDeserializationExclusionStrategy(new SuperClassExclusion()); result = builder.create().toJson(logs); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index a82a439d7f..48a2715df2 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -42,6 +42,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; @@ -1001,8 +1002,11 @@ final protected boolean changeBatchStatus(Status status, OutgoingBatch currentBa */ final protected OutgoingBatch requeryIfEnoughTimeHasPassed(long ts, OutgoingBatch currentBatch) { if (System.currentTimeMillis() - ts > MS_PASSED_BEFORE_BATCH_REQUERIED) { - currentBatch = outgoingBatchService.findOutgoingBatch(currentBatch.getBatchId(), + OutgoingBatch batch = outgoingBatchService.findOutgoingBatch(currentBatch.getBatchId(), currentBatch.getNodeId()); + if (batch != null && !batch.getStatus().equals(currentBatch.getStatus())) { + currentBatch.setStatus(batch.getStatus()); + } } return currentBatch; } @@ -1078,7 +1082,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targe } } extractTimeInMs = System.currentTimeMillis() - ts; - Statistics stats = getExtractStats(writer); + Statistics stats = getExtractStats(writer, currentBatch); if (stats != null) { transformTimeInMs = stats.get(DataWriterStatisticConstants.TRANSFORMMILLIS); currentBatch.setDataRowCount(stats.get(DataWriterStatisticConstants.ROWCOUNT)); @@ -1120,23 +1124,8 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targe } if (updateBatchStatistics) { - long dataEventCount = currentBatch.getDataRowCount(); - long insertEventCount = currentBatch.getDataInsertRowCount(); - currentBatch = requeryIfEnoughTimeHasPassed(ts, currentBatch); - // preserve in the case of a reload event - if (dataEventCount > currentBatch.getDataRowCount()) { - currentBatch.setDataRowCount(dataEventCount); - } - - // preserve in the case of a reload event - if (insertEventCount > currentBatch.getDataInsertRowCount()) { - currentBatch.setDataInsertRowCount(insertEventCount); - } - - // only update the current batch after we have possibly - // "re-queried" if (extractTimeInMs > 0) { currentBatch.setExtractMillis(extractTimeInMs); } @@ -1149,8 +1138,19 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targe currentBatch.setTransformExtractMillis(transformTimeInMs); } - } - + if (currentBatch.getLoadId() > 0 && (currentBatch.getSummary() == null || !currentBatch.getSummary().startsWith(symmetricDialect.getTablePrefix()))) { + if (currentBatch.getExtractRowCount() != currentBatch.getDataRowCount()) { + currentBatch.setDataRowCount(currentBatch.getExtractRowCount()); + currentBatch.setDataInsertRowCount(currentBatch.getExtractInsertRowCount()); + } + ExtractRequest extractRequest = getExtractRequestForBatch(currentBatch); + if (extractRequest != null) { + sqlTemplate.update(getSql("updateExtractRequestStatus"), ExtractStatus.OK.name(), currentBatch.getExtractRowCount(), + currentBatch.getExtractMillis(), extractRequest.getRequestId()); + checkSendDeferredConstraints(extractRequest, null, targetNode); + } + } + } } return currentBatch; } @@ -1272,7 +1272,7 @@ protected ExtractDataReader buildExtractDataReader(Node sourceNode, Node targetN new SelectFromSymDataSource(currentBatch, sourceNode, targetNode, processInfo, containsBigLob)); } - protected Statistics getExtractStats(IDataWriter writer) { + protected Statistics getExtractStats(IDataWriter writer, OutgoingBatch currentBatch) { Map statisticsMap = null; if (writer instanceof TransformWriter) { statisticsMap = ((TransformWriter) writer).getNestedWriter().getStatistics(); @@ -1280,10 +1280,13 @@ protected Statistics getExtractStats(IDataWriter writer) { statisticsMap = writer.getStatistics(); } if (statisticsMap.size() > 0) { - return statisticsMap.values().iterator().next(); - } else { - return null; + for (Entry entry : statisticsMap.entrySet()) { + if (entry.getKey().getBatchId() == currentBatch.getBatchId()) { + return entry.getValue(); + } + } } + return null; } protected IDataWriter wrapWithTransformWriter(Node sourceNode, Node targetNode, ProcessInfo processInfo, IDataWriter dataWriter, @@ -2092,7 +2095,7 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status extractOutgoingBatch(processInfo, targetNode, multiBatchStagingWriter, firstBatch, false, false, ExtractMode.FOR_SYM_CLIENT, new ClusterLockRefreshListener(clusterService)); - checkSendDeferredConstraints(request, childRequests, targetNode, firstBatch); + checkSendDeferredConstraints(request, childRequests, targetNode); } else { log.info("Batches already had an OK status for request {} to extract table {} for batches {} through {} for node {}. Not extracting.", new Object[] { request.getRequestId(), request.getTableName(), request.getStartBatchId(), request.getEndBatchId(), request.getNodeId() }); @@ -2258,7 +2261,7 @@ public void releaseMissedExtractRequests() { } } - protected void checkSendDeferredConstraints(ExtractRequest request, List childRequests, Node targetNode, OutgoingBatch batch) { + protected void checkSendDeferredConstraints(ExtractRequest request, List childRequests, Node targetNode) { if (parameterService.is(ParameterConstants.INITIAL_LOAD_DEFER_CREATE_CONSTRAINTS, false)) { TableReloadRequest reloadRequest = dataService.getTableReloadRequest(request.getLoadId(), request.getTriggerId(), request.getRouterId()); if ((reloadRequest != null && reloadRequest.isCreateTable()) || diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index eed21bf0bf..927b9548d8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -189,7 +189,7 @@ public DataLoaderService(ISymmetricEngine engine) { this.stagingManager = engine.getStagingManager(); this.setSqlMap(new DataLoaderServiceSqlMap(platform, createSqlReplacementTokens())); extensionService = engine.getExtensionService(); - extensionService.addExtensionPoint(new DefaultDataLoaderFactory(parameterService)); + extensionService.addExtensionPoint(new DefaultDataLoaderFactory(engine)); extensionService.addExtensionPoint(new ConfigurationChangedDatabaseWriterFilter(engine)); this.nodeCommunicationService = engine.getNodeCommunicationService(); this.engine = engine; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index be2bb30479..7fc88215a2 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -680,7 +680,7 @@ public TableReloadStatus mapRow(Row rs) { protected long insertRequestedOutgoingBatches(ISqlTransaction transaction, Node targetNode, TriggerRouter triggerRouter, TriggerHistory triggerHistory, String overrideInitialLoadSelect, long loadId, String createBy, - String channelId, long rowsPerBatch, long batchCount) { + String channelId, long totalRows, long maxRowsPerBatch, long batchCount) { long startBatchId = 0; if (platform.supportsMultiThreadedTransactions()) { @@ -692,13 +692,17 @@ protected long insertRequestedOutgoingBatches(ISqlTransaction transaction, Node for (int i = 0; i < batchCount; i++) { long batchId = startBatchId + i; + long rowCount = totalRows; + if (rowCount > maxRowsPerBatch) { + rowCount = maxRowsPerBatch; + } OutgoingBatch batch = new OutgoingBatch(targetNode.getNodeId(), channelId, Status.RQ); batch.setBatchId(batchId); batch.setLoadId(loadId); batch.setCreateBy(createBy); batch.setLoadFlag(true); batch.incrementRowCount(DataEventType.RELOAD); - batch.setDataRowCount(rowsPerBatch); + batch.setDataRowCount(rowCount); batch.incrementTableCount(tableName); batch.setExtractJobFlag(true); engine.getOutgoingBatchService().insertOutgoingBatch(transaction, batch); @@ -712,6 +716,7 @@ protected long insertRequestedOutgoingBatches(ISqlTransaction transaction, Node long dataId = insertData(transaction, data); insertDataEvent(transaction, new DataEvent(dataId, batchId)); } + totalRows -= rowCount; } return startBatchId; @@ -1464,72 +1469,67 @@ private Map insertLoadBatchesForReload(Node targetNode, : triggerRouter.getInitialLoadSelect(); } } + + Table table = getTargetPlatform(triggerHistory.getSourceTableName()).getTableFromCache( + triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(), + triggerHistory.getSourceTableName(), false); - if (parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB)) { + if (table != null) { + processInfo.setCurrentTableName(table.getName()); Trigger trigger = triggerRouter.getTrigger(); String reloadChannel = getReloadChannelIdForTrigger(trigger, channels); Channel channel = channels.get(reloadChannel); - - Table table = getTargetPlatform(triggerHistory.getSourceTableName()).getTableFromCache( - triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(), - triggerHistory.getSourceTableName(), false); - - if (table != null) { - processInfo.setCurrentTableName(table.getName()); - - long rowCount = -1; - long parentRequestId = 0; - ExtractRequest parentRequest = requests.get(triggerHistory.getTriggerHistoryId()); - - if (parentRequest != null) { - Router router = engine.getTriggerRouterService().getRouterById(triggerRouter.getRouterId(), false); - if (router != null && router.getRouterType().equals("default")) { - parentRequestId = parentRequest.getRequestId(); - rowCount = parentRequest.getRows(); - } - } + long rowCount = -1; + long parentRequestId = 0; + ExtractRequest parentRequest = requests.get(triggerHistory.getTriggerHistoryId()); + + if (parentRequest != null) { + Router router = engine.getTriggerRouterService().getRouterById(triggerRouter.getRouterId(), false); + if (router != null && router.getRouterType().equals("default")) { + parentRequestId = parentRequest.getRequestId(); + rowCount = parentRequest.getRows(); + } + } - if (rowCount == -1) { - rowCount = getDataCountForReload(table, targetNode, selectSql); - } + if (rowCount == -1) { + rowCount = getDataCountForReload(table, targetNode, selectSql); + } - long transformMultiplier = getTransformMultiplier(table, triggerRouter); - - // calculate the number of batches needed for table. - long numberOfBatches = 1; + long transformMultiplier = getTransformMultiplier(table, triggerRouter); + long startBatchId = 0; + long numberOfBatches = 1; + if (parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB)) { if (rowCount > 0) { numberOfBatches = (long) Math.ceil((rowCount * transformMultiplier) / (channel.getMaxBatchSize() * 1f)); } - long startBatchId = insertRequestedOutgoingBatches(transaction, targetNode, triggerRouter, triggerHistory, selectSql, - loadId, createBy, reloadChannel, channel.getMaxBatchSize(), numberOfBatches); - long endBatchId = startBatchId + numberOfBatches - 1; + startBatchId = insertRequestedOutgoingBatches(transaction, targetNode, triggerRouter, triggerHistory, selectSql, + loadId, createBy, reloadChannel, rowCount, channel.getMaxBatchSize(), numberOfBatches); + } else { + startBatchId = insertReloadEvent(transaction, targetNode, triggerRouter, triggerHistory, + selectSql, true, loadId, createBy, Status.NE, null, -1); + } - firstBatchId = firstBatchId == 0 ? startBatchId : firstBatchId; - - if (table.getNameLowerCase().startsWith(symmetricDialect.getTablePrefix() + "_" + TableConstants.SYM_FILE_SNAPSHOT)) { - TableReloadStatus reloadStatus = getTableReloadStatusByLoadId(loadId); - firstBatchId = reloadStatus.getStartDataBatchId() > 0 ? reloadStatus.getStartDataBatchId() : firstBatchId; - } - - updateTableReloadStatusDataCounts(platform.supportsMultiThreadedTransactions() ? null : transaction, - loadId, firstBatchId, endBatchId, numberOfBatches, rowCount); + long endBatchId = startBatchId + numberOfBatches - 1; + firstBatchId = firstBatchId == 0 ? startBatchId : firstBatchId; - ExtractRequest request = engine.getDataExtractorService().requestExtractRequest(transaction, targetNode.getNodeId(), channel.getQueue(), - triggerRouter, startBatchId, endBatchId, loadId, table.getName(), rowCount, parentRequestId); - if (parentRequestId == 0) { - requests.put(triggerHistory.getTriggerHistoryId(), request); - } - } else { - log.warn("The table defined by trigger_hist row %d no longer exists. A load will not be queue'd up for the table", triggerHistory.getTriggerHistoryId()); - + if (table.getNameLowerCase().startsWith(symmetricDialect.getTablePrefix() + "_" + TableConstants.SYM_FILE_SNAPSHOT)) { + TableReloadStatus reloadStatus = getTableReloadStatusByLoadId(loadId); + firstBatchId = reloadStatus.getStartDataBatchId() > 0 ? reloadStatus.getStartDataBatchId() : firstBatchId; + } + + updateTableReloadStatusDataCounts(platform.supportsMultiThreadedTransactions() ? null : transaction, + loadId, firstBatchId, endBatchId, numberOfBatches, rowCount); + + ExtractRequest request = engine.getDataExtractorService().requestExtractRequest(transaction, targetNode.getNodeId(), channel.getQueue(), + triggerRouter, startBatchId, endBatchId, loadId, table.getName(), rowCount, parentRequestId); + if (parentRequestId == 0) { + requests.put(triggerHistory.getTriggerHistoryId(), request); } } else { - insertReloadEvent(transaction, targetNode, triggerRouter, triggerHistory, - selectSql, true, loadId, createBy, Status.NE, null, -1); + log.warn("The table defined by trigger_hist row %d no longer exists. A load will not be queue'd up for the table", triggerHistory.getTriggerHistoryId()); } - if (!transactional) { transaction.commit(); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java index 6c436bdfd0..8b38fe645d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java @@ -113,7 +113,7 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace + " from $(table_reload_status) " + " where target_node_id = ?"); - putSql("updateProcessedTableReloadRequest", "update $(table_reload_request) set last_update_time = ?, processed = 1 where load_id = ?"); + putSql("updateProcessedTableReloadRequest", "update $(table_reload_request) set last_update_time = ?, processed = 1 where load_id = ? and processed = 0"); putSql("cancelTableReloadRequest", "update $(table_reload_request) set last_update_time = ?, processed = 1 where source_node_id=? and target_node_id=? and trigger_id=? and router_id=? and create_time=?"); @@ -138,14 +138,14 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace putSql("updateTableReloadStatusDataLoaded", "update $(table_reload_status) " + " set completed = case when (" - + " data_batch_count = (case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end) and " - + " setup_batch_count = (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end) and " - + " finalize_batch_count = (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end)) " + + " data_batch_count <= (case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end) and " + + " setup_batch_count <= (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end) and " + + " finalize_batch_count <= (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end)) " + " then 1 else 0 end, " + " end_time = case when (" - + " data_batch_count = (case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end) and " - + " setup_batch_count = (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end) and " - + " finalize_batch_loaded = (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end)) " + + " data_batch_count <= (case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end) and " + + " setup_batch_count <= (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end) and " + + " finalize_batch_loaded <= (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end)) " + " then ? else end_time end, " + " data_batch_loaded = case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end, " + " setup_batch_loaded = case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end, " @@ -160,7 +160,7 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace putSql("updateTableReloadStatusCancelled", "update $(table_reload_status) set " + " cancelled = 1, completed = 1, end_time = ?, last_update_time = ? " - + " where load_id = ?"); + + " where load_id = ? and cancelled = 0 and completed = 0"); putSql("updateTableReloadStatusError", "update $(table_reload_status) set " + " error_flag = 1, sql_code = ?, sql_state = ?, sql_message = ? where load_id = ?"); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/InitialLoadService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/InitialLoadService.java index f96c1d02fd..ba9971e7f7 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/InitialLoadService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/InitialLoadService.java @@ -62,6 +62,8 @@ public class InitialLoadService extends AbstractService implements IInitialLoadS protected boolean syncTriggersBeforeInitialLoadAttempted = false; + protected int lastLoadCountToProcess; + public InitialLoadService(ISymmetricEngine engine) { super(engine.getParameterService(), engine.getSymmetricDialect()); @@ -203,9 +205,10 @@ protected void processTableRequestLoads(Node source, ProcessInfo processInfo) { int maxLoadCount = parameterService.getInt(ParameterConstants.INITIAL_LOAD_EXTRACT_THREAD_COUNT_PER_SERVER, 20); int activeLoadCount = engine.getDataService().getActiveTableReloadStatus().size(); - String maxLoadsReachedMessage = "Max initial/partial loads of {} are already active"; + int loadCountToProcess = loadsToProcess.size(); + if (activeLoadCount >= maxLoadCount) { - log.debug(maxLoadsReachedMessage, activeLoadCount); + logActiveLoadCount(activeLoadCount, loadCountToProcess); return; } @@ -227,7 +230,7 @@ protected void processTableRequestLoads(Node source, ProcessInfo processInfo) { } } - log.info("Found " + loadsToProcess.size() + " table reload requests to process."); + log.info("Found {} table reload requests to process.", loadCountToProcess); boolean streamToFile = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED, false); Map> requestsSplitByLoad = new HashMap>(); @@ -255,8 +258,9 @@ protected void processTableRequestLoads(Node source, ProcessInfo processInfo) { extractRequests = engine.getDataService().insertReloadEvents(targetNode, false, fullLoad, processInfo, activeHistories, triggerRouters, extractRequests); + loadCountToProcess--; if (++activeLoadCount >= maxLoadCount) { - log.debug(maxLoadsReachedMessage, activeLoadCount); + logActiveLoadCount(activeLoadCount, loadCountToProcess); return; } } else { @@ -305,14 +309,25 @@ protected void processTableRequestLoads(Node source, ProcessInfo processInfo) { extractRequests = engine.getDataService().insertReloadEvents(targetNode, false, entry.getValue(), processInfo, activeHistories, triggerRouters, extractRequests); + loadCountToProcess--; if (++activeLoadCount >= maxLoadCount) { - log.debug(maxLoadsReachedMessage, activeLoadCount); + logActiveLoadCount(activeLoadCount, loadCountToProcess); return; } } } } + protected void logActiveLoadCount(int activeLoadCount, int loadCountToProcess) { + String message = "Max outgoing loads of {} are active, while {} outgoing loads are pending"; + if (loadCountToProcess != lastLoadCountToProcess) { + log.warn(message, activeLoadCount, loadCountToProcess); + } else { + log.debug(message, activeLoadCount, loadCountToProcess); + } + lastLoadCountToProcess = loadCountToProcess; + } + protected List getTriggerRoutersForNodeGroup(Map> triggerRoutersByNodeGroup, String nodeGroupId) { List list = triggerRoutersByNodeGroup.get(nodeGroupId); if (list == null) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 4672d02a69..d08acd3a4d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -42,7 +42,7 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, putSql("cancelLoadBatchesSql", "update $(outgoing_batch) set ignore_count=1, status=case when sent_count > 0 then 'IG' else 'OK' end, " + - "error_flag=0, last_update_time=current_timestamp where load_id=?"); + "error_flag=0, last_update_time=current_timestamp where load_id=? and status not in ('OK','IG')"); putSql("insertOutgoingBatchSql", "insert into $(outgoing_batch) " diff --git a/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/raima/RaimaDatabasePlatform.java b/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/raima/RaimaDatabasePlatform.java index d9b19cb508..5dcc119b15 100644 --- a/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/raima/RaimaDatabasePlatform.java +++ b/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/raima/RaimaDatabasePlatform.java @@ -37,6 +37,7 @@ public class RaimaDatabasePlatform extends AbstractJdbcDatabasePlatform { public RaimaDatabasePlatform(DataSource dataSource, SqlTemplateSettings settings) { super(dataSource, settings); supportsTruncate = false; + getDatabaseInfo().setRequiresAutoCommitForDdl(true); } @Override diff --git a/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/raima/RaimaJdbcSqlTemplate.java b/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/raima/RaimaJdbcSqlTemplate.java index 87239985b2..d76130b940 100644 --- a/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/raima/RaimaJdbcSqlTemplate.java +++ b/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/raima/RaimaJdbcSqlTemplate.java @@ -26,7 +26,9 @@ import javax.sql.DataSource; import org.jumpmind.db.platform.DatabaseInfo; +import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.JdbcSqlTemplate; +import org.jumpmind.db.sql.JdbcSqlTransaction; import org.jumpmind.db.sql.SqlTemplateSettings; import org.jumpmind.db.sql.SymmetricLobHandler; @@ -52,5 +54,10 @@ protected int verifyArgType(Object arg, int argType) { public String getSelectLastInsertIdSql(String sequenceName) { return "select last_insert_id()"; } + + @Override + public ISqlTransaction startSqlTransaction() { + return new JdbcSqlTransaction(this, true); + } } diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/ServerSymmetricEngine.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/ServerSymmetricEngine.java index 000cfcb06f..d1dfaeeea6 100644 --- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/ServerSymmetricEngine.java +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/ServerSymmetricEngine.java @@ -88,7 +88,7 @@ protected void init() { this.uriHandlers = new ArrayList(); this.uriHandlers.add(new AckUriHandler(parameterService, acknowledgeService, - add(customInterceptors, authInterceptor))); + add(customInterceptors, authInterceptor, concurrencyInterceptor))); this.uriHandlers.add(new PingUriHandler(parameterService, customInterceptors)); this.uriHandlers .add(new InfoUriHandler(parameterService, nodeService, configurationService, customInterceptors)); diff --git a/symmetric-util/src/main/java/org/jumpmind/util/LogSummary.java b/symmetric-util/src/main/java/org/jumpmind/util/LogSummary.java index 70c7202c30..220e063eff 100644 --- a/symmetric-util/src/main/java/org/jumpmind/util/LogSummary.java +++ b/symmetric-util/src/main/java/org/jumpmind/util/LogSummary.java @@ -29,6 +29,7 @@ public class LogSummary implements Comparable { + @Expose private Level level; @Expose diff --git a/symmetric-wrapper/src/main/resources/symmetricds.initd b/symmetric-wrapper/src/main/resources/symmetricds.initd index 89729b147d..ce74f357bf 100644 --- a/symmetric-wrapper/src/main/resources/symmetricds.initd +++ b/symmetric-wrapper/src/main/resources/symmetricds.initd @@ -17,6 +17,7 @@ JAVA="${wrapper.java.command}" JARFILE="${wrapper.jarfile}" +SYM_OPTIONS="-Djava.io.tmpdir=${wrapper.home}/tmp" SYM_HOME="${wrapper.home}" export SYM_HOME RUN_AS_USER="${wrapper.run.as.user}" @@ -47,16 +48,16 @@ cd "$SYM_HOME" case "$1" in start) - "$JAVA" -jar "$JARFILE" start + "$JAVA" "$SYM_OPTIONS" -jar "$JARFILE" start ;; stop) - "$JAVA" -jar "$JARFILE" stop + "$JAVA" "$SYM_OPTIONS" -jar "$JARFILE" stop ;; restart) - "$JAVA" -jar "$JARFILE" restart + "$JAVA" "$SYM_OPTIONS" -jar "$JARFILE" restart ;; status) - "$JAVA" -jar "$JARFILE" status + "$JAVA" "$SYM_OPTIONS" -jar "$JARFILE" status ;; *) echo "Usage: `basename $0` {start|stop|restart|status}" diff --git a/symmetric-wrapper/src/main/resources/symmetricds.systemd b/symmetric-wrapper/src/main/resources/symmetricds.systemd index 63e7ad9e64..a51cd054b1 100644 --- a/symmetric-wrapper/src/main/resources/symmetricds.systemd +++ b/symmetric-wrapper/src/main/resources/symmetricds.systemd @@ -7,9 +7,9 @@ Type=forking Environment="SYM_HOME=${wrapper.home}" -ExecStart=${wrapper.java.command} -jar ${wrapper.jarfile} start -ExecStop=${wrapper.java.command} -jar ${wrapper.jarfile} stop -ExecReload=${wrapper.java.command} -jar ${wrapper.jarfile} restart +ExecStart=${wrapper.java.command} -Djava.io.tmpdir=${wrapper.home}/tmp -jar ${wrapper.jarfile} start +ExecStop=${wrapper.java.command} -Djava.io.tmpdir=${wrapper.home}/tmp -jar ${wrapper.jarfile} stop +ExecReload=${wrapper.java.command} -Djava.io.tmpdir=${wrapper.home}/tmp -jar ${wrapper.jarfile} restart Restart=no PIDFile=${wrapper.pidfile} User=${wrapper.run.as.user}