Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Nov 9, 2022
1 parent e321dfb commit 2432ff3
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 27 deletions.
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
Expand Down Expand Up @@ -348,6 +349,6 @@ public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config {
public String indexType = "SIMPLE";

@Parameter(names = {"--enable-metadata-on-read"}, description = "Enable's metadata for queries")
public Boolean enableMetadataOnRead = false;
public Boolean enableMetadataOnRead = HoodieMetadataConfig.ENABLE.key();
}
}
Expand Up @@ -35,31 +35,32 @@ public PrestoQueryNode(DeltaConfig.Config config) {

@Override
public void execute(ExecutionContext context, int curItrCount) throws Exception {
if (context.getHoodieTestSuiteWriter().getCfg().enablePrestoValidation) {
int validateOnceEveryItr = config.validateOnceEveryIteration();
int itrCountToExecute = config.getIterationCountToExecute();
if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount)
|| (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) {
log.info("Executing presto query node {}", this.getName());
String url = context.getHoodieTestSuiteWriter().getCfg().prestoJdbcUrl;
if (StringUtils.isNullOrEmpty(url)) {
throw new IllegalArgumentException("Presto JDBC connection url not provided. Please set --presto-jdbc-url.");
}
String user = context.getHoodieTestSuiteWriter().getCfg().prestoUsername;
String pass = context.getHoodieTestSuiteWriter().getCfg().prestoPassword;
try {
Class.forName("com.facebook.presto.jdbc.PrestoDriver");
} catch (ClassNotFoundException e) {
throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e);
}
try (Connection connection = DriverManager.getConnection(url, user, pass)) {
Statement stmt = connection.createStatement();
setSessionProperties(this.config.getPrestoProperties(), stmt);
executeAndValidateQueries(this.config.getPrestoQueries(), stmt);
stmt.close();
} catch (Exception e) {
throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e);
}
if (!context.getHoodieTestSuiteWriter().getCfg().enablePrestoValidation) {
return;
}
int validateOnceEveryItr = config.validateOnceEveryIteration();
int itrCountToExecute = config.getIterationCountToExecute();
if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount)
|| (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) {
log.info("Executing presto query node {}", this.getName());
String url = context.getHoodieTestSuiteWriter().getCfg().prestoJdbcUrl;
if (StringUtils.isNullOrEmpty(url)) {
throw new IllegalArgumentException("Presto JDBC connection url not provided. Please set --presto-jdbc-url.");
}
String user = context.getHoodieTestSuiteWriter().getCfg().prestoUsername;
String pass = context.getHoodieTestSuiteWriter().getCfg().prestoPassword;
try {
Class.forName("com.facebook.presto.jdbc.PrestoDriver");
} catch (ClassNotFoundException e) {
throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e);
}
try (Connection connection = DriverManager.getConnection(url, user, pass)) {
Statement stmt = connection.createStatement();
setSessionProperties(this.config.getPrestoProperties(), stmt);
executeAndValidateQueries(this.config.getPrestoQueries(), stmt);
stmt.close();
} catch (Exception e) {
throw new HoodieValidationException("Presto query validation failed due to " + e.getMessage(), e);
}
}
}
Expand Down
Expand Up @@ -72,7 +72,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
.option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType)
.option(HoodieIndexConfig.INDEX_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.indexType)
.option(DataSourceWriteOptions.OPERATION.key, getOperation())
.option("hoodie.index.type", context.getHoodieTestSuiteWriter.getCfg.indexType)
.option(HoodieIndexConfig.INDEX_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.indexType)
.option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.mode(SaveMode.Append)
.save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
Expand Down

0 comments on commit 2432ff3

Please sign in to comment.