Skip to content

Commit

Permalink
Start to refactor
Browse files Browse the repository at this point in the history
Comment out classes to work on to be able to build the module

Refator TestCopyUtils.java

Before refactor: TestReplDumpTask

Refactor TestReplDumpTask.java

Before refactor: TestAtlasLoadTask

Refactor TestAtlasLoadTask.java

Refator TestRangerDumpTask.java

Before refactor: TestCompactionHeartbeatService

Refactor TestCompactionHeartbeatService.java

Before refactor: TestRetryable

Refactor TestRetryable.java

Before refactor: TestRangerLoadTask

Refactor: RangerLoadTask

Before refactor: TestAtlasDumpTask

Refactor: AtlasDumpTask

Before refactor: TestPrimaryToReplicaResourceFunction

Refactor TestPrimaryToReplicaResourceFunction

Before refactor: TestExportService

Refactor: TestExportService

HIVE-26522: Added test for HIVE-22033 regarding delegation token renewal (apache#3585)

HIVE-26676: Count distinct in subquery returning wrong results (Steve Carlin, reviewed by Alessandro Solimando, Aman Sinha, Krisztian Kasa)

HIVE-26736: Authorization failure for nested Views having WITH clause. (apache#3760). (Ayush Saxena, reviewed by Denys Kuzmenko)

HIVE-26628: Iceberg table is created when running explain ctas command (Krisztian Kasa, reviewed by Denys Kuzmenko)

HIVE-26734: Iceberg: Add an option to allow positional delete files without actual row data. (apache#3758). (Ayush Saxena, reviewed by Adam Szita, Denys Kuzmenko)

HIVE-26524: Use Calcite to remove sections of a query plan known never produces rows - ADDENDUM (Krisztian Kasa, reviewed by Stamatis Zampetakis)

HIVE-26740: HS2 makes direct connections to HMS backend DB due to Compaction/StatsUpdater (apache#3765) (Adam Szita, reviewed by Zhihua Deng)

HIVE-26631: Remove unused Thrift config parameters login.timeout and exponential.backoff.slot.length (xiuzhu9527 reviewed by Stamatis Zampetakis)

Closes apache#3672

HIVE-26747: Remove implementor from HiveRelNode (Krisztian Kasa, reviewed by Stamatis Zampetakis)

HIVE-26747: Remove implementor from HiveRelNode (Krisztian Kasa, reviewed by Stamatis Zampetakis) ADDENDUM

HIVE-26745: HPL unable to handle Decimal or null values in hplsql mode (apache#3769) (Adam Szita, reviewed by Attila Magyar and Denys Kuzmenko)

HIVE-26722: HiveFilterSetOpTransposeRule incorrectly prunes UNION ALL operands.  (apache#3748). (Alessandro Solimando, reviewed by Ayush Saxena, Simhadri Govindappa)

HIVE-26746: Request tracking: change to X-Request-ID header (apache#3770) (Laszlo Bodor reviewed by Zhihua Deng)

HIVE-26624: Set repl.background.enable on target after failover completion (Vinit Patni, reviewed by László Pintér, Teddy Choi)

Co-authored-by: vpatni <vpatni@cloudera.com>

HIVE-26712: HCatMapReduceTest writes test files in project base directory instead of build directory. (apache#3738) (Chris Nauroth reviewed by Ayush Saxena)

HIVE-26726: Tinyint column with windowing fn crashes at runtime (Steve Carlin, reviewed by Aman Sinha, Krisztian Kasa)

HIVE-26680: Make CMV use Direct Insert Semantics (Sourabh Badhya, reviewed by Denys Kuzmenko, Laszlo Vegh)

Closes apache#3715

HIVE-26243: Add vectorized implementation of the 'ds_kll_sketch' UDAF (Alessandro Solimando, reviewed by Denys Kuzmenko, Zoltan Haindrich)

Closes apache#3317

HIVE-26761: Add result sorting to complex_alias.q (apache#3783) (Balazs Cseh reviewed by Laszlo Bodor)

HIVE-26759: Update SHOW COMPACTIONS query to support Postgres HMS (Akshat Mathur, reviewed by Denys Kuzmenko, Zsolt Miskolczi)

Closes 3782

HIVE-26765: Hive Ranger URL policy for insert overwrite directory denies access when fully qualified paths are passed (apache#3790) (Simhadri Govindappa, reviewed by Adam Szita)

Small refactors

Fix bug
  • Loading branch information
zsmiskolczi committed Sep 11, 2023
1 parent e49fe08 commit b9b2895
Show file tree
Hide file tree
Showing 28 changed files with 736 additions and 459 deletions.
19 changes: 6 additions & 13 deletions ql/pom.xml
Expand Up @@ -25,7 +25,6 @@
<name>Hive Query Language</name>
<properties>
<hive.path.to.root>..</hive.path.to.root>
<powermock.version>2.0.2</powermock.version>
<reflections.version>0.10.2</reflections.version>
<atlas.version>2.1.0</atlas.version>
</properties>
Expand Down Expand Up @@ -813,18 +812,6 @@
<version>${mockito-core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
Expand Down Expand Up @@ -858,6 +845,12 @@
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>3.4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-multipart</artifactId>
Expand Down
22 changes: 19 additions & 3 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
Expand Up @@ -36,11 +36,13 @@
import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.dump.log.AtlasDumpLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -68,16 +70,23 @@ public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable {
private static final transient Logger LOG = LoggerFactory.getLogger(AtlasDumpTask.class);
private static final long serialVersionUID = 1L;
private transient AtlasRestClient atlasRestClient;
private AtlasDumpLogger replLogger;

public AtlasDumpTask() {
super();
}

@VisibleForTesting
AtlasDumpTask(final AtlasRestClient atlasRestClient, final HiveConf conf, final AtlasDumpWork work) {
AtlasDumpTask(final AtlasRestClient atlasRestClient, final HiveConf conf, final AtlasDumpWork work, AtlasDumpLogger replLogger) {
this.conf = conf;
this.work = work;
this.atlasRestClient = atlasRestClient;
this.replLogger = replLogger;
}

@VisibleForTesting
AtlasDumpTask(final AtlasRestClient atlasRestClient, final HiveConf conf, final AtlasDumpWork work) {
this(atlasRestClient, conf, work, null);
}

@Override
Expand All @@ -87,8 +96,7 @@ public int execute() {
AtlasReplInfo atlasReplInfo = createAtlasReplInfo();
LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
AtlasDumpLogger replLogger = new AtlasDumpLogger(atlasReplInfo.getSrcDB(),
atlasReplInfo.getStagingDir().toString());
initializeReplLogger(atlasReplInfo);
replLogger.startLog();
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.ENTITIES.name(), 0L);
Expand Down Expand Up @@ -129,6 +137,14 @@ public int execute() {
}
}

@NotNull
private void initializeReplLogger(AtlasReplInfo atlasReplInfo) {
if (this.replLogger == null){
this.replLogger = new AtlasDumpLogger(atlasReplInfo.getSrcDB(),
atlasReplInfo.getStagingDir().toString());
}
}

private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
String errorFormat = "%s is mandatory config for Atlas metadata replication";
//Also validates URL for endpoint.
Expand Down
18 changes: 13 additions & 5 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
Expand Up @@ -34,7 +34,7 @@
import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.load.log.AtlasLoadLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.api.StageType;
Expand All @@ -50,7 +50,6 @@
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;

/**
* Atlas Metadata Replication Load Task.
Expand All @@ -59,14 +58,17 @@ public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable {
private static final long serialVersionUID = 1L;
private static final transient Logger LOG = LoggerFactory.getLogger(AtlasLoadTask.class);

private ReplLogger replLogger = null;

public AtlasLoadTask() {
super();
}

@VisibleForTesting
AtlasLoadTask(final HiveConf conf, final AtlasLoadWork work) {
AtlasLoadTask(final HiveConf conf, final AtlasLoadWork work, ReplLogger replLogger) {
this.conf = conf;
this.work = work;
this.replLogger = replLogger;
}

@Override
Expand All @@ -79,8 +81,7 @@ public int execute() {
work.getMetricCollector().reportStageStart(getName(), metricMap);
LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from staging: {}",
atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
AtlasLoadLogger replLogger = new AtlasLoadLogger(atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(),
atlasReplInfo.getStagingDir().toString());
initializeReplLogger(atlasReplInfo);
replLogger.startLog();
int importCount = importAtlasMetadata(atlasReplInfo);
replLogger.endLog(importCount);
Expand Down Expand Up @@ -113,6 +114,13 @@ public int execute() {
}
}

private void initializeReplLogger(AtlasReplInfo atlasReplInfo) {
if (this.replLogger == null){
this.replLogger = new AtlasLoadLogger(atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(),
atlasReplInfo.getStagingDir().toString());
}
}

AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
String errorFormat = "%s is mandatory config for Atlas metadata replication";
//Also validates URL for endpoint.
Expand Down
Expand Up @@ -33,7 +33,6 @@
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.dump.log.RangerDumpLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.api.StageType;
Expand Down Expand Up @@ -65,10 +64,16 @@ public RangerDumpTask() {
}

@VisibleForTesting
RangerDumpTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerDumpWork work) {
RangerDumpTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerDumpWork work, ReplLogger replLogger) {
this.conf = conf;
this.work = work;
this.rangerRestClient = rangerRestClient;
this.replLogger = replLogger;
}

@VisibleForTesting
RangerDumpTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerDumpWork work) {
this(rangerRestClient, conf, work, null);
}

@Override
Expand All @@ -86,7 +91,7 @@ public int execute() {
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.POLICIES.name(), 0L);
work.getMetricCollector().reportStageStart(getName(), metricMap);
replLogger = new RangerDumpLogger(work.getDbName(), work.getCurrentDumpPath().toString());
initializeReplLogger(work);
replLogger.startLog();
if (rangerRestClient == null) {
rangerRestClient = getRangerRestClient();
Expand Down Expand Up @@ -158,6 +163,12 @@ public int execute() {
}
}

private void initializeReplLogger(RangerDumpWork work) {
if (this.replLogger == null){
this.replLogger = new RangerDumpLogger(work.getDbName(), work.getCurrentDumpPath().toString());
}
}

private RangerRestClient getRangerRestClient() {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL) || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
return new NoOpRangerRestClient();
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hive.ql.parse.repl.load.log.RangerLoadLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,9 +68,15 @@ public RangerLoadTask() {

@VisibleForTesting
RangerLoadTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerLoadWork work) {
this(rangerRestClient, conf, work, null);
}

@VisibleForTesting
RangerLoadTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerLoadWork work, ReplLogger replLogger) {
this.conf = conf;
this.work = work;
this.rangerRestClient = rangerRestClient;
this.replLogger = replLogger;
}

@Override
Expand Down Expand Up @@ -111,8 +118,7 @@ public int execute() {
rangerExportPolicyList = rangerRestClient.readRangerPoliciesFromJsonFile(new Path(work.getCurrentDumpPath(),
ReplUtils.HIVE_RANGER_POLICIES_FILE_NAME), conf);
int expectedPolicyCount = rangerExportPolicyList == null ? 0 : rangerExportPolicyList.getListSize();
replLogger = new RangerLoadLogger(work.getSourceDbName(), work.getTargetDbName(),
work.getCurrentDumpPath().toString(), expectedPolicyCount);
initializeReplLogger(expectedPolicyCount);
replLogger.startLog();
Map<String, Long> metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.POLICIES.name(), (long) expectedPolicyCount);
Expand Down Expand Up @@ -170,6 +176,13 @@ public int execute() {
}
}

private void initializeReplLogger(int expectedPolicyCount) {
if (this.replLogger == null){
this.replLogger = new RangerLoadLogger(work.getSourceDbName(), work.getTargetDbName(),
work.getCurrentDumpPath().toString(), expectedPolicyCount);
}
}

private RangerRestClient getRangerRestClient() {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL) || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
return new NoOpRangerRestClient();
Expand Down
Expand Up @@ -1406,7 +1406,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
Table table = null;
try {
HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName, conf);
HiveWrapper.Tuple<Table> tableTuple = createHiveWrapper(hiveDb, dbName).table(tblName, conf);
table = tableTuple != null ? tableTuple.object : null;

//disable materialized-view replication if not configured
Expand Down Expand Up @@ -1805,6 +1805,10 @@ void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiv
}
}

HiveWrapper createHiveWrapper(Hive hiveDb, String dbName){
return new HiveWrapper(hiveDb, dbName);
}

private HiveWrapper.Tuple<Function> functionTuple(String functionName, String dbName, Hive hiveDb) {
try {
HiveWrapper.Tuple<Function> tuple = new HiveWrapper(hiveDb, dbName).function(functionName);
Expand Down
@@ -1,4 +1,3 @@

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -40,6 +39,7 @@
import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
import org.apache.hadoop.hive.ql.plan.CopyWork;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.apache.hadoop.hive.ql.util.TimeUtil;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -154,12 +154,20 @@ static class PrimaryToReplicaResourceFunction
private final String functionsRootDir;
private String destinationDbName;

private TimeUtil timeUtil;

PrimaryToReplicaResourceFunction(Context context, MetaData metadata,
String destinationDbName) {
String destinationDbName, TimeUtil timeUtil) {
this.context = context;
this.metadata = metadata;
this.destinationDbName = destinationDbName;
this.functionsRootDir = context.hiveConf.getVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR);
this.timeUtil = timeUtil;
}

PrimaryToReplicaResourceFunction(Context context, MetaData metadata,
String destinationDbName) {
this(context, metadata, destinationDbName, new TimeUtil());
}

@Override
Expand Down Expand Up @@ -187,7 +195,7 @@ ResourceUri destinationResourceUri(ResourceUri resourceUri)
pathBuilder
.addDescendant(destinationDbName.toLowerCase())
.addDescendant(metadata.function.getFunctionName().toLowerCase())
.addDescendant(String.valueOf(System.nanoTime()))
.addDescendant(String.valueOf(timeUtil.getNanoSeconds()))
.addDescendant(split[split.length - 1])
.build(),
new Path(functionsRootDir).getFileSystem(context.hiveConf)
Expand Down
1 change: 0 additions & 1 deletion ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
Expand Up @@ -137,7 +137,6 @@ public void setJobProperties(Map<String, String> jobProperties) {
this.jobProperties = jobProperties;
}

@Explain(displayName = "jobProperties", explainLevels = { Level.EXTENDED })
public Map<String, String> getJobProperties() {
return jobProperties;
}
Expand Down

0 comments on commit b9b2895

Please sign in to comment.