diff --git a/pom.xml b/pom.xml
index bcfa9708..ca6ae2d8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
Salesforce plugins
io.cdap.plugin
salesforce-plugins
- 1.3.0
+ 1.3.1
jar
@@ -509,6 +509,7 @@
*.rst
*.md
+ **/*.iml
**/*.cdap
**/*.yaml
**/*.md
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformer.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformer.java
index be6ec797..d26fd336 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformer.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformer.java
@@ -46,7 +46,12 @@ private void transformRecord(Schema schema, Map record, StructuredRec
private Object convertValue(String fieldName, Object value, Schema fieldSchema) {
if (fieldSchema.isNullable()) {
- return convertValue(fieldName, value, fieldSchema.getNonNullable());
+ return value == null ? null : convertValue(fieldName, value, fieldSchema.getNonNullable());
+ }
+
+ if (value == null) {
+ throw new RuntimeException(
+ String.format("Found null value for non nullable field %s", fieldName));
}
Schema.Type fieldSchemaType = fieldSchema.getType();
diff --git a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformerTest.java b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformerTest.java
index 172ed0e8..339df619 100644
--- a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformerTest.java
+++ b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformerTest.java
@@ -26,6 +26,7 @@
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -90,4 +91,30 @@ public void testTransform() {
Assert.assertEquals(ZonedDateTime.parse(arrayField.get(0).get("nested_timestamp"), DateTimeFormatter.ISO_DATE_TIME),
arrayRecord.get(0).getTimestamp("nested_timestamp", ZoneOffset.UTC));
}
+
+ @Test
+ public void testNullableFields() {
+ Schema schema = Schema.recordOf("output",
+ Schema.Field.of("string_field", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of("double_field", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))));
+ Map records = new HashMap<>();
+ records.put("string_field", null);
+ records.put("double_field", null);
+ MapToRecordTransformer recordTransformer = new MapToRecordTransformer();
+ StructuredRecord structuredRecord = recordTransformer.transform(schema, records);
+ Assert.assertNull(structuredRecord.get("string_field"));
+ Assert.assertNull(structuredRecord.get("double_field"));
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testNonNullableFieldWithNull() {
+ Schema schema = Schema.recordOf("output",
+ Schema.Field.of("string_field", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
+ Schema.Field.of("double_field", Schema.of(Schema.Type.DOUBLE)));
+ Map records = new HashMap<>();
+ records.put("string_field", "");
+ records.put("double_field", null);
+ MapToRecordTransformer recordTransformer = new MapToRecordTransformer();
+ recordTransformer.transform(schema, records);
+ }
}