Skip to content

Commit

Permalink
[MINOR] Fix typos (#4053)
Browse files Browse the repository at this point in the history
  • Loading branch information
dongkelun committed Nov 21, 2021
1 parent 887787e commit 2533a9c
Showing 1 changed file with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ public static class Config implements Serializable {

private Connection connection;
protected final Config config;
private final ST incrementalPullSQLtemplate;
private final ST incrementalPullSQLTemplate;

public HiveIncrementalPuller(Config config) throws IOException {
this.config = config;
validateConfig(config);
String templateContent =
FileIOUtils.readAsUTFString(this.getClass().getResourceAsStream("/IncrementalPull.sqltemplate"));
incrementalPullSQLtemplate = new ST(templateContent);
incrementalPullSQLTemplate = new ST(templateContent);
}

private void validateConfig(Config config) {
Expand Down Expand Up @@ -165,19 +165,19 @@ public void saveDelta() throws IOException {
stmt.close();
}
} catch (SQLException e) {
LOG.error("Could not close the resultset opened ", e);
LOG.error("Could not close the resultSet opened ", e);
}
}
}

private void executeIncrementalSQL(String tempDbTable, String tempDbTablePath, Statement stmt)
throws FileNotFoundException, SQLException {
incrementalPullSQLtemplate.add("tempDbTable", tempDbTable);
incrementalPullSQLtemplate.add("tempDbTablePath", tempDbTablePath);
incrementalPullSQLTemplate.add("tempDbTable", tempDbTable);
incrementalPullSQLTemplate.add("tempDbTablePath", tempDbTablePath);

String storedAsClause = getStoredAsClause();

incrementalPullSQLtemplate.add("storedAsClause", storedAsClause);
incrementalPullSQLTemplate.add("storedAsClause", storedAsClause);
String incrementalSQL = new Scanner(new File(config.incrementalSQLFile)).useDelimiter("\\Z").next();
if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) {
LOG.error("Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable
Expand All @@ -194,8 +194,8 @@ private void executeIncrementalSQL(String tempDbTable, String tempDbTablePath, S
+ "means its not pulling incrementally");
}

incrementalPullSQLtemplate.add("incrementalSQL", String.format(incrementalSQL, config.fromCommitTime));
String sql = incrementalPullSQLtemplate.render();
incrementalPullSQLTemplate.add("incrementalSQL", String.format(incrementalSQL, config.fromCommitTime));
String sql = incrementalPullSQLTemplate.render();
// Check if the SQL is pulling from the right database
executeStatement(sql, stmt);
}
Expand All @@ -208,13 +208,13 @@ private void initHiveBeelineProperties(Statement stmt) throws SQLException {
LOG.info("Setting up Hive JDBC Session with properties");
// set the queue
executeStatement("set mapred.job.queue.name=" + config.yarnQueueName, stmt);
// Set the inputformat to HoodieCombineHiveInputFormat
// Set the inputFormat to HoodieCombineHiveInputFormat
executeStatement("set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat", stmt);
// Allow queries without partition predicate
executeStatement("set hive.strict.checks.large.query=false", stmt);
// Dont gather stats for the table created
// Don't gather stats for the table created
executeStatement("set hive.stats.autogather=false", stmt);
// Set the hoodie modie
// Set the hoodie mode
executeStatement("set hoodie." + config.sourceTable + ".consume.mode=INCREMENTAL", stmt);
// Set the from commit time
executeStatement("set hoodie." + config.sourceTable + ".consume.start.timestamp=" + config.fromCommitTime, stmt);
Expand Down Expand Up @@ -263,7 +263,7 @@ private String getTableLocation(String db, String table) {
resultSet.close();
}
} catch (SQLException e) {
LOG.error("Could not close the resultset opened ", e);
LOG.error("Could not close the resultSet opened ", e);
}
}
return null;
Expand Down

0 comments on commit 2533a9c

Please sign in to comment.