diff --git a/pom.xml b/pom.xml index d81813d..3dc339d 100644 --- a/pom.xml +++ b/pom.xml @@ -385,6 +385,129 @@ + + e2e-tests + + src/e2e-test/java + TestRunner.java + + + + + src/e2e-test/resources + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.18.1 + + true + + + + + org.apache.maven.plugins + maven-failsafe-plugin + 3.0.0-M5 + + + ${TEST_RUNNER} + + + classes + 2 + 2 + true + + + + ${GOOGLE_APPLICATION_CREDENTIALS} + + + ${SERVICE_ACCOUNT_TYPE} + + + ${SERVICE_ACCOUNT_FILE_PATH} + + + ${SERVICE_ACCOUNT_JSON} + + + + + + + integration-test + + + + + + + net.masterthought + maven-cucumber-reporting + 5.5.0 + + + + execution + verify + + generate + + + Cucumber Reports + target/cucumber-reports/advanced-reports + 1 + false + ${project.build.directory}/cucumber-reports + + **/*.json + + ${project.build.directory}/cucumber-reports + true + + + + + + + + + + + com.google.guava + guava + 31.1-jre + + + + + + + com.oracle.database.jdbc + ojdbc8 + 21.1.0.0 + test + + + io.cdap.tests.e2e + cdap-e2e-framework + 0.2.0-SNAPSHOT + test + + + ch.qos.logback + logback-classic + 1.2.8 + runtime + + + + + diff --git a/src/e2e-test/features/Pipeline.feature b/src/e2e-test/features/Pipeline.feature new file mode 100644 index 0000000..7df4ff5 --- /dev/null +++ b/src/e2e-test/features/Pipeline.feature @@ -0,0 +1,78 @@ +# +# Copyright © 2023 Cask Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +# + +Feature: Oracle - Verify Oracle source data transfer + +# Scenario: Sanity test from Oracle to Big Query +# Given Open DataFusion Project with replication to configure pipeline +# When Enter input plugin property: "name" with value: "pipelineName" +# And Click on the Next button +# And Select Oracle as Source +# Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields +# Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields +# Then Click plugin property: "region" +# Then Click plugin property: "regionOption" +# Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields +# Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields +# Then Replace input plugin property: "sid" with value: "database" for Credentials and Authorization related fields +# Then Click on the Next button +# Then Replace input plugin property: "loadInterval" with value: "loadInterval" +# Then Click on the Next button +# Then Validate Table is available and can be selected "ABC.E2E-sanity" +# And Click on the Next button +# And Click on the Next button +# And Click on the Next button +# Then Deploy the replication pipeline +# And Run the replication Pipeline +# Then Open the logs +# And Wait till pipeline is in running state and check if no errors occurred +# Then Verify expected Oracle records in target BigQuery table +# And Capture raw logs +# Then Close the pipeline logs and stop the pipeline + + @ORACLE_SOURCE + Scenario: To verify snapshot and cdc from Oracle to Big Query successfully + Given Open DataFusion Project with replication to configure pipeline + When Enter input plugin property: "name" with value: "pipelineName" + And Click on the Next button + And Select Oracle as Source + Then Replace input plugin property: "host" with value: "host" for Credentials and Authorization related fields + Then Replace input plugin property: "port" with value: "port" for Credentials and Authorization related fields + Then Click plugin property: "region" + Then Click plugin property: "regionOption" + Then Replace input plugin property: "user" with value: "username" for Credentials and Authorization related fields + Then Replace input plugin property: "password" with value: "password" for Credentials and Authorization related fields + Then Replace input plugin property: "sid" with value: "database" for Credentials and Authorization related fields + Then Click on the Next button + Then Replace input plugin property: "loadInterval" with value: "loadInterval" + Then Click on the Next button + Then Validate Table is available and can be selected "ABC.E2E1" + And Click on the Next button + And Click on the Next button + And Click on the Next button + Then Deploy the replication pipeline + And Run the replication Pipeline + Then Open the logs + And Wait till pipeline is in running state and check if no errors occurred + Then Verify expected Oracle records in target BigQuery table + And Insert a record in the source table and wait for replication + Then Verify expected Oracle records in target BigQuery table + And Delete a record in the source table and wait for replication + Then Verify expected Oracle records in target BigQuery table + And Update a record in the source table and wait for replication + Then Verify expected Oracle records in target BigQuery table + And Capture raw logs + Then Close the pipeline logs and stop the pipeline \ No newline at end of file diff --git a/src/e2e-test/java/io.cdap.plugin/actions/ReplicationActions.java b/src/e2e-test/java/io.cdap.plugin/actions/ReplicationActions.java new file mode 100644 index 0000000..b92acb1 --- /dev/null +++ b/src/e2e-test/java/io.cdap.plugin/actions/ReplicationActions.java @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.actions; + +import io.cdap.e2e.pages.actions.CdfPipelineRunAction; +import io.cdap.e2e.pages.locators.CdfPipelineRunLocators; +import io.cdap.e2e.utils.*; +import io.cdap.plugin.locators.ReplicationLocators; +import io.cdap.plugin.utils.OracleClient; +import io.cdap.plugin.utils.ValidationHelper; +import org.apache.commons.lang.StringUtils; +import org.junit.Assert; +import stepsdesign.BeforeActions; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class ReplicationActions { + private static String parentWindow = StringUtils.EMPTY; + private static final String projectId = PluginPropertyUtils.pluginProp("projectId"); + private static final String database = PluginPropertyUtils.pluginProp("database"); + public static String tableName = PluginPropertyUtils.pluginProp("sourceTable"); + public static String schemaName = PluginPropertyUtils.pluginProp("schema"); + public static String datatypeColumnNames = PluginPropertyUtils.pluginProp("datatypeColumnNames"); + public static String datatypeValues = PluginPropertyUtils.pluginProp("datatypeValuesForInsertOperation"); + public static String deleteCondition = PluginPropertyUtils.pluginProp("deleteRowCondition"); + public static String updateCondition = PluginPropertyUtils.pluginProp("updateRowCondition"); + public static String updatedValue = PluginPropertyUtils.pluginProp("updatedRow"); + + static { + SeleniumHelper.getPropertiesLocators(ReplicationLocators.class); + } + public static void clickNextButton() throws InterruptedException { + TimeUnit time = TimeUnit.SECONDS; + time.sleep(1); + ElementHelper.clickOnElement(ReplicationLocators.next); + } + public static void clickOnOraclePlugin() { + ElementHelper.clickOnElement(ReplicationLocators.oraclePlugin); + } + + public static void selectTable(String tableName) { + WaitHelper.waitForElementToBeDisplayed(ReplicationLocators.selectTable(tableName)); + AssertionHelper.verifyElementDisplayed(ReplicationLocators.selectTable(tableName)); + ElementHelper.clickOnElement(ReplicationLocators.selectTable(tableName)); + } + + public static void deployPipeline() { + ElementHelper.clickOnElement(ReplicationLocators.deployPipeline); + } + + public static void startPipeline() { + ElementHelper.clickIfDisplayed(ReplicationLocators.start, ConstantsUtil.DEFAULT_TIMEOUT_SECONDS); + } + + public static void runThePipeline() { + startPipeline(); + WaitHelper.waitForElementToBeDisplayed(ReplicationLocators.running); + } + + public static void openAdvanceLogs() { + ReplicationLocators.logs.click(); + parentWindow = SeleniumDriver.getDriver().getWindowHandle(); + ArrayList tabs = new ArrayList(SeleniumDriver.getDriver().getWindowHandles()); + SeleniumDriver.getDriver().switchTo().window(tabs.get(tabs.indexOf(parentWindow) + 1)); + ReplicationLocators.advancedLogs.click(); + } + + public static void captureRawLog() { + //Capturing raw logs. + try { + String rawLogs = getRawLogs(); + String logsSeparatorMessage = ConstantsUtil.LOGS_SEPARATOR_MESSAGE + .replace("MESSAGE", "DEPLOYED PIPELINE RUNTIME LOGS"); + BeforeActions.scenario.write(rawLogs); + CdfPipelineRunAction.writeRawLogsToFile(BeforeActions.file, logsSeparatorMessage, rawLogs); + } catch (Exception e) { + BeforeActions.scenario.write("Exception in capturing logs : " + e); + } + } + + public static String getRawLogs() { + CdfPipelineRunAction.viewRawLogs(); + ArrayList tabs = new ArrayList(SeleniumDriver.getDriver().getWindowHandles()); + PageHelper.switchToWindow(tabs.indexOf(parentWindow) + 2); + String logs = CdfPipelineRunLocators.logsTextbox.getText(); + Assert.assertNotNull(logs); + PageHelper.closeCurrentWindow(); + return logs; + } + + public static void waitTillPipelineIsRunningAndCheckForErrors() throws InterruptedException { + //wait for datastream to startup + int defaultTimeout = Integer.parseInt(PluginPropertyUtils.pluginProp("datastream.timeout")); + TimeUnit time = TimeUnit.SECONDS; + time.sleep(defaultTimeout); + // Checking if an error message is displayed. + Assert.assertFalse(ElementHelper.isElementDisplayed(ReplicationLocators.error)); + } + + public static void closeTheLogsAndClickOnStopButton() { + //As the logs get opened in a new window in this plugin so after closing them we have to switch to parent window. + SeleniumDriver.getDriver().switchTo().window(parentWindow); + //Stopping the pipeline + ElementHelper.clickOnElement(ReplicationLocators.stop); + WaitHelper.waitForElementToBeDisplayed(ReplicationLocators.stopped); + } + public static void verifyTargetBigQueryRecordMatchesExpectedOracleRecord() + throws IOException, InterruptedException, SQLException, ClassNotFoundException { + // Checking if an error message is displayed. + Assert.assertFalse(ElementHelper.isElementDisplayed(ReplicationLocators.error)); + + List> sourceOracleRecords = OracleClient.getOracleRecordsAsMap(tableName, schemaName); + List> targetBigQueryRecords = ValidationHelper.getBigQueryRecordsAsMap(projectId, database, tableName); //+ "_v1`" + ValidationHelper.validateRecords(sourceOracleRecords, targetBigQueryRecords); + } + + public static void insertRecordAndWait() + throws IOException, InterruptedException, SQLException, ClassNotFoundException { //JCoException, + OracleClient.insertRow(tableName,schemaName, datatypeColumnNames,datatypeValues); + OracleClient.forceFlushCDC(); + ValidationHelper.waitForFlush(); + } + + public static void deleteRecordAndWait() throws SQLException, ClassNotFoundException, IOException, InterruptedException { + OracleClient.deleteRow(tableName,schemaName, deleteCondition); + OracleClient.forceFlushCDC(); + ValidationHelper.waitForFlush(); + } + + public static void updateRecordAndWait() throws SQLException, ClassNotFoundException, IOException, InterruptedException { + OracleClient.updateRow(tableName,schemaName, updateCondition, updatedValue ); + OracleClient.forceFlushCDC(); + ValidationHelper.waitForFlush(); + } +} \ No newline at end of file diff --git a/src/e2e-test/java/io.cdap.plugin/actions/package-info.java b/src/e2e-test/java/io.cdap.plugin/actions/package-info.java new file mode 100644 index 0000000..8e01adf --- /dev/null +++ b/src/e2e-test/java/io.cdap.plugin/actions/package-info.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.actions; \ No newline at end of file diff --git a/src/e2e-test/java/io.cdap.plugin/hooks/TestSetUpHooks.java b/src/e2e-test/java/io.cdap.plugin/hooks/TestSetUpHooks.java new file mode 100644 index 0000000..dbffc05 --- /dev/null +++ b/src/e2e-test/java/io.cdap.plugin/hooks/TestSetUpHooks.java @@ -0,0 +1,86 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.hooks; +import io.cdap.e2e.utils.PluginPropertyUtils; +import io.cdap.plugin.utils.OracleClient; +import io.cucumber.java.After; +import io.cucumber.java.Before; +import stepsdesign.BeforeActions; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +/** + * Oracle test hooks. + */ +public class TestSetUpHooks { + public static List> sourceOracleRecords = new ArrayList<>(); + public static String tableName = PluginPropertyUtils.pluginProp("sourceTable"); + public static String schemaName = PluginPropertyUtils.pluginProp("schema"); + public static String primaryKey = PluginPropertyUtils.pluginProp("primaryKey"); + public static String datatypeColumns = PluginPropertyUtils.pluginProp("datatypeColumns"); + public static String datatypeColumnNames = PluginPropertyUtils.pluginProp("datatypeColumnNames"); + public static String row1 = PluginPropertyUtils.pluginProp("datatypeValuesRow1"); + public static String row2= PluginPropertyUtils.pluginProp("datatypeValuesRow2"); + + + @Before(order = 1, value = "@ORACLE_SOURCE") + public static void overridePropertiesFromEnvVarsIfProvided() { + String username = System.getenv("ORACLE_USERNAME"); + if (username != null && !username.isEmpty()) { + PluginPropertyUtils.addPluginProp("username", username); + } + String password = System.getenv("ORACLE_PASSWORD"); + if (password != null && !password.isEmpty()) { + PluginPropertyUtils.addPluginProp("password", password); + } + String port = System.getenv("ORACLE_PORT"); + if (port!= null && !port.isEmpty()) { + PluginPropertyUtils.addPluginProp("port", port); + } + String sapHost = System.getenv("ORACLE_HOST"); + if (sapHost != null && !sapHost.isEmpty()) { + PluginPropertyUtils.addPluginProp("host", sapHost); + } + } + + @Before(order = 2, value = "@ORACLE_SOURCE") + public static void createTable() throws SQLException, ClassNotFoundException { + OracleClient.createTable(tableName,schemaName,datatypeColumns, primaryKey); + } + + @Before(order = 3, value = "@ORACLE_SOURCE") + public static void insertRow() throws SQLException, ClassNotFoundException { + OracleClient.insertRow(tableName, schemaName, datatypeColumnNames, row1); + OracleClient.insertRow(tableName, schemaName, datatypeColumnNames, row2); + sourceOracleRecords = OracleClient.getOracleRecordsAsMap(tableName, schemaName); + } + + @Before(order = 4, value = "@ORACLE_SOURCE") + public static void getOracleRecordsAsMap() throws SQLException, ClassNotFoundException { + sourceOracleRecords = OracleClient.getOracleRecordsAsMap(tableName, schemaName); + BeforeActions.scenario.write("Expected Oracle records : " + sourceOracleRecords); + } + + @After(order = 1, value = "@ORACLE_SOURCE_TEMP") + public static void dropTables() throws SQLException, ClassNotFoundException { + OracleClient.deleteTables(schemaName, tableName); + } +} \ No newline at end of file diff --git a/src/e2e-test/java/io.cdap.plugin/locators/ReplicationLocators.java b/src/e2e-test/java/io.cdap.plugin/locators/ReplicationLocators.java new file mode 100644 index 0000000..a5690b2 --- /dev/null +++ b/src/e2e-test/java/io.cdap.plugin/locators/ReplicationLocators.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.locators; + +import io.cdap.e2e.utils.SeleniumDriver; +import org.openqa.selenium.By; +import org.openqa.selenium.WebElement; +import org.openqa.selenium.support.FindBy; +import org.openqa.selenium.support.How; + +public class ReplicationLocators { + @FindBy(how = How.XPATH, using = "//*[contains(text(),'Next')]") + public static WebElement next; + @FindBy(how = How.XPATH, using ="//div[contains(text(),'Oracle')]") + public static WebElement oraclePlugin; + public static WebElement selectTable(String tableName) { + return SeleniumDriver.getDriver().findElement(By.xpath("//div[contains(text(),'" + tableName + "')]" + + "/preceding-sibling::div/span")); + } + @FindBy(how = How.XPATH, using = "//span[contains(text(),'Deploy Replication Job')]") + public static WebElement deployPipeline; + @FindBy(how = How.XPATH, using = "//*[contains(text(), 'Running')]") + public static WebElement running; + @FindBy(how = How.XPATH, using = "//*[contains(text(), 'Logs')]") + public static WebElement logs; + @FindBy(how = How.XPATH, using = "(//*[contains(text(), 'View')])[1]") + public static WebElement advancedLogs; + @FindBy(how = How.XPATH, using = "//*[contains(@class, 'icon-stop')]") + public static WebElement stop; + @FindBy(how = How.XPATH, using = "//div[@data-cy='log-viewer-row']//div[contains(text(),'ERROR')]") + public static WebElement error; + @FindBy(how = How.XPATH, using = "//*[contains(text(),'Stopped')]") + public static WebElement stopped; + public static By start = By.xpath("//*[contains(@class, 'icon-play ')]"); +} \ No newline at end of file diff --git a/src/e2e-test/java/io.cdap.plugin/locators/package-info.java b/src/e2e-test/java/io.cdap.plugin/locators/package-info.java new file mode 100644 index 0000000..e9988de --- /dev/null +++ b/src/e2e-test/java/io.cdap.plugin/locators/package-info.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.locators; \ No newline at end of file diff --git a/src/e2e-test/java/io.cdap.plugin/stepsdesign/StepDefinition.java b/src/e2e-test/java/io.cdap.plugin/stepsdesign/StepDefinition.java new file mode 100644 index 0000000..5b73e6a --- /dev/null +++ b/src/e2e-test/java/io.cdap.plugin/stepsdesign/StepDefinition.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.stepsdesign; + +import io.cdap.e2e.utils.CdfHelper; +import io.cdap.e2e.utils.PluginPropertyUtils; +import io.cdap.e2e.utils.SeleniumDriver; +import io.cdap.plugin.actions.ReplicationActions; +import io.cucumber.java.en.And; +import io.cucumber.java.en.Given; +import io.cucumber.java.en.Then; + +import java.io.IOException; +import java.sql.SQLException; + +public class StepDefinition implements CdfHelper { + @Given("Open DataFusion Project with replication to configure pipeline") + public void openDataFusionProjectWithReplicationToConfigurePipeline() throws IOException, InterruptedException { + openCdf(); + SeleniumDriver.getDriver().get(SeleniumDriver.getDriver().getCurrentUrl().replace( + PluginPropertyUtils.pluginProp("cdfUrl"), PluginPropertyUtils.pluginProp("replication.url"))); + } + @And("Click on the Next button") + public void clickOnTheNextButton() throws InterruptedException { + ReplicationActions.clickNextButton(); + } + @And("Select Oracle as Source") + public void selectSourceAsOracle() { + ReplicationActions.clickOnOraclePlugin(); + } + @Then("Validate Table is available and can be selected {string}") + public void selectTable(String table) { + ReplicationActions.selectTable(table); + } + @Then("Deploy the replication pipeline") + public void deployPipeline() { + ReplicationActions.deployPipeline(); + } + + @Then("Run the replication Pipeline") + public void runPipelineInRuntime() { + ReplicationActions.runThePipeline(); + } + + @Then("Open the logs") + public void openLogsOfSltPipeline() { + ReplicationActions.openAdvanceLogs(); + } + + @Then("Wait till pipeline is in running state and check if no errors occurred") + public void waitTillSltPipelineIsInRunningState() throws InterruptedException { + ReplicationActions.waitTillPipelineIsRunningAndCheckForErrors(); + } + + @Then("Close the pipeline logs and stop the pipeline") + public void closeLogsAndStopThePipeline() { + ReplicationActions.closeTheLogsAndClickOnStopButton(); + } + + @And("Capture raw logs") + public void captureRawLogs() { + ReplicationActions.captureRawLog(); + } + + @And("Insert a record in the source table and wait for replication") + public void triggerInsertCdcEvent() throws IOException, InterruptedException, SQLException, ClassNotFoundException { + ReplicationActions.insertRecordAndWait(); //JCoException, + } + + @And("Delete a record in the source table and wait for replication") + public void triggerDeleteCdcEvent() throws IOException, InterruptedException, SQLException, ClassNotFoundException { + ReplicationActions.deleteRecordAndWait(); //JCoException, + } + + @And("Update a record in the source table and wait for replication") + public void triggerUpdateCdcEvent() throws IOException, InterruptedException, SQLException, ClassNotFoundException { + ReplicationActions.updateRecordAndWait(); //JCoException, + } + @Then("Verify expected Oracle records in target BigQuery table") + public void verifyExpectedOracleRecordsInTargetBigQueryTable() throws IOException, InterruptedException, SQLException, ClassNotFoundException { + ReplicationActions.verifyTargetBigQueryRecordMatchesExpectedOracleRecord(); + } + + +} diff --git a/src/e2e-test/java/io.cdap.plugin/tests/TestRunner.java b/src/e2e-test/java/io.cdap.plugin/tests/TestRunner.java new file mode 100644 index 0000000..2959b32 --- /dev/null +++ b/src/e2e-test/java/io.cdap.plugin/tests/TestRunner.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.tests; + +import io.cucumber.junit.Cucumber; +import io.cucumber.junit.CucumberOptions; +import org.junit.runner.RunWith; + +/** + * Test Runner to execute Oracle plugin test cases. + */ +@RunWith(Cucumber.class) +@CucumberOptions( + features = {"src/e2e-test/features"}, + glue = {"stepsdesign", "io.cdap.plugin.common.stepsdesign", "io.cdap.plugin.oracle.stepsdesign"}, + tags = {"@Oracle"}, + plugin = {"pretty", "html:target/cucumber-html-report/oracle", + "json:target/cucumber-reports/cucumber-oracle.json", + "junit:target/cucumber-reports/cucumber-oracle.xml"} +) +public class TestRunner { +} diff --git a/src/e2e-test/java/io.cdap.plugin/tests/package-info.java b/src/e2e-test/java/io.cdap.plugin/tests/package-info.java new file mode 100644 index 0000000..05da828 --- /dev/null +++ b/src/e2e-test/java/io.cdap.plugin/tests/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * Package contains the runners for Oracle features. + */ +package io.cdap.plugin.tests; diff --git a/src/e2e-test/java/io.cdap.plugin/utils/OracleClient.java b/src/e2e-test/java/io.cdap.plugin/utils/OracleClient.java new file mode 100644 index 0000000..b803f4d --- /dev/null +++ b/src/e2e-test/java/io.cdap.plugin/utils/OracleClient.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.utils; + + +import io.cdap.e2e.utils.PluginPropertyUtils; + +import java.sql.*; +import java.time.Instant; +import java.util.*; + +import java.util.concurrent.TimeUnit; + +/** + * Oracle client. + */ +public class OracleClient { + private static Connection getOracleConnection() throws SQLException, ClassNotFoundException { + TimeZone timezone = TimeZone.getTimeZone("UTC"); + TimeZone.setDefault(timezone); + Class.forName("oracle.jdbc.driver.OracleDriver"); + String databaseName = PluginPropertyUtils.pluginProp("database"); + String host = PluginPropertyUtils.pluginProp("host"); + String port = PluginPropertyUtils.pluginProp("port"); + String username = PluginPropertyUtils.pluginProp("username"); + String password = PluginPropertyUtils.pluginProp("password"); + + return DriverManager.getConnection("jdbc:oracle:thin:@//" +host + + ":" + port + "/" + databaseName, + username, password); + } + + + public static void createTable(String table, String schema, String datatypeColumns, String primaryKey) throws SQLException, ClassNotFoundException { + try (Connection connect = getOracleConnection(); Statement statement = connect.createStatement()) { + String createTableQuery = "CREATE TABLE " + schema + "." + table + datatypeColumns; + statement.executeUpdate(createTableQuery); + } + } + + public static void forceFlushCDC() throws SQLException, ClassNotFoundException{ + try (Connection connect = getOracleConnection(); Statement statement = connect.createStatement()) { + //Oracle doesn't immediately flush CDC events, it can automatically happen based on time/log file size or you can force it + statement.executeUpdate("ALTER SYSTEM SWITCH LOGFILE"); + } + } + + + + public static void insertRow(String table, String schema, String datatypeColumns, String datatypeValues) throws SQLException, ClassNotFoundException { + try (Connection connect = getOracleConnection(); Statement statement = connect.createStatement()) { + // Insert dummy data. + statement.executeUpdate("INSERT INTO " + schema + "." + table + " " + datatypeColumns + + " VALUES " + datatypeValues); + + } + } + public static void deleteRow(String table, String schema, String deleteCondition) throws SQLException, ClassNotFoundException { + try (Connection connect = getOracleConnection(); Statement statement = connect.createStatement()) { + // Insert dummy data. + statement.executeUpdate("DELETE FROM " + schema + "." + table + " WHERE " + deleteCondition); + } + } + public static void updateRow(String table, String schema, String updateCondition, String updatedValue) throws SQLException, ClassNotFoundException { + try (Connection connect = getOracleConnection(); Statement statement = connect.createStatement()) { + // Insert dummy data. + statement.executeUpdate("UPDATE " + schema + "." + table + " SET " + updatedValue + + " WHERE " + updateCondition); + } + } + + public static List> getOracleRecordsAsMap(String table, String schema)throws SQLException, ClassNotFoundException { + try (Connection connect = getOracleConnection(); Statement statement = connect.createStatement()) { + // Insert dummy data. + List> oracleRecords = new ArrayList<>(); + String query = "select * from " + schema + "." + table; + ResultSet result = statement.executeQuery(query); + + ResultSetMetaData rsmd = result.getMetaData(); + int numberOfColumns = rsmd.getColumnCount(); + List columns = new ArrayList<>(); + columns.add(""); + for(int colIndex = 1 ; colIndex <= numberOfColumns ; colIndex++){ + columns.add(rsmd.getColumnName(colIndex) + "#" + rsmd.getColumnType(colIndex)); + } + while (result.next()) { + Map record = new HashMap<>(); + for(int colIndex = 1 ; colIndex <= numberOfColumns ; colIndex++){ + String columnName = columns.get(colIndex).split("#")[0]; + int type = Integer.parseInt(columns.get(colIndex).split("#")[1]); + Object value; + switch (type){ + case Types.TIMESTAMP: + Instant instant = result.getTimestamp(colIndex).toInstant(); + //Rounding off as BQ supports till microseconds + value = TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(instant.getNano()); + break; + default: + //else convert all data types toString as bq converts certain data types to string to preserve precision and scale + value = result.getString(colIndex); + } + record.put(columnName,value); + } + oracleRecords.add(record); + } + statement.close(); + return oracleRecords; + } + } + + public static void deleteTables(String table, String schema) + throws SQLException, ClassNotFoundException { + try (Connection connect = getOracleConnection(); Statement statement = connect.createStatement()) { + String dropTableQuery = "DROP TABLE " + schema + "." + table; + statement.execute(dropTableQuery); + } + } +} \ No newline at end of file diff --git a/src/e2e-test/java/io.cdap.plugin/utils/ValidationHelper.java b/src/e2e-test/java/io.cdap.plugin/utils/ValidationHelper.java new file mode 100644 index 0000000..00344d9 --- /dev/null +++ b/src/e2e-test/java/io.cdap.plugin/utils/ValidationHelper.java @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2023. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.utils; + +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldValue; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.TableResult; +import io.cdap.e2e.utils.BigQueryClient; +import io.cdap.e2e.utils.ConstantsUtil; +import io.cdap.e2e.utils.PluginPropertyUtils; +import net.jodah.failsafe.Failsafe; +import net.jodah.failsafe.RetryPolicy; +import org.junit.Assert; + +import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ValidationHelper { + + public static List> getBigQueryRecordsAsMap(String projectId, String database, String tableName) + throws IOException, InterruptedException { + String query = "SELECT * EXCEPT ( _row_id, _source_timestamp, _sort) FROM `" + projectId + + "." + database + "." + tableName + "`"; + List> bqRecords = new ArrayList<>(); + //Let us account for 90 seconds flush here.And for every flush, how long does it take to reflect the changes in bg by running queries. + //handle exceptions and retry + RetryPolicy retryPolicy = new RetryPolicy<>() + .withMaxAttempts(5) + .onRetry( e -> System.out.println("Retrying exception")) + .withDelay(Duration.ofSeconds(60)); + //.withMaxDuration(Duration.of(5, ChronoUnit.MINUTES)); + TableResult results = Failsafe.with(retryPolicy).get(() -> { + //Let changes reflect in bq making sure flush cycle is included +// TimeUnit time = TimeUnit.SECONDS; +// time.sleep(180); + return BigQueryClient.getQueryResult(query); + }); + List columns = new ArrayList<>(); + for (Field field : results.getSchema().getFields()) { + columns.add(field.getName() + "#" + field.getType()); + } + for (FieldValueList row : results.getValues()) { + Map record = new HashMap<>(); + int index = 0; + for (FieldValue fieldValue : row) { + String columnName = columns.get(index).split("#")[0]; + String dataType = columns.get(index).split("#")[1]; + Object value; + if(dataType.equalsIgnoreCase("TIMESTAMP")){ + value = fieldValue.getTimestampValue(); +// } else if (dataType.toUpperCase().equals("FLOAT")) { +// value = Float.parseFloat(fieldValue.getStringValue()); + } else { + value = fieldValue.getValue(); + value = value != null ? value.toString() : null; + } + record.put(columnName, value); + index++; + } + bqRecords.add(record); + } + return bqRecords; + } + + public static void validateRecords(List> sourceOracleRecords, + List> targetBigQueryRecords) { + + String uniqueField = PluginPropertyUtils.pluginProp("primaryKey"); + //Assert.assertEquals("Checking the size of lists ", targetBigQueryRecords.size(), sourceOracleRecords.size()); //doesn't make sense in our case, since few datatypes are dropped during assessment + + // Logic to maintain the order of both lists and validate records based on that order. + Map BqUniqueIdMap = (Map)targetBigQueryRecords.stream() + .filter(t -> t.get("_is_deleted")==null) + .collect(Collectors.toMap( + t -> t.get(uniqueField), + t -> t, + (x,y) -> { + Long xSeqNum = Long.parseLong(x.get("_sequence_num").toString()); + Long ySeqNum = Long.parseLong(y.get("_sequence_num").toString()); + if(xSeqNum > ySeqNum){ + return x; + } + return y; + })); + + + for (int record = 0; record < sourceOracleRecords.size(); record++) { + Map oracleRecord = sourceOracleRecords.get(record); + Object uniqueId = oracleRecord.get(uniqueField); + //EXP : In case data type not matched, convert to string and compare +// if( !uniqueId.getClass().equals(targetBigQueryRecords.get(0).get(uniqueField).getClass()) ){ +// uniqueId = uniqueId.toString(); +// } + Map bqRow = (Map) BqUniqueIdMap.get(uniqueId); + bqRow.remove("_is_deleted"); + bqRow.remove("_sequence_num"); + + ValidationHelper.compareBothResponses( bqRow, oracleRecord); + } + } + + public static void compareBothResponses(Map targetBigQueryRecords, + Map sourceOracleRecord) { + Assert.assertNotNull("Checking if target BigQuery Record is null", targetBigQueryRecords); + Set bigQueryKeySet = targetBigQueryRecords.keySet(); + for (String field : bigQueryKeySet) { + Object bigQueryFieldValue = targetBigQueryRecords.get(field); + Object sapFieldValue = sourceOracleRecord.get(field); +// if(!bigQueryFieldValue.getClass().equals(sapFieldValue.getClass())){ +// sapFieldValue = sapFieldValue.toString(); +// bigQueryFieldValue = bigQueryFieldValue.toString(); +// } + Assert.assertEquals(String.format("Field %s is not equal: expected %s but got %s", field, + sapFieldValue, bigQueryFieldValue), + sapFieldValue, bigQueryFieldValue); + } + } + + + + + public static void waitForFlush() throws InterruptedException { + int flushInterval = Integer.parseInt(PluginPropertyUtils.pluginProp("loadInterval")); + TimeUnit time = TimeUnit.SECONDS; + time.sleep(2*flushInterval); + } + + + +} diff --git a/src/e2e-test/resources/errorMessage.properties b/src/e2e-test/resources/errorMessage.properties new file mode 100644 index 0000000..283352d --- /dev/null +++ b/src/e2e-test/resources/errorMessage.properties @@ -0,0 +1 @@ +validationSuccessMessage=No errors found \ No newline at end of file diff --git a/src/e2e-test/resources/pluginDataCyAttributes.properties b/src/e2e-test/resources/pluginDataCyAttributes.properties new file mode 100644 index 0000000..2608ac4 --- /dev/null +++ b/src/e2e-test/resources/pluginDataCyAttributes.properties @@ -0,0 +1,6 @@ +host=host +user=user +password=password +region=select-region +regionOption=option-us-west1 + diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties new file mode 100644 index 0000000..7fa82fc --- /dev/null +++ b/src/e2e-test/resources/pluginParameters.properties @@ -0,0 +1,42 @@ + +pipelineName=e2e-ds3 +projectId= + +database= +schema= +sourceTable=E2E1 +host= +port= +username= +password= + +datastream.timeout=300 +loadInterval=30 + +replication.url=http://localhost:11011/cdap/ns/default/replication/create +cdfUrl=http://localhost:11011/pipelines/ns/default/studio + +datatypeColumns= (ID number(38), LASTNAME varchar2(100), PRIMARY KEY (ID)) +datatypeColumnNames=(ID, LASTNAME) +primaryKey=ID + +datatypeValuesRow1=(1, 'Sheldon') +datatypeValuesRow2=(2, 'Shelby') + +datatypeValuesForInsertOperation=(3, 'Simpson') + +deleteRowCondition=ID=3 + +updateRowCondition=ID=1 +updatedRow=LASTNAME='Leonard' + +datatypeColumnsForSanityTesting=('USER1', 'M','ABCDEF','ABC','ABC','ä','ä½ å¥½ï¼?è¿?','ä½ å¥½ï¼?è¿?','AAAAaoAATAAABrXAAA',1234,1234.56789,\ + 1234.56789,1234.56789,1234.56789,1234.56789,1234.56789,1234.56789,1234.5679,\ + 1234.56789,1234.5679,1234.5679,1234.56789,TIMESTAMP'2023-01-01 2:00:00',TIMESTAMP'2023-01-01 2:00:00',\ + TIMESTAMP '2023-01-01 00:00:00.000000') + +datatypeValuesForSanityTesting=('USER1', 'M','ABCDEF','ABC','ABC','ä','ä½ å¥½ï¼?è¿?','ä½ å¥½ï¼?è¿?',\ + 'AAAAaoAATAAABrXAAA',1234,1234.56789,\ + 1234.56789,1234.56789,1234.56789,1234.56789,1234.56789,1234.56789,1234.5679,\ + 1234.56789,1234.5679,1234.5679,1234.56789,TIMESTAMP'2023-01-01 2:00:00',TIMESTAMP'2023-01-01 2:00:00',\ + TIMESTAMP '2023-01-01 00:00:00.000000') \ No newline at end of file