diff --git a/ql/pom.xml b/ql/pom.xml
index d171633ac0fc..17ac8d80d7ff 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -25,7 +25,6 @@
Hive Query Language..
- 2.0.20.10.22.1.0
@@ -813,18 +812,6 @@
${mockito-core.version}test
-
- org.powermock
- powermock-module-junit4
- ${powermock.version}
- test
-
-
- org.powermock
- powermock-api-mockito2
- ${powermock.version}
- test
- com.google.guavaguava-testlib
@@ -858,6 +845,12 @@
org.codehaus.janinojanino
+
+ org.mockito
+ mockito-inline
+ 3.4.4
+ test
+ com.sun.jersey.contribsjersey-multipart
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
index 2c3a77851aa2..2bc9d0d6113d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
@@ -36,11 +36,13 @@
import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.dump.log.AtlasDumpLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,16 +70,23 @@ public class AtlasDumpTask extends Task implements Serializable {
private static final transient Logger LOG = LoggerFactory.getLogger(AtlasDumpTask.class);
private static final long serialVersionUID = 1L;
private transient AtlasRestClient atlasRestClient;
+ private AtlasDumpLogger replLogger;
public AtlasDumpTask() {
super();
}
@VisibleForTesting
- AtlasDumpTask(final AtlasRestClient atlasRestClient, final HiveConf conf, final AtlasDumpWork work) {
+ AtlasDumpTask(final AtlasRestClient atlasRestClient, final HiveConf conf, final AtlasDumpWork work, AtlasDumpLogger replLogger) {
this.conf = conf;
this.work = work;
this.atlasRestClient = atlasRestClient;
+ this.replLogger = replLogger;
+ }
+
+ @VisibleForTesting
+ AtlasDumpTask(final AtlasRestClient atlasRestClient, final HiveConf conf, final AtlasDumpWork work) {
+ this(atlasRestClient, conf, work, null);
}
@Override
@@ -87,8 +96,7 @@ public int execute() {
AtlasReplInfo atlasReplInfo = createAtlasReplInfo();
LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging location:",
atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
- AtlasDumpLogger replLogger = new AtlasDumpLogger(atlasReplInfo.getSrcDB(),
- atlasReplInfo.getStagingDir().toString());
+ initializeReplLogger(atlasReplInfo);
replLogger.startLog();
Map metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.ENTITIES.name(), 0L);
@@ -129,6 +137,14 @@ public int execute() {
}
}
+ @NotNull
+ private void initializeReplLogger(AtlasReplInfo atlasReplInfo) {
+ if (this.replLogger == null){
+ this.replLogger = new AtlasDumpLogger(atlasReplInfo.getSrcDB(),
+ atlasReplInfo.getStagingDir().toString());
+ }
+ }
+
private AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
String errorFormat = "%s is mandatory config for Atlas metadata replication";
//Also validates URL for endpoint.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
index a44aa435aa9b..f10ce52ef14f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
@@ -34,7 +34,7 @@
import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.load.log.AtlasLoadLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -50,7 +50,6 @@
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.Callable;
/**
* Atlas Metadata Replication Load Task.
@@ -59,14 +58,17 @@ public class AtlasLoadTask extends Task implements Serializable {
private static final long serialVersionUID = 1L;
private static final transient Logger LOG = LoggerFactory.getLogger(AtlasLoadTask.class);
+ private ReplLogger replLogger = null;
+
public AtlasLoadTask() {
super();
}
@VisibleForTesting
- AtlasLoadTask(final HiveConf conf, final AtlasLoadWork work) {
+ AtlasLoadTask(final HiveConf conf, final AtlasLoadWork work, ReplLogger replLogger) {
this.conf = conf;
this.work = work;
+ this.replLogger = replLogger;
}
@Override
@@ -79,8 +81,7 @@ public int execute() {
work.getMetricCollector().reportStageStart(getName(), metricMap);
LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from staging: {}",
atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), atlasReplInfo.getStagingDir());
- AtlasLoadLogger replLogger = new AtlasLoadLogger(atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(),
- atlasReplInfo.getStagingDir().toString());
+ initializeReplLogger(atlasReplInfo);
replLogger.startLog();
int importCount = importAtlasMetadata(atlasReplInfo);
replLogger.endLog(importCount);
@@ -113,6 +114,13 @@ public int execute() {
}
}
+ private void initializeReplLogger(AtlasReplInfo atlasReplInfo) {
+ if (this.replLogger == null){
+ this.replLogger = new AtlasLoadLogger(atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(),
+ atlasReplInfo.getStagingDir().toString());
+ }
+ }
+
AtlasReplInfo createAtlasReplInfo() throws SemanticException, MalformedURLException {
String errorFormat = "%s is mandatory config for Atlas metadata replication";
//Also validates URL for endpoint.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
index e7b403b8dbc2..153aaa0d14e4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java
@@ -33,7 +33,6 @@
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
-import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
import org.apache.hadoop.hive.ql.parse.repl.dump.log.RangerDumpLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -65,10 +64,16 @@ public RangerDumpTask() {
}
@VisibleForTesting
- RangerDumpTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerDumpWork work) {
+ RangerDumpTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerDumpWork work, ReplLogger replLogger) {
this.conf = conf;
this.work = work;
this.rangerRestClient = rangerRestClient;
+ this.replLogger = replLogger;
+ }
+
+ @VisibleForTesting
+ RangerDumpTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerDumpWork work) {
+ this(rangerRestClient, conf, work, null);
}
@Override
@@ -86,7 +91,7 @@ public int execute() {
Map metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.POLICIES.name(), 0L);
work.getMetricCollector().reportStageStart(getName(), metricMap);
- replLogger = new RangerDumpLogger(work.getDbName(), work.getCurrentDumpPath().toString());
+ initializeReplLogger(work);
replLogger.startLog();
if (rangerRestClient == null) {
rangerRestClient = getRangerRestClient();
@@ -158,6 +163,12 @@ public int execute() {
}
}
+ private void initializeReplLogger(RangerDumpWork work) {
+ if (this.replLogger == null){
+ this.replLogger = new RangerDumpLogger(work.getDbName(), work.getCurrentDumpPath().toString());
+ }
+ }
+
private RangerRestClient getRangerRestClient() {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL) || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
return new NoOpRangerRestClient();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
index d992c4984d7b..c8b50fdce3c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.hive.ql.parse.repl.load.log.RangerLoadLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,9 +68,15 @@ public RangerLoadTask() {
@VisibleForTesting
RangerLoadTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerLoadWork work) {
+ this(rangerRestClient, conf, work, null);
+ }
+
+ @VisibleForTesting
+ RangerLoadTask(final RangerRestClient rangerRestClient, final HiveConf conf, final RangerLoadWork work, ReplLogger replLogger) {
this.conf = conf;
this.work = work;
this.rangerRestClient = rangerRestClient;
+ this.replLogger = replLogger;
}
@Override
@@ -111,8 +118,7 @@ public int execute() {
rangerExportPolicyList = rangerRestClient.readRangerPoliciesFromJsonFile(new Path(work.getCurrentDumpPath(),
ReplUtils.HIVE_RANGER_POLICIES_FILE_NAME), conf);
int expectedPolicyCount = rangerExportPolicyList == null ? 0 : rangerExportPolicyList.getListSize();
- replLogger = new RangerLoadLogger(work.getSourceDbName(), work.getTargetDbName(),
- work.getCurrentDumpPath().toString(), expectedPolicyCount);
+ initializeReplLogger(expectedPolicyCount);
replLogger.startLog();
Map metricMap = new HashMap<>();
metricMap.put(ReplUtils.MetricName.POLICIES.name(), (long) expectedPolicyCount);
@@ -170,6 +176,13 @@ public int execute() {
}
}
+ private void initializeReplLogger(int expectedPolicyCount) {
+ if (this.replLogger == null){
+ this.replLogger = new RangerLoadLogger(work.getSourceDbName(), work.getTargetDbName(),
+ work.getCurrentDumpPath().toString(), expectedPolicyCount);
+ }
+ }
+
private RangerRestClient getRangerRestClient() {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL) || conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
return new NoOpRangerRestClient();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 31bf8fe2cfb7..8ee54fe8a1d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -1406,7 +1406,7 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
Table table = null;
try {
- HiveWrapper.Tuple
tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName, conf);
+ HiveWrapper.Tuple
tableTuple = createHiveWrapper(hiveDb, dbName).table(tblName, conf);
table = tableTuple != null ? tableTuple.object : null;
//disable materialized-view replication if not configured
@@ -1805,6 +1805,10 @@ void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive hiv
}
}
+ HiveWrapper createHiveWrapper(Hive hiveDb, String dbName){
+ return new HiveWrapper(hiveDb, dbName);
+ }
+
private HiveWrapper.Tuple functionTuple(String functionName, String dbName, Hive hiveDb) {
try {
HiveWrapper.Tuple tuple = new HiveWrapper(hiveDb, dbName).function(functionName);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
index d009541478c9..1c1517955265 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CreateFunctionHandler.java
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -40,6 +39,7 @@
import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
import org.apache.hadoop.hive.ql.plan.CopyWork;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.util.TimeUtil;
import java.io.IOException;
import java.util.ArrayList;
@@ -154,12 +154,20 @@ static class PrimaryToReplicaResourceFunction
private final String functionsRootDir;
private String destinationDbName;
+ private TimeUtil timeUtil;
+
PrimaryToReplicaResourceFunction(Context context, MetaData metadata,
- String destinationDbName) {
+ String destinationDbName, TimeUtil timeUtil) {
this.context = context;
this.metadata = metadata;
this.destinationDbName = destinationDbName;
this.functionsRootDir = context.hiveConf.getVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR);
+ this.timeUtil = timeUtil;
+ }
+
+ PrimaryToReplicaResourceFunction(Context context, MetaData metadata,
+ String destinationDbName) {
+ this(context, metadata, destinationDbName, new TimeUtil());
}
@Override
@@ -187,7 +195,7 @@ ResourceUri destinationResourceUri(ResourceUri resourceUri)
pathBuilder
.addDescendant(destinationDbName.toLowerCase())
.addDescendant(metadata.function.getFunctionName().toLowerCase())
- .addDescendant(String.valueOf(System.nanoTime()))
+ .addDescendant(String.valueOf(timeUtil.getNanoSeconds()))
.addDescendant(split[split.length - 1])
.build(),
new Path(functionsRootDir).getFileSystem(context.hiveConf)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
index edc8e6fdeca3..8190871352e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
@@ -137,7 +137,6 @@ public void setJobProperties(Map jobProperties) {
this.jobProperties = jobProperties;
}
- @Explain(displayName = "jobProperties", explainLevels = { Level.EXTENDED })
public Map getJobProperties() {
return jobProperties;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/datasketches/kll/KllHistogramEstimator.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/datasketches/kll/KllHistogramEstimator.java
new file mode 100644
index 000000000000..4d0777c3ff98
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/datasketches/kll/KllHistogramEstimator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.udf.datasketches.kll;
+
+import org.apache.datasketches.kll.KllFloatsSketch;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class KllHistogramEstimator {
+
+ private final KllFloatsSketch kll;
+
+ KllHistogramEstimator(int k) {
+ this.kll = new KllFloatsSketch(k);
+ }
+
+ KllHistogramEstimator(KllFloatsSketch kll) {
+ this.kll = kll;
+ }
+
+ public byte[] serialize() {
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ KllUtils.serializeKll(bos, kll);
+ final byte[] result = bos.toByteArray();
+ bos.close();
+ return result;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void addToEstimator(long v) {
+ kll.update(v);
+ }
+
+ public void addToEstimator(double d) {
+ kll.update((float) d);
+ }
+
+ public void addToEstimator(HiveDecimal decimal) {
+ kll.update(decimal.floatValue());
+ }
+
+ public void mergeEstimators(KllHistogramEstimator o) {
+ kll.merge(o.kll);
+ }
+
+ public int lengthFor(JavaDataModel model) {
+ return KllUtils.lengthFor(model, kll);
+ }
+
+ public KllFloatsSketch getSketch() {
+ return kll;
+ }
+
+ public int getK() {
+ return kll.getK();
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/datasketches/kll/KllHistogramEstimatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/datasketches/kll/KllHistogramEstimatorFactory.java
new file mode 100644
index 000000000000..72dc7d3b400c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/datasketches/kll/KllHistogramEstimatorFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.datasketches.kll;
+
+public class KllHistogramEstimatorFactory {
+
+ private KllHistogramEstimatorFactory() {
+ throw new AssertionError("Suppress default constructor for non instantiation");
+ }
+
+ /**
+ * This function deserializes the serialized KLL histogram estimator from a byte array.
+ * @param buf to deserialize
+ * @param start start index for deserialization
+ * @param len start+len is deserialized
+ * @return KLL histogram estimator
+ */
+ public static KllHistogramEstimator getKllHistogramEstimator(byte[] buf, int start, int len) {
+ return new KllHistogramEstimator(KllUtils.deserializeKll(buf, start, len));
+ }
+
+ /**
+ * This method creates an empty histogram estimator with a KLL sketch of a given k parameter.
+ * @param k the KLL parameter k for initializing the sketch
+ * @return an empty histogram estimator with a KLL sketch of a given k parameter
+ */
+ public static KllHistogramEstimator getEmptyHistogramEstimator(int k) {
+ return new KllHistogramEstimator(k);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/datasketches/kll/KllUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/datasketches/kll/KllUtils.java
new file mode 100644
index 000000000000..2d9c08b586de
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/datasketches/kll/KllUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.datasketches.kll;
+
+import org.apache.datasketches.kll.KllFloatsSketch;
+import org.apache.datasketches.memory.Memory;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * KLL serialization utilities.
+ */
+public class KllUtils {
+
+ private KllUtils() {
+ throw new AssertionError("Suppress default constructor for non instantiation");
+ }
+
+ /**
+ * KLL is serialized according to what provided by data-sketches library
+ * @param out output stream to write to
+ * @param kll KLL sketch that needs to be serialized
+ * @throws IOException if an error occurs during serialization
+ */
+ public static void serializeKll(OutputStream out, KllFloatsSketch kll) throws IOException {
+ out.write(kll.toByteArray());
+ }
+
+ /**
+ * This function deserializes the serialized KLL sketch from a stream.
+ * @param in input stream to be deserialized
+ * @return KLL sketch
+ * @throws IOException if errors occur while reading the stream
+ */
+ public static KllFloatsSketch deserializeKll(InputStream in) throws IOException {
+ final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ final byte[] data = new byte[4];
+ int nRead;
+
+ while ((nRead = in.read(data, 0, data.length)) != -1) {
+ buffer.write(data, 0, nRead);
+ }
+
+ buffer.flush();
+ return KllFloatsSketch.heapify(Memory.wrap(buffer.toByteArray()));
+ }
+
+ /**
+ * This function deserializes the serialized KLL sketch from a byte array.
+ * @param buf to deserialize
+ * @param start start index for deserialization
+ * @param len start+len is deserialized
+ * @return KLL sketch
+ */
+ public static KllFloatsSketch deserializeKll(byte[] buf, int start, int len) {
+ InputStream is = new ByteArrayInputStream(buf, start, len);
+ try {
+ KllFloatsSketch result = deserializeKll(is);
+ is.close();
+ return result;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Returns the length of the given KLL sketch according to the given java data model.
+ * @param model the java data model to compute the length
+ * @param kll the KLL sketch to compute the length for
+ * @return the length of the given KLL sketch according to the given java data model
+ */
+ public static int lengthFor(JavaDataModel model, KllFloatsSketch kll) {
+ return model == null ? KllFloatsSketch.getMaxSerializedSizeBytes(kll.getK(), kll.getN())
+ : (int) model.lengthForByteArrayOfSize(kll.getSerializedSizeBytes());
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/TimeUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/util/TimeUtil.java
new file mode 100644
index 000000000000..ff1a372d5d17
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/util/TimeUtil.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+public class TimeUtil {
+ public long getNanoSeconds(){
+ return System.nanoTime();
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 566b12514278..4e6f9934162d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -101,7 +101,8 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
-import static org.powermock.api.mockito.PowerMockito.when;
+import static org.mockito.Mockito.when;
+
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
@@ -1452,8 +1453,8 @@ private void execDDLOpAndCompactionConcurrently(String opType, boolean isPartion
runStatementOnDriver("INSERT INTO " + tblName +
(isPartioned ? " PARTITION (p='" + partName + "')" : "") +
" VALUES (1, 'foo'),(2, 'bar'),(3, 'baz')");
- runStatementOnDriver("UPDATE " + tblName + " SET b = 'blah' WHERE a = 3");
+ runStatementOnDriver("UPDATE " + tblName + " SET b = 'blah' WHERE a = 3");
//run Worker to execute compaction
CompactionRequest req = new CompactionRequest("default", tblName, CompactionType.MAJOR);
if (isPartioned) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
index 7f3263bd4e63..8d23406ef8b3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestAtlasDumpTask.java
@@ -36,7 +36,7 @@
import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientImpl;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.repl.ReplState;
+import org.apache.hadoop.hive.ql.parse.repl.dump.log.AtlasDumpLogger;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Assert;
@@ -44,13 +44,9 @@
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.mockito.junit.MockitoJUnitRunner;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;
@@ -64,16 +60,17 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.powermock.api.mockito.PowerMockito.when;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Unit test class for testing Atlas metadata Dump.
*/
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({LoggerFactory.class, UserGroupInformation.class, ConfigurationConverter.class})
-
+@RunWith(MockitoJUnitRunner.class)
public class TestAtlasDumpTask {
@Mock
@@ -92,48 +89,39 @@ public class TestAtlasDumpTask {
@Test
public void testAtlasDumpMetrics() throws Exception {
- Mockito.when(work.getMetricCollector()).thenReturn(metricCollector);
- Mockito.when(conf.get(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname)).thenReturn("http://localhost:21000/atlas");
- Mockito.when(conf.get(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname)).thenReturn("tgtDb");
- Mockito.when(conf.get(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname)).thenReturn("srcCluster");
- Mockito.when(conf.get(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname)).thenReturn("tgtCluster");
- Mockito.when(conf.get(ReplUtils.DEFAULT_FS_CONFIG)).thenReturn("hdfs:tgtFsUri:8020");
- Mockito.when(work.getStagingDir()).thenReturn(new Path("hdfs://tmp:8020/staging"));
- Mockito.when(work.getSrcDB()).thenReturn("srcDB");
- Mockito.when(work.isBootstrap()).thenReturn(true);
- atlasDumpTask = new AtlasDumpTask(atlasRestClient, conf, work);
+ AtlasDumpLogger logger = mock(AtlasDumpLogger.class);
+ when(work.getMetricCollector()).thenReturn(metricCollector);
+ when(conf.get(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname)).thenReturn("http://localhost:21000/atlas");
+ when(conf.get(HiveConf.ConfVars.REPL_ATLAS_REPLICATED_TO_DB.varname)).thenReturn("tgtDb");
+ when(conf.get(HiveConf.ConfVars.REPL_SOURCE_CLUSTER_NAME.varname)).thenReturn("srcCluster");
+ when(conf.get(HiveConf.ConfVars.REPL_TARGET_CLUSTER_NAME.varname)).thenReturn("tgtCluster");
+ when(conf.get(ReplUtils.DEFAULT_FS_CONFIG)).thenReturn("hdfs:tgtFsUri:8020");
+ when(work.getStagingDir()).thenReturn(new Path("hdfs://tmp:8020/staging"));
+ when(work.getSrcDB()).thenReturn("srcDB");
+ when(work.isBootstrap()).thenReturn(true);
+ atlasDumpTask = new AtlasDumpTask(atlasRestClient, conf, work, logger);
AtlasDumpTask atlasDumpTaskSpy = Mockito.spy(atlasDumpTask);
- Mockito.when(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)).thenReturn(true);
- Logger logger = Mockito.mock(Logger.class);
- Whitebox.setInternalState(ReplState.class, logger);
+ when(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)).thenReturn(true);
Mockito.doReturn(0L).when(atlasDumpTaskSpy)
.dumpAtlasMetaData(Mockito.any(AtlasRequestBuilder.class), Mockito.any(AtlasReplInfo.class));
Mockito.doNothing().when(atlasDumpTaskSpy).createDumpMetadata(Mockito.any(AtlasReplInfo.class),
Mockito.any(Long.class));
int status = atlasDumpTaskSpy.execute();
+
Assert.assertEquals(0, status);
- ArgumentCaptor replStateCaptor = ArgumentCaptor.forClass(String.class);
- ArgumentCaptor