diff --git a/pom.xml b/pom.xml
index 8ebf573a..c485ff8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -439,8 +439,12 @@
1.1.0
- system:cdap-data-pipeline[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)
- system:cdap-data-streams[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)
+
+ system:cdap-data-pipeline[6.4.0-SNAPSHOT,7.0.0-SNAPSHOT)
+ system:cdap-data-streams[6.4.0-SNAPSHOT,7.0.0-SNAPSHOT)
diff --git a/src/main/java/io/cdap/plugin/salesforce/SalesforceConnectionUtil.java b/src/main/java/io/cdap/plugin/salesforce/SalesforceConnectionUtil.java
index 61eeee8e..d6e10b3b 100644
--- a/src/main/java/io/cdap/plugin/salesforce/SalesforceConnectionUtil.java
+++ b/src/main/java/io/cdap/plugin/salesforce/SalesforceConnectionUtil.java
@@ -20,6 +20,7 @@
import com.sforce.ws.ConnectorConfig;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
+import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import org.apache.hadoop.conf.Configuration;
/**
@@ -41,22 +42,6 @@ public static PartnerConnection getPartnerConnection(AuthenticatorCredentials cr
return new PartnerConnection(connectorConfig);
}
- /**
- * Creates {@link AuthenticatorCredentials} instance based on given parameters.
- *
- * @param username Salesforce username
- * @param password Salesforce password
- * @param consumerKey Salesforce consumer key
- * @param consumerSecret Salesforce consumer secret
- * @param loginUrl Salesforce authentication url
- * @return authenticator credentials
- */
- public static AuthenticatorCredentials getAuthenticatorCredentials(String username, String password,
- String consumerKey, String consumerSecret,
- String loginUrl) {
- return new AuthenticatorCredentials(username, password, consumerKey, consumerSecret, loginUrl);
- }
-
/**
* Creates {@link AuthenticatorCredentials} instance based on given {@link Configuration}.
*
@@ -64,10 +49,16 @@ public static AuthenticatorCredentials getAuthenticatorCredentials(String userna
* @return authenticator credentials
*/
public static AuthenticatorCredentials getAuthenticatorCredentials(Configuration conf) {
- return getAuthenticatorCredentials(conf.get(SalesforceConstants.CONFIG_USERNAME),
- conf.get(SalesforceConstants.CONFIG_PASSWORD),
- conf.get(SalesforceConstants.CONFIG_CONSUMER_KEY),
- conf.get(SalesforceConstants.CONFIG_CONSUMER_SECRET),
- conf.get(SalesforceConstants.CONFIG_LOGIN_URL));
+ String oAuthToken = conf.get(SalesforceConstants.CONFIG_OAUTH_TOKEN);
+ String instanceURL = conf.get(SalesforceConstants.CONFIG_OAUTH_INSTANCE_URL);
+ if (oAuthToken != null && instanceURL != null) {
+ return new AuthenticatorCredentials(new OAuthInfo(oAuthToken, instanceURL));
+ }
+
+ return new AuthenticatorCredentials(conf.get(SalesforceConstants.CONFIG_USERNAME),
+ conf.get(SalesforceConstants.CONFIG_PASSWORD),
+ conf.get(SalesforceConstants.CONFIG_CONSUMER_KEY),
+ conf.get(SalesforceConstants.CONFIG_CONSUMER_SECRET),
+ conf.get(SalesforceConstants.CONFIG_LOGIN_URL));
}
}
diff --git a/src/main/java/io/cdap/plugin/salesforce/SalesforceConstants.java b/src/main/java/io/cdap/plugin/salesforce/SalesforceConstants.java
index 33ee3756..3bc53840 100644
--- a/src/main/java/io/cdap/plugin/salesforce/SalesforceConstants.java
+++ b/src/main/java/io/cdap/plugin/salesforce/SalesforceConstants.java
@@ -29,7 +29,10 @@ public class SalesforceConstants {
public static final String PROPERTY_PASSWORD = "password";
public static final String PROPERTY_SECURITY_TOKEN = "securityToken";
public static final String PROPERTY_LOGIN_URL = "loginUrl";
+ public static final String PROPERTY_OAUTH_INFO = "oAuthInfo";
+ public static final String CONFIG_OAUTH_TOKEN = "mapred.salesforce.oauth.token";
+ public static final String CONFIG_OAUTH_INSTANCE_URL = "mapred.salesforce.oauth.instance.url";
public static final String CONFIG_CONSUMER_KEY = "mapred.salesforce.consumer.key";
public static final String CONFIG_PASSWORD = "mapred.salesforce.password";
public static final String CONFIG_USERNAME = "mapred.salesforce.user";
diff --git a/src/main/java/io/cdap/plugin/salesforce/authenticator/Authenticator.java b/src/main/java/io/cdap/plugin/salesforce/authenticator/Authenticator.java
index 79b00702..d67646f0 100644
--- a/src/main/java/io/cdap/plugin/salesforce/authenticator/Authenticator.java
+++ b/src/main/java/io/cdap/plugin/salesforce/authenticator/Authenticator.java
@@ -20,6 +20,7 @@
import com.google.gson.Gson;
import com.sforce.ws.ConnectorConfig;
import io.cdap.plugin.salesforce.SalesforceConstants;
+import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@@ -39,12 +40,12 @@ public class Authenticator {
*/
public static ConnectorConfig createConnectorConfig(AuthenticatorCredentials credentials) {
try {
- AuthResponse authResponse = oauthLogin(credentials);
+ OAuthInfo oAuthInfo = getOAuthInfo(credentials);
ConnectorConfig connectorConfig = new ConnectorConfig();
- connectorConfig.setSessionId(authResponse.getAccessToken());
+ connectorConfig.setSessionId(oAuthInfo.getAccessToken());
String apiVersion = SalesforceConstants.API_VERSION;
- String restEndpoint = String.format("%s/services/async/%s", authResponse.getInstanceUrl(), apiVersion);
- String serviceEndPoint = String.format("%s/services/Soap/u/%s", authResponse.getInstanceUrl(), apiVersion);
+ String restEndpoint = String.format("%s/services/async/%s", oAuthInfo.getInstanceURL(), apiVersion);
+ String serviceEndPoint = String.format("%s/services/Soap/u/%s", oAuthInfo.getInstanceURL(), apiVersion);
connectorConfig.setRestEndpoint(restEndpoint);
connectorConfig.setServiceEndpoint(serviceEndPoint);
// This should only be false when doing debugging.
@@ -65,7 +66,12 @@ public static ConnectorConfig createConnectorConfig(AuthenticatorCredentials cre
*
* @return AuthResponse response to http request
*/
- public static AuthResponse oauthLogin(AuthenticatorCredentials credentials) throws Exception {
+ public static OAuthInfo getOAuthInfo(AuthenticatorCredentials credentials) throws Exception {
+ OAuthInfo oAuthInfo = credentials.getOAuthInfo();
+ if (oAuthInfo != null) {
+ return oAuthInfo;
+ }
+
SslContextFactory sslContextFactory = new SslContextFactory();
HttpClient httpClient = new HttpClient(sslContextFactory);
try {
@@ -83,7 +89,7 @@ public static AuthResponse oauthLogin(AuthenticatorCredentials credentials) thro
String.format("Cannot authenticate to Salesforce with given credentials. ServerResponse='%s'", response));
}
- return authResponse;
+ return new OAuthInfo(authResponse.getAccessToken(), authResponse.getInstanceUrl());
} finally {
httpClient.stop();
}
diff --git a/src/main/java/io/cdap/plugin/salesforce/authenticator/AuthenticatorCredentials.java b/src/main/java/io/cdap/plugin/salesforce/authenticator/AuthenticatorCredentials.java
index 98df68ed..65d50449 100644
--- a/src/main/java/io/cdap/plugin/salesforce/authenticator/AuthenticatorCredentials.java
+++ b/src/main/java/io/cdap/plugin/salesforce/authenticator/AuthenticatorCredentials.java
@@ -16,21 +16,41 @@
package io.cdap.plugin.salesforce.authenticator;
+import io.cdap.plugin.salesforce.plugin.OAuthInfo;
+
import java.io.Serializable;
import java.util.Objects;
+import javax.annotation.Nullable;
/**
* Stores information to connect to salesforce via oauth2
*/
public class AuthenticatorCredentials implements Serializable {
+
+ private final OAuthInfo oAuthInfo;
private final String username;
private final String password;
private final String consumerKey;
private final String consumerSecret;
private final String loginUrl;
+ public AuthenticatorCredentials(OAuthInfo oAuthInfo) {
+ this(Objects.requireNonNull(oAuthInfo), null, null, null, null, null);
+ }
+
public AuthenticatorCredentials(String username, String password,
String consumerKey, String consumerSecret, String loginUrl) {
+ this(null, Objects.requireNonNull(username), Objects.requireNonNull(password), Objects.requireNonNull(consumerKey),
+ Objects.requireNonNull(consumerSecret), Objects.requireNonNull(loginUrl));
+ }
+
+ private AuthenticatorCredentials(@Nullable OAuthInfo oAuthInfo,
+ @Nullable String username,
+ @Nullable String password,
+ @Nullable String consumerKey,
+ @Nullable String consumerSecret,
+ @Nullable String loginUrl) {
+ this.oAuthInfo = oAuthInfo;
this.username = username;
this.password = password;
this.consumerKey = consumerKey;
@@ -38,22 +58,32 @@ public AuthenticatorCredentials(String username, String password,
this.loginUrl = loginUrl;
}
+ @Nullable
+ public OAuthInfo getOAuthInfo() {
+ return oAuthInfo;
+ }
+
+ @Nullable
public String getUsername() {
return username;
}
+ @Nullable
public String getPassword() {
return password;
}
+ @Nullable
public String getConsumerKey() {
return consumerKey;
}
+ @Nullable
public String getConsumerSecret() {
return consumerSecret;
}
+ @Nullable
public String getLoginUrl() {
return loginUrl;
}
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/BaseSalesforceConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/BaseSalesforceConfig.java
index ba8dcffb..97802d25 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/BaseSalesforceConfig.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/BaseSalesforceConfig.java
@@ -32,40 +32,59 @@
*/
public class BaseSalesforceConfig extends ReferencePluginConfig {
+ @Name(SalesforceConstants.PROPERTY_OAUTH_INFO)
+ @Description("OAuth information for connecting to Salesforce. " +
+ "It is expected to be an json string containing two properties, \"accessToken\" and \"instanceURL\", " +
+ "which carry the OAuth access token and the URL to connect to respectively. " +
+ "Use the ${oauth(provider, credentialId)} macro function for acquiring OAuth information dynamically. ")
+ @Macro
+ @Nullable
+ private OAuthInfo oAuthInfo;
+
@Name(SalesforceConstants.PROPERTY_CONSUMER_KEY)
@Description("Salesforce connected app's consumer key")
@Macro
+ @Nullable
private String consumerKey;
@Name(SalesforceConstants.PROPERTY_CONSUMER_SECRET)
@Description("Salesforce connected app's client secret key")
@Macro
+ @Nullable
private String consumerSecret;
@Name(SalesforceConstants.PROPERTY_USERNAME)
@Description("Salesforce username")
@Macro
+ @Nullable
private String username;
@Name(SalesforceConstants.PROPERTY_PASSWORD)
@Description("Salesforce password")
@Macro
+ @Nullable
private String password;
@Name(SalesforceConstants.PROPERTY_SECURITY_TOKEN)
@Description("Salesforce security token")
- @Nullable
@Macro
+ @Nullable
private String securityToken;
@Name(SalesforceConstants.PROPERTY_LOGIN_URL)
@Description("Endpoint to authenticate to")
@Macro
+ @Nullable
private String loginUrl;
- public BaseSalesforceConfig(String referenceName, String consumerKey, String consumerSecret,
- String username, String password, String loginUrl,
- @Nullable String securityToken) {
+ public BaseSalesforceConfig(String referenceName,
+ @Nullable String consumerKey,
+ @Nullable String consumerSecret,
+ @Nullable String username,
+ @Nullable String password,
+ @Nullable String loginUrl,
+ @Nullable String securityToken,
+ @Nullable OAuthInfo oAuthInfo) {
super(referenceName);
this.consumerKey = consumerKey;
this.consumerSecret = consumerSecret;
@@ -73,24 +92,35 @@ public BaseSalesforceConfig(String referenceName, String consumerKey, String con
this.password = password;
this.loginUrl = loginUrl;
this.securityToken = securityToken;
+ this.oAuthInfo = oAuthInfo;
+ }
+
+ @Nullable
+ public OAuthInfo getOAuthInfo() {
+ return oAuthInfo;
}
+ @Nullable
public String getConsumerKey() {
return consumerKey;
}
+ @Nullable
public String getConsumerSecret() {
return consumerSecret;
}
+ @Nullable
public String getUsername() {
return username;
}
+ @Nullable
public String getPassword() {
return constructPasswordWithToken(password, securityToken);
}
+ @Nullable
public String getLoginUrl() {
return loginUrl;
}
@@ -99,15 +129,21 @@ public void validate(FailureCollector collector) {
try {
validateConnection();
} catch (Exception e) {
- collector.addFailure("Error encountered while establishing connection: " + e.getMessage(), null)
+ collector.addFailure("Error encountered while establishing connection: " + e.getMessage(),
+ "Please verify authentication properties are provided correctly")
.withStacktrace(e.getStackTrace());
}
collector.getOrThrowException();
}
public AuthenticatorCredentials getAuthenticatorCredentials() {
- return SalesforceConnectionUtil.getAuthenticatorCredentials(username, getPassword(),
- consumerKey, consumerSecret, loginUrl);
+ OAuthInfo oAuthInfo = getOAuthInfo();
+ if (oAuthInfo != null) {
+ return new AuthenticatorCredentials(oAuthInfo);
+ }
+
+ return new AuthenticatorCredentials(getUsername(), getPassword(), getConsumerKey(),
+ getConsumerSecret(), getLoginUrl());
}
/**
@@ -117,6 +153,16 @@ public AuthenticatorCredentials getAuthenticatorCredentials() {
* @return true if none of the connection properties contains macro, false otherwise
*/
public boolean canAttemptToEstablishConnection() {
+ // If OAuth token is configured, use it to establish connection
+ if (getOAuthInfo() != null) {
+ return true;
+ }
+
+ // At configurePipeline time, macro is not resolved, hence the OAuth field will be null.
+ if (containsMacro(SalesforceConstants.PROPERTY_OAUTH_INFO)) {
+ return false;
+ }
+
return !(containsMacro(SalesforceConstants.PROPERTY_CONSUMER_KEY)
|| containsMacro(SalesforceConstants.PROPERTY_CONSUMER_SECRET)
|| containsMacro(SalesforceConstants.PROPERTY_USERNAME)
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/OAuthInfo.java b/src/main/java/io/cdap/plugin/salesforce/plugin/OAuthInfo.java
new file mode 100644
index 00000000..b41e8569
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/OAuthInfo.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright © 2021 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.salesforce.plugin;
+
+/**
+ * Class to carry OAuth information returned by the {@code ${oauth}} macro function.
+ */
+public final class OAuthInfo {
+
+ private final String accessToken;
+ private final String instanceURL;
+
+ public OAuthInfo(String accessToken, String instanceURL) {
+ this.accessToken = accessToken;
+ this.instanceURL = instanceURL;
+ }
+
+ public String getAccessToken() {
+ return accessToken;
+ }
+
+ public String getInstanceURL() {
+ return instanceURL;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java
index 7ac28e6a..a2596fcf 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceOutputFormatProvider.java
@@ -18,8 +18,10 @@
import com.google.common.collect.ImmutableMap;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
import io.cdap.plugin.salesforce.SalesforceConstants;
+import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import java.util.Map;
+import java.util.Objects;
/**
* Provides SalesforceOutputFormat's class name and configuration.
@@ -34,17 +36,26 @@ public class SalesforceOutputFormatProvider implements OutputFormatProvider {
*/
public SalesforceOutputFormatProvider(SalesforceSinkConfig config) {
ImmutableMap.Builder configBuilder = new ImmutableMap.Builder()
- .put(SalesforceConstants.CONFIG_USERNAME, config.getUsername())
- .put(SalesforceConstants.CONFIG_PASSWORD, config.getPassword())
- .put(SalesforceConstants.CONFIG_CONSUMER_KEY, config.getConsumerKey())
- .put(SalesforceConstants.CONFIG_CONSUMER_SECRET, config.getConsumerSecret())
- .put(SalesforceConstants.CONFIG_LOGIN_URL, config.getLoginUrl())
.put(SalesforceSinkConstants.CONFIG_SOBJECT, config.getSObject())
.put(SalesforceSinkConstants.CONFIG_OPERATION, config.getOperation())
.put(SalesforceSinkConstants.CONFIG_ERROR_HANDLING, config.getErrorHandling().getValue())
.put(SalesforceSinkConstants.CONFIG_MAX_BYTES_PER_BATCH, config.getMaxBytesPerBatch().toString())
.put(SalesforceSinkConstants.CONFIG_MAX_RECORDS_PER_BATCH, config.getMaxRecordsPerBatch().toString());
+ OAuthInfo oAuthInfo = config.getOAuthInfo();
+ if (oAuthInfo != null) {
+ configBuilder
+ .put(SalesforceConstants.CONFIG_OAUTH_TOKEN, oAuthInfo.getAccessToken())
+ .put(SalesforceConstants.CONFIG_OAUTH_INSTANCE_URL, oAuthInfo.getInstanceURL());
+ } else {
+ configBuilder
+ .put(SalesforceConstants.CONFIG_USERNAME, Objects.requireNonNull(config.getUsername()))
+ .put(SalesforceConstants.CONFIG_PASSWORD, Objects.requireNonNull(config.getPassword()))
+ .put(SalesforceConstants.CONFIG_CONSUMER_KEY, Objects.requireNonNull(config.getConsumerKey()))
+ .put(SalesforceConstants.CONFIG_CONSUMER_SECRET, Objects.requireNonNull(config.getConsumerSecret()))
+ .put(SalesforceConstants.CONFIG_LOGIN_URL, Objects.requireNonNull(config.getLoginUrl()));
+ }
+
if (config.getExternalIdField() != null) {
configBuilder.put(SalesforceSinkConstants.CONFIG_EXTERNAL_ID_FIELD, config.getExternalIdField());
}
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java
index 916578e4..36de061b 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/sink/batch/SalesforceSinkConfig.java
@@ -33,6 +33,7 @@
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.BaseSalesforceConfig;
+import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import java.util.List;
import java.util.Set;
@@ -102,13 +103,19 @@ public class SalesforceSinkConfig extends BaseSalesforceConfig {
@Macro
private String errorHandling;
- public SalesforceSinkConfig(String referenceName, String clientId,
- String clientSecret, String username,
- String password, String loginUrl, String sObject,
+ public SalesforceSinkConfig(String referenceName,
+ @Nullable String clientId,
+ @Nullable String clientSecret,
+ @Nullable String username,
+ @Nullable String password,
+ @Nullable String loginUrl,
+ String sObject,
String operation, String externalIdField,
String maxBytesPerBatch, String maxRecordsPerBatch,
- String errorHandling, @Nullable String securityToken) {
- super(referenceName, clientId, clientSecret, username, password, loginUrl, securityToken);
+ String errorHandling,
+ @Nullable String securityToken,
+ @Nullable OAuthInfo oAuthInfo) {
+ super(referenceName, clientId, clientSecret, username, password, loginUrl, securityToken, oAuthInfo);
this.sObject = sObject;
this.operation = operation;
this.externalIdField = externalIdField;
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBaseSourceConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBaseSourceConfig.java
index 836842da..e875709d 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBaseSourceConfig.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceBaseSourceConfig.java
@@ -28,6 +28,7 @@
import io.cdap.plugin.salesforce.SalesforceQueryUtil;
import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
import io.cdap.plugin.salesforce.plugin.BaseSalesforceConfig;
+import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -76,17 +77,18 @@ public abstract class SalesforceBaseSourceConfig extends BaseSalesforceConfig {
private String offset;
protected SalesforceBaseSourceConfig(String referenceName,
- String consumerKey,
- String consumerSecret,
- String username,
- String password,
- String loginUrl,
+ @Nullable String consumerKey,
+ @Nullable String consumerSecret,
+ @Nullable String username,
+ @Nullable String password,
+ @Nullable String loginUrl,
@Nullable String datetimeAfter,
@Nullable String datetimeBefore,
@Nullable String duration,
@Nullable String offset,
- @Nullable String securityToken) {
- super(referenceName, consumerKey, consumerSecret, username, password, loginUrl, securityToken);
+ @Nullable String securityToken,
+ @Nullable OAuthInfo oAuthInfo) {
+ super(referenceName, consumerKey, consumerSecret, username, password, loginUrl, securityToken, oAuthInfo);
this.datetimeAfter = datetimeAfter;
this.datetimeBefore = datetimeBefore;
this.duration = duration;
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceInputFormatProvider.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceInputFormatProvider.java
index e2190e48..d3cc971f 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceInputFormatProvider.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceInputFormatProvider.java
@@ -20,10 +20,12 @@
import com.google.gson.Gson;
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.plugin.salesforce.SalesforceConstants;
+import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import javax.annotation.Nullable;
/**
@@ -39,20 +41,29 @@ public SalesforceInputFormatProvider(SalesforceBaseSourceConfig config,
List queries,
Map schemas,
@Nullable String sObjectNameField) {
- ImmutableMap.Builder builder = new ImmutableMap.Builder()
- .put(SalesforceConstants.CONFIG_USERNAME, config.getUsername())
- .put(SalesforceConstants.CONFIG_PASSWORD, config.getPassword())
- .put(SalesforceConstants.CONFIG_CONSUMER_KEY, config.getConsumerKey())
- .put(SalesforceConstants.CONFIG_CONSUMER_SECRET, config.getConsumerSecret())
- .put(SalesforceConstants.CONFIG_LOGIN_URL, config.getLoginUrl())
+ ImmutableMap.Builder configBuilder = new ImmutableMap.Builder()
.put(SalesforceSourceConstants.CONFIG_QUERIES, GSON.toJson(queries))
.put(SalesforceSourceConstants.CONFIG_SCHEMAS, GSON.toJson(schemas));
+ OAuthInfo oAuthInfo = config.getOAuthInfo();
+ if (oAuthInfo != null) {
+ configBuilder
+ .put(SalesforceConstants.CONFIG_OAUTH_TOKEN, oAuthInfo.getAccessToken())
+ .put(SalesforceConstants.CONFIG_OAUTH_INSTANCE_URL, oAuthInfo.getInstanceURL());
+ } else {
+ configBuilder
+ .put(SalesforceConstants.CONFIG_USERNAME, Objects.requireNonNull(config.getUsername()))
+ .put(SalesforceConstants.CONFIG_PASSWORD, Objects.requireNonNull(config.getPassword()))
+ .put(SalesforceConstants.CONFIG_CONSUMER_KEY, Objects.requireNonNull(config.getConsumerKey()))
+ .put(SalesforceConstants.CONFIG_CONSUMER_SECRET, Objects.requireNonNull(config.getConsumerSecret()))
+ .put(SalesforceConstants.CONFIG_LOGIN_URL, Objects.requireNonNull(config.getLoginUrl()));
+ }
+
if (sObjectNameField != null) {
- builder.put(SalesforceSourceConstants.CONFIG_SOBJECT_NAME_FIELD, sObjectNameField);
+ configBuilder.put(SalesforceSourceConstants.CONFIG_SOBJECT_NAME_FIELD, sObjectNameField);
}
- this.conf = builder.build();
+ this.conf = configBuilder.build();
}
@Override
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfig.java
index a1fea127..c4760bfa 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfig.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceMultiSourceConfig.java
@@ -29,6 +29,7 @@
import io.cdap.plugin.salesforce.SObjectsDescribeResult;
import io.cdap.plugin.salesforce.SalesforceConnectionUtil;
import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
+import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,21 +70,22 @@ public class SalesforceMultiSourceConfig extends SalesforceBaseSourceConfig {
private String sObjectNameField;
public SalesforceMultiSourceConfig(String referenceName,
- String consumerKey,
- String consumerSecret,
- String username,
- String password,
- String loginUrl,
- @Nullable String datetimeAfter,
- @Nullable String datetimeBefore,
- @Nullable String duration,
- @Nullable String offset,
- @Nullable String whiteList,
- @Nullable String blackList,
- @Nullable String sObjectNameField,
- @Nullable String securityToken) {
+ @Nullable String consumerKey,
+ @Nullable String consumerSecret,
+ @Nullable String username,
+ @Nullable String password,
+ @Nullable String loginUrl,
+ @Nullable String datetimeAfter,
+ @Nullable String datetimeBefore,
+ @Nullable String duration,
+ @Nullable String offset,
+ @Nullable String whiteList,
+ @Nullable String blackList,
+ @Nullable String sObjectNameField,
+ @Nullable String securityToken,
+ @Nullable OAuthInfo oAuthInfo) {
super(referenceName, consumerKey, consumerSecret, username, password, loginUrl,
- datetimeAfter, datetimeBefore, duration, offset, securityToken);
+ datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo);
this.whiteList = whiteList;
this.blackList = blackList;
this.sObjectNameField = sObjectNameField;
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfig.java
index 835ad68d..6ae12c0e 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfig.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfig.java
@@ -30,6 +30,7 @@
import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
import io.cdap.plugin.salesforce.parser.SOQLParsingException;
import io.cdap.plugin.salesforce.parser.SalesforceQueryParser;
+import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
import java.io.IOException;
@@ -64,21 +65,22 @@ public class SalesforceSourceConfig extends SalesforceBaseSourceConfig {
@VisibleForTesting
SalesforceSourceConfig(String referenceName,
- String consumerKey,
- String consumerSecret,
- String username,
- String password,
- String loginUrl,
- @Nullable String query,
- @Nullable String sObjectName,
- @Nullable String datetimeAfter,
- @Nullable String datetimeBefore,
- @Nullable String duration,
- @Nullable String offset,
- @Nullable String schema,
- @Nullable String securityToken) {
+ @Nullable String consumerKey,
+ @Nullable String consumerSecret,
+ @Nullable String username,
+ @Nullable String password,
+ @Nullable String loginUrl,
+ @Nullable String query,
+ @Nullable String sObjectName,
+ @Nullable String datetimeAfter,
+ @Nullable String datetimeBefore,
+ @Nullable String duration,
+ @Nullable String offset,
+ @Nullable String schema,
+ @Nullable String securityToken,
+ @Nullable OAuthInfo oAuthInfo) {
super(referenceName, consumerKey, consumerSecret, username, password, loginUrl,
- datetimeAfter, datetimeBefore, duration, offset, securityToken);
+ datetimeAfter, datetimeBefore, duration, offset, securityToken, oAuthInfo);
this.query = query;
this.sObjectName = sObjectName;
this.schema = schema;
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforcePushTopicListener.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforcePushTopicListener.java
index 89465aa8..e6ced7e1 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforcePushTopicListener.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforcePushTopicListener.java
@@ -17,9 +17,9 @@
package io.cdap.plugin.salesforce.plugin.source.streaming;
import io.cdap.plugin.salesforce.SalesforceConstants;
-import io.cdap.plugin.salesforce.authenticator.AuthResponse;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
+import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.cometd.client.BayeuxClient;
@@ -102,9 +102,7 @@ public String getMessage(long timeout, TimeUnit unit) throws InterruptedExceptio
}
private BayeuxClient getClient(AuthenticatorCredentials credentials) throws Exception {
- AuthResponse authResponse = Authenticator.oauthLogin(credentials);
- String acessToken = authResponse.getAccessToken();
- String instanceUrl = authResponse.getInstanceUrl();
+ OAuthInfo oAuthInfo = Authenticator.getOAuthInfo(credentials);
SslContextFactory sslContextFactory = new SslContextFactory();
@@ -125,12 +123,12 @@ private BayeuxClient getClient(AuthenticatorCredentials credentials) throws Exce
@Override
protected void customize(Request exchange) {
super.customize(exchange);
- exchange.header("Authorization", "OAuth " + acessToken);
+ exchange.header("Authorization", "OAuth " + oAuthInfo.getAccessToken());
}
};
// Now set up the Bayeux client itself
- BayeuxClient client = new BayeuxClient(instanceUrl + DEFAULT_PUSH_ENDPOINT, transport);
+ BayeuxClient client = new BayeuxClient(oAuthInfo.getInstanceURL() + DEFAULT_PUSH_ENDPOINT, transport);
client.handshake();
return client;
diff --git a/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceConfig.java b/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceConfig.java
index 83f50435..19ac1716 100644
--- a/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceConfig.java
+++ b/src/main/java/io/cdap/plugin/salesforce/plugin/source/streaming/SalesforceStreamingSourceConfig.java
@@ -33,6 +33,7 @@
import io.cdap.plugin.salesforce.SalesforceQueryUtil;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.plugin.BaseSalesforceConfig;
+import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import io.cdap.plugin.salesforce.soap.SObjectBuilder;
import io.cdap.plugin.salesforce.soap.SObjectUtil;
import org.slf4j.Logger;
@@ -105,10 +106,16 @@ public class SalesforceStreamingSourceConfig extends BaseSalesforceConfig implem
@Name("pushTopicNotifyForFields")
private String pushTopicNotifyForFields;
- public SalesforceStreamingSourceConfig(String referenceName, String consumerKey, String consumerSecret,
- String username, String password, String loginUrl,
- String pushTopicName, String sObjectName, @Nullable String securityToken) {
- super(referenceName, consumerKey, consumerSecret, username, password, loginUrl, securityToken);
+ public SalesforceStreamingSourceConfig(String referenceName,
+ @Nullable String consumerKey,
+ @Nullable String consumerSecret,
+ @Nullable String username,
+ @Nullable String password,
+ @Nullable String loginUrl,
+ String pushTopicName, String sObjectName,
+ @Nullable String securityToken,
+ @Nullable OAuthInfo oAuthInfo) {
+ super(referenceName, consumerKey, consumerSecret, username, password, loginUrl, securityToken, oAuthInfo);
this.pushTopicName = pushTopicName;
this.sObjectName = sObjectName;
}
diff --git a/src/test/java/io/cdap/plugin/salesforce/etl/BaseSalesforceBatchSinkETLTest.java b/src/test/java/io/cdap/plugin/salesforce/etl/BaseSalesforceBatchSinkETLTest.java
index e2b7c055..ddce35ad 100644
--- a/src/test/java/io/cdap/plugin/salesforce/etl/BaseSalesforceBatchSinkETLTest.java
+++ b/src/test/java/io/cdap/plugin/salesforce/etl/BaseSalesforceBatchSinkETLTest.java
@@ -229,6 +229,7 @@ protected SalesforceSinkConfig getDefaultConfig(String sObject) {
BaseSalesforceETLTest.USERNAME, BaseSalesforceETLTest.PASSWORD,
BaseSalesforceETLTest.LOGIN_URL, sObject, "Insert", null,
"1000000", "10000", "Stop on Error",
- BaseSalesforceETLTest.SECURITY_TOKEN);
+ BaseSalesforceETLTest.SECURITY_TOKEN,
+ null);
}
}
diff --git a/src/test/java/io/cdap/plugin/salesforce/etl/BaseSalesforceETLTest.java b/src/test/java/io/cdap/plugin/salesforce/etl/BaseSalesforceETLTest.java
index d40b5e75..5dd6add8 100644
--- a/src/test/java/io/cdap/plugin/salesforce/etl/BaseSalesforceETLTest.java
+++ b/src/test/java/io/cdap/plugin/salesforce/etl/BaseSalesforceETLTest.java
@@ -88,8 +88,8 @@ public static void initializeTests() throws ConnectionException {
throw e;
}
- AuthenticatorCredentials credentials = SalesforceConnectionUtil.getAuthenticatorCredentials(
- USERNAME, PASSWORD, CONSUMER_KEY, CONSUMER_SECRET, LOGIN_URL);
+ AuthenticatorCredentials credentials = new AuthenticatorCredentials(USERNAME, PASSWORD, CONSUMER_KEY,
+ CONSUMER_SECRET, LOGIN_URL);
partnerConnection = SalesforceConnectionUtil.getPartnerConnection(credentials);
}
diff --git a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfigBuilder.java b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfigBuilder.java
index 52015df9..5b245387 100644
--- a/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfigBuilder.java
+++ b/src/test/java/io/cdap/plugin/salesforce/plugin/source/batch/SalesforceSourceConfigBuilder.java
@@ -109,6 +109,6 @@ public SalesforceSourceConfigBuilder setSecurityToken(String securityToken) {
public SalesforceSourceConfig build() {
return new SalesforceSourceConfig(referenceName, consumerKey, consumerSecret, username, password, loginUrl,
query, sObjectName, datetimeAfter, datetimeBefore, duration, offset, schema,
- securityToken);
+ securityToken, null);
}
}
diff --git a/widgets/Salesforce-batchsink.json b/widgets/Salesforce-batchsink.json
index 6ca04d1e..1602b825 100644
--- a/widgets/Salesforce-batchsink.json
+++ b/widgets/Salesforce-batchsink.json
@@ -16,6 +16,11 @@
{
"label": "Authentication",
"properties": [
+ {
+ "widget-type" : "hidden",
+ "label": "OAuth Information",
+ "name": "oAuthInfo"
+ },
{
"widget-type": "textbox",
"label": "Username",
diff --git a/widgets/Salesforce-batchsource.json b/widgets/Salesforce-batchsource.json
index 2f12eaf3..e1915ef1 100644
--- a/widgets/Salesforce-batchsource.json
+++ b/widgets/Salesforce-batchsource.json
@@ -16,6 +16,11 @@
{
"label": "Authentication",
"properties": [
+ {
+ "widget-type" : "hidden",
+ "label": "OAuth Information",
+ "name": "oAuthInfo"
+ },
{
"widget-type": "textbox",
"label": "Username",
diff --git a/widgets/Salesforce-streamingsource.json b/widgets/Salesforce-streamingsource.json
index b336b424..a50815d3 100644
--- a/widgets/Salesforce-streamingsource.json
+++ b/widgets/Salesforce-streamingsource.json
@@ -16,6 +16,11 @@
{
"label": "Authentication",
"properties": [
+ {
+ "widget-type" : "hidden",
+ "label": "OAuth Information",
+ "name": "oAuthInfo"
+ },
{
"widget-type": "textbox",
"label": "Username",
diff --git a/widgets/SalesforceMultiObjects-batchsource.json b/widgets/SalesforceMultiObjects-batchsource.json
index 27e2a2d2..aafa0185 100644
--- a/widgets/SalesforceMultiObjects-batchsource.json
+++ b/widgets/SalesforceMultiObjects-batchsource.json
@@ -16,6 +16,11 @@
{
"label": "Authentication",
"properties": [
+ {
+ "widget-type" : "hidden",
+ "label": "OAuth Information",
+ "name": "oAuthInfo"
+ },
{
"widget-type": "textbox",
"label": "Username",