diff --git a/src/e2e-test/features/Pipeline.feature b/src/e2e-test/features/Pipeline.feature index 549dc38..0b6a7e5 100644 --- a/src/e2e-test/features/Pipeline.feature +++ b/src/e2e-test/features/Pipeline.feature @@ -15,7 +15,7 @@ # Feature: Oracle - Verify Oracle source data transfer to Big Query - @ORACLE_SOURCE @BIGQUERY_TARGET + @ENV_VARIABLES #@ORACLE_SOURCE @ORACLE_DELETE @BIGQUERY_DELETE Scenario: To verify replication of snapshot and cdc data from Oracle to Big Query successfully with Sanity test Given Open DataFusion Project with replication to configure pipeline When Enter input plugin property: "name" with value: "pipelineName" diff --git a/src/e2e-test/java/io.cdap.plugin/actions/ReplicationActions.java b/src/e2e-test/java/io.cdap.plugin/actions/ReplicationActions.java index a5ae274..ad659f4 100644 --- a/src/e2e-test/java/io.cdap.plugin/actions/ReplicationActions.java +++ b/src/e2e-test/java/io.cdap.plugin/actions/ReplicationActions.java @@ -20,6 +20,7 @@ import io.cdap.e2e.pages.locators.CdfPipelineRunLocators; import io.cdap.e2e.utils.*; import io.cdap.plugin.locators.ReplicationLocators; +import io.cdap.plugin.utils.BigQuery; import io.cdap.plugin.utils.OracleClient; import io.cdap.plugin.utils.ValidationHelper; import org.apache.commons.lang.StringUtils; @@ -59,7 +60,7 @@ public static void clickOnOraclePlugin() { public static void selectTable() { String table = schemaName + "." + tableName; - WaitHelper.waitForElementToBeDisplayed(ReplicationLocators.selectTable(table)); + WaitHelper.waitForElementToBeDisplayed(ReplicationLocators.selectTable(table),300); AssertionHelper.verifyElementDisplayed(ReplicationLocators.selectTable(table)); ElementHelper.clickOnElement(ReplicationLocators.selectTable(table)); } @@ -113,7 +114,7 @@ public static void waitTillPipelineIsRunningAndCheckForErrors() throws Interrupt int defaultTimeout = Integer.parseInt(PluginPropertyUtils.pluginProp("pipeline-initialization")); TimeUnit time = TimeUnit.SECONDS; time.sleep(defaultTimeout); - ValidationHelper.waitForFlush(); + BigQuery.waitForFlush(); // Checking if an error message is displayed. Assert.assertFalse(ElementHelper.isElementDisplayed(ReplicationLocators.error)); } @@ -140,18 +141,18 @@ public static void insertRecordAndWait() throws InterruptedException, SQLException, ClassNotFoundException { OracleClient.insertRow(tableName, schemaName, datatypeValues); OracleClient.forceFlushCDC(); - ValidationHelper.waitForFlush(); + BigQuery.waitForFlush(); } public static void deleteRecordAndWait() throws SQLException, ClassNotFoundException, InterruptedException { OracleClient.deleteRow(tableName, schemaName, deleteCondition); OracleClient.forceFlushCDC(); - ValidationHelper.waitForFlush(); + BigQuery.waitForFlush(); } public static void updateRecordAndWait() throws SQLException, ClassNotFoundException, InterruptedException { OracleClient.updateRow(tableName, schemaName, updateCondition, updatedValue ); OracleClient.forceFlushCDC(); - ValidationHelper.waitForFlush(); + BigQuery.waitForFlush(); } } \ No newline at end of file diff --git a/src/e2e-test/java/io.cdap.plugin/hooks/TestSetUpHooks.java b/src/e2e-test/java/io.cdap.plugin/hooks/TestSetUpHooks.java index 5972bf8..ac59b3b 100644 --- a/src/e2e-test/java/io.cdap.plugin/hooks/TestSetUpHooks.java +++ b/src/e2e-test/java/io.cdap.plugin/hooks/TestSetUpHooks.java @@ -18,6 +18,7 @@ import com.google.cloud.bigquery.BigQueryException; import io.cdap.e2e.utils.BigQueryClient; import io.cdap.e2e.utils.PluginPropertyUtils; +import io.cdap.plugin.utils.BigQuery; import io.cdap.plugin.utils.OracleClient; import io.cucumber.java.After; import io.cucumber.java.Before; @@ -43,7 +44,7 @@ public class TestSetUpHooks { public static String row1 = PluginPropertyUtils.pluginProp("datatypeValuesRow1"); public static String row2= PluginPropertyUtils.pluginProp("datatypeValuesRow2"); - @Before(order = 1, value = "@ORACLE_SOURCE") + @Before(order = 1, value = "@ENV_VARIABLES") public static void overridePropertiesFromEnvVarsIfProvided() { String projectId = System.getenv("PROJECT_ID"); if (projectId != null && !projectId.isEmpty()) { @@ -89,12 +90,12 @@ public static void getOracleRecordsAsMap() throws SQLException, ClassNotFoundExc BeforeActions.scenario.write("Expected Oracle records : " + sourceOracleRecords); } - @After(order = 1, value = "@ORACLE_SOURCE") + @After(order = 1, value = "@ORACLE_DELETE") public static void dropTables() throws SQLException, ClassNotFoundException { OracleClient.deleteTables(schemaName, tableName); } - @After(order = 1, value = "@BIGQUERY_TARGET") + @After(order = 1, value = "@BIGQUERY_DELETE") public static void deleteTempTargetBQTable() throws IOException, InterruptedException { try { BigQueryClient.dropBqQuery(tableName); diff --git a/src/e2e-test/java/io.cdap.plugin/stepsdesign/StepDefinition.java b/src/e2e-test/java/io.cdap.plugin/stepsdesign/StepDefinition.java index caa42af..b3af423 100644 --- a/src/e2e-test/java/io.cdap.plugin/stepsdesign/StepDefinition.java +++ b/src/e2e-test/java/io.cdap.plugin/stepsdesign/StepDefinition.java @@ -44,7 +44,7 @@ public void selectSourceAsOracle() { } @Then("Validate Source table is available and select it") - public void selectTable1() { + public void selectTable() { ReplicationActions.selectTable(); } @Then("Deploy the replication pipeline") diff --git a/src/e2e-test/java/io.cdap.plugin/utils/BigQuery.java b/src/e2e-test/java/io.cdap.plugin/utils/BigQuery.java new file mode 100644 index 0000000..3cd3df7 --- /dev/null +++ b/src/e2e-test/java/io.cdap.plugin/utils/BigQuery.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.utils; + +import com.google.cloud.bigquery.BigQueryException; +import io.cdap.e2e.utils.BigQueryClient; +import io.cdap.e2e.utils.PluginPropertyUtils; +import org.junit.Assert; +import stepsdesign.BeforeActions; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class BigQuery { + public static void waitForFlush() throws InterruptedException { + int flushInterval = Integer.parseInt(PluginPropertyUtils.pluginProp("loadInterval")); + TimeUnit time = TimeUnit.SECONDS; + time.sleep(2 * flushInterval + 60); + } + + public static void deleteTable(String tableName) throws IOException, InterruptedException{ + try { + BigQueryClient.dropBqQuery(tableName); + BeforeActions.scenario.write("BQ Target table - " + tableName + " deleted successfully"); + } catch (BigQueryException e) { + if (e.getMessage().contains("Not found: Table")) { + BeforeActions.scenario.write("BQ Target Table does not exist"); + } else { + Assert.fail(e.getMessage()); + } + } + } +} \ No newline at end of file diff --git a/src/e2e-test/java/io.cdap.plugin/utils/ValidationHelper.java b/src/e2e-test/java/io.cdap.plugin/utils/ValidationHelper.java index 8c15eae..b351b36 100644 --- a/src/e2e-test/java/io.cdap.plugin/utils/ValidationHelper.java +++ b/src/e2e-test/java/io.cdap.plugin/utils/ValidationHelper.java @@ -66,7 +66,7 @@ public static void validateRecords(List> sourceOracleRecords String uniqueField = PluginPropertyUtils.pluginProp("primaryKey"); // Logic to maintain the order of both lists and validate records based on that order. - Map BqUniqueIdMap = (Map)targetBigQueryRecords.stream() + Map bqUniqueIdMap = (Map)targetBigQueryRecords.stream() .filter(t -> t.get("_is_deleted")==null) .collect(Collectors.toMap( t -> t.get(uniqueField), @@ -84,11 +84,10 @@ public static void validateRecords(List> sourceOracleRecords for (int record = 0; record < sourceOracleRecords.size(); record++) { Map oracleRecord = sourceOracleRecords.get(record); Object uniqueId = oracleRecord.get(uniqueField); - Map bqRow = (Map) BqUniqueIdMap.get(uniqueId); + Map bqRow = (Map) bqUniqueIdMap.get(uniqueId); if (bqRow != null) { bqRow.remove("_is_deleted"); bqRow.remove("_sequence_num"); - bqRow.forEach((key, value) -> System.out.println("Bq record" + key + ":" + value)); } compareRecords(bqRow, oracleRecord); } @@ -106,10 +105,4 @@ public static void compareRecords(Map targetBigQueryRecords, } } - public static void waitForFlush() throws InterruptedException { - int flushInterval = Integer.parseInt(PluginPropertyUtils.pluginProp("loadInterval")); - TimeUnit time = TimeUnit.SECONDS; - time.sleep(2 * flushInterval); - } - } diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index 7bf3452..57b6385 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -1,15 +1,15 @@ projectId=PROJECT_ID -dataset=XE -schema=ABC +dataset=ORCL +schema=HR sourceTable=SOURCE_TABLE host=ORACLE_HOST port=ORACLE_PORT username=ORACLE_USERNAME password=ORACLE_PASSWORD -pipelineName=e2e-sanity +pipelineName=e2e-sanity-existing-table pipeline-initialization=200 loadInterval=30