Skip to content

Commit

Permalink
resolved comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yjhawar committed Apr 11, 2023
1 parent 700cf89 commit 28bcdf6
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/e2e-test/features/Pipeline.feature
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#

Feature: Oracle - Verify Oracle source data transfer to Big Query
@ORACLE_SOURCE @BIGQUERY_TARGET
@ENV_VARIABLES #@ORACLE_SOURCE @ORACLE_DELETE @BIGQUERY_DELETE
Scenario: To verify replication of snapshot and cdc data from Oracle to Big Query successfully with Sanity test
Given Open DataFusion Project with replication to configure pipeline
When Enter input plugin property: "name" with value: "pipelineName"
Expand Down
11 changes: 6 additions & 5 deletions src/e2e-test/java/io.cdap.plugin/actions/ReplicationActions.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.cdap.e2e.pages.locators.CdfPipelineRunLocators;
import io.cdap.e2e.utils.*;
import io.cdap.plugin.locators.ReplicationLocators;
import io.cdap.plugin.utils.BigQuery;
import io.cdap.plugin.utils.OracleClient;
import io.cdap.plugin.utils.ValidationHelper;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -59,7 +60,7 @@ public static void clickOnOraclePlugin() {

public static void selectTable() {
String table = schemaName + "." + tableName;
WaitHelper.waitForElementToBeDisplayed(ReplicationLocators.selectTable(table));
WaitHelper.waitForElementToBeDisplayed(ReplicationLocators.selectTable(table),300);
AssertionHelper.verifyElementDisplayed(ReplicationLocators.selectTable(table));
ElementHelper.clickOnElement(ReplicationLocators.selectTable(table));
}
Expand Down Expand Up @@ -113,7 +114,7 @@ public static void waitTillPipelineIsRunningAndCheckForErrors() throws Interrupt
int defaultTimeout = Integer.parseInt(PluginPropertyUtils.pluginProp("pipeline-initialization"));
TimeUnit time = TimeUnit.SECONDS;
time.sleep(defaultTimeout);
ValidationHelper.waitForFlush();
BigQuery.waitForFlush();
// Checking if an error message is displayed.
Assert.assertFalse(ElementHelper.isElementDisplayed(ReplicationLocators.error));
}
Expand All @@ -140,18 +141,18 @@ public static void insertRecordAndWait()
throws InterruptedException, SQLException, ClassNotFoundException {
OracleClient.insertRow(tableName, schemaName, datatypeValues);
OracleClient.forceFlushCDC();
ValidationHelper.waitForFlush();
BigQuery.waitForFlush();
}

public static void deleteRecordAndWait() throws SQLException, ClassNotFoundException, InterruptedException {
OracleClient.deleteRow(tableName, schemaName, deleteCondition);
OracleClient.forceFlushCDC();
ValidationHelper.waitForFlush();
BigQuery.waitForFlush();
}

public static void updateRecordAndWait() throws SQLException, ClassNotFoundException, InterruptedException {
OracleClient.updateRow(tableName, schemaName, updateCondition, updatedValue );
OracleClient.forceFlushCDC();
ValidationHelper.waitForFlush();
BigQuery.waitForFlush();
}
}
7 changes: 4 additions & 3 deletions src/e2e-test/java/io.cdap.plugin/hooks/TestSetUpHooks.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.cloud.bigquery.BigQueryException;
import io.cdap.e2e.utils.BigQueryClient;
import io.cdap.e2e.utils.PluginPropertyUtils;
import io.cdap.plugin.utils.BigQuery;
import io.cdap.plugin.utils.OracleClient;
import io.cucumber.java.After;
import io.cucumber.java.Before;
Expand All @@ -43,7 +44,7 @@ public class TestSetUpHooks {
public static String row1 = PluginPropertyUtils.pluginProp("datatypeValuesRow1");
public static String row2= PluginPropertyUtils.pluginProp("datatypeValuesRow2");

@Before(order = 1, value = "@ORACLE_SOURCE")
@Before(order = 1, value = "@ENV_VARIABLES")
public static void overridePropertiesFromEnvVarsIfProvided() {
String projectId = System.getenv("PROJECT_ID");
if (projectId != null && !projectId.isEmpty()) {
Expand Down Expand Up @@ -89,12 +90,12 @@ public static void getOracleRecordsAsMap() throws SQLException, ClassNotFoundExc
BeforeActions.scenario.write("Expected Oracle records : " + sourceOracleRecords);
}

@After(order = 1, value = "@ORACLE_SOURCE")
@After(order = 1, value = "@ORACLE_DELETE")
public static void dropTables() throws SQLException, ClassNotFoundException {
OracleClient.deleteTables(schemaName, tableName);
}

@After(order = 1, value = "@BIGQUERY_TARGET")
@After(order = 1, value = "@BIGQUERY_DELETE")
public static void deleteTempTargetBQTable() throws IOException, InterruptedException {
try {
BigQueryClient.dropBqQuery(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void selectSourceAsOracle() {
}

@Then("Validate Source table is available and select it")
public void selectTable1() {
public void selectTable() {
ReplicationActions.selectTable();
}
@Then("Deploy the replication pipeline")
Expand Down
47 changes: 47 additions & 0 deletions src/e2e-test/java/io.cdap.plugin/utils/BigQuery.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2023.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.utils;

import com.google.cloud.bigquery.BigQueryException;
import io.cdap.e2e.utils.BigQueryClient;
import io.cdap.e2e.utils.PluginPropertyUtils;
import org.junit.Assert;
import stepsdesign.BeforeActions;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class BigQuery {
public static void waitForFlush() throws InterruptedException {
int flushInterval = Integer.parseInt(PluginPropertyUtils.pluginProp("loadInterval"));
TimeUnit time = TimeUnit.SECONDS;
time.sleep(2 * flushInterval + 60);
}

public static void deleteTable(String tableName) throws IOException, InterruptedException{
try {
BigQueryClient.dropBqQuery(tableName);
BeforeActions.scenario.write("BQ Target table - " + tableName + " deleted successfully");
} catch (BigQueryException e) {
if (e.getMessage().contains("Not found: Table")) {
BeforeActions.scenario.write("BQ Target Table does not exist");
} else {
Assert.fail(e.getMessage());
}
}
}
}
11 changes: 2 additions & 9 deletions src/e2e-test/java/io.cdap.plugin/utils/ValidationHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static void validateRecords(List<Map<String, Object>> sourceOracleRecords

String uniqueField = PluginPropertyUtils.pluginProp("primaryKey");
// Logic to maintain the order of both lists and validate records based on that order.
Map BqUniqueIdMap = (Map)targetBigQueryRecords.stream()
Map bqUniqueIdMap = (Map)targetBigQueryRecords.stream()
.filter(t -> t.get("_is_deleted")==null)
.collect(Collectors.toMap(
t -> t.get(uniqueField),
Expand All @@ -84,11 +84,10 @@ public static void validateRecords(List<Map<String, Object>> sourceOracleRecords
for (int record = 0; record < sourceOracleRecords.size(); record++) {
Map<String, Object> oracleRecord = sourceOracleRecords.get(record);
Object uniqueId = oracleRecord.get(uniqueField);
Map<String, Object> bqRow = (Map<String, Object>) BqUniqueIdMap.get(uniqueId);
Map<String, Object> bqRow = (Map<String, Object>) bqUniqueIdMap.get(uniqueId);
if (bqRow != null) {
bqRow.remove("_is_deleted");
bqRow.remove("_sequence_num");
bqRow.forEach((key, value) -> System.out.println("Bq record" + key + ":" + value));
}
compareRecords(bqRow, oracleRecord);
}
Expand All @@ -106,10 +105,4 @@ public static void compareRecords(Map<String, Object> targetBigQueryRecords,
}
}

public static void waitForFlush() throws InterruptedException {
int flushInterval = Integer.parseInt(PluginPropertyUtils.pluginProp("loadInterval"));
TimeUnit time = TimeUnit.SECONDS;
time.sleep(2 * flushInterval);
}

}
6 changes: 3 additions & 3 deletions src/e2e-test/resources/pluginParameters.properties
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@


projectId=PROJECT_ID
dataset=XE
schema=ABC
dataset=ORCL
schema=HR
sourceTable=SOURCE_TABLE
host=ORACLE_HOST
port=ORACLE_PORT
username=ORACLE_USERNAME
password=ORACLE_PASSWORD

pipelineName=e2e-sanity
pipelineName=e2e-sanity-existing-table
pipeline-initialization=200
loadInterval=30

Expand Down

0 comments on commit 28bcdf6

Please sign in to comment.