From 2be8cb82deb9a198605ab6a30315897e2b5414c1 Mon Sep 17 00:00:00 2001 From: chenson42 Date: Mon, 22 Sep 2014 00:22:34 +0000 Subject: [PATCH] 0001942: PostGIS initial load fails unless you set initial.load.concat.csv.in.sql.enabled=true --- .../symmetric/db/AbstractTriggerTemplate.java | 21 ++++++++---------- .../service/impl/DataExtractorService.java | 22 ++++++++++++++----- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java index 808db1c219..d3803c8b91 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractTriggerTemplate.java @@ -124,19 +124,18 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table String sql = null; + String tableAlias = symmetricDialect.getInitialLoadTableAlias(); if (parameterService.is( ParameterConstants.INITIAL_LOAD_CONCAT_CSV_IN_SQL_ENABLED)) { sql = sqlTemplates.get(INITIAL_LOAD_SQL_TEMPLATE); - String columnsText = buildColumnsString(symmetricDialect.getInitialLoadTableAlias(), - symmetricDialect.getInitialLoadTableAlias(), "", columns, DataEventType.INSERT, + String columnsText = buildColumnsString(tableAlias, + tableAlias, "", columns, DataEventType.INSERT, false, channel, triggerRouter.getTrigger()).columnString; if (isNotBlank(textColumnExpression)) { columnsText = textColumnExpression.replace("$(columnName)", columnsText); } sql = FormatUtils.replace("columns", columnsText, sql); } else { - boolean dateTimeAsString = parameterService.is( - ParameterConstants.DATA_LOADER_TREAT_DATETIME_AS_VARCHAR); sql = "select $(columns) from $(schemaName)$(tableName) t where $(whereClause)"; StringBuilder columnList = new StringBuilder(); for (int i = 0; i < columns.length; i++) { @@ -149,16 +148,14 @@ public String createInitalLoadSql(Node node, TriggerRouter triggerRouter, Table columnList.append(","); } - String columnExpression = SymmetricUtils.quote(symmetricDialect, - column.getName()); - - if (dateTimeAsString && TypeMap.isDateTimeType(column.getMappedTypeCode())) { - columnExpression = castDatetimeColumnToString(column.getName()); - } else if (isNotBlank(textColumnExpression) && TypeMap.isTextType(column.getMappedTypeCode())) { - columnExpression = textColumnExpression.replace("$(columnName)", columnExpression) + " as " + column.getName(); + ColumnString columnString = fillOutColumnTemplate(tableAlias, tableAlias, + "", column, DataEventType.INSERT, false, channel, triggerRouter.getTrigger()); + String columnExpression = columnString.columnString; + if (isNotBlank(textColumnExpression) && TypeMap.isTextType(column.getMappedTypeCode())) { + columnExpression = textColumnExpression.replace("$(columnName)", columnExpression); } - columnList.append(columnExpression); + columnList.append(columnExpression).append(" as ").append(column.getName()); } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index f663386e43..0e1a930144 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -37,6 +37,7 @@ import org.apache.commons.lang.StringUtils; import org.jumpmind.db.io.DatabaseXmlUtil; +import org.jumpmind.db.model.Column; import org.jumpmind.db.model.Database; import org.jumpmind.db.model.Table; import org.jumpmind.db.platform.DatabaseNamesConstants; @@ -1594,17 +1595,26 @@ protected void startNewCursor(final TriggerHistory triggerHistory, configurationService.getChannel(triggerRouter.getTrigger().getChannelId()), overrideSelectSql); this.cursor = sqlTemplate.queryForCursor(initialLoadSql, new ISqlRowMapper() { - public Data mapRow(Row rs) { + public Data mapRow(Row row) { String csvRow = null; if (symmetricDialect.getParameterService().is( ParameterConstants.INITIAL_LOAD_CONCAT_CSV_IN_SQL_ENABLED)) { - csvRow = rs.stringValue(); + csvRow = row.stringValue(); } else { - String[] rowData = platform.getStringValues( - symmetricDialect.getBinaryEncoding(), sourceTable.getColumns(), rs, - false); - csvRow = CsvUtils.escapeCsvData(rowData, '\0', '"'); + StringBuilder concatedRow = new StringBuilder(); + Column[] columns = sourceTable.getColumns(); + int index = 0; + for (Column column : columns) { + if (index > 0) { + concatedRow.append(","); + } + String value = row.getString(column.getName()); + concatedRow.append(value == null ? "" : value); + index++; + } + csvRow = concatedRow.toString(); + } int commaCount = StringUtils.countMatches(csvRow, ","); if (expectedCommaCount <= commaCount) {