Skip to content

Commit

Permalink
0005811: When Capture of Old Data is disabled, Deletes get an Index Out
Browse files Browse the repository at this point in the history
Of Bounds Error with Kafka
  • Loading branch information
jakobvanmeter committed Apr 27, 2023
1 parent f1b498c commit 5769814
Showing 1 changed file with 82 additions and 29 deletions.
Expand Up @@ -614,6 +614,7 @@ public void batchComplete(DataContext context) {

public int writeKafka(CsvData data, Table table) {
String[] rowData = data.getParsedData(CsvData.ROW_DATA);
String[] oldData = data.getParsedData(CsvData.OLD_DATA);
if (data.getDataEventType() == DataEventType.DELETE) {
rowData = data.getParsedData(CsvData.OLD_DATA);
if (rowData == null) {
Expand Down Expand Up @@ -651,11 +652,21 @@ public int writeKafka(CsvData data, Table table) {
.append("\"data\": { ");
// Let Gson escape the json values
Gson gson = new Gson();
for (int i = 0; i < table.getColumnNames().length; i++) {
kafkaText.append("\"").append(table.getColumnNames()[i]).append("\": ");
kafkaText.append(gson.toJson(rowData[i]));
if (i + 1 < table.getColumnNames().length) {
kafkaText.append(",");
if (oldData != null) {
for (int i = 0; i < table.getColumnCount(); i++) {
kafkaText.append("\"").append(table.getColumnNames()[i]).append("\": ");
kafkaText.append(gson.toJson(rowData[i]));
if (i + 1 < table.getColumnCount()) {
kafkaText.append(",");
}
}
} else {
for (int i = 0; i < table.getPrimaryKeyColumnCount(); i++) {
kafkaText.append("\"").append(table.getColumnNames()[i]).append("\": ");
kafkaText.append(gson.toJson(rowData[i]));
if (i + 1 < table.getPrimaryKeyColumnCount()) {
kafkaText.append(",");
}
}
}
kafkaText.append(" } } }");
Expand All @@ -664,22 +675,42 @@ public int writeKafka(CsvData data, Table table) {
// doubling the quote character
kafkaText.append("\n\"TABLE\"").append(",\"").append(table.getName()).append("\",\"").append("EVENT").append("\",\"")
.append(data.getDataEventType()).append("\",");
for (int i = 0; i < table.getColumnNames().length; i++) {
kafkaText.append("\"").append(StringUtils.replace(table.getColumnNames()[i], "\"", "\"\"")).append("\",");
if (rowData[i] != null) {
kafkaText.append("\"").append(StringUtils.replace(rowData[i], "\"", "\"\"")).append("\"");
if (oldData != null) {
for (int i = 0; i < table.getColumnNames().length; i++) {
kafkaText.append("\"").append(StringUtils.replace(table.getColumnNames()[i], "\"", "\"\"")).append("\",");
if (rowData[i] != null) {
kafkaText.append("\"").append(StringUtils.replace(rowData[i], "\"", "\"\"")).append("\"");
}
if (i + 1 < table.getColumnNames().length) {
kafkaText.append(",");
}
}
if (i + 1 < table.getColumnNames().length) {
kafkaText.append(",");
} else {
for (int i = 0; i < table.getPrimaryKeyColumnCount(); i++) {
kafkaText.append("\"").append(StringUtils.replace(table.getPrimaryKeyColumnNames()[i], "\"", "\"\"")).append("\",");
if (rowData[i] != null) {
kafkaText.append("\"").append(StringUtils.replace(rowData[i], "\"", "\"\"")).append("\"");
}
if (i + 1 < table.getColumnNames().length) {
kafkaText.append(",");
}
}
}
} else if (outputFormat.equals(KAFKA_FORMAT_XML)) {
kafkaText.append("<row entity=\"").append(StringEscapeUtils.escapeXml11(table.getName())).append("\"").append(" dml=\"")
.append(data.getDataEventType()).append("\">");
for (int i = 0; i < table.getColumnNames().length; i++) {
kafkaText.append("<data key=\"").append(StringEscapeUtils.escapeXml11(table.getColumnNames()[i])).append("\">")
.append(StringEscapeUtils.escapeXml11(rowData[i])).append("</data>");
if (oldData != null) {
for (int i = 0; i < table.getColumnNames().length; i++) {
kafkaText.append("<data key=\"").append(StringEscapeUtils.escapeXml11(table.getColumnNames()[i])).append("\">")
.append(StringEscapeUtils.escapeXml11(rowData[i])).append("</data>");
}
} else {
for (int i = 0; i < table.getPrimaryKeyColumnCount(); i++) {
kafkaText.append("<data key=\"").append(StringEscapeUtils.escapeXml11(table.getPrimaryKeyColumnNames()[i])).append("\">")
.append(StringEscapeUtils.escapeXml11(rowData[i])).append("</data>");
}
}

kafkaText.append("</row>");
} else if (outputFormat.equals(KAFKA_FORMAT_AVRO)) {
if (confluentUrl != null) {
Expand All @@ -689,22 +720,44 @@ public int writeKafka(CsvData data, Table table) {
if (curClass != null) {
Constructor<?> defaultConstructor = curClass.getConstructor();
Object pojo = defaultConstructor.newInstance();
for (int i = 0; i < table.getColumnNames().length; i++) {
String colName = getColumnName(table.getName(), table.getColumnNames()[i], pojo);
if (colName != null) {
Class<?> propertyTypeClass = PropertyUtils.getPropertyType(pojo, colName);
if (CharSequence.class.equals(propertyTypeClass)) {
PropertyUtils.setSimpleProperty(pojo, colName, rowData[i]);
} else if (Long.class.equals(propertyTypeClass)) {
Date date = null;
try {
date = DateUtils.parseDate(rowData[i], parseDatePatterns);
} catch (Exception e) {
log.debug(rowData[i] + " was not a recognized date format so treating it as a long.");
if (oldData != null) {
for (int i = 0; i < table.getColumnNames().length; i++) {
String colName = getColumnName(table.getName(), table.getColumnNames()[i], pojo);
if (colName != null) {
Class<?> propertyTypeClass = PropertyUtils.getPropertyType(pojo, colName);
if (CharSequence.class.equals(propertyTypeClass)) {
PropertyUtils.setSimpleProperty(pojo, colName, rowData[i]);
} else if (Long.class.equals(propertyTypeClass)) {
Date date = null;
try {
date = DateUtils.parseDate(rowData[i], parseDatePatterns);
} catch (Exception e) {
log.debug(rowData[i] + " was not a recognized date format so treating it as a long.");
}
BeanUtils.setProperty(pojo, colName, date != null ? date.getTime() : rowData[i]);
} else {
BeanUtils.setProperty(pojo, colName, rowData[i]);
}
}
}
} else {
for (int i = 0; i < table.getPrimaryKeyColumnCount(); i++) {
String colName = getColumnName(table.getName(), table.getPrimaryKeyColumnNames()[i], pojo);
if (colName != null) {
Class<?> propertyTypeClass = PropertyUtils.getPropertyType(pojo, colName);
if (CharSequence.class.equals(propertyTypeClass)) {
PropertyUtils.setSimpleProperty(pojo, colName, rowData[i]);
} else if (Long.class.equals(propertyTypeClass)) {
Date date = null;
try {
date = DateUtils.parseDate(rowData[i], parseDatePatterns);
} catch (Exception e) {
log.debug(rowData[i] + " was not a recognized date format so treating it as a long.");
}
BeanUtils.setProperty(pojo, colName, date != null ? date.getTime() : rowData[i]);
} else {
BeanUtils.setProperty(pojo, colName, rowData[i]);
}
BeanUtils.setProperty(pojo, colName, date != null ? date.getTime() : rowData[i]);
} else {
BeanUtils.setProperty(pojo, colName, rowData[i]);
}
}
}
Expand Down

0 comments on commit 5769814

Please sign in to comment.