Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-15787] Add failure collector apis #11

Merged
merged 1 commit into from Nov 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Expand Up @@ -80,14 +80,14 @@
</repositories>

<properties>
<cdap.version>6.0.0-SNAPSHOT</cdap.version>
<cdap.version>6.1.0-SNAPSHOT</cdap.version>
<junit.version>4.11</junit.version>
<commons-lang.version>2.6</commons-lang.version>
<guava.version>19.0</guava.version>
<widgets.dir>widgets</widgets.dir>
<docs.dir>docs</docs.dir>
<app.parents>
system:cdap-etl-batch[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT),system:cdap-etl-realtime[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT),system:cdap-data-pipeline[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT),system:cdap-data-streams[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT)
system:cdap-data-pipeline[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT),system:cdap-data-streams[6.1.0-SNAPSHOT,7.0.0-SNAPSHOT)
</app.parents>
<!-- this is here because project.basedir evaluates to null in the script build step -->
<main.basedir>${project.basedir}</main.basedir>
Expand Down
45 changes: 26 additions & 19 deletions src/main/java/io/cdap/plugin/batch/S3ToRedshiftAction.java
Expand Up @@ -21,6 +21,7 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
Expand All @@ -46,13 +47,16 @@ public S3ToRedshiftAction(S3ToRedshiftConfig config) {
}

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
config.checkKeysAndRoleForConnection();
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
config.checkKeysAndRoleForConnection(pipelineConfigurer.getStageConfigurer().getFailureCollector());
}

@Override
public void run(ActionContext actionContext) throws Exception {
config.checkKeysAndRoleForConnection();
public void run(ActionContext actionContext) {
FailureCollector collector = actionContext.getFailureCollector();
config.checkKeysAndRoleForConnection(collector);
collector.getOrThrowException();

Connection connection = null;
Statement statement = null;
try {
Expand Down Expand Up @@ -139,6 +143,9 @@ private String buildCopyCommand() {
* Config class for S3ToRedshiftAction.
*/
public static class S3ToRedshiftConfig extends PluginConfig {
private static final String IAM_ROLE = "iamRole";
private static final String ACCESS_KEY = "accessKey";
private static final String SECRET_ACCESS_KEY = "secretAccessKey";

@Description("Access key for AWS S3 to connect to. Either provide 'Keys(Access and Secret Access keys)' or 'IAM " +
"Role' for connecting to AWS S3 bucket. (Macro-enabled)")
Expand Down Expand Up @@ -214,24 +221,24 @@ public S3ToRedshiftConfig(@Nullable String accessKey, @Nullable String secretAcc
/**
* Checks whether both the keys and role is present or empty at the same time, for connecting to S3 bucket.
*/
private void checkKeysAndRoleForConnection() {
if (!Strings.isNullOrEmpty(iamRole) || this.containsMacro("iamRole")) {
if (!((Strings.isNullOrEmpty(accessKey) && !this.containsMacro("accessKey")) &&
(Strings.isNullOrEmpty(secretAccessKey) && !this.containsMacro("secretAccessKey")))) {
throw new IllegalArgumentException("Both configurations 'Keys(Access and Secret Access keys)' and 'IAM " +
"Role' can not be provided at the same time. Either provide the 'Keys" +
"(Access and Secret Access keys)' or 'IAM Role' for connecting to S3 " +
"bucket.");
private void checkKeysAndRoleForConnection(FailureCollector collector) {
if (!Strings.isNullOrEmpty(iamRole) || this.containsMacro(IAM_ROLE)) {
if (!((Strings.isNullOrEmpty(accessKey) && !this.containsMacro(ACCESS_KEY)) &&
(Strings.isNullOrEmpty(secretAccessKey) && !this.containsMacro(SECRET_ACCESS_KEY)))) {
collector.addFailure(
"Both configurations 'Keys'(Access and Secret Access keys) and 'IAM " +
"Role' can not be provided at the same time.",
"Either provide the 'Keys' (Access and Secret Access keys) or 'IAM Role' for connecting to S3 bucket.")
.withConfigProperty(IAM_ROLE).withConfigProperty(ACCESS_KEY).withConfigProperty(SECRET_ACCESS_KEY);
}
}

if (Strings.isNullOrEmpty(iamRole)) {
} else if (Strings.isNullOrEmpty(iamRole)) {
if (!((!Strings.isNullOrEmpty(accessKey) || this.containsMacro("accessKey")) &&
(!Strings.isNullOrEmpty(secretAccessKey) || this.containsMacro("secretAccessKey")))) {
throw new IllegalArgumentException("Both configurations 'Keys(Access and Secret Access keys)' and 'IAM " +
"Role' can not be empty at the same time. Either provide the 'Keys" +
"(Access and Secret Access keys)' or 'IAM Role' for connecting to S3 " +
"bucket.");
collector.addFailure(
"Both configurations 'Keys'(Access and Secret Access keys) and " +
"'IAM Role' can not be empty at the same time.",
"Either provide the 'Keys'(Access and Secret Access keys) or 'IAM Role' for connecting to S3 bucket.")
.withConfigProperty(IAM_ROLE).withConfigProperty(ACCESS_KEY).withConfigProperty(SECRET_ACCESS_KEY);
}
}
}
Expand Down
119 changes: 65 additions & 54 deletions src/test/java/io/cdap/plugin/S3ToRedshiftConfigTest.java
Expand Up @@ -15,6 +15,8 @@
*/
package io.cdap.plugin;

import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.validation.CauseAttributes;
import io.cdap.cdap.etl.mock.common.MockPipelineConfigurer;
import io.cdap.plugin.batch.S3ToRedshiftAction;
import org.junit.Assert;
Expand All @@ -24,116 +26,125 @@
* Unit Tests for S3ToRedshiftConfig.
*/
public class S3ToRedshiftConfigTest {
private static final String IAM_ROLE = "iamRole";
private static final String ACCESS_KEY = "accessKey";
private static final String SECRET_ACCESS_KEY = "secretAccessKey";

@Test
public void testIfBothKeysAndRoleIsNotPresent() throws Exception {
public void testIfBothKeysAndRoleIsNotPresent() {
S3ToRedshiftAction.S3ToRedshiftConfig config = new
S3ToRedshiftAction.S3ToRedshiftConfig("", "", "", "", "s3://test-bucket/test/2017-02-22",
"jdbc:redshift://x.y.us-east-1.redshift.amazonaws.com:5439/dev",
"masterUser", "masterPassword", "redshifttable", "");

MockPipelineConfigurer configurer = new MockPipelineConfigurer(null);
try {
new S3ToRedshiftAction(config).configurePipeline(configurer);
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertEquals("Both configurations 'Keys(Access and Secret Access keys)' and 'IAM Role' can not be " +
"empty at the same time. Either provide the 'Keys(Access and Secret Access keys)' or " +
"'IAM Role' for connecting to S3 bucket.", e.getMessage());
}
FailureCollector collector = configurer.getStageConfigurer().getFailureCollector();
new S3ToRedshiftAction(config).configurePipeline(configurer);
Assert.assertEquals(1, collector.getValidationFailures().size());
Assert.assertEquals(IAM_ROLE, collector.getValidationFailures().get(0).getCauses().get(0)
.getAttribute(CauseAttributes.STAGE_CONFIG));
Assert.assertEquals(ACCESS_KEY, collector.getValidationFailures().get(0).getCauses().get(1)
.getAttribute(CauseAttributes.STAGE_CONFIG));
Assert.assertEquals(SECRET_ACCESS_KEY, collector.getValidationFailures().get(0).getCauses().get(2)
.getAttribute(CauseAttributes.STAGE_CONFIG));
}

@Test
public void testIfEitherKeyAndRoleIsNotPresent() throws Exception {
public void testIfEitherKeyAndRoleIsNotPresent() {
S3ToRedshiftAction.S3ToRedshiftConfig config = new
S3ToRedshiftAction.S3ToRedshiftConfig("", "testaccesskey", "", "", "s3://test-bucket/test/2017-02-22",
"jdbc:redshift://x.y.us-east-1.redshift.amazonaws.com:5439/dev",
"masterUser", "masterPassword", "redshifttable", "");

MockPipelineConfigurer configurer = new MockPipelineConfigurer(null);
try {
new S3ToRedshiftAction(config).configurePipeline(configurer);
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertEquals("Both configurations 'Keys(Access and Secret Access keys)' and 'IAM Role' can not be " +
"empty at the same time. Either provide the 'Keys(Access and Secret Access keys)' or " +
"'IAM Role' for connecting to S3 bucket.", e.getMessage());
}
FailureCollector collector = configurer.getStageConfigurer().getFailureCollector();
new S3ToRedshiftAction(config).configurePipeline(configurer);
Assert.assertEquals(1, collector.getValidationFailures().size());
Assert.assertEquals(IAM_ROLE, collector.getValidationFailures().get(0).getCauses().get(0)
.getAttribute(CauseAttributes.STAGE_CONFIG));
Assert.assertEquals(ACCESS_KEY, collector.getValidationFailures().get(0).getCauses().get(1)
.getAttribute(CauseAttributes.STAGE_CONFIG));
Assert.assertEquals(SECRET_ACCESS_KEY, collector.getValidationFailures().get(0).getCauses().get(2)
.getAttribute(CauseAttributes.STAGE_CONFIG));
}

@Test
public void testIfBothKeysAndRoleIsPresent() throws Exception {
public void testIfBothKeysAndRoleIsPresent() {
S3ToRedshiftAction.S3ToRedshiftConfig config = new
S3ToRedshiftAction.S3ToRedshiftConfig("testAccessKey", "testSecretAccessKey", "testIamRole", "",
"s3://test-bucket/test/2017-02-22", "jdbc:redshift://x.y.us-east-1" +
".redshift.amazonaws.com:5439/dev", "masterUser", "masterPassword",
"redshifttable", "");

MockPipelineConfigurer configurer = new MockPipelineConfigurer(null);
try {
new S3ToRedshiftAction(config).configurePipeline(configurer);
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertEquals("Both configurations 'Keys(Access and Secret Access keys)' and 'IAM Role' can not be " +
"provided at the same time. Either provide the 'Keys(Access and Secret Access keys)' or " +
"'IAM Role' for connecting to S3 bucket.", e.getMessage());
}
FailureCollector collector = configurer.getStageConfigurer().getFailureCollector();
new S3ToRedshiftAction(config).configurePipeline(configurer);
Assert.assertEquals(1, collector.getValidationFailures().size());
Assert.assertEquals(IAM_ROLE, collector.getValidationFailures().get(0).getCauses().get(0)
.getAttribute(CauseAttributes.STAGE_CONFIG));
Assert.assertEquals(ACCESS_KEY, collector.getValidationFailures().get(0).getCauses().get(1)
.getAttribute(CauseAttributes.STAGE_CONFIG));
Assert.assertEquals(SECRET_ACCESS_KEY, collector.getValidationFailures().get(0).getCauses().get(2)
.getAttribute(CauseAttributes.STAGE_CONFIG));
}

@Test
public void testIfEitherKeyAndRoleIsPresent() throws Exception {
public void testIfEitherKeyAndRoleIsPresent() {
S3ToRedshiftAction.S3ToRedshiftConfig config = new
S3ToRedshiftAction.S3ToRedshiftConfig("testAccessKey", "testSecretAccessKey", "testIamRole", "",
"s3://test-bucket/test/2017-02-22", "jdbc:redshift://x.y.us-east-1" +
".redshift.amazonaws.com:5439/dev", "masterUser", "masterPassword",
"redshifttable", "");

MockPipelineConfigurer configurer = new MockPipelineConfigurer(null);
try {
new S3ToRedshiftAction(config).configurePipeline(configurer);
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertEquals("Both configurations 'Keys(Access and Secret Access keys)' and 'IAM Role' can not be " +
"provided at the same time. Either provide the 'Keys(Access and Secret Access keys)' or " +
"'IAM Role' for connecting to S3 bucket.", e.getMessage());
}
FailureCollector collector = configurer.getStageConfigurer().getFailureCollector();
new S3ToRedshiftAction(config).configurePipeline(configurer);
Assert.assertEquals(1, collector.getValidationFailures().size());
Assert.assertEquals(IAM_ROLE, collector.getValidationFailures().get(0).getCauses().get(0)
.getAttribute(CauseAttributes.STAGE_CONFIG));
Assert.assertEquals(ACCESS_KEY, collector.getValidationFailures().get(0).getCauses().get(1)
.getAttribute(CauseAttributes.STAGE_CONFIG));
Assert.assertEquals(SECRET_ACCESS_KEY, collector.getValidationFailures().get(0).getCauses().get(2)
.getAttribute(CauseAttributes.STAGE_CONFIG));
}

@Test
public void testIfBothKeysAndRolePresentAsMacro() throws Exception {
public void testIfBothKeysAndRolePresentAsMacro() {
S3ToRedshiftAction.S3ToRedshiftConfig config = new
S3ToRedshiftAction.S3ToRedshiftConfig("${accessKey}", "${secretAccessKey}", "${iamRole}", "",
"s3://test-bucket/test/2017-02-22", "jdbc:redshift://x.y.us-east-1" +
".redshift.amazonaws.com:5439/dev", "masterUser", "masterPassword",
"redshifttable", "");

MockPipelineConfigurer configurer = new MockPipelineConfigurer(null);
try {
new S3ToRedshiftAction(config).configurePipeline(configurer);
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertEquals("Both configurations 'Keys(Access and Secret Access keys)' and 'IAM Role' can not be " +
"provided at the same time. Either provide the 'Keys(Access and Secret Access keys)' or " +
"'IAM Role' for connecting to S3 bucket.", e.getMessage());
}
FailureCollector collector = configurer.getStageConfigurer().getFailureCollector();
new S3ToRedshiftAction(config).configurePipeline(configurer);
Assert.assertEquals(1, collector.getValidationFailures().size());
Assert.assertEquals(IAM_ROLE, collector.getValidationFailures().get(0).getCauses().get(0)
.getAttribute(CauseAttributes.STAGE_CONFIG));
Assert.assertEquals(ACCESS_KEY, collector.getValidationFailures().get(0).getCauses().get(1)
.getAttribute(CauseAttributes.STAGE_CONFIG));
Assert.assertEquals(SECRET_ACCESS_KEY, collector.getValidationFailures().get(0).getCauses().get(2)
.getAttribute(CauseAttributes.STAGE_CONFIG));
}

@Test
public void testIfKeysAndRoleIsPresentAsMacro() throws Exception {
public void testIfKeysAndRoleIsPresentAsMacro() {
S3ToRedshiftAction.S3ToRedshiftConfig config = new
S3ToRedshiftAction.S3ToRedshiftConfig("testAccessKey", "testSecretAccessKey", "${iamRole}", "",
"s3://test-bucket/test/2017-02-22", "jdbc:redshift://x.y.us-east-1" +
".redshift.amazonaws.com:5439/dev", "masterUser", "masterPassword",
"redshifttable", "");

MockPipelineConfigurer configurer = new MockPipelineConfigurer(null);
try {
new S3ToRedshiftAction(config).configurePipeline(configurer);
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertEquals("Both configurations 'Keys(Access and Secret Access keys)' and 'IAM Role' can not be " +
"provided at the same time. Either provide the 'Keys(Access and Secret Access keys)' or " +
"'IAM Role' for connecting to S3 bucket.", e.getMessage());
}
FailureCollector collector = configurer.getStageConfigurer().getFailureCollector();
new S3ToRedshiftAction(config).configurePipeline(configurer);
Assert.assertEquals(1, collector.getValidationFailures().size());
Assert.assertEquals(IAM_ROLE, collector.getValidationFailures().get(0).getCauses().get(0)
.getAttribute(CauseAttributes.STAGE_CONFIG));
Assert.assertEquals(ACCESS_KEY, collector.getValidationFailures().get(0).getCauses().get(1)
.getAttribute(CauseAttributes.STAGE_CONFIG));
Assert.assertEquals(SECRET_ACCESS_KEY, collector.getValidationFailures().get(0).getCauses().get(2)
.getAttribute(CauseAttributes.STAGE_CONFIG));
}
}