diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/KafkaWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/KafkaWriter.java index 6579ae161d..3d2583fe99 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/KafkaWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/KafkaWriter.java @@ -614,6 +614,7 @@ public void batchComplete(DataContext context) { public int writeKafka(CsvData data, Table table) { String[] rowData = data.getParsedData(CsvData.ROW_DATA); + String[] oldData = data.getParsedData(CsvData.OLD_DATA); if (data.getDataEventType() == DataEventType.DELETE) { rowData = data.getParsedData(CsvData.OLD_DATA); if (rowData == null) { @@ -651,11 +652,21 @@ public int writeKafka(CsvData data, Table table) { .append("\"data\": { "); // Let Gson escape the json values Gson gson = new Gson(); - for (int i = 0; i < table.getColumnNames().length; i++) { - kafkaText.append("\"").append(table.getColumnNames()[i]).append("\": "); - kafkaText.append(gson.toJson(rowData[i])); - if (i + 1 < table.getColumnNames().length) { - kafkaText.append(","); + if (oldData != null) { + for (int i = 0; i < table.getColumnCount(); i++) { + kafkaText.append("\"").append(table.getColumnNames()[i]).append("\": "); + kafkaText.append(gson.toJson(rowData[i])); + if (i + 1 < table.getColumnCount()) { + kafkaText.append(","); + } + } + } else { + for (int i = 0; i < table.getPrimaryKeyColumnCount(); i++) { + kafkaText.append("\"").append(table.getColumnNames()[i]).append("\": "); + kafkaText.append(gson.toJson(rowData[i])); + if (i + 1 < table.getPrimaryKeyColumnCount()) { + kafkaText.append(","); + } } } kafkaText.append(" } } }"); @@ -664,22 +675,42 @@ public int writeKafka(CsvData data, Table table) { // doubling the quote character kafkaText.append("\n\"TABLE\"").append(",\"").append(table.getName()).append("\",\"").append("EVENT").append("\",\"") .append(data.getDataEventType()).append("\","); - for (int i = 0; i < table.getColumnNames().length; i++) { - kafkaText.append("\"").append(StringUtils.replace(table.getColumnNames()[i], "\"", "\"\"")).append("\","); - if (rowData[i] != null) { - kafkaText.append("\"").append(StringUtils.replace(rowData[i], "\"", "\"\"")).append("\""); + if (oldData != null) { + for (int i = 0; i < table.getColumnNames().length; i++) { + kafkaText.append("\"").append(StringUtils.replace(table.getColumnNames()[i], "\"", "\"\"")).append("\","); + if (rowData[i] != null) { + kafkaText.append("\"").append(StringUtils.replace(rowData[i], "\"", "\"\"")).append("\""); + } + if (i + 1 < table.getColumnNames().length) { + kafkaText.append(","); + } } - if (i + 1 < table.getColumnNames().length) { - kafkaText.append(","); + } else { + for (int i = 0; i < table.getPrimaryKeyColumnCount(); i++) { + kafkaText.append("\"").append(StringUtils.replace(table.getPrimaryKeyColumnNames()[i], "\"", "\"\"")).append("\","); + if (rowData[i] != null) { + kafkaText.append("\"").append(StringUtils.replace(rowData[i], "\"", "\"\"")).append("\""); + } + if (i + 1 < table.getColumnNames().length) { + kafkaText.append(","); + } } } } else if (outputFormat.equals(KAFKA_FORMAT_XML)) { kafkaText.append(""); - for (int i = 0; i < table.getColumnNames().length; i++) { - kafkaText.append("") - .append(StringEscapeUtils.escapeXml11(rowData[i])).append(""); + if (oldData != null) { + for (int i = 0; i < table.getColumnNames().length; i++) { + kafkaText.append("") + .append(StringEscapeUtils.escapeXml11(rowData[i])).append(""); + } + } else { + for (int i = 0; i < table.getPrimaryKeyColumnCount(); i++) { + kafkaText.append("") + .append(StringEscapeUtils.escapeXml11(rowData[i])).append(""); + } } + kafkaText.append(""); } else if (outputFormat.equals(KAFKA_FORMAT_AVRO)) { if (confluentUrl != null) { @@ -689,22 +720,44 @@ public int writeKafka(CsvData data, Table table) { if (curClass != null) { Constructor defaultConstructor = curClass.getConstructor(); Object pojo = defaultConstructor.newInstance(); - for (int i = 0; i < table.getColumnNames().length; i++) { - String colName = getColumnName(table.getName(), table.getColumnNames()[i], pojo); - if (colName != null) { - Class propertyTypeClass = PropertyUtils.getPropertyType(pojo, colName); - if (CharSequence.class.equals(propertyTypeClass)) { - PropertyUtils.setSimpleProperty(pojo, colName, rowData[i]); - } else if (Long.class.equals(propertyTypeClass)) { - Date date = null; - try { - date = DateUtils.parseDate(rowData[i], parseDatePatterns); - } catch (Exception e) { - log.debug(rowData[i] + " was not a recognized date format so treating it as a long."); + if (oldData != null) { + for (int i = 0; i < table.getColumnNames().length; i++) { + String colName = getColumnName(table.getName(), table.getColumnNames()[i], pojo); + if (colName != null) { + Class propertyTypeClass = PropertyUtils.getPropertyType(pojo, colName); + if (CharSequence.class.equals(propertyTypeClass)) { + PropertyUtils.setSimpleProperty(pojo, colName, rowData[i]); + } else if (Long.class.equals(propertyTypeClass)) { + Date date = null; + try { + date = DateUtils.parseDate(rowData[i], parseDatePatterns); + } catch (Exception e) { + log.debug(rowData[i] + " was not a recognized date format so treating it as a long."); + } + BeanUtils.setProperty(pojo, colName, date != null ? date.getTime() : rowData[i]); + } else { + BeanUtils.setProperty(pojo, colName, rowData[i]); + } + } + } + } else { + for (int i = 0; i < table.getPrimaryKeyColumnCount(); i++) { + String colName = getColumnName(table.getName(), table.getPrimaryKeyColumnNames()[i], pojo); + if (colName != null) { + Class propertyTypeClass = PropertyUtils.getPropertyType(pojo, colName); + if (CharSequence.class.equals(propertyTypeClass)) { + PropertyUtils.setSimpleProperty(pojo, colName, rowData[i]); + } else if (Long.class.equals(propertyTypeClass)) { + Date date = null; + try { + date = DateUtils.parseDate(rowData[i], parseDatePatterns); + } catch (Exception e) { + log.debug(rowData[i] + " was not a recognized date format so treating it as a long."); + } + BeanUtils.setProperty(pojo, colName, date != null ? date.getTime() : rowData[i]); + } else { + BeanUtils.setProperty(pojo, colName, rowData[i]); } - BeanUtils.setProperty(pojo, colName, date != null ? date.getTime() : rowData[i]); - } else { - BeanUtils.setProperty(pojo, colName, rowData[i]); } } }