diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformer.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformer.java index 363460fa..351a817b 100644 --- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformer.java +++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformer.java @@ -52,7 +52,12 @@ private void transformRecord(Schema schema, Map record, StructuredRec private Object convertValue(String fieldName, Object value, Schema fieldSchema) { if (fieldSchema.isNullable()) { - return convertValue(fieldName, value, fieldSchema.getNonNullable()); + return value == null ? null : convertValue(fieldName, value, fieldSchema.getNonNullable()); + } + + if (value == null) { + throw new RuntimeException( + String.format("Found null value for non nullable field %s", fieldName)); } Schema.Type fieldSchemaType = fieldSchema.getType(); diff --git a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformerTest.java b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformerTest.java index b1db56e2..31a0def9 100644 --- a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformerTest.java +++ b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/MapToRecordTransformerTest.java @@ -26,6 +26,7 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -92,4 +93,30 @@ public void testTransform() { Assert.assertEquals(ZonedDateTime.parse(arrayField.get(0).get("nested_timestamp"), DateTimeFormatter.ISO_DATE_TIME), arrayRecord.get(0).getTimestamp("nested_timestamp", ZoneOffset.UTC)); } + + @Test + public void testNullableFields() { + Schema schema = Schema.recordOf("output", + Schema.Field.of("string_field", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("double_field", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE)))); + Map records = new HashMap<>(); + records.put("string_field", null); + records.put("double_field", null); + MapToRecordTransformer recordTransformer = new MapToRecordTransformer(); + StructuredRecord structuredRecord = recordTransformer.transform(schema, records); + Assert.assertNull(structuredRecord.get("string_field")); + Assert.assertNull(structuredRecord.get("double_field")); + } + + @Test(expected = RuntimeException.class) + public void testNonNullableFieldWithNull() { + Schema schema = Schema.recordOf("output", + Schema.Field.of("string_field", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("double_field", Schema.of(Schema.Type.DOUBLE))); + Map records = new HashMap<>(); + records.put("string_field", ""); + records.put("double_field", null); + MapToRecordTransformer recordTransformer = new MapToRecordTransformer(); + recordTransformer.transform(schema, records); + } } diff --git a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReaderTest.java b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReaderTest.java index adae2e8d..06542eb1 100644 --- a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReaderTest.java +++ b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBulkRecordReaderTest.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableMap; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.salesforce.SalesforceSchemaUtil; import org.junit.Assert; import org.junit.Test; @@ -86,7 +87,9 @@ public void testUTF8InKeysAndValues() throws Exception { Schema schema = Schema.recordOf("output", Schema.Field.of("Id", Schema.of(Schema.Type.STRING)), - Schema.Field.of("IsDeleted\u0628\u0633\u0645", Schema.of(Schema.Type.BOOLEAN)), + Schema.Field + .of(SalesforceSchemaUtil.normalizeAvroName("IsDeleted\u0628\u0633\u0645"), + Schema.of(Schema.Type.BOOLEAN)), Schema.Field.of("ExpectedRevenue", Schema.of(Schema.Type.DOUBLE)), Schema.Field.of("LastModifiedDate", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)), Schema.Field.of("CloseDate", Schema.of(Schema.LogicalType.DATE)) @@ -95,7 +98,7 @@ public void testUTF8InKeysAndValues() throws Exception { List> expectedRecords = new ImmutableList.Builder>() .add(new ImmutableMap.Builder() .put("Id", "0061i000003XNcBAAW\u0628\u0633\u0645") - .put("IsDeleted\u0628\u0633\u0645", false) + .put(SalesforceSchemaUtil.normalizeAvroName("IsDeleted\u0628\u0633\u0645"), false) .put("ExpectedRevenue", 1500.0) .put("LastModifiedDate", 1550819001000000L) .put("CloseDate", 17897) @@ -103,7 +106,7 @@ public void testUTF8InKeysAndValues() throws Exception { ) .add(new ImmutableMap.Builder() .put("Id", "0061i000003XNcCAAW") - .put("IsDeleted\u0628\u0633\u0645", false) + .put(SalesforceSchemaUtil.normalizeAvroName("IsDeleted\u0628\u0633\u0645"), false) .put("ExpectedRevenue", 112500.0) .put("LastModifiedDate", 1550819001000000L) .put("CloseDate", 17885) @@ -111,7 +114,7 @@ public void testUTF8InKeysAndValues() throws Exception { ) .add(new ImmutableMap.Builder() .put("Id", "0061i000003XNcDAAW") - .put("IsDeleted\u0628\u0633\u0645", false) + .put(SalesforceSchemaUtil.normalizeAvroName("IsDeleted\u0628\u0633\u0645"), false) .put("ExpectedRevenue", 220000.0) .put("LastModifiedDate", 1550819001000000L) .put("CloseDate", 17850)