Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
<surefire.redirectTestOutputToFile>true</surefire.redirectTestOutputToFile>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- version properties -->
<cdap.version>6.0.0</cdap.version>
<cdap.version>6.1.0-SNAPSHOT</cdap.version>
<junit.version>4.12</junit.version>
<commons.io.version>2.6</commons.io.version>
<commons.lang.version>2.6</commons.lang.version>
<hadoop.version>2.3.0</hadoop.version>
<spark2.version>2.1.3</spark2.version>
<hydrator.version>2.2.0</hydrator.version>
<hydrator.version>2.3.0-SNAPSHOT</hydrator.version>
<commons.version>3.8.1</commons.version>
<salesforce.api.version>45.0.0</salesforce.api.version>
<cometd.java.client.version>4.0.0</cometd.java.client.version>
Expand Down Expand Up @@ -428,8 +428,8 @@
<version>1.1.0</version>
<configuration>
<cdapArtifacts>
<parent>system:cdap-data-pipeline[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-streams[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-pipeline[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
<parent>system:cdap-data-streams[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)</parent>
</cdapArtifacts>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright © 2019 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.salesforce;

/**
* Represents invalid config exception.
*/
public class InvalidConfigException extends RuntimeException {
private final String property;

public InvalidConfigException(String message, String property) {
super(message);
this.property = property;
}

public InvalidConfigException(String message, Throwable cause, String property) {
super(message, cause);
this.property = property;
}

public String getProperty() {
return property;
}
}
19 changes: 14 additions & 5 deletions src/main/java/io/cdap/plugin/salesforce/SalesforceSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.sforce.soap.partner.PartnerConnection;
import com.sforce.ws.ConnectionException;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;

import java.util.ArrayList;
Expand Down Expand Up @@ -88,20 +89,28 @@ public static Schema getSchema(AuthenticatorCredentials credentials, SObjectDesc
* Validates that fields from given CDAP schema are of supported schema.
*
* @param schema CDAP schema
* @param collector that collects failures
*/
public static void validateFieldSchemas(Schema schema) {
public static void validateFieldSchemas(Schema schema, FailureCollector collector) {
for (Schema.Field field : Objects.requireNonNull(schema.getFields(), "Schema must have fields")) {
Schema fieldSchema = field.getSchema();
fieldSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema;

if (!SUPPORTED_TYPES.contains(fieldSchema.getType())) {
throw new IllegalArgumentException(
String.format("Field '%s' is of unsupported type '%s'", field.getName(), fieldSchema.getType()));
collector.addFailure(
String.format("Field '%s' is of unsupported type '%s'.", field.getName(), fieldSchema.getDisplayName()),
String.format("Supported types are: '%s'", SUPPORTED_TYPES.stream().map(Enum::name)
.collect(Collectors.joining(", "))))
.withOutputSchemaField(field.getName()).withInputSchemaField(field.getName());
}

Schema.LogicalType logicalType = fieldSchema.getLogicalType();
if (logicalType != null && !SUPPORTED_LOGICAL_TYPES.contains(logicalType)) {
throw new IllegalArgumentException(
String.format("Field '%s' is of unsupported type '%s'", field.getName(), logicalType.getToken()));
collector.addFailure(
String.format("Field '%s' is of unsupported type '%s'.", field.getName(), fieldSchema.getDisplayName()),
String.format("Supported types are: '%s'", SUPPORTED_LOGICAL_TYPES.stream().map(Enum::name)
.collect(Collectors.joining(", "))))
.withOutputSchemaField(field.getName()).withInputSchemaField(field.getName());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.common.ReferencePluginConfig;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.SalesforceConstants;
Expand Down Expand Up @@ -84,8 +85,14 @@ public String getLoginUrl() {
return loginUrl;
}

public void validate() {
validateConnection();
public void validate(FailureCollector collector) {
try {
validateConnection();
} catch (Exception e) {
collector.addFailure("Error encountered while establishing connection: " + e.getMessage(), null)
.withStacktrace(e.getStackTrace());
}
collector.getOrThrowException();
}

public AuthenticatorCredentials getAuthenticatorCredentials() {
Expand Down Expand Up @@ -115,7 +122,7 @@ private void validateConnection() {
try {
SalesforceConnectionUtil.getPartnerConnection(this.getAuthenticatorCredentials());
} catch (ConnectionException e) {
throw new RuntimeException("There was issue communicating with Salesforce", e);
throw new RuntimeException("There was issue communicating with Salesforce. " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
Expand Down Expand Up @@ -57,13 +59,16 @@ public SalesforceBatchSink(SalesforceSinkConfig config) throws ConnectionExcepti
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
config.validate(pipelineConfigurer.getStageConfigurer().getInputSchema());
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
config.validate(stageConfigurer.getInputSchema(), stageConfigurer.getFailureCollector());
}

@Override
public void prepareRun(BatchSinkContext context) {
Schema inputSchema = context.getInputSchema();
config.validate(inputSchema);
FailureCollector collector = context.getFailureCollector();
config.validate(inputSchema, collector);
collector.getOrThrowException();

context.addOutput(Output.of(config.referenceName, new SalesforceOutputFormatProvider(config)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.validation.InvalidStageException;
import io.cdap.plugin.salesforce.InvalidConfigException;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SObjectsDescribeResult;
import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
Expand Down Expand Up @@ -128,8 +129,8 @@ public OperationEnum getOperationEnum() {
try {
return OperationEnum.valueOf(operation.toLowerCase());
} catch (IllegalArgumentException ex) {
throw new InvalidConfigPropertyException("Unsupported value for operation: " + operation,
SalesforceSinkConfig.PROPERTY_OPERATION);
throw new InvalidConfigException("Unsupported value for operation: " + operation,
SalesforceSinkConfig.PROPERTY_OPERATION);
}
}

Expand All @@ -141,37 +142,45 @@ public Long getMaxBytesPerBatch() {
try {
return Long.parseLong(maxBytesPerBatch);
} catch (NumberFormatException ex) {
throw new InvalidConfigPropertyException("Unsupported value for maxBytesPerBatch: " + maxBytesPerBatch,
SalesforceSinkConfig.PROPERTY_MAX_BYTES_PER_BATCH);
throw new InvalidConfigException("Unsupported value for maxBytesPerBatch: " + maxBytesPerBatch,
SalesforceSinkConfig.PROPERTY_MAX_BYTES_PER_BATCH);
}
}

public Long getMaxRecordsPerBatch() {
try {
return Long.parseLong(maxRecordsPerBatch);
} catch (NumberFormatException ex) {
throw new InvalidConfigPropertyException("Unsupported value for maxRecordsPerBatch: " + maxRecordsPerBatch,
SalesforceSinkConfig.PROPERTY_MAX_RECORDS_PER_BATCH);
throw new InvalidConfigException("Unsupported value for maxRecordsPerBatch: " + maxRecordsPerBatch,
SalesforceSinkConfig.PROPERTY_MAX_RECORDS_PER_BATCH);
}
}

public ErrorHandling getErrorHandling() {
return ErrorHandling.fromValue(errorHandling)
.orElseThrow(() -> new InvalidConfigPropertyException("Unsupported error handling value: " + errorHandling,
SalesforceSinkConfig.PROPERTY_ERROR_HANDLING));
.orElseThrow(() -> new InvalidConfigException("Unsupported error handling value: " + errorHandling,
SalesforceSinkConfig.PROPERTY_ERROR_HANDLING));
}

public void validate(Schema schema) {
super.validate();
public void validate(Schema schema, FailureCollector collector) {
super.validate(collector);

if (!containsMacro(PROPERTY_ERROR_HANDLING)) {
// triggering getter will also trigger value validity check
getErrorHandling();
try {
getErrorHandling();
} catch (InvalidConfigException e) {
collector.addFailure(e.getMessage(), null).withConfigProperty(PROPERTY_ERROR_HANDLING);
}
}

if (!containsMacro(PROPERTY_OPERATION)) {
// triggering getter will also trigger value validity check
getOperationEnum();
try {
getOperationEnum();
} catch (InvalidConfigException e) {
collector.addFailure(e.getMessage(), null).withConfigProperty(PROPERTY_OPERATION);
}
}

if (!containsMacro(PROPERTY_MAX_BYTES_PER_BATCH)) {
Expand All @@ -181,8 +190,7 @@ public void validate(Schema schema) {
String errorMessage = String.format(
"Unsupported value for maxBytesPerBatch: %d. Value should be between 1 and %d",
maxBytesPerBatch, MAX_BYTES_PER_BATCH_LIMIT);

throw new InvalidConfigPropertyException(errorMessage, SalesforceSinkConfig.PROPERTY_MAX_BYTES_PER_BATCH);
collector.addFailure(errorMessage, null).withConfigProperty(PROPERTY_MAX_BYTES_PER_BATCH);
}
}

Expand All @@ -194,26 +202,26 @@ public void validate(Schema schema) {
String errorMessage = String.format(
"Unsupported value for maxRecordsPerBatch: %d. Value should be between 1 and %d",
maxRecordsPerBatch, MAX_RECORDS_PER_BATCH_LIMIT);

throw new InvalidConfigPropertyException(errorMessage, SalesforceSinkConfig.PROPERTY_MAX_RECORDS_PER_BATCH);
collector.addFailure(errorMessage, null).withConfigProperty(PROPERTY_MAX_RECORDS_PER_BATCH);
}
}

validateSchema(schema);
collector.getOrThrowException();
validateSchema(schema, collector);
}

private void validateSchema(Schema schema) {
private void validateSchema(Schema schema, FailureCollector collector) {
List<Schema.Field> fields = schema.getFields();
if (fields == null || fields.isEmpty()) {
throw new InvalidStageException("Sink schema must contain at least one field");
collector.addFailure("Sink schema must contain at least one field", null);
throw collector.getOrThrowException();
}

if (!canAttemptToEstablishConnection() || containsMacro(PROPERTY_SOBJECT)
|| containsMacro(PROPERTY_OPERATION) || containsMacro(PROPERTY_EXTERNAL_ID_FIELD)) {
return;
}

SObjectsDescribeResult describeResult = getSObjectDescribeResult();
SObjectsDescribeResult describeResult = getSObjectDescribeResult(collector);
Set<String> creatableSObjectFields = getCreatableSObjectFields(describeResult);

Set<String> inputFields = schema.getFields()
Expand All @@ -234,38 +242,40 @@ private void validateSchema(Schema schema) {
externalIdFieldName = SALESFORCE_ID_FIELD;
break;
default:
throw new InvalidConfigPropertyException("Unsupported value for operation: " + operation,
SalesforceSinkConfig.PROPERTY_OPERATION);
collector.addFailure("Unsupported value for operation: " + operation, null)
.withConfigProperty(PROPERTY_OPERATION);
}

if (operation == OperationEnum.upsert) {
Field externalIdField = describeResult.getField(sObject, externalIdFieldName);
if (externalIdField == null) {
throw new InvalidConfigPropertyException(
String.format("SObject '%s' does not contain external id field '%s'", sObject, externalIdFieldName),
SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
collector.addFailure(
String.format("SObject '%s' does not contain external id field '%s'", sObject, externalIdFieldName), null)
.withConfigProperty(SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
} else if (!externalIdField.isExternalId() && !externalIdField.getName().equals(SALESFORCE_ID_FIELD)) {
throw new InvalidConfigPropertyException(
String.format("Field '%s' is not configured as external id in Salesforce", externalIdFieldName),
SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
collector.addFailure(
String.format("Field '%s' is not configured as external id in Salesforce", externalIdFieldName), null)
.withConfigProperty(SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
}
} else {
} else if (operation == OperationEnum.insert || operation == OperationEnum.update) {
if (!Strings.isNullOrEmpty(getExternalIdField())) {
throw new InvalidConfigPropertyException(
String.format("External id field must not be set for operation='%s'", operation),
SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
collector.addFailure(String.format("External id field must not be set for operation='%s'", operation), null)
.withConfigProperty(SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
}
}

if (externalIdFieldName != null && !inputFields.remove(externalIdFieldName)) {
throw new InvalidStageException(String.format("Schema must contain external id field '%s'", externalIdFieldName));
collector.addFailure(String.format("Schema must contain external id field '%s'", externalIdFieldName), null)
.withConfigProperty(SalesforceSinkConfig.PROPERTY_EXTERNAL_ID_FIELD);
}
inputFields.removeAll(creatableSObjectFields);

if (!inputFields.isEmpty()) {
throw new InvalidStageException(String.format("Following schema fields: '%s' are not present " +
"or not creatable in target Salesforce sObject '%s'",
String.join(",", inputFields), this.getSObject()));
for (String inputField : inputFields) {
collector.addFailure(
String.format("Field '%s' is not present or not creatable in target Salesforce sObject.", inputField), null)
.withInputSchemaField(inputField);
}
}
}

Expand All @@ -280,16 +290,17 @@ private Set<String> getCreatableSObjectFields(SObjectsDescribeResult describeRes
return creatableSObjectFields;
}

private SObjectsDescribeResult getSObjectDescribeResult() {
private SObjectsDescribeResult getSObjectDescribeResult(FailureCollector collector) {
AuthenticatorCredentials credentials = this.getAuthenticatorCredentials();
try {
PartnerConnection partnerConnection = new PartnerConnection(Authenticator.createConnectorConfig(credentials));
SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromName(this.getSObject(),
this.getAuthenticatorCredentials());
return SObjectsDescribeResult.of(partnerConnection,
sObjectDescriptor.getName(), sObjectDescriptor.getFeaturedSObjects());
sObjectDescriptor.getName(), sObjectDescriptor.getFeaturedSObjects());
} catch (ConnectionException e) {
throw new InvalidStageException("There was issue communicating with Salesforce", e);
collector.addFailure("There was issue communicating with Salesforce", null).withStacktrace(e.getStackTrace());
throw collector.getOrThrowException();
}
}

Expand Down
Loading