diff --git a/pom.xml b/pom.xml
index 37952561..6321a105 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,13 +30,13 @@
true
UTF-8
- 6.0.0
+ 6.1.0-SNAPSHOT
4.12
2.6
2.6
2.3.0
2.1.3
- 2.2.0
+ 2.3.0-SNAPSHOT
3.8.1
45.0.0
4.0.0
@@ -428,8 +428,8 @@
1.1.0
- system:cdap-data-pipeline[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT)
- system:cdap-data-streams[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT)
+ system:cdap-data-pipeline[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)
+ system:cdap-data-streams[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)
diff --git a/src/main/java/io/cdap/plugin/salesforce/InvalidConfigException.java b/src/main/java/io/cdap/plugin/salesforce/InvalidConfigException.java
new file mode 100644
index 00000000..7ee5d402
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/salesforce/InvalidConfigException.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.salesforce;
+
+/**
+ * Represents invalid config exception.
+ */
+public class InvalidConfigException extends RuntimeException {
+ private final String property;
+
+ public InvalidConfigException(String message, String property) {
+ super(message);
+ this.property = property;
+ }
+
+ public InvalidConfigException(String message, Throwable cause, String property) {
+ super(message, cause);
+ this.property = property;
+ }
+
+ public String getProperty() {
+ return property;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/salesforce/SalesforceSchemaUtil.java b/src/main/java/io/cdap/plugin/salesforce/SalesforceSchemaUtil.java
index 3c69aca9..1ecec724 100644
--- a/src/main/java/io/cdap/plugin/salesforce/SalesforceSchemaUtil.java
+++ b/src/main/java/io/cdap/plugin/salesforce/SalesforceSchemaUtil.java
@@ -22,6 +22,7 @@
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.ws.ConnectionException;
import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import java.util.ArrayList;
@@ -88,20 +89,28 @@ public static Schema getSchema(AuthenticatorCredentials credentials, SObjectDesc
* Validates that fields from given CDAP schema are of supported schema.
*
* @param schema CDAP schema
+ * @param collector that collects failures
*/
- public static void validateFieldSchemas(Schema schema) {
+ public static void validateFieldSchemas(Schema schema, FailureCollector collector) {
for (Schema.Field field : Objects.requireNonNull(schema.getFields(), "Schema must have fields")) {
Schema fieldSchema = field.getSchema();
fieldSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema;
+
if (!SUPPORTED_TYPES.contains(fieldSchema.getType())) {
- throw new IllegalArgumentException(
- String.format("Field '%s' is of unsupported type '%s'", field.getName(), fieldSchema.getType()));
+ collector.addFailure(
+ String.format("Field '%s' is of unsupported type '%s'.", field.getName(), fieldSchema.getDisplayName()),
+ String.format("Supported types are: '%s'", SUPPORTED_TYPES.stream().map(Enum::name)
+ .collect(Collectors.joining(", "))))
+ .withOutputSchemaField(field.getName()).withInputSchemaField(field.getName());
}
Schema.LogicalType logicalType = fieldSchema.getLogicalType();
if (logicalType != null && !SUPPORTED_LOGICAL_TYPES.contains(logicalType)) {
- throw new IllegalArgumentException(
- String.format("Field '%s' is of unsupported type '%s'", field.getName(), logicalType.getToken()));
+ collector.addFailure(
+ String.format("Field '%s' is of unsupported type '%s'.", field.getName(), fieldSchema.getDisplayName()),
+ String.format("Supported types are: '%s'", SUPPORTED_LOGICAL_TYPES.stream().map(Enum::name)
+ .collect(Collectors.joining(", "))))
+ .withOutputSchemaField(field.getName()).withInputSchemaField(field.getName());
}
}
}
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/BaseSalesforceConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/BaseSalesforceConfig.java
index d97e824f..8488f388 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/BaseSalesforceConfig.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/BaseSalesforceConfig.java
@@ -19,6 +19,7 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.common.ReferencePluginConfig;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.SalesforceConstants;
@@ -84,8 +85,14 @@ public String getLoginUrl() {
return loginUrl;
}
- public void validate() {
- validateConnection();
+ public void validate(FailureCollector collector) {
+ try {
+ validateConnection();
+ } catch (Exception e) {
+ collector.addFailure("Error encountered while establishing connection: " + e.getMessage(), null)
+ .withStacktrace(e.getStackTrace());
+ }
+ collector.getOrThrowException();
}
public AuthenticatorCredentials getAuthenticatorCredentials() {
@@ -115,7 +122,7 @@ private void validateConnection() {
try {
SalesforceConnectionUtil.getPartnerConnection(this.getAuthenticatorCredentials());
} catch (ConnectionException e) {
- throw new RuntimeException("There was issue communicating with Salesforce", e);
+ throw new RuntimeException("There was issue communicating with Salesforce. " + e.getMessage(), e);
}
}
}
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java
index c750152e..110926cd 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceBatchSink.java
@@ -24,7 +24,9 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
+import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
+import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
@@ -57,13 +59,16 @@ public SalesforceBatchSink(SalesforceSinkConfig config) throws ConnectionExcepti
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
- config.validate(pipelineConfigurer.getStageConfigurer().getInputSchema());
+ StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
+ config.validate(stageConfigurer.getInputSchema(), stageConfigurer.getFailureCollector());
}
@Override
public void prepareRun(BatchSinkContext context) {
Schema inputSchema = context.getInputSchema();
- config.validate(inputSchema);
+ FailureCollector collector = context.getFailureCollector();
+ config.validate(inputSchema, collector);
+ collector.getOrThrowException();
context.addOutput(Output.of(config.referenceName, new SalesforceOutputFormatProvider(config)));
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java
index 1464eef4..2d78f113 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java
@@ -24,8 +24,9 @@
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.schema.Schema;
-import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
+import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.validation.InvalidStageException;
+import io.cdap.plugin.salesforce.InvalidConfigException;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SObjectsDescribeResult;
import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
@@ -128,8 +129,8 @@ public OperationEnum getOperationEnum() {
try {
return OperationEnum.valueOf(operation.toLowerCase());
} catch (IllegalArgumentException ex) {
- throw new InvalidConfigPropertyException("Unsupported value for operation: " + operation,
- SalesforceSinkConfig.PROPERTY_OPERATION);
+ throw new InvalidConfigException("Unsupported value for operation: " + operation,
+ SalesforceSinkConfig.PROPERTY_OPERATION);
}
}
@@ -141,8 +142,8 @@ public Long getMaxBytesPerBatch() {
try {
return Long.parseLong(maxBytesPerBatch);
} catch (NumberFormatException ex) {
- throw new InvalidConfigPropertyException("Unsupported value for maxBytesPerBatch: " + maxBytesPerBatch,
- SalesforceSinkConfig.PROPERTY_MAX_BYTES_PER_BATCH);
+ throw new InvalidConfigException("Unsupported value for maxBytesPerBatch: " + maxBytesPerBatch,
+ SalesforceSinkConfig.PROPERTY_MAX_BYTES_PER_BATCH);
}
}
@@ -150,28 +151,36 @@ public Long getMaxRecordsPerBatch() {
try {
return Long.parseLong(maxRecordsPerBatch);
} catch (NumberFormatException ex) {
- throw new InvalidConfigPropertyException("Unsupported value for maxRecordsPerBatch: " + maxRecordsPerBatch,
- SalesforceSinkConfig.PROPERTY_MAX_RECORDS_PER_BATCH);
+ throw new InvalidConfigException("Unsupported value for maxRecordsPerBatch: " + maxRecordsPerBatch,
+ SalesforceSinkConfig.PROPERTY_MAX_RECORDS_PER_BATCH);
}
}
public ErrorHandling getErrorHandling() {
return ErrorHandling.fromValue(errorHandling)
- .orElseThrow(() -> new InvalidConfigPropertyException("Unsupported error handling value: " + errorHandling,
- SalesforceSinkConfig.PROPERTY_ERROR_HANDLING));
+ .orElseThrow(() -> new InvalidConfigException("Unsupported error handling value: " + errorHandling,
+ SalesforceSinkConfig.PROPERTY_ERROR_HANDLING));
}
- public void validate(Schema schema) {
- super.validate();
+ public void validate(Schema schema, FailureCollector collector) {
+ super.validate(collector);
if (!containsMacro(PROPERTY_ERROR_HANDLING)) {
// triggering getter will also trigger value validity check
- getErrorHandling();
+ try {
+ getErrorHandling();
+ } catch (InvalidConfigException e) {
+ collector.addFailure(e.getMessage(), null).withConfigProperty(PROPERTY_ERROR_HANDLING);
+ }
}
if (!containsMacro(PROPERTY_OPERATION)) {
// triggering getter will also trigger value validity check
- getOperationEnum();
+ try {
+ getOperationEnum();
+ } catch (InvalidConfigException e) {
+ collector.addFailure(e.getMessage(), null).withConfigProperty(PROPERTY_OPERATION);
+ }
}
if (!containsMacro(PROPERTY_MAX_BYTES_PER_BATCH)) {
@@ -181,8 +190,7 @@ public void validate(Schema schema) {
String errorMessage = String.format(
"Unsupported value for maxBytesPerBatch: %d. Value should be between 1 and %d",
maxBytesPerBatch, MAX_BYTES_PER_BATCH_LIMIT);
-
- throw new InvalidConfigPropertyException(errorMessage, SalesforceSinkConfig.PROPERTY_MAX_BYTES_PER_BATCH);
+ collector.addFailure(errorMessage, null).withConfigProperty(PROPERTY_MAX_BYTES_PER_BATCH);
}
}
@@ -194,18 +202,18 @@ public void validate(Schema schema) {
String errorMessage = String.format(
"Unsupported value for maxRecordsPerBatch: %d. Value should be between 1 and %d",
maxRecordsPerBatch, MAX_RECORDS_PER_BATCH_LIMIT);
-
- throw new InvalidConfigPropertyException(errorMessage, SalesforceSinkConfig.PROPERTY_MAX_RECORDS_PER_BATCH);
+ collector.addFailure(errorMessage, null).withConfigProperty(PROPERTY_MAX_RECORDS_PER_BATCH);
}
}
-
- validateSchema(schema);
+ collector.getOrThrowException();
+ validateSchema(schema, collector);
}
- private void validateSchema(Schema schema) {
+ private void validateSchema(Schema schema, FailureCollector collector) {
List fields = schema.getFields();
if (fields == null || fields.isEmpty()) {
- throw new InvalidStageException("Sink schema must contain at least one field");
+ collector.addFailure("Sink schema must contain at least one field", null);
+ throw collector.getOrThrowException();
}
if (!canAttemptToEstablishConnection() || containsMacro(PROPERTY_SOBJECT)
@@ -213,7 +221,7 @@ private void validateSchema(Schema schema) {
return;
}
- SObjectsDescribeResult describeResult = getSObjectDescribeResult();
+ SObjectsDescribeResult describeResult = getSObjectDescribeResult(collector);
Set creatableSObjectFields = getCreatableSObjectFields(describeResult);
Set inputFields = schema.getFields()
@@ -234,38 +242,40 @@ private void validateSchema(Schema schema) {
externalIdFieldName = SALESFORCE_ID_FIELD;
break;
default:
- throw new InvalidConfigPropertyException("Unsupported value for operation: " + operation,
- SalesforceSinkConfig.PROPERTY_OPERATION);
+ collector.addFailure("Unsupported value for operation: " + operation, null)
+ .withConfigProperty(PROPERTY_OPERATION);
}
if (operation == OperationEnum.upsert) {
Field externalIdField = describeResult.getField(sObject, externalIdFieldName);
if (externalIdField == null) {
- throw new InvalidConfigPropertyException(
- String.format("SObject '%s' does not contain external id field '%s'", sObject, externalIdFieldName),
- SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
+ collector.addFailure(
+ String.format("SObject '%s' does not contain external id field '%s'", sObject, externalIdFieldName), null)
+ .withConfigProperty(SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
} else if (!externalIdField.isExternalId() && !externalIdField.getName().equals(SALESFORCE_ID_FIELD)) {
- throw new InvalidConfigPropertyException(
- String.format("Field '%s' is not configured as external id in Salesforce", externalIdFieldName),
- SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
+ collector.addFailure(
+ String.format("Field '%s' is not configured as external id in Salesforce", externalIdFieldName), null)
+ .withConfigProperty(SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
}
- } else {
+ } else if (operation == OperationEnum.insert || operation == OperationEnum.update) {
if (!Strings.isNullOrEmpty(getExternalIdField())) {
- throw new InvalidConfigPropertyException(
- String.format("External id field must not be set for operation='%s'", operation),
- SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
+ collector.addFailure(String.format("External id field must not be set for operation='%s'", operation), null)
+ .withConfigProperty(SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
}
}
if (externalIdFieldName != null && !inputFields.remove(externalIdFieldName)) {
- throw new InvalidStageException(String.format("Schema must contain external id field '%s'", externalIdFieldName));
+ collector.addFailure(String.format("Schema must contain external id field '%s'", externalIdFieldName), null)
+ .withConfigProperty(SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
}
inputFields.removeAll(creatableSObjectFields);
if (!inputFields.isEmpty()) {
- throw new InvalidStageException(String.format("Following schema fields: '%s' are not present " +
- "or not creatable in target Salesforce sObject '%s'",
- String.join(",", inputFields), this.getSObject()));
+ for (String inputField : inputFields) {
+ collector.addFailure(
+ String.format("Field '%s' is not present or not creatable in target Salesforce sObject.", inputField), null)
+ .withInputSchemaField(inputField);
+ }
}
}
@@ -280,16 +290,17 @@ private Set getCreatableSObjectFields(SObjectsDescribeResult describeRes
return creatableSObjectFields;
}
- private SObjectsDescribeResult getSObjectDescribeResult() {
+ private SObjectsDescribeResult getSObjectDescribeResult(FailureCollector collector) {
AuthenticatorCredentials credentials = this.getAuthenticatorCredentials();
try {
PartnerConnection partnerConnection = new PartnerConnection(Authenticator.createConnectorConfig(credentials));
SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromName(this.getSObject(),
this.getAuthenticatorCredentials());
return SObjectsDescribeResult.of(partnerConnection,
- sObjectDescriptor.getName(), sObjectDescriptor.getFeaturedSObjects());
+ sObjectDescriptor.getName(), sObjectDescriptor.getFeaturedSObjects());
} catch (ConnectionException e) {
- throw new InvalidStageException("There was issue communicating with Salesforce", e);
+ collector.addFailure("There was issue communicating with Salesforce", null).withStacktrace(e.getStackTrace());
+ throw collector.getOrThrowException();
}
}
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBaseSourceConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBaseSourceConfig.java
index edfdcd1f..c191a17f 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBaseSourceConfig.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBaseSourceConfig.java
@@ -20,7 +20,8 @@
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.schema.Schema;
-import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.plugin.salesforce.InvalidConfigException;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SObjectFilterDescriptor;
import io.cdap.plugin.salesforce.SalesforceConstants;
@@ -109,11 +110,27 @@ public String getDatetimeBefore() {
return datetimeBefore;
}
- protected void validateFilters() {
- validateIntervalFilterProperty(SalesforceSourceConstants.PROPERTY_DATETIME_AFTER, getDatetimeAfter());
- validateIntervalFilterProperty(SalesforceSourceConstants.PROPERTY_DATETIME_BEFORE, getDatetimeBefore());
- validateRangeFilterProperty(SalesforceSourceConstants.PROPERTY_DURATION, getDuration());
- validateRangeFilterProperty(SalesforceSourceConstants.PROPERTY_OFFSET, getOffset());
+ protected void validateFilters(FailureCollector collector) {
+ try {
+ validateIntervalFilterProperty(SalesforceSourceConstants.PROPERTY_DATETIME_AFTER, getDatetimeAfter());
+ } catch (InvalidConfigException e) {
+ collector.addFailure(e.getMessage(), null).withConfigProperty(e.getProperty());
+ }
+ try {
+ validateIntervalFilterProperty(SalesforceSourceConstants.PROPERTY_DATETIME_BEFORE, getDatetimeBefore());
+ } catch (InvalidConfigException e) {
+ collector.addFailure(e.getMessage(), null).withConfigProperty(e.getProperty());
+ }
+ try {
+ validateRangeFilterProperty(SalesforceSourceConstants.PROPERTY_DURATION, getDuration());
+ } catch (InvalidConfigException e) {
+ collector.addFailure(e.getMessage(), null).withConfigProperty(e.getProperty());
+ }
+ try {
+ validateRangeFilterProperty(SalesforceSourceConstants.PROPERTY_OFFSET, getOffset());
+ } catch (InvalidConfigException e) {
+ collector.addFailure(e.getMessage(), null).withConfigProperty(e.getProperty());
+ }
}
/**
@@ -179,7 +196,7 @@ private void validateIntervalFilterProperty(String propertyName, String datetime
try {
parseDatetime(datetime);
} catch (DateTimeParseException e) {
- throw new InvalidConfigPropertyException(
+ throw new InvalidConfigException(
String.format("Invalid SObject '%s' value: '%s'. Value must be in Salesforce Date Formats. For example, "
+ "2019-01-01T23:01:01Z", propertyName, datetime), propertyName);
}
@@ -194,7 +211,7 @@ private void validateRangeFilterProperty(String propertyName, Map extractRangeValue(String propertyName, String r
keyValue -> parseUnitType(propertyName, keyValue[1]),
keyValue -> parseUnitValue(propertyName, keyValue[0]),
(o, n) -> {
- throw new InvalidConfigPropertyException(
+ throw new InvalidConfigException(
String.format("'%s' has duplicate unit types '%s'",
propertyName, rangeValue), propertyName);
}
@@ -221,7 +238,7 @@ private Map extractRangeValue(String propertyName, String r
private void validateUnitKeyValue(String propertyName, String rangeValue, String[] keyValue) {
if (keyValue.length < 2) {
- throw new InvalidConfigPropertyException(
+ throw new InvalidConfigException(
String.format("'%s' has invalid format '%s'. "
+ "Expected format is , ... . "
+ "For example, '1 days, 2 hours, 30 minutes'", propertyName, rangeValue), propertyName);
@@ -233,7 +250,7 @@ private ChronoUnit parseUnitType(String propertyName, String value) {
try {
return ChronoUnit.valueOf(value.trim().toUpperCase());
} catch (IllegalArgumentException e) {
- throw new InvalidConfigPropertyException(
+ throw new InvalidConfigException(
String.format("'%s' has invalid unit type '%s'", propertyName, value), e, propertyName);
}
}
@@ -242,7 +259,7 @@ private int parseUnitValue(String propertyName, String value) {
try {
return Integer.parseInt(value.trim());
} catch (NumberFormatException e) {
- throw new InvalidConfigPropertyException(
+ throw new InvalidConfigException(
String.format("'%s' has invalid unit value '%s'", propertyName, value), e, propertyName);
}
}
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java
index 5c65cbb5..859fc86a 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchMultiSource.java
@@ -24,7 +24,9 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
+import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
+import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.action.SettableArguments;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
@@ -59,13 +61,17 @@ public SalesforceBatchMultiSource(SalesforceMultiSourceConfig config) {
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
- config.validate(); // validate before macros are substituted
- pipelineConfigurer.getStageConfigurer().setOutputSchema(null);
+ StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
+ config.validate(stageConfigurer.getFailureCollector()); // validate before macros are substituted
+ stageConfigurer.setOutputSchema(null);
}
@Override
public void prepareRun(BatchSourceContext context) throws ConnectionException {
- config.validate();
+ FailureCollector collector = context.getFailureCollector();
+ config.validate(collector);
+ collector.getOrThrowException();
+
List queries = config.getQueries(context.getLogicalStartTime());
Map schemas = config.getSObjectsSchemas(queries);
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java
index f13bdf13..36dede82 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBatchSource.java
@@ -26,6 +26,7 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
+import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
@@ -61,7 +62,8 @@ public SalesforceBatchSource(SalesforceSourceConfig config) {
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
- config.validate(); // validate when macros not yet substituted
+ // validate when macros not yet substituted
+ config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
if (config.containsMacro(SalesforceSourceConstants.PROPERTY_SCHEMA)) {
// schema will be available later during `prepareRun` stage
@@ -84,7 +86,9 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
@Override
public void prepareRun(BatchSourceContext context) {
- config.validate(); // validate when macros are already substituted
+ FailureCollector collector = context.getFailureCollector();
+ config.validate(collector); // validate when macros are already substituted
+ collector.getOrThrowException();
if (schema == null) {
schema = retrieveSchema();
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfig.java
index 00682d75..e3e880c5 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfig.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfig.java
@@ -24,6 +24,7 @@
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SObjectsDescribeResult;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
@@ -100,9 +101,9 @@ public String getSObjectNameField() {
}
@Override
- public void validate() {
- super.validate();
- validateFilters();
+ public void validate(FailureCollector collector) {
+ super.validate(collector);
+ validateFilters(collector);
}
/**
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfig.java
index 7c8a55d6..7db1979a 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfig.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfig.java
@@ -22,7 +22,8 @@
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.schema.Schema;
-import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.plugin.salesforce.InvalidConfigException;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SalesforceConstants;
import io.cdap.plugin.salesforce.SalesforceQueryUtil;
@@ -104,7 +105,7 @@ public Schema getSchema() {
try {
return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema);
} catch (IOException e) {
- throw new InvalidConfigPropertyException("Unable to parse output schema: " +
+ throw new InvalidConfigException("Unable to parse output schema: " +
schema, e, SalesforceSourceConstants.PROPERTY_SCHEMA);
}
}
@@ -115,52 +116,66 @@ public boolean isSoqlQuery() {
} else if (!Strings.isNullOrEmpty(sObjectName)) {
return false;
}
- throw new InvalidConfigPropertyException("SOQL query or SObject name must be provided",
+ throw new InvalidConfigException("SOQL query or SObject name must be provided",
SalesforceSourceConstants.PROPERTY_QUERY);
}
@Override
- public void validate() {
- super.validate();
+ public void validate(FailureCollector collector) {
+ super.validate(collector);
if (!containsMacro(SalesforceSourceConstants.PROPERTY_QUERY) && !Strings.isNullOrEmpty(query)) {
if (!SalesforceQueryUtil.isQueryUnderLengthLimit(query) && SalesforceQueryParser.isRestrictedQuery(query)) {
- throw new InvalidConfigPropertyException(String.format(
- "SOQL Query with restricted field types (function calls, sub-query fields) or "
- + "GROUP BY [ROLLUP / CUBE], OFFSET clauses cannot exceed SOQL query length: '%d'. "
- + "Unsupported SOQL query: '%s'", SalesforceConstants.SOQL_MAX_LENGTH, query),
- SalesforceSourceConstants.PROPERTY_QUERY);
+ collector.addFailure(
+ String.format(
+ "SOQL Query with restricted field types (function calls, sub-query fields) or "
+ + "GROUP BY [ROLLUP / CUBE], OFFSET clauses cannot exceed SOQL query length: '%d'. "
+ + "Unsupported SOQL query: '%s'", SalesforceConstants.SOQL_MAX_LENGTH, query), null)
+ .withConfigProperty(SalesforceSourceConstants.PROPERTY_QUERY);
+ throw collector.getOrThrowException();
}
SObjectDescriptor queryDescriptor;
try {
queryDescriptor = SalesforceQueryParser.getObjectDescriptorFromQuery(query);
} catch (SOQLParsingException e) {
- throw new InvalidConfigPropertyException(String.format("Invalid SOQL query: '%s'", query), e,
- SalesforceSourceConstants.PROPERTY_QUERY);
+ collector.addFailure(String.format("Invalid SOQL query '%s' : %s", query, e.getMessage()), null)
+ .withStacktrace(e.getStackTrace())
+ .withConfigProperty(SalesforceSourceConstants.PROPERTY_QUERY);
+ throw collector.getOrThrowException();
}
if (canAttemptToEstablishConnection()) {
- validateCompoundFields(queryDescriptor.getName(), queryDescriptor.getFieldsNames());
+ validateCompoundFields(queryDescriptor.getName(), queryDescriptor.getFieldsNames(), collector);
}
}
if (!containsMacro(SalesforceSourceConstants.PROPERTY_QUERY)
- && !containsMacro(SalesforceSourceConstants.PROPERTY_SOBJECT_NAME)
- && !isSoqlQuery()) {
- validateFilters();
+ && !containsMacro(SalesforceSourceConstants.PROPERTY_SOBJECT_NAME)) {
+ try {
+ boolean isSoql = isSoqlQuery();
+ if (!isSoql) {
+ validateFilters(collector);
+ }
+ } catch (InvalidConfigException e) {
+ collector.addFailure(e.getMessage(), null).withConfigProperty(e.getProperty());
+ }
}
- validateSchema();
+ validateSchema(collector);
}
- private void validateSchema() {
+ private void validateSchema(FailureCollector collector) {
if (containsMacro(SalesforceSourceConstants.PROPERTY_SCHEMA)) {
return;
}
- Schema schema = getSchema();
- if (schema != null) {
- SalesforceSchemaUtil.validateFieldSchemas(schema);
+ try {
+ Schema schema = getSchema();
+ if (schema != null) {
+ SalesforceSchemaUtil.validateFieldSchemas(schema, collector);
+ }
+ } catch (InvalidConfigException e) {
+ collector.addFailure(e.getMessage(), null).withConfigProperty(e.getProperty());
}
}
- private void validateCompoundFields(String sObjectName, List fieldNames) {
+ private void validateCompoundFields(String sObjectName, List fieldNames, FailureCollector collector) {
try {
SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromName(sObjectName,
getAuthenticatorCredentials());
@@ -170,17 +185,18 @@ private void validateCompoundFields(String sObjectName, List fieldNames)
.map(SObjectDescriptor.FieldDescriptor::getName)
.collect(Collectors.toList());
if (!compoundFieldNames.isEmpty()) {
- throw new InvalidConfigPropertyException(
+ collector.addFailure(
String.format("Compound fields %s cannot be fetched when a SOQL query is given. "
+ "Please specify the individual attributes instead of compound field name in SOQL query. "
+ "For example, instead of 'Select BillingAddress ...', use "
+ "'Select BillingCountry, BillingCity, BillingStreet ...'",
- compoundFieldNames),
- SalesforceSourceConstants.PROPERTY_QUERY);
+ compoundFieldNames), null)
+ .withConfigProperty(SalesforceSourceConstants.PROPERTY_QUERY);
}
} catch (ConnectionException e) {
- throw new IllegalStateException(
- String.format("Cannot establish connection to Salesforce to describe SObject: '%s'", sObjectName), e);
+ collector.addFailure(
+ String.format("Cannot establish connection to Salesforce to describe SObject: '%s'", sObjectName), null)
+ .withStacktrace(e.getStackTrace());
}
}
}
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSource.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSource.java
index 7218f133..bb0a2ce7 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSource.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSource.java
@@ -24,33 +24,20 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.format.StructuredRecord;
-import io.cdap.cdap.api.data.format.UnexpectedFormatException;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.DatasetProperties;
+import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.streaming.StreamingContext;
import io.cdap.cdap.etl.api.streaming.StreamingSource;
-import io.cdap.cdap.etl.api.validation.InvalidStageException;
import io.cdap.plugin.common.Constants;
import io.cdap.plugin.common.IdUtils;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
-import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.time.Instant;
-import java.time.LocalTime;
-import java.time.temporal.ChronoUnit;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
import javax.ws.rs.Path;
/**
@@ -61,12 +48,9 @@
@Name(SalesforceStreamingSource.NAME)
@Description(SalesforceStreamingSource.DESCRIPTION)
public class SalesforceStreamingSource extends StreamingSource {
- private static final Logger LOG = LoggerFactory.getLogger(SalesforceStreamingSource.class);
-
static final String NAME = "Salesforce";
static final String DESCRIPTION = "Streams data updates from Salesforce using Salesforce Streaming API";
private SalesforceStreamingSourceConfig config;
- private Schema schema;
public SalesforceStreamingSource(SalesforceStreamingSourceConfig config) {
this.config = config;
@@ -74,12 +58,13 @@ public SalesforceStreamingSource(SalesforceStreamingSourceConfig config) {
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
+ FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
// Verify that reference name meets dataset id constraints
- IdUtils.validateId(config.referenceName);
+ IdUtils.validateReferenceName(config.referenceName, collector);
pipelineConfigurer.createDataset(config.referenceName, Constants.EXTERNAL_DATASET_TYPE, DatasetProperties.EMPTY);
try {
- config.validate(); // validate when macros are not substituted
+ config.validate(collector); // validate when macros are not substituted
config.ensurePushTopicExistAndWithCorrectFields(); // run when macros are not substituted
String query = config.getQuery();
@@ -93,100 +78,18 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
pipelineConfigurer.getStageConfigurer().setOutputSchema(schema);
}
} catch (ConnectionException e) {
- throw new InvalidStageException("There was issue communicating with Salesforce", e);
+ collector.addFailure("There was issue communicating with Salesforce: " + e.getMessage(), null)
+ .withStacktrace(e.getStackTrace());
}
}
@Override
public JavaDStream getStream(StreamingContext streamingContext) throws ConnectionException {
- config.validate(); // validate when macros are substituted
- config.ensurePushTopicExistAndWithCorrectFields(); // run when macros are substituted
-
- this.schema = streamingContext.getOutputSchema();
-
- if (this.schema == null) { // if was not set in configurePipeline due to fields containing macro
- this.schema = SalesforceSchemaUtil.getSchema(config.getAuthenticatorCredentials(),
- SObjectDescriptor.fromQuery(config.getQuery()));
- }
- LOG.debug("Schema is {}", schema);
-
- JavaStreamingContext jssc = streamingContext.getSparkStreamingContext();
-
- return jssc.receiverStream(new SalesforceReceiver(this.config.getAuthenticatorCredentials(),
- this.config.getPushTopicName())).
- map((Function) this::getStructuredRecord).filter(Objects::nonNull);
- }
-
- private StructuredRecord getStructuredRecord(String jsonMessage) {
- StructuredRecord.Builder builder = StructuredRecord.builder(schema);
-
- JSONObject sObjectFields;
- try {
- sObjectFields = new JSONObject(jsonMessage) // throws a JSONException if failed to decode
- .getJSONObject("sobject"); // throws a JSONException if not found
- } catch (JSONException e) {
- throw new IllegalStateException(
- String.format("Cannot retrieve /data/sobject from json message %s", jsonMessage), e);
- }
-
- for (Map.Entry entry : sObjectFields.toMap().entrySet()) {
- String fieldName = entry.getKey();
- Object value = entry.getValue();
-
- Schema.Field field = schema.getField(fieldName, true);
-
- if (field == null) {
- continue; // this field is not in schema
- }
-
- builder.set(field.getName(), convertValue(value, field));
- }
- return builder.build();
- }
-
- private Object convertValue(Object value, Schema.Field field) {
- if (value == null) {
- return null;
- }
-
- Schema fieldSchema = field.getSchema();
-
- if (fieldSchema.isNullable()) {
- fieldSchema = fieldSchema.getNonNullable();
- }
-
- Schema.Type fieldSchemaType = fieldSchema.getType();
- Schema.LogicalType logicalType = fieldSchema.getLogicalType();
-
- if (fieldSchema.getLogicalType() != null) {
- String valueString = (String) value;
- switch (logicalType) {
- case DATE:
- return Math.toIntExact(ChronoUnit.DAYS.between(Instant.EPOCH, Instant.parse(valueString)));
- case TIMESTAMP_MICROS:
- return TimeUnit.MILLISECONDS.toMicros(Instant.parse(valueString).toEpochMilli());
- case TIME_MICROS:
- return TimeUnit.NANOSECONDS.toMicros(LocalTime.parse(valueString).toNanoOfDay());
- default:
- throw new UnexpectedFormatException(String.format("Field '%s' is of unsupported type '%s'",
- field.getName(), logicalType.getToken()));
- }
- }
-
- // Found a single field (Opportunity.Fiscal) which is documented as string and has a string type
- // in describe result, however in Salesforce Streaming API reponse json is represented as json.
- // Converting it and similar back to string, since it does not comply with generated schema.
- if (value instanceof Map) {
- if (fieldSchemaType.equals(Schema.Type.STRING)) {
- return value.toString();
- } else {
- throw new UnexpectedFormatException(
- String.format("Field '%s' is of type '%s', but value found is '%s'",
- field.getName(), fieldSchemaType.toString(), value.toString()));
- }
- }
+ FailureCollector collector = streamingContext.getFailureCollector();
+ config.validate(collector); // validate when macros are substituted
+ collector.getOrThrowException();
- return value;
+ return SalesforceStreamingSourceUtil.getStructuredRecordJavaDStream(streamingContext, config);
}
@Path("outputSchema")
@@ -207,6 +110,6 @@ public Schema outputSchema(SalesforceStreamingSourceConfig config) throws Except
}
return SalesforceSchemaUtil.getSchema(authenticatorCredentials,
- SObjectDescriptor.fromQuery(query));
+ SObjectDescriptor.fromQuery(query));
}
}
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceConfig.java
index a6453a52..a0863748 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceConfig.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceConfig.java
@@ -25,8 +25,8 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
-import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
import io.cdap.cdap.etl.api.validation.InvalidStageException;
+import io.cdap.plugin.salesforce.InvalidConfigException;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SObjectFilterDescriptor;
import io.cdap.plugin.salesforce.SalesforceConstants;
@@ -177,9 +177,9 @@ public void ensurePushTopicExistAndWithCorrectFields() {
LOG.info("Creating PushTopic {}", pushTopicName);
if (Strings.isNullOrEmpty(query)) {
- throw new InvalidConfigPropertyException("SOQL query or SObject name must be provided, unless " +
+ throw new InvalidConfigException("SOQL query or SObject name must be provided, unless " +
"existing pushTopic is used",
- SalesforceStreamingSourceConfig.PROPERTY_PUSH_TOPIC_QUERY);
+ SalesforceStreamingSourceConfig.PROPERTY_PUSH_TOPIC_QUERY);
}
pushTopic = new SObjectBuilder()
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceUtil.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceUtil.java
new file mode 100644
index 00000000..9274770b
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceUtil.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2019 Google Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.salesforce.plugin.source.streaming;
+
+import com.sforce.ws.ConnectionException;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.format.UnexpectedFormatException;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.streaming.StreamingContext;
+import io.cdap.plugin.salesforce.SObjectDescriptor;
+import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Salesforce streaming source uti.
+ */
+final class SalesforceStreamingSourceUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(SalesforceStreamingSourceUtil.class);
+
+ static JavaDStream getStructuredRecordJavaDStream(StreamingContext streamingContext,
+ SalesforceStreamingSourceConfig config)
+ throws ConnectionException {
+ config.ensurePushTopicExistAndWithCorrectFields(); // run when macros are substituted
+
+ Schema schema = streamingContext.getOutputSchema();
+
+ if (schema == null) { // if was not set in configurePipeline due to fields containing macro
+ schema = SalesforceSchemaUtil.getSchema(config.getAuthenticatorCredentials(),
+ SObjectDescriptor.fromQuery(config.getQuery()));
+ }
+ LOG.debug("Schema is {}", schema);
+
+ JavaStreamingContext jssc = streamingContext.getSparkStreamingContext();
+
+ final Schema finalSchema = schema;
+ return jssc.receiverStream(new SalesforceReceiver(config.getAuthenticatorCredentials(),
+ config.getPushTopicName()))
+ .map(jsonMessage -> getStructuredRecord(jsonMessage, finalSchema))
+ .filter(Objects::nonNull);
+ }
+
+ private static StructuredRecord getStructuredRecord(String jsonMessage, Schema schema) {
+ StructuredRecord.Builder builder = StructuredRecord.builder(schema);
+
+ JSONObject sObjectFields;
+ try {
+ sObjectFields = new JSONObject(jsonMessage) // throws a JSONException if failed to decode
+ .getJSONObject("sobject"); // throws a JSONException if not found
+ } catch (JSONException e) {
+ throw new IllegalStateException(
+ String.format("Cannot retrieve /data/sobject from json message %s", jsonMessage), e);
+ }
+
+ for (Map.Entry entry : sObjectFields.toMap().entrySet()) {
+ String fieldName = entry.getKey();
+ Object value = entry.getValue();
+
+ Schema.Field field = schema.getField(fieldName, true);
+
+ if (field == null) {
+ continue; // this field is not in schema
+ }
+
+ builder.set(field.getName(), convertValue(value, field));
+ }
+ return builder.build();
+ }
+
+ private static Object convertValue(Object value, Schema.Field field) {
+ if (value == null) {
+ return null;
+ }
+
+ Schema fieldSchema = field.getSchema();
+
+ if (fieldSchema.isNullable()) {
+ fieldSchema = fieldSchema.getNonNullable();
+ }
+
+ Schema.Type fieldSchemaType = fieldSchema.getType();
+ Schema.LogicalType logicalType = fieldSchema.getLogicalType();
+
+ if (fieldSchema.getLogicalType() != null) {
+ String valueString = (String) value;
+ switch (logicalType) {
+ case DATE:
+ return Math.toIntExact(ChronoUnit.DAYS.between(Instant.EPOCH, Instant.parse(valueString)));
+ case TIMESTAMP_MICROS:
+ return TimeUnit.MILLISECONDS.toMicros(Instant.parse(valueString).toEpochMilli());
+ case TIME_MICROS:
+ return TimeUnit.NANOSECONDS.toMicros(LocalTime.parse(valueString).toNanoOfDay());
+ default:
+ throw new UnexpectedFormatException(String.format("Field '%s' is of unsupported type '%s'",
+ field.getName(), logicalType.getToken()));
+ }
+ }
+
+ // Found a single field (Opportunity.Fiscal) which is documented as string and has a string type
+ // in describe result, however in Salesforce Streaming API reponse json is represented as json.
+ // Converting it and similar back to string, since it does not comply with generated schema.
+ if (value instanceof Map) {
+ if (fieldSchemaType.equals(Schema.Type.STRING)) {
+ return value.toString();
+ } else {
+ throw new UnexpectedFormatException(
+ String.format("Field '%s' is of type '%s', but value found is '%s'",
+ field.getName(), fieldSchemaType.toString(), value.toString()));
+ }
+ }
+
+ return value;
+ }
+
+ private SalesforceStreamingSourceUtil() {
+ // no-op
+ }
+}
diff --git a/src/test/java/io/cdap/plugin/salesforce/SalesforceSchemaUtilTest.java b/src/test/java/io/cdap/plugin/salesforce/SalesforceSchemaUtilTest.java
index ca6028a2..5f849673 100644
--- a/src/test/java/io/cdap/plugin/salesforce/SalesforceSchemaUtilTest.java
+++ b/src/test/java/io/cdap/plugin/salesforce/SalesforceSchemaUtilTest.java
@@ -18,6 +18,7 @@
import com.sforce.soap.partner.Field;
import com.sforce.soap.partner.FieldType;
import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -148,7 +149,8 @@ public void testValidateSupportedFieldSchemas() {
Schema.Field.of("TimeField", Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)),
Schema.Field.of("StringField", Schema.of(Schema.Type.STRING)));
- SalesforceSchemaUtil.validateFieldSchemas(schema);
+ MockFailureCollector collector = new MockFailureCollector();
+ SalesforceSchemaUtil.validateFieldSchemas(schema, collector);
}
@Test
@@ -157,9 +159,9 @@ public void testValidateUnsupportedFieldSchema() {
Schema.Field.of("IntField", Schema.of(Schema.Type.INT)),
Schema.Field.of("BytesField", Schema.of(Schema.Type.BYTES)));
- thrown.expect(IllegalArgumentException.class);
-
- SalesforceSchemaUtil.validateFieldSchemas(schema);
+ MockFailureCollector collector = new MockFailureCollector();
+ SalesforceSchemaUtil.validateFieldSchemas(schema, collector);
+ Assert.assertEquals(1, collector.getValidationFailures().size());
}
@Test
diff --git a/src/test/java/io/cdap/plugin/salesforce/etl/SalesforceBatchSinkETLTest.java b/src/test/java/io/cdap/plugin/salesforce/etl/SalesforceBatchSinkETLTest.java
index ea40ef48..11096004 100644
--- a/src/test/java/io/cdap/plugin/salesforce/etl/SalesforceBatchSinkETLTest.java
+++ b/src/test/java/io/cdap/plugin/salesforce/etl/SalesforceBatchSinkETLTest.java
@@ -20,7 +20,7 @@
import com.sforce.soap.partner.sobject.SObject;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
-import io.cdap.cdap.etl.api.validation.InvalidStageException;
+import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
import io.cdap.cdap.test.ApplicationManager;
import io.cdap.plugin.salesforce.plugin.sink.batch.SalesforceSinkConfig;
import org.junit.Assert;
@@ -365,14 +365,9 @@ public void testCompoundFieldsValidation() {
Schema.Field.of("BillingAddress", Schema.of(Schema.Type.STRING))
);
- try {
- getDefaultConfig(sObject).validate(schema);
- Assert.fail("Validation was expected to fail due to compound field 'BillingAddress' in schema");
- } catch (InvalidStageException ex) {
- if (!ex.getMessage().contains("Following schema fields: 'BillingAddress' are not present or not creatable")) {
- throw ex;
- }
- }
+ MockFailureCollector collector = new MockFailureCollector();
+ getDefaultConfig(sObject).validate(schema, collector);
+ Assert.assertEquals(1, collector.getValidationFailures().size());
}
@Test
@@ -390,14 +385,9 @@ public void testNonCreatableFieldsValidation() {
Schema.Field.of("HasOverdueTask", Schema.of(Schema.Type.STRING))
);
- try {
- getDefaultConfig(sObject).validate(schema);
- Assert.fail("Validation was expected to fail due to compound field 'BillingAddress' in schema");
- } catch (InvalidStageException ex) {
- if (!ex.getMessage().contains("Following schema fields: 'HasOverdueTask' are not present or not creatable")) {
- throw ex;
- }
- }
+ MockFailureCollector collector = new MockFailureCollector();
+ getDefaultConfig(sObject).validate(schema, collector);
+ Assert.assertEquals(1, collector.getValidationFailures().size());
}
@Test
@@ -414,14 +404,8 @@ public void testNonExistingFieldsValidation() {
Schema.Field.of("SomethingNotExistant", Schema.of(Schema.Type.STRING))
);
- try {
- getDefaultConfig(sObject).validate(schema);
- Assert.fail("Validation was expected to fail due to compound field 'BillingAddress' in schema");
- } catch (InvalidStageException ex) {
- if (!ex.getMessage().contains(
- "Following schema fields: 'SomethingNotExistant' are not present or not creatable")) {
- throw ex;
- }
- }
+ MockFailureCollector collector = new MockFailureCollector();
+ getDefaultConfig(sObject).validate(schema, collector);
+ Assert.assertEquals(1, collector.getValidationFailures().size());
}
}
diff --git a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfigTest.java b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfigTest.java
index 703a7c4d..8af5c59f 100644
--- a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfigTest.java
+++ b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfigTest.java
@@ -16,7 +16,7 @@
package io.cdap.plugin.salesforce.plugin.source.batch;
-import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
+import io.cdap.plugin.salesforce.InvalidConfigException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -99,7 +99,7 @@ public void testIsSoqlQueryException() {
.setSObjectName(null)
.build();
- thrown.expect(InvalidConfigPropertyException.class);
+ thrown.expect(InvalidConfigException.class);
config.isSoqlQuery();
}
@@ -139,7 +139,7 @@ public void testGetDurationException() {
config.getDuration();
Assert.fail(String.format("Exception is not thrown for value '%s'", value));
- } catch (InvalidConfigPropertyException e) {
+ } catch (InvalidConfigException e) {
// expected failure, do nothing
}
});
@@ -180,7 +180,7 @@ public void testGetOffsetException() {
config.getOffset();
Assert.fail(String.format("Exception is not thrown for value '%s'", value));
- } catch (InvalidConfigPropertyException e) {
+ } catch (InvalidConfigException e) {
// expected failure, do nothing
}
});