diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 018d0546213..3007d3ba03a 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -397,6 +397,14 @@
org.apache.hbase
hbase-common
+
+ org.apache.hbase
+ hbase-metrics-api
+
+
+ org.apache.hbase
+ hbase-metrics
+
org.apache.hbase
hbase-common
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableNonTxStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableNonTxStatsCollectorIT.java
deleted file mode 100644
index 0481ba54a8f..00000000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableNonTxStatsCollectorIT.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import org.apache.phoenix.schema.stats.StatsCollectorIT;
-import org.junit.runners.Parameterized.Parameters;
-
-public class ColumnEncodedImmutableNonTxStatsCollectorIT extends StatsCollectorIT {
-
- public ColumnEncodedImmutableNonTxStatsCollectorIT(boolean mutable, String transactionProvider,
- boolean userTableNamespaceMapped, boolean columnEncoded) {
- super(mutable, transactionProvider, userTableNamespaceMapped, columnEncoded);
- }
-
- @Parameters(name = "mutable={0},transactionProvider={1},isUserTableNamespaceMapped={2},columnEncoded={3}")
- public static Collection data() {
- return Arrays.asList(new Object[][] {
- { false, null, false, true }, { false, null, true, true }
- });
- }
-}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableTxStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableTxStatsCollectorIT.java
deleted file mode 100644
index 073dc07dc37..00000000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedImmutableTxStatsCollectorIT.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import org.apache.phoenix.schema.stats.StatsCollectorIT;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.runners.Parameterized.Parameters;
-
-public class ColumnEncodedImmutableTxStatsCollectorIT extends StatsCollectorIT {
-
- public ColumnEncodedImmutableTxStatsCollectorIT(boolean mutable, String transactionProvider,
- boolean userTableNamespaceMapped, boolean columnEncoded) {
- super(mutable, transactionProvider, userTableNamespaceMapped, columnEncoded);
- }
-
- @Parameters(name = "mutable={0},transactionProvider={1},isUserTableNamespaceMapped={2},columnEncoded={3}")
- public static Collection data() {
- return TestUtil.filterTxParamData(
- Arrays.asList(
- new Object[][] {
- { false, "TEPHRA", false, true },
- { false, "TEPHRA", true, true }, }), 1);
- }
-}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableTxStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableTxStatsCollectorIT.java
deleted file mode 100644
index 29e000ef878..00000000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableTxStatsCollectorIT.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import org.apache.phoenix.schema.stats.StatsCollectorIT;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.runners.Parameterized.Parameters;
-
-public class ColumnEncodedMutableTxStatsCollectorIT extends StatsCollectorIT {
-
- public ColumnEncodedMutableTxStatsCollectorIT(boolean mutable, String transactionProvider,
- boolean userTableNamespaceMapped, boolean columnEncoded) {
- super(mutable, transactionProvider, userTableNamespaceMapped, columnEncoded);
- }
-
- @Parameters(name = "mutable={0},transactionProvider={1},isUserTableNamespaceMapped={2},columnEncoded={3}")
- public static Collection data() {
- return TestUtil.filterTxParamData(
- Arrays.asList(new Object[][] {
- { true, "TEPHRA", false, true },
- { true, "TEPHRA", true, true } }), 1);
- }
-}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableNonTxStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceDisabledStatsCollectorIT.java
similarity index 53%
rename from phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableNonTxStatsCollectorIT.java
rename to phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceDisabledStatsCollectorIT.java
index 248f6fbf666..525837a47a4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ColumnEncodedMutableNonTxStatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceDisabledStatsCollectorIT.java
@@ -15,24 +15,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.phoenix.end2end;
+import org.apache.phoenix.schema.stats.BaseStatsCollectorIT;
+import org.junit.runners.Parameterized;
+
import java.util.Arrays;
import java.util.Collection;
-import org.apache.phoenix.schema.stats.StatsCollectorIT;
-import org.junit.runners.Parameterized.Parameters;
+public class NamespaceDisabledStatsCollectorIT extends BaseStatsCollectorIT {
-public class ColumnEncodedMutableNonTxStatsCollectorIT extends StatsCollectorIT {
-
- public ColumnEncodedMutableNonTxStatsCollectorIT(boolean mutable, String transactionProvider,
- boolean userTableNamespaceMapped, boolean columnEncoded) {
- super(mutable, transactionProvider, userTableNamespaceMapped, columnEncoded);
+ public NamespaceDisabledStatsCollectorIT(boolean userTableNamespaceMapped, boolean collectStatsOnSnapshot) {
+ super(userTableNamespaceMapped, collectStatsOnSnapshot);
}
- @Parameters(name = "mutable={0},transactionProvider={1},isUserTableNamespaceMapped={2},columnEncoded={3}")
- public static Collection data() {
+ @Parameterized.Parameters(name = "userTableNamespaceMapped={0},collectStatsOnSnapshot={1}")
+ public static Collection provideData() {
return Arrays.asList(
- new Object[][] { { true, null, false, true }, { true, null, true, true } });
+ new Object[][] {
+ // Collect stats on snapshots using UpdateStatisticsTool
+ { false, true },
+ // Collect stats via `UPDATE STATISTICS` SQL
+ { false, false }
+ }
+ );
}
+
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SysTableNamespaceMappedStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceEnabledStatsCollectorIT.java
similarity index 61%
rename from phoenix-core/src/it/java/org/apache/phoenix/end2end/SysTableNamespaceMappedStatsCollectorIT.java
rename to phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceEnabledStatsCollectorIT.java
index 8f0a1d8fd3e..7baacf263e3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SysTableNamespaceMappedStatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceEnabledStatsCollectorIT.java
@@ -17,48 +17,46 @@
*/
package org.apache.phoenix.end2end;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-
+import com.google.common.collect.Maps;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.stats.StatsCollectorIT;
+import org.apache.phoenix.schema.stats.BaseStatsCollectorIT;
import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Parameterized;
-import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
-public class SysTableNamespaceMappedStatsCollectorIT extends StatsCollectorIT {
+public class NamespaceEnabledStatsCollectorIT extends BaseStatsCollectorIT {
- public SysTableNamespaceMappedStatsCollectorIT(boolean mutable, String transactionProvider,
- boolean userTableNamespaceMapped, boolean columnEncoded) {
- super(mutable, transactionProvider, userTableNamespaceMapped, columnEncoded);
+ public NamespaceEnabledStatsCollectorIT(boolean userTableNamespaceMapped, boolean collectStatsOnSnapshot) {
+ super(userTableNamespaceMapped, collectStatsOnSnapshot);
}
- @Parameters(name = "mutable={0},transactionProvider={1},isUserTableNamespaceMapped={2},columnEncoded={3}")
- public static Collection data() {
- return TestUtil.filterTxParamData(Arrays.asList(
- new Object[][] {
- { true, "TEPHRA", false, false }, { true, "TEPHRA", false, true },
- { true, "OMID", false, false },
- }), 1);
+ @Parameterized.Parameters(name = "userTableNamespaceMapped={0},collectStatsOnSnapshot={1}")
+ public static Collection provideData() {
+ return Arrays.asList(
+ new Object[][] {
+ // Collect stats on snapshots using UpdateStatisticsTool
+ { true, true },
+ // Collect stats via `UPDATE STATISTICS` SQL
+ { true, false }
+ }
+ );
}
@BeforeClass
public static void doSetup() throws Exception {
// enable name space mapping at global level on both client and server side
Map serverProps = Maps.newHashMapWithExpectedSize(7);
- serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
+ serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString());
serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
- serverProps.put(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, "true");
Map clientProps = Maps.newHashMapWithExpectedSize(2);
- clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
+ clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString());
clientProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
- clientProps.put(QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, "true");
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
- new ReadOnlyProps(clientProps.entrySet().iterator()));
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableTxStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableTxStatsCollectorIT.java
deleted file mode 100644
index 04485c4aa88..00000000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableTxStatsCollectorIT.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import org.apache.phoenix.schema.stats.StatsCollectorIT;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.runners.Parameterized.Parameters;
-
-public class NonColumnEncodedImmutableTxStatsCollectorIT extends StatsCollectorIT {
-
- public NonColumnEncodedImmutableTxStatsCollectorIT(boolean mutable, String transactionProvider,
- boolean userTableNamespaceMapped, boolean columnEncoded) {
- super(mutable,transactionProvider, userTableNamespaceMapped, columnEncoded);
- }
-
- @Parameters(name = "mutable={0},transactionProvider={1},isUserTableNamespaceMapped={2},columnEncoded={3}")
- public static Collection data() {
- return TestUtil.filterTxParamData(Arrays.asList(
- new Object[][] {
- { false, "TEPHRA", false, false }, { false, "TEPHRA", true, false },
- { false, "OMID", false, false }, { false, "OMID", true, false },
- }),1);
- }
-}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableNonTxStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonTxStatsCollectorIT.java
similarity index 55%
rename from phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableNonTxStatsCollectorIT.java
rename to phoenix-core/src/it/java/org/apache/phoenix/end2end/NonTxStatsCollectorIT.java
index fec890ec96f..220f82fd1d3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonColumnEncodedImmutableNonTxStatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NonTxStatsCollectorIT.java
@@ -20,19 +20,28 @@
import java.util.Arrays;
import java.util.Collection;
-import org.apache.phoenix.schema.stats.StatsCollectorIT;
+import org.apache.phoenix.schema.stats.BaseStatsCollectorIT;
import org.junit.runners.Parameterized.Parameters;
-public class NonColumnEncodedImmutableNonTxStatsCollectorIT extends StatsCollectorIT {
+public class NonTxStatsCollectorIT extends BaseStatsCollectorIT {
- public NonColumnEncodedImmutableNonTxStatsCollectorIT(boolean mutable, String transactionProvider,
- boolean userTableNamespaceMapped, boolean columnEncoded) {
- super(mutable, transactionProvider, userTableNamespaceMapped, columnEncoded);
+ public NonTxStatsCollectorIT(boolean mutable,
+ String transactionProvider, boolean columnEncoded) {
+ super(mutable, transactionProvider, columnEncoded);
}
- @Parameters(name = "mutable={0},transactionProvider={1},isUserTableNamespaceMapped={2},columnEncoded={3}")
- public static Collection data() {
+ @Parameters(name = "mutable={0},transactionProvider={1},columnEncoded={2}")
+ public static Collection provideData() {
return Arrays.asList(
- new Object[][] { { false, null, false, false }, { false, null, true, false } });
+ new Object[][] {
+ // Immutable, Column Encoded
+ { false, null, true },
+ // Mutable, Column Encoded
+ { true, null, true },
+ // Immutable, Not Column Encoded
+ { false, null, false }
+ }
+ );
}
+
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TxStatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TxStatsCollectorIT.java
new file mode 100644
index 00000000000..eff52500485
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TxStatsCollectorIT.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.schema.stats.BaseStatsCollectorIT;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+public class TxStatsCollectorIT extends BaseStatsCollectorIT {
+
+ public TxStatsCollectorIT(boolean mutable, String transactionProvider, boolean columnEncoded) {
+ super(mutable, transactionProvider, columnEncoded);
+ }
+
+ @Parameterized.Parameters(name = "mutable={0},transactionProvider={1},columnEncoded={2}")
+ public static Collection data() {
+ return TestUtil.filterTxParamData(
+ Arrays.asList(
+ new Object[][] {
+ // Immutable, TEPHRA, Column Encoded
+ { false, "TEPHRA", true },
+ // Immutable, TEPHRA, Non Column Encoded
+ { false, "TEPHRA", false },
+ // Immutable, OMID, Non Column Encoded
+ { false, "OMID", false },
+
+ // Mutable, TEPHRA, Column Encoded
+ { true, "TEPHRA", true },
+ // Mutable, TEPHRA, Non Column Encoded
+ { true, "TEPHRA", false },
+ // Mutable, OMID, Non Column Encoded
+ { true, "OMID", false }}), 1);
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java
similarity index 85%
rename from phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java
rename to phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java
index f1c4e450ea9..78c4fafccb3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/StatsCollectorIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/schema/stats/BaseStatsCollectorIT.java
@@ -19,6 +19,8 @@
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_JOB_TYPE;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MRJobType.UPDATE_STATS;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.apache.phoenix.util.TestUtil.getAllSplits;
import static org.junit.Assert.assertEquals;
@@ -38,16 +40,22 @@
import java.util.Properties;
import java.util.Random;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -63,6 +71,7 @@
import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -77,8 +86,19 @@
import com.google.common.collect.Maps;
+/**
+ * Base Test class for all Statistics Collection
+ * Tests stats collection with various scenario parameters
+ * 1. Column Encoding
+ * 2. Transactions
+ * 3. Namespaces
+ * 4. Stats collection via SQL or MR job
+ */
@RunWith(Parameterized.class)
-public abstract class StatsCollectorIT extends BaseUniqueNamesOwnClusterIT {
+public abstract class BaseStatsCollectorIT extends BaseUniqueNamesOwnClusterIT {
+
+ private static final Log LOG = LogFactory.getLog(BaseStatsCollectorIT.class);
+
private final String tableDDLOptions;
private final boolean columnEncoded;
private String tableName;
@@ -89,8 +109,18 @@ public abstract class StatsCollectorIT extends BaseUniqueNamesOwnClusterIT {
private final boolean mutable;
private final String transactionProvider;
private static final int defaultGuidePostWidth = 20;
-
- protected StatsCollectorIT(boolean mutable, String transactionProvider, boolean userTableNamespaceMapped, boolean columnEncoded) {
+ private boolean collectStatsOnSnapshot;
+
+ protected BaseStatsCollectorIT(boolean userTableNamespaceMapped, boolean collectStatsOnSnapshot) {
+ this(false, null, userTableNamespaceMapped, false, collectStatsOnSnapshot);
+ }
+
+ protected BaseStatsCollectorIT(boolean mutable, String transactionProvider, boolean columnEncoded) {
+ this(mutable, transactionProvider, false, columnEncoded, false);
+ }
+
+ private BaseStatsCollectorIT(boolean mutable, String transactionProvider,
+ boolean userTableNamespaceMapped, boolean columnEncoded, boolean collectStatsOnSnapshot) {
this.transactionProvider = transactionProvider;
StringBuilder sb = new StringBuilder();
if (columnEncoded) {
@@ -112,16 +142,17 @@ protected StatsCollectorIT(boolean mutable, String transactionProvider, boolean
this.userTableNamespaceMapped = userTableNamespaceMapped;
this.columnEncoded = columnEncoded;
this.mutable = mutable;
+ this.collectStatsOnSnapshot = collectStatsOnSnapshot;
}
@BeforeClass
public static void doSetup() throws Exception {
- // enable name space mapping at global level on both client and server side
+ // disable name space mapping at global level on both client and server side
Map serverProps = Maps.newHashMapWithExpectedSize(7);
- serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
+ serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.FALSE.toString());
serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(defaultGuidePostWidth));
Map clientProps = Maps.newHashMapWithExpectedSize(2);
- clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
+ clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.FALSE.toString());
clientProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(defaultGuidePostWidth));
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
@@ -148,18 +179,57 @@ private Connection getConnection(Integer statsUpdateFreq) throws SQLException {
props.setProperty(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString());
props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.TRUE.toString());
props.setProperty(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Integer.toString(statsUpdateFreq));
- // enable/disable namespace mapping at connection level
props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(userTableNamespaceMapped));
return DriverManager.getConnection(getUrl(), props);
}
-
+
+ private void collectStatistics(Connection conn, String fullTableName) throws Exception {
+ collectStatistics(conn, fullTableName, null);
+ }
+
+ private void collectStatistics(Connection conn, String fullTableName,
+ String guidePostWidth) throws Exception {
+
+ String localPhysicalTableName = SchemaUtil.getPhysicalTableName(fullTableName.getBytes(),
+ userTableNamespaceMapped).getNameAsString();
+
+ if (collectStatsOnSnapshot) {
+ collectStatsOnSnapshot(conn, fullTableName, guidePostWidth, localPhysicalTableName);
+ invalidateStats(conn, fullTableName);
+ } else {
+ String updateStatisticsSql = "UPDATE STATISTICS " + fullTableName;
+ if (guidePostWidth != null) {
+ updateStatisticsSql += " SET \"" + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB + "\" = " + guidePostWidth;
+ }
+ LOG.info("Running SQL to collect stats: " + updateStatisticsSql);
+ conn.createStatement().execute(updateStatisticsSql);
+ }
+ }
+
+ private void collectStatsOnSnapshot(Connection conn, String fullTableName,
+ String guidePostWidth, String localPhysicalTableName) throws Exception {
+ UpdateStatisticsTool tool = new UpdateStatisticsTool();
+ Configuration conf = utility.getConfiguration();
+ HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ String snapshotName = "UpdateStatisticsTool_" + generateUniqueName();
+ admin.snapshot(snapshotName, localPhysicalTableName);
+ LOG.info("Successfully created snapshot " + snapshotName + " for " + localPhysicalTableName);
+ Path randomDir = getUtility().getRandomDir();
+ if (guidePostWidth != null) {
+ conn.createStatement().execute("ALTER TABLE " + fullTableName + " SET GUIDE_POSTS_WIDTH = " + guidePostWidth);
+ }
+ Job job = tool.configureJob(conf, fullTableName, snapshotName, randomDir);
+ assertEquals(job.getConfiguration().get(MAPREDUCE_JOB_TYPE), UPDATE_STATS.name());
+ tool.runJob(job, true);
+ }
+
@Test
public void testUpdateEmptyStats() throws Exception {
Connection conn = getConnection();
conn.setAutoCommit(true);
conn.createStatement().execute(
"CREATE TABLE " + fullTableName +" ( k CHAR(1) PRIMARY KEY )" + tableDDLOptions);
- conn.createStatement().execute("UPDATE STATISTICS " + fullTableName);
+ collectStatistics(conn, fullTableName);
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName);
String explainPlan = QueryUtil.getExplainPlan(rs);
assertEquals(
@@ -168,7 +238,7 @@ public void testUpdateEmptyStats() throws Exception {
explainPlan);
conn.close();
}
-
+
@Test
public void testSomeUpdateEmptyStats() throws Exception {
Connection conn = getConnection();
@@ -176,7 +246,7 @@ public void testSomeUpdateEmptyStats() throws Exception {
conn.createStatement().execute(
"CREATE TABLE " + fullTableName +" ( k VARCHAR PRIMARY KEY, a.v1 VARCHAR, b.v2 VARCHAR ) " + tableDDLOptions + (tableDDLOptions.isEmpty() ? "" : ",") + "SALT_BUCKETS = 3");
conn.createStatement().execute("UPSERT INTO " + fullTableName + "(k,v1) VALUES('a','123456789')");
- conn.createStatement().execute("UPDATE STATISTICS " + fullTableName);
+ collectStatistics(conn, fullTableName);
ResultSet rs;
String explainPlan;
@@ -206,8 +276,7 @@ public void testSomeUpdateEmptyStats() throws Exception {
}
@Test
- public void testUpdateStats() throws SQLException, IOException,
- InterruptedException {
+ public void testUpdateStats() throws Exception {
Connection conn;
PreparedStatement stmt;
ResultSet rs;
@@ -220,9 +289,10 @@ public void testUpdateStats() throws SQLException, IOException,
String[] s;
Array array;
conn = upsertValues(props, fullTableName);
- // CAll the update statistics query here. If already major compaction has run this will not get executed.
- stmt = conn.prepareStatement("UPDATE STATISTICS " + fullTableName);
- stmt.execute();
+ collectStatistics(conn, fullTableName);
+ rs = conn.createStatement().executeQuery("EXPLAIN SELECT k FROM " + fullTableName);
+ rs.next();
+ long rows1 = (Long) rs.getObject(PhoenixRuntime.EXPLAIN_PLAN_ESTIMATED_ROWS_READ_COLUMN);
stmt = upsertStmt(conn, fullTableName);
stmt.setString(1, "z");
s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
@@ -232,10 +302,12 @@ public void testUpdateStats() throws SQLException, IOException,
array = conn.createArrayOf("VARCHAR", s);
stmt.setArray(3, array);
stmt.execute();
- stmt = conn.prepareStatement("UPDATE STATISTICS " + fullTableName);
- stmt.execute();
- rs = conn.createStatement().executeQuery("SELECT k FROM " + fullTableName);
- assertTrue(rs.next());
+ conn.commit();
+ collectStatistics(conn, fullTableName);
+ rs = conn.createStatement().executeQuery("EXPLAIN SELECT k FROM " + fullTableName);
+ rs.next();
+ long rows2 = (Long) rs.getObject(PhoenixRuntime.EXPLAIN_PLAN_ESTIMATED_ROWS_READ_COLUMN);
+ assertNotEquals(rows1, rows2);
conn.close();
}
@@ -250,7 +322,7 @@ private void testNoDuplicatesAfterUpdateStats(String splitKey) throws Throwable
conn.createStatement().execute("upsert into " + fullTableName + " values ('abc',1,3)");
conn.createStatement().execute("upsert into " + fullTableName + " values ('def',2,4)");
conn.commit();
- conn.createStatement().execute("UPDATE STATISTICS " + fullTableName);
+ collectStatistics(conn, fullTableName);
rs = conn.createStatement().executeQuery("SELECT k FROM " + fullTableName + " order by k desc");
assertTrue(rs.next());
assertEquals("def", rs.getString(1));
@@ -270,57 +342,6 @@ public void testNoDuplicatesAfterUpdateStatsWithDesc() throws Throwable {
testNoDuplicatesAfterUpdateStats(null);
}
- @Test
- public void testUpdateStatsWithMultipleTables() throws Throwable {
- String fullTableName2 = SchemaUtil.getTableName(schemaName, "T_" + generateUniqueName());
- Connection conn;
- PreparedStatement stmt;
- ResultSet rs;
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- conn = getConnection();
- conn.createStatement().execute(
- "CREATE TABLE " + fullTableName +" ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n"
- + " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC))" + tableDDLOptions );
- conn.createStatement().execute(
- "CREATE TABLE " + fullTableName2 +" ( k VARCHAR, a_string_array VARCHAR(100) ARRAY[4], b_string_array VARCHAR(100) ARRAY[4] \n"
- + " CONSTRAINT pk PRIMARY KEY (k, b_string_array DESC))" + tableDDLOptions );
- String[] s;
- Array array;
- conn = upsertValues(props, fullTableName);
- conn = upsertValues(props, fullTableName2);
- // CAll the update statistics query here
- stmt = conn.prepareStatement("UPDATE STATISTICS "+fullTableName);
- stmt.execute();
- stmt = conn.prepareStatement("UPDATE STATISTICS "+fullTableName2);
- stmt.execute();
- stmt = upsertStmt(conn, fullTableName);
- stmt.setString(1, "z");
- s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
- array = conn.createArrayOf("VARCHAR", s);
- stmt.setArray(2, array);
- s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" };
- array = conn.createArrayOf("VARCHAR", s);
- stmt.setArray(3, array);
- stmt.execute();
- stmt = upsertStmt(conn, fullTableName2);
- stmt.setString(1, "z");
- s = new String[] { "xyz", "def", "ghi", "jkll", null, null, "xxx" };
- array = conn.createArrayOf("VARCHAR", s);
- stmt.setArray(2, array);
- s = new String[] { "zya", "def", "ghi", "jkll", null, null, null, "xxx" };
- array = conn.createArrayOf("VARCHAR", s);
- stmt.setArray(3, array);
- stmt.execute();
- conn.close();
- conn = getConnection();
- // This analyze would not work
- stmt = conn.prepareStatement("UPDATE STATISTICS "+fullTableName2);
- stmt.execute();
- rs = conn.createStatement().executeQuery("SELECT k FROM "+fullTableName2);
- assertTrue(rs.next());
- conn.close();
- }
-
private Connection upsertValues(Properties props, String tableName) throws SQLException, IOException,
InterruptedException {
Connection conn;
@@ -527,7 +548,7 @@ public void testWithMultiCF() throws Exception {
conn.commit();
ResultSet rs;
- TestUtil.analyzeTable(conn, fullTableName);
+ collectStatistics(conn, fullTableName);
List keyRanges = getAllSplits(conn, fullTableName);
assertEquals(26, keyRanges.size());
rs = conn.createStatement().executeQuery("EXPLAIN SELECT * FROM " + fullTableName);
@@ -538,10 +559,7 @@ public void testWithMultiCF() throws Exception {
List regions = services.getAllTableRegions(Bytes.toBytes(physicalTableName));
assertEquals(1, regions.size());
- TestUtil.analyzeTable(conn, fullTableName);
- String query = "UPDATE STATISTICS " + fullTableName + " SET \""
- + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB + "\"=" + Long.toString(1000);
- conn.createStatement().execute(query);
+ collectStatistics(conn, fullTableName, Long.toString(1000));
keyRanges = getAllSplits(conn, fullTableName);
boolean oneCellPerColFamliyStorageScheme = !mutable && columnEncoded;
boolean hasShadowCells = TransactionFactory.Provider.OMID.name().equals(transactionProvider);
@@ -582,7 +600,7 @@ public void testWithMultiCF() throws Exception {
// Disable stats
conn.createStatement().execute("ALTER TABLE " + fullTableName +
" SET " + PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH + "=0");
- TestUtil.analyzeTable(conn, fullTableName);
+ collectStatistics(conn, fullTableName);
// Assert that there are no more guideposts
rs = conn.createStatement().executeQuery("SELECT count(1) FROM " + PhoenixDatabaseMetaData.SYSTEM_STATS_NAME +
" WHERE " + PhoenixDatabaseMetaData.PHYSICAL_NAME + "='" + physicalTableName + "' AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NOT NULL");
@@ -595,7 +613,7 @@ public void testWithMultiCF() throws Exception {
}
@Test
- public void testRowCountAndByteCounts() throws SQLException {
+ public void testRowCountAndByteCounts() throws Exception {
Connection conn = getConnection();
String ddl = "CREATE TABLE " + fullTableName + " (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n"
+ "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 VARCHAR,\n"
@@ -610,9 +628,7 @@ public void testRowCountAndByteCounts() throws SQLException {
}
conn.commit();
ResultSet rs;
- String query = "UPDATE STATISTICS " + fullTableName + " SET \""
- + QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB + "\"=" + Long.toString(20);
- conn.createStatement().execute(query);
+ collectStatistics(conn, fullTableName, Long.toString(20L));
Random r = new Random();
int count = 0;
boolean hasShadowCells = TransactionFactory.Provider.OMID.name().equals(transactionProvider);
@@ -643,11 +659,10 @@ public void testRowCountAndByteCounts() throws SQLException {
@Test
public void testRowCountWhenNumKVsExceedCompactionScannerThreshold() throws Exception {
- String tableName = generateUniqueName();
StringBuilder sb = new StringBuilder(200);
- sb.append("CREATE TABLE " + tableName + "(PK1 VARCHAR NOT NULL, ");
+ sb.append("CREATE TABLE " + fullTableName + "(PK1 VARCHAR NOT NULL, ");
int numRows = 10;
- try (Connection conn = DriverManager.getConnection(getUrl())) {
+ try (Connection conn = getConnection()) {
int compactionScannerKVThreshold =
conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration()
.getInt(HConstants.COMPACTION_KV_MAX,
@@ -663,7 +678,7 @@ public void testRowCountWhenNumKVsExceedCompactionScannerThreshold() throws Exce
String ddl = sb.toString();
conn.createStatement().execute(ddl);
sb = new StringBuilder(200);
- sb.append("UPSERT INTO " + tableName + " VALUES (");
+ sb.append("UPSERT INTO " + fullTableName + " VALUES (");
for (int i = 1; i <= numKvColumns + 1; i++) {
sb.append("?");
if (i < numKvColumns + 1) {
@@ -685,15 +700,15 @@ public void testRowCountWhenNumKVsExceedCompactionScannerThreshold() throws Exce
stmt.executeUpdate();
}
conn.commit();
- conn.createStatement().execute("UPDATE STATISTICS " + tableName);
- String q = "SELECT SUM(GUIDE_POSTS_ROW_COUNT) FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + tableName + "'";
+ collectStatistics(conn, fullTableName);
+ String q = "SELECT SUM(GUIDE_POSTS_ROW_COUNT) FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + physicalTableName + "'";
ResultSet rs = conn.createStatement().executeQuery(q);
rs.next();
assertEquals("Number of expected rows in stats table after update stats didn't match!", numRows, rs.getInt(1));
- conn.createStatement().executeUpdate("DELETE FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + tableName + "'");
+ conn.createStatement().executeUpdate("DELETE FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + physicalTableName + "'");
conn.commit();
- TestUtil.doMajorCompaction(conn, tableName);
- q = "SELECT SUM(GUIDE_POSTS_ROW_COUNT) FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + tableName + "'";
+ TestUtil.doMajorCompaction(conn, physicalTableName);
+ q = "SELECT SUM(GUIDE_POSTS_ROW_COUNT) FROM SYSTEM.STATS WHERE PHYSICAL_NAME = '" + physicalTableName + "'";
rs = conn.createStatement().executeQuery(q);
rs.next();
assertEquals("Number of expected rows in stats table after major compaction didn't match", numRows, rs.getInt(1));
@@ -721,52 +736,49 @@ private void verifyGuidePostGenerated(ConnectionQueryServices queryServices,
@Test
public void testEmptyGuidePostGeneratedWhenDataSizeLessThanGPWidth() throws Exception {
- String tableName = generateUniqueName();
- try (Connection conn = DriverManager.getConnection(getUrl())) {
+ try (Connection conn = getConnection()) {
long guidePostWidth = 20000000;
conn.createStatement()
- .execute("CREATE TABLE " + tableName
+ .execute("CREATE TABLE " + fullTableName
+ " ( k INTEGER, c1.a bigint,c2.b bigint CONSTRAINT pk PRIMARY KEY (k)) GUIDE_POSTS_WIDTH="
+ guidePostWidth + ", SALT_BUCKETS = 4");
- conn.createStatement().execute("upsert into " + tableName + " values (100,1,3)");
- conn.createStatement().execute("upsert into " + tableName + " values (101,2,4)");
+ conn.createStatement().execute("upsert into " + fullTableName + " values (100,1,3)");
+ conn.createStatement().execute("upsert into " + fullTableName + " values (101,2,4)");
conn.commit();
- conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+ collectStatistics(conn, fullTableName);
ConnectionQueryServices queryServices =
conn.unwrap(PhoenixConnection.class).getQueryServices();
- verifyGuidePostGenerated(queryServices, tableName, new String[] {"C1", "C2"}, guidePostWidth, true);
+ verifyGuidePostGenerated(queryServices, physicalTableName, new String[] {"C1", "C2"}, guidePostWidth, true);
}
}
@Test
public void testCollectingAllVersionsOfCells() throws Exception {
- String tableName = generateUniqueName();
- try (Connection conn = DriverManager.getConnection(getUrl())) {
+ try (Connection conn = getConnection()) {
long guidePostWidth = 70;
String ddl =
- "CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, c1.a bigint, c2.b bigint)"
+ "CREATE TABLE " + fullTableName + " (k INTEGER PRIMARY KEY, c1.a bigint, c2.b bigint)"
+ " GUIDE_POSTS_WIDTH=" + guidePostWidth
+ ", USE_STATS_FOR_PARALLELIZATION=true" + ", VERSIONS=3";
conn.createStatement().execute(ddl);
- conn.createStatement().execute("upsert into " + tableName + " values (100,100,3)");
+ conn.createStatement().execute("upsert into " + fullTableName + " values (100,100,3)");
conn.commit();
- conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+ collectStatistics(conn, fullTableName);
ConnectionQueryServices queryServices =
conn.unwrap(PhoenixConnection.class).getQueryServices();
// The table only has one row. All cells just has one version, and the data size of the row
// is less than the guide post width, so we generate empty guide post.
- verifyGuidePostGenerated(queryServices, tableName, new String[] {"C1", "C2"}, guidePostWidth, true);
-
+ verifyGuidePostGenerated(queryServices, physicalTableName, new String[] {"C1", "C2"}, guidePostWidth, true);
- conn.createStatement().execute("upsert into " + tableName + " values (100,101,4)");
+ conn.createStatement().execute("upsert into " + fullTableName + " values (100,101,4)");
conn.commit();
- conn.createStatement().execute("UPDATE STATISTICS " + tableName);
+ collectStatistics(conn, fullTableName);
// We updated the row. Now each cell has two versions, and the data size of the row
// is >= the guide post width, so we generate non-empty guide post.
- verifyGuidePostGenerated(queryServices, tableName, new String[] {"C1", "C2"}, guidePostWidth, false);
+ verifyGuidePostGenerated(queryServices, physicalTableName, new String[] {"C1", "C2"}, guidePostWidth, false);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 26e338fd7c4..ca9c8506599 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -120,6 +120,7 @@
import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
import org.apache.phoenix.schema.stats.StatisticsCollector;
import org.apache.phoenix.schema.stats.StatisticsCollectorFactory;
+import org.apache.phoenix.schema.stats.StatisticsScanner;
import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
@@ -998,12 +999,12 @@ public InternalScanner preCompact(final ObserverContext values;
+ private List values;
+ private StatisticsCollector statisticsCollector;
public SnapshotScanner(Configuration conf, FileSystem fs, Path rootDir,
HTableDescriptor htd, HRegionInfo hri, Scan scan) throws Throwable{
+ LOG.info("Creating SnapshotScanner for region: " + hri);
+
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
values = new ArrayList<>();
this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);
+ this.scan = scan;
RegionCoprocessorEnvironment snapshotEnv = getSnapshotContextEnvironment(conf);
- RegionScannerFactory regionScannerFactory;
- if (scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null) {
- regionScannerFactory = new NonAggregateRegionScannerFactory(snapshotEnv);
+ // Collect statistics during scan if ANALYZE_TABLE attribute is set
+ if (ScanUtil.isAnalyzeTable(scan)) {
+ this.scanner = region.getScanner(scan);
+ PhoenixConnection connection = (PhoenixConnection) ConnectionUtil.getInputConnection(conf, new Properties());
+ String tableName = region.getTableDesc().getNameAsString();
+ TableName physicalTableName = SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, conf);
+ Table table = connection.getQueryServices().getTable(physicalTableName.getName());
+ StatisticsWriter statsWriter = StatisticsWriter.newWriter(connection, tableName, HConstants.LATEST_TIMESTAMP);
+ statisticsCollector = new DefaultStatisticsCollector(conf, region,
+ tableName, null, null, null, statsWriter, table);
+ } else if (scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null) {
+ RegionScannerFactory regionScannerFactory = new NonAggregateRegionScannerFactory(snapshotEnv);
+ this.scanner = regionScannerFactory.getRegionScanner(scan, region.getScanner(scan));
+ statisticsCollector = new NoOpStatisticsCollector();
} else {
/* future work : Snapshot M/R jobs for aggregate queries*/
throw new UnsupportedOperationException("Snapshot M/R jobs not available for aggregate queries");
}
- this.scanner = regionScannerFactory.getRegionScanner(scan, region.getScanner(scan));
+ statisticsCollector.init();
region.startRegionOperation();
}
@@ -74,6 +111,7 @@ public SnapshotScanner(Configuration conf, FileSystem fs, Path rootDir,
public Result next() throws IOException {
values.clear();
scanner.nextRaw(values);
+ statisticsCollector.collectStatistics(values);
if (values.isEmpty()) {
//we are done
return null;
@@ -86,6 +124,7 @@ public Result next() throws IOException {
public void close() {
if (this.scanner != null) {
try {
+ statisticsCollector.updateStatistics(region, scan);
this.scanner.close();
this.scanner = null;
} catch (IOException e) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
index c3d75f726f0..c58773420a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
@@ -18,6 +18,8 @@
package org.apache.phoenix.iterate;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,8 +42,18 @@
import java.util.List;
import java.util.UUID;
+/**
+ * Iterator to scan over a HBase snapshot based on input HBase Scan object.
+ * This iterator is generated by Phoenix during the query plan scan generation,
+ * hence it will include scan attributes and custom filters.
+ * Restores HBase snapshot and determines the valid regions that intersect
+ * with the input Scan boundaries. Launches SnapshotScanner for each of them.
+ * Deletes the restored snapshot when iterator is closed.
+ */
public class TableSnapshotResultIterator implements ResultIterator {
+ private static final Log LOG = LogFactory.getLog(TableSnapshotResultIterator.class);
+
private final Scan scan;
private ResultIterator scanIterator;
private Configuration configuration;
@@ -89,6 +101,7 @@ private void init() throws IOException {
}
Collections.sort(this.regions);
+ LOG.info("Initialization complete with " + regions.size() + " valid regions");
}
/**
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 6093edd0cea..c815119df70 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -47,8 +47,11 @@
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MRJobType;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import org.apache.phoenix.query.HBaseFactoryProvider;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import com.google.common.base.Preconditions;
@@ -78,8 +81,6 @@ public RecordReader createRecordReader(InputSplit split, TaskAtt
final Class inputClass = (Class) PhoenixConfigurationUtil.getInputClass(configuration);
return new PhoenixRecordReader(inputClass , configuration, queryPlan);
}
-
-
@Override
public List getSplits(JobContext context) throws IOException, InterruptedException {
@@ -163,8 +164,7 @@ private List generateSplits(final QueryPlan qplan, final List scans = pSplit.getScans();
try {
+ LOG.info("Generating iterators for " + scans.size() + " scans in keyrange: " + pSplit.getKeyRange());
List iterators = Lists.newArrayListWithExpectedSize(scans.size());
StatementContext ctx = queryPlan.getContext();
ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
@@ -133,6 +135,7 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
final TableSnapshotResultIterator tableSnapshotResultIterator = new TableSnapshotResultIterator(configuration, scan,
scanMetricsHolder);
peekingResultIterator = LookAheadResultIterator.wrap(tableSnapshotResultIterator);
+ LOG.info("Adding TableSnapshotResultIterator for scan: " + scan);
} else {
final TableResultIterator tableResultIterator =
new TableResultIterator(
@@ -140,8 +143,8 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
scanMetricsHolder, renewScannerLeaseThreshold, queryPlan,
MapReduceParallelScanGrouper.getInstance());
peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
+ LOG.info("Adding TableResultIterator for scan: " + scan);
}
-
iterators.add(peekingResultIterator);
}
ResultIterator iterator = queryPlan.useRoundRobinIterator() ? RoundRobinResultIterator.newIterator(iterators, queryPlan) : ConcatResultIterator.newIterator(iterators);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index b0ea17ba228..83e66077bdb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -152,9 +152,21 @@ public final class PhoenixConfigurationUtil {
public static final String MAPREDUCE_TENANT_ID = "phoenix.mapreduce.tenantid";
+ public static final String MAPREDUCE_JOB_TYPE = "phoenix.mapreduce.jobtype";
+
+ /**
+ * Determines type of Phoenix Map Reduce job.
+ * 1. QUERY allows running arbitrary queries without aggregates
+ * 2. UPDATE_STATS collects statistics for the table
+ */
+ public enum MRJobType {
+ QUERY,
+ UPDATE_STATS
+ }
+
public enum SchemaType {
TABLE,
- QUERY;
+ QUERY
}
private PhoenixConfigurationUtil(){
@@ -217,7 +229,12 @@ public static void setSchemaType(Configuration configuration, final SchemaType s
Preconditions.checkNotNull(configuration);
configuration.set(SCHEMA_TYPE, schemaType.name());
}
-
+
+ public static void setMRJobType(Configuration configuration, final MRJobType mrJobType) {
+ Preconditions.checkNotNull(configuration);
+ configuration.set(MAPREDUCE_JOB_TYPE, mrJobType.name());
+ }
+
public static void setPhysicalTableName(final Configuration configuration, final String tableName) {
Preconditions.checkNotNull(configuration);
Preconditions.checkNotNull(tableName);
@@ -285,7 +302,13 @@ public static SchemaType getSchemaType(final Configuration configuration) {
Preconditions.checkNotNull(schemaTp);
return SchemaType.valueOf(schemaTp);
}
-
+
+ public static MRJobType getMRJobType(final Configuration configuration, String defaultMRJobType) {
+ final String mrJobType = configuration.get(MAPREDUCE_JOB_TYPE, defaultMRJobType);
+ Preconditions.checkNotNull(mrJobType);
+ return MRJobType.valueOf(mrJobType);
+ }
+
public static List getUpsertColumnMetadataList(final Configuration configuration) throws SQLException {
Preconditions.checkNotNull(configuration);
List columnMetadataList = null;
@@ -372,7 +395,7 @@ private static List getSelectColumnList(
}
return selectColumnList;
}
-
+
public static String getSelectStatement(final Configuration configuration) throws SQLException {
Preconditions.checkNotNull(configuration);
String selectStmt = configuration.get(SELECT_STATEMENT);
@@ -388,7 +411,8 @@ public static String getSelectStatement(final Configuration configuration) throw
configuration.set(SELECT_STATEMENT, selectStmt);
return selectStmt;
}
-
+
+
public static long getBatchSize(final Configuration configuration) throws SQLException {
Preconditions.checkNotNull(configuration);
long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
@@ -618,6 +642,11 @@ public static void setScrutinyExecuteTimestamp(Configuration configuration, long
configuration.setLong(SCRUTINY_EXECUTE_TIMESTAMP, ts);
}
+ public static void setSplitByStats(final Configuration configuration, boolean value) {
+ Preconditions.checkNotNull(configuration);
+ configuration.setBoolean(MAPREDUCE_SPLIT_BY_STATS, value);
+ }
+
public static String getDisableIndexes(Configuration configuration) {
Preconditions.checkNotNull(configuration);
return configuration.get(DISABLED_INDEXES);
@@ -673,4 +702,5 @@ public static void setTenantId(Configuration configuration, String tenantId){
Preconditions.checkNotNull(configuration);
configuration.set(MAPREDUCE_TENANT_ID, tenantId);
}
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index 3462177cb6e..ecede806bae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -80,7 +80,7 @@ public static void setInput(final Job job, final Class extends DBWritable> inp
public static void setInput(final Job job, final Class extends DBWritable> inputClass, final String snapshotName, String tableName,
Path restoreDir, final String conditions, final String... fieldNames) throws
IOException {
- final Configuration configuration = setSnapshotInput(job, inputClass, snapshotName, tableName, restoreDir);
+ final Configuration configuration = setSnapshotInput(job, inputClass, snapshotName, tableName, restoreDir, SchemaType.QUERY);
if(conditions != null) {
PhoenixConfigurationUtil.setInputTableConditions(configuration, conditions);
}
@@ -99,13 +99,18 @@ public static void setInput(final Job job, final Class extends DBWritable> inp
public static void setInput(final Job job, final Class extends DBWritable> inputClass, final String snapshotName, String tableName,
Path restoreDir, String inputQuery) throws
IOException {
- final Configuration configuration = setSnapshotInput(job, inputClass, snapshotName, tableName, restoreDir);
+ final Configuration configuration = setSnapshotInput(job, inputClass, snapshotName, tableName, restoreDir, SchemaType.QUERY);
if(inputQuery != null) {
PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
}
}
+ public static void setInput(final Job job, final Class extends DBWritable> inputClass, final String snapshotName, String tableName,
+ Path restoreDir) {
+ setSnapshotInput(job, inputClass, snapshotName, tableName, restoreDir, SchemaType.QUERY);
+ }
+
/**
*
* @param job
@@ -115,7 +120,7 @@ public static void setInput(final Job job, final Class extends DBWritable> inp
* @param restoreDir a temporary dir to copy the snapshot files into
*/
private static Configuration setSnapshotInput(Job job, Class extends DBWritable> inputClass, String snapshotName,
- String tableName, Path restoreDir) {
+ String tableName, Path restoreDir, SchemaType schemaType) {
job.setInputFormatClass(PhoenixInputFormat.class);
final Configuration configuration = job.getConfiguration();
PhoenixConfigurationUtil.setInputClass(configuration, inputClass);
@@ -123,7 +128,7 @@ private static Configuration setSnapshotInput(Job job, Class extends DBWritabl
PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
PhoenixConfigurationUtil.setRestoreDirKey(configuration, new Path(restoreDir, UUID.randomUUID().toString()).toString());
- PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
+ PhoenixConfigurationUtil.setSchemaType(configuration, schemaType);
return configuration;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 67a492868d7..7cf2e212a52 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -218,6 +218,7 @@
import org.apache.phoenix.schema.PTable.QualifierEncodingScheme.QualifierOutOfRangeException;
import org.apache.phoenix.schema.PTable.ViewType;
import org.apache.phoenix.schema.stats.GuidePostsKey;
+import org.apache.phoenix.schema.stats.StatisticsUtil;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.schema.types.PInteger;
@@ -1317,21 +1318,9 @@ public TransactionFactory.Provider getTransactionProvider() {
TableRef tableRef = new TableRef(null, nonTxnLogicalTable, clientTimeStamp, false);
MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, cfs, null, clientTimeStamp);
Scan scan = plan.getContext().getScan();
- scan.setCacheBlocks(false);
- scan.setMaxVersions();
- scan.setAttribute(ANALYZE_TABLE, TRUE_BYTES);
+ StatisticsUtil.setScanAttributes(scan, statsProps);
boolean runUpdateStatsAsync = props.getBoolean(QueryServices.RUN_UPDATE_STATS_ASYNC, DEFAULT_RUN_UPDATE_STATS_ASYNC);
scan.setAttribute(RUN_UPDATE_STATS_ASYNC_ATTRIB, runUpdateStatsAsync ? TRUE_BYTES : FALSE_BYTES);
- if (statsProps != null) {
- Object gp_width = statsProps.get(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB);
- if (gp_width != null) {
- scan.setAttribute(BaseScannerRegionObserver.GUIDEPOST_WIDTH_BYTES, PLong.INSTANCE.toBytes(gp_width));
- }
- Object gp_per_region = statsProps.get(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB);
- if (gp_per_region != null) {
- scan.setAttribute(BaseScannerRegionObserver.GUIDEPOST_PER_REGION, PInteger.INSTANCE.toBytes(gp_per_region));
- }
- }
MutationState mutationState = plan.execute();
rowCount = mutationState.getUpdateCount();
}
@@ -1722,6 +1711,7 @@ public MutationState createIndex(CreateIndexStatement statement, byte[][] splits
}
PrimaryKeyConstraint pk = FACTORY.primaryKey(null, allPkColumns);
tableProps.put(MetaDataUtil.DATA_TABLE_NAME_PROP_NAME, dataTable.getName().getString());
+
CreateTableStatement tableStatement = FACTORY.createTable(indexTableName, statement.getProps(), columnDefs, pk, statement.getSplitNodes(), PTableType.INDEX, statement.ifNotExists(), null, null, statement.getBindCount(), null);
table = createTableInternal(tableStatement, splits, dataTable, null, null, MetaDataUtil.getViewIndexIdDataType(),null, null, allocateIndexId, statement.getIndexType(), asyncCreatedDate, tableProps, commonFamilyProps);
break;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
index 788e2dd555b..f9e1de276bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java
@@ -32,13 +32,11 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Pair;
@@ -49,7 +47,6 @@
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PInteger;
@@ -59,47 +56,39 @@
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
/**
* A default implementation of the Statistics tracker that helps to collect stats like min key, max key and guideposts.
*/
-class DefaultStatisticsCollector implements StatisticsCollector {
- private static final Logger logger = LoggerFactory.getLogger(DefaultStatisticsCollector.class);
- private final Map> guidePostsInfoWriterMap = Maps.newHashMap();
+public class DefaultStatisticsCollector implements StatisticsCollector {
+
+ private static final Log LOG = LogFactory.getLog(DefaultStatisticsCollector.class);
+
+ final Map> guidePostsInfoWriterMap = Maps.newHashMap();
+ private final Table htable;
private StatisticsWriter statsWriter;
- private final Pair cachedGuidePosts;
- private final byte[] guidePostWidthBytes;
- private final byte[] guidePostPerRegionBytes;
+ final Pair cachedGuidePosts;
+ final byte[] guidePostWidthBytes;
+ final byte[] guidePostPerRegionBytes;
// Where to look for GUIDE_POSTS_WIDTH in SYSTEM.CATALOG
- private final byte[] ptableKey;
- private final RegionCoprocessorEnvironment env;
+ final byte[] ptableKey;
+
private long guidePostDepth;
private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP;
- private static final Log LOG = LogFactory.getLog(DefaultStatisticsCollector.class);
private ImmutableBytesWritable currentRow;
- private final long clientTimeStamp;
private final String tableName;
private final boolean isViewIndexTable;
+ private final Region region;
+ private final Configuration configuration;
- DefaultStatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp, byte[] family,
- byte[] gp_width_bytes, byte[] gp_per_region_bytes) throws IOException {
- this.env = env;
+ public DefaultStatisticsCollector(Configuration configuration, Region region, String tableName, byte[] family,
+ byte[] gp_width_bytes, byte[] gp_per_region_bytes, StatisticsWriter statsWriter, Table htable) {
+ this.configuration = configuration;
+ this.region = region;
this.guidePostWidthBytes = gp_width_bytes;
this.guidePostPerRegionBytes = gp_per_region_bytes;
- // Provides a means of clients controlling their timestamps to not use current time
- // when background tasks are updating stats. Instead we track the max timestamp of
- // the cells and use that.
- boolean useCurrentTime = env.getConfiguration().getBoolean(
- QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
- if (!useCurrentTime) {
- clientTimeStamp = DefaultStatisticsCollector.NO_TIMESTAMP;
- }
String pName = tableName;
// For view index, get GUIDE_POST_WIDTH from data physical table
// since there's no row representing those in SYSTEM.CATALOG.
@@ -110,7 +99,6 @@ class DefaultStatisticsCollector implements StatisticsCollector {
isViewIndexTable = false;
}
ptableKey = SchemaUtil.getTableKeyFromFullName(pName);
- this.clientTimeStamp = clientTimeStamp;
this.tableName = tableName;
// in a compaction we know the one family ahead of time
if (family != null) {
@@ -120,93 +108,125 @@ class DefaultStatisticsCollector implements StatisticsCollector {
} else {
cachedGuidePosts = null;
}
+
+ this.statsWriter = statsWriter;
+ this.htable = htable;
}
-
- private void initGuidepostDepth() throws IOException, ClassNotFoundException, SQLException {
- // First check is if guidepost info set on statement itself
+
+ @Override
+ public void init() throws IOException {
+ try {
+ initGuidepostDepth();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ LOG.info("Initialization complete for " +
+ this.getClass() + " statistics collector for table " + tableName);
+ }
+
+ /**
+ * Determine the GPW for statistics collection for the table.
+ * The order of priority from highest to lowest is as follows
+ * 1. Value provided in UPDATE STATISTICS SQL statement (N/A for MR jobs)
+ * 2. GPW column in SYSTEM.CATALOG for the table is not null
+ * 3. Value from global configuration parameters from hbase-site.xml
+ *
+ * GPW of 0 disables the stats collection. If stats were previously collected, this task
+ * would attempt to delete entries from SYSTEM.STATS table. Not reading '0' from SYSTEM.CATALOG
+ * would mean the fall back to global value which is defaulted to DEFAULT_STATS_GUIDEPOST_PER_REGION
+ */
+ private void initGuidepostDepth() throws IOException, SQLException {
if (guidePostPerRegionBytes != null || guidePostWidthBytes != null) {
- int guidepostPerRegion = 0;
- long guidepostWidth = QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES;
- if (guidePostPerRegionBytes != null) {
- guidepostPerRegion = PInteger.INSTANCE.getCodec().decodeInt(guidePostPerRegionBytes, 0, SortOrder.getDefault());
- }
- if (guidePostWidthBytes != null) {
- guidepostWidth = PLong.INSTANCE.getCodec().decodeInt(guidePostWidthBytes, 0, SortOrder.getDefault());
- }
- this.guidePostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth,
- env.getRegion().getTableDesc());
+ getGuidePostDepthFromStatement();
+ LOG.info("Guide post depth determined from SQL statement: " + guidePostDepth);
} else {
+ long guidepostWidth = getGuidePostDepthFromSystemCatalog();
+ if (guidepostWidth >= 0) {
+ this.guidePostDepth = guidepostWidth;
+ LOG.info("Guide post depth determined from SYSTEM.CATALOG: " + guidePostDepth);
+ } else {
+ this.guidePostDepth = StatisticsUtil.getGuidePostDepth(
+ configuration.getInt(
+ QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION),
+ configuration.getLong(
+ QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES),
+ region.getTableDesc());
+ LOG.info("Guide post depth determined from global configuration: " + guidePostDepth);
+ }
+ }
+
+ }
+
+ private long getGuidePostDepthFromSystemCatalog() throws IOException, SQLException {
+ try {
long guidepostWidth = -1;
- HTableInterface htable = null;
- try {
- // Next check for GUIDE_POST_WIDTH on table
- htable = env.getTable(
- SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()));
- Get get = new Get(ptableKey);
- get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
- Result result = htable.get(get);
- if (!result.isEmpty()) {
- Cell cell = result.listCells().get(0);
- guidepostWidth = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
- } else if (!isViewIndexTable) {
- /*
- * The table we are collecting stats for is potentially a base table, or local
- * index or a global index. For view indexes, we rely on the the guide post
- * width column in the parent data table's metadata which we already tried
- * retrieving above.
- */
- try (Connection conn =
- QueryUtil.getConnectionOnServer(env.getConfiguration())) {
- PTable table = PhoenixRuntime.getTable(conn, tableName);
- if (table.getType() == PTableType.INDEX
- && table.getIndexType() == IndexType.GLOBAL) {
- /*
- * For global indexes, we need to get the parentName first and then
- * fetch guide post width configured for the parent table.
- */
- PName parentName = table.getParentName();
- byte[] parentKey =
- SchemaUtil.getTableKeyFromFullName(parentName.getString());
- get = new Get(parentKey);
- get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ Get get = new Get(ptableKey);
+ get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
+ Result result = htable.get(get);
+ if (!result.isEmpty()) {
+ Cell cell = result.listCells().get(0);
+ guidepostWidth = PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(), cell.getValueOffset(), SortOrder.getDefault());
+ } else if (!isViewIndexTable) {
+ /*
+ * The table we are collecting stats for is potentially a base table, or local
+ * index or a global index. For view indexes, we rely on the the guide post
+ * width column in the parent data table's metadata which we already tried
+ * retrieving above.
+ */
+ try (Connection conn =
+ QueryUtil.getConnectionOnServer(configuration)) {
+ PTable table = PhoenixRuntime.getTable(conn, tableName);
+ if (table.getType() == PTableType.INDEX
+ && table.getIndexType() == PTable.IndexType.GLOBAL) {
+ /*
+ * For global indexes, we need to get the parentName first and then
+ * fetch guide post width configured for the parent table.
+ */
+ PName parentName = table.getParentName();
+ byte[] parentKey =
+ SchemaUtil.getTableKeyFromFullName(parentName.getString());
+ get = new Get(parentKey);
+ get.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
- result = htable.get(get);
- if (!result.isEmpty()) {
- Cell cell = result.listCells().get(0);
- guidepostWidth =
- PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(),
+ result = htable.get(get);
+ if (!result.isEmpty()) {
+ Cell cell = result.listCells().get(0);
+ guidepostWidth =
+ PLong.INSTANCE.getCodec().decodeLong(cell.getValueArray(),
cell.getValueOffset(), SortOrder.getDefault());
- }
}
}
-
- }
- } finally {
- if (htable != null) {
- try {
- htable.close();
- } catch (IOException e) {
- LOG.warn("Failed to close " + htable.getName(), e);
- }
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
}
}
- if (guidepostWidth >= 0) {
- this.guidePostDepth = guidepostWidth;
- } else {
- // Last use global config value
- Configuration config = env.getConfiguration();
- this.guidePostDepth = StatisticsUtil.getGuidePostDepth(
- config.getInt(
- QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION),
- config.getLong(
- QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB,
- QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES),
- env.getRegion().getTableDesc());
+ return guidepostWidth;
+ } finally {
+ if (htable != null) {
+ try {
+ htable.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close " + htable.getName(), e);
+ }
}
}
}
+ private void getGuidePostDepthFromStatement() {
+ int guidepostPerRegion = 0;
+ long guidepostWidth = QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES;
+ if (guidePostPerRegionBytes != null) {
+ guidepostPerRegion = PInteger.INSTANCE.getCodec().decodeInt(guidePostPerRegionBytes, 0, SortOrder.getDefault());
+ }
+ if (guidePostWidthBytes != null) {
+ guidepostWidth = PLong.INSTANCE.getCodec().decodeInt(guidePostWidthBytes, 0, SortOrder.getDefault());
+ }
+ this.guidePostDepth = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth,
+ region.getTableDesc());
+ }
+
@Override
public long getMaxTimeStamp() {
return maxTimeStamp;
@@ -220,68 +240,62 @@ public void close() throws IOException {
}
@Override
- public void updateStatistic(Region region, Scan scan) {
+ public void updateStatistics(Region region, Scan scan) {
try {
- ArrayList mutations = new ArrayList();
- writeStatistics(region, true, mutations, EnvironmentEdgeManager.currentTimeMillis(), scan);
- if (logger.isDebugEnabled()) {
- logger.debug("Committing new stats for the region " + region.getRegionInfo());
- }
+ List mutations = new ArrayList();
+ writeStatistics(region, true, mutations,
+ EnvironmentEdgeManager.currentTimeMillis(), scan);
commitStats(mutations);
} catch (IOException e) {
- logger.error("Unable to commit new stats", e);
+ LOG.error("Unable to update SYSTEM.STATS table.", e);
}
}
private void writeStatistics(final Region region, boolean delete, List mutations, long currentTime, Scan scan)
throws IOException {
- try {
- Set fams = guidePostsInfoWriterMap.keySet();
- // Update the statistics table.
- // Delete statistics for a region if no guide posts are collected for that region during
- // UPDATE STATISTICS. This will not impact a stats collection of single column family during
- // compaction as guidePostsInfoWriterMap cannot be empty in this case.
- if (cachedGuidePosts == null) {
- // We're either collecting stats for the data table or the local index table, but not both
- // We can determine this based on the column families in the scan being prefixed with the
- // local index column family prefix. We always explicitly specify the local index column
- // families when we're collecting stats for a local index.
- boolean collectingForLocalIndex = scan != null && !scan.getFamilyMap().isEmpty() && MetaDataUtil.isLocalIndexFamily(scan.getFamilyMap().keySet().iterator().next());
- for (Store store : region.getStores()) {
- ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getFamily().getName());
- boolean isLocalIndexStore = MetaDataUtil.isLocalIndexFamily(cfKey);
- if (isLocalIndexStore != collectingForLocalIndex) {
- continue;
- }
- if (!guidePostsInfoWriterMap.containsKey(cfKey)) {
- Pair emptyGps = new Pair(0l, new GuidePostsInfoBuilder());
- guidePostsInfoWriterMap.put(cfKey, emptyGps);
- }
- }
- }
- for (ImmutableBytesPtr fam : fams) {
- if (delete) {
- if (logger.isDebugEnabled()) {
- logger.debug("Deleting the stats for the region " + region.getRegionInfo());
- }
- statsWriter.deleteStatsForRegion(region, this, fam, mutations);
+ Set fams = guidePostsInfoWriterMap.keySet();
+ // Update the statistics table.
+ // Delete statistics for a region if no guide posts are collected for that region during
+ // UPDATE STATISTICS. This will not impact a stats collection of single column family during
+ // compaction as guidePostsInfoWriterMap cannot be empty in this case.
+ if (cachedGuidePosts == null) {
+ // We're either collecting stats for the data table or the local index table, but not both
+ // We can determine this based on the column families in the scan being prefixed with the
+ // local index column family prefix. We always explicitly specify the local index column
+ // families when we're collecting stats for a local index.
+ boolean collectingForLocalIndex = scan != null &&
+ !scan.getFamilyMap().isEmpty() &&
+ MetaDataUtil.isLocalIndexFamily(scan.getFamilyMap().keySet().iterator().next());
+ for (Store store : region.getStores()) {
+ ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getFamily().getName());
+ boolean isLocalIndexStore = MetaDataUtil.isLocalIndexFamily(cfKey);
+ if (isLocalIndexStore != collectingForLocalIndex) {
+ continue;
}
- if (logger.isDebugEnabled()) {
- logger.debug("Adding new stats for the region " + region.getRegionInfo());
- }
- // If we've disabled stats, don't write any, just delete them
- if (this.guidePostDepth > 0) {
- statsWriter.addStats(this, fam, mutations);
+ if (!guidePostsInfoWriterMap.containsKey(cfKey)) {
+ Pair emptyGps = new Pair(0l, new GuidePostsInfoBuilder());
+ guidePostsInfoWriterMap.put(cfKey, emptyGps);
}
}
- } catch (IOException e) {
- logger.error("Failed to update statistics table!", e);
- throw e;
+ }
+ for (ImmutableBytesPtr fam : fams) {
+ if (delete) {
+ statsWriter.deleteStatsForRegion(region, this, fam, mutations);
+ LOG.info("Generated " + mutations.size() + " mutations to delete existing stats");
+ }
+
+ // If we've disabled stats, don't write any, just delete them
+ if (this.guidePostDepth > 0) {
+ int oldSize = mutations.size();
+ statsWriter.addStats(this, fam, mutations, guidePostDepth);
+ LOG.info("Generated " + (mutations.size() - oldSize) + " mutations for new stats");
+ }
}
}
private void commitStats(List mutations) throws IOException {
statsWriter.commitStats(mutations, this);
+ LOG.info("Committed " + mutations.size() + " mutations for stats");
}
/**
@@ -346,31 +360,6 @@ public void collectStatistics(final List results) {
}
}
- @Override
- public InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, Store store,
- InternalScanner s) throws IOException {
- // See if this is for Major compaction
- if (logger.isDebugEnabled()) {
- logger.debug("Compaction scanner created for stats");
- }
- ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getFamily().getName());
- // Potentially perform a cross region server get in order to use the correct guide posts
- // width for the table being compacted.
- init();
- StatisticsScanner scanner = new StatisticsScanner(this, statsWriter, env, s, cfKey);
- return scanner;
- }
-
- @Override
- public void init() throws IOException {
- try {
- initGuidepostDepth();
- } catch (ClassNotFoundException | SQLException e) {
- throw new IOException("Unable to initialize the guide post depth", e);
- }
- this.statsWriter = StatisticsWriter.newWriter(env, tableName, clientTimeStamp, guidePostDepth);
- }
-
@Override
public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) {
Pair pair = guidePostsInfoWriterMap.get(fam);
@@ -380,9 +369,14 @@ public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) {
return null;
}
- @VisibleForTesting // Don't call this method anywhere else
+ @Override
public long getGuidePostDepth() {
return guidePostDepth;
}
+ @Override
+ public StatisticsWriter getStatisticsWriter() {
+ return statsWriter;
+ }
+
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
index 74d17101db5..65ef7173981 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/NoOpStatisticsCollector.java
@@ -22,10 +22,7 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
/**
@@ -45,7 +42,7 @@ public void close() throws IOException {
}
@Override
- public void updateStatistic(Region region, Scan scan) {
+ public void updateStatistics(Region region, Scan scan) {
// No-op
}
@@ -55,12 +52,6 @@ public void collectStatistics(List results) {
}
@Override
- public InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, Store store,
- InternalScanner delegate) throws IOException {
- return delegate;
- }
-
- @Override
public void init() {
// No-op
}
@@ -68,4 +59,14 @@ public void init() {
@Override public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) {
return null;
}
+
+ @Override
+ public long getGuidePostDepth() {
+ return -1;
+ }
+
+ @Override
+ public StatisticsWriter getStatisticsWriter() {
+ return null;
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
index 60e83a820f2..df28a793ff5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java
@@ -23,10 +23,7 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
/**
@@ -45,7 +42,7 @@ public interface StatisticsCollector extends Closeable {
/**
* Write the collected statistics for the given region over the scan provided.
*/
- void updateStatistic(Region region, Scan scan);
+ void updateStatistics(Region region, Scan scan);
/**
* Collect statistics for the given list of cells. This method can be called multiple times
@@ -54,12 +51,6 @@ public interface StatisticsCollector extends Closeable {
*/
void collectStatistics(List results);
- /**
- * Wrap a compaction scanner with a scanner that will collect statistics using this instance.
- */
- InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, Store store,
- InternalScanner delegate) throws IOException;
-
/**
* Called before beginning the collection of statistics through {@link #collectStatistics(List)}
* @throws IOException
@@ -70,4 +61,14 @@ InternalScanner createCompactionScanner(RegionCoprocessorEnvironment env, Store
* Retrieve the calculated guide post info for the given column family.
*/
GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam);
+
+ /**
+ * Retrieve the guide post depth during stats collection
+ */
+ long getGuidePostDepth();
+
+ /**
+ * Retrieve the object that manages statistics persistence
+ */
+ StatisticsWriter getStatisticsWriter();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
index 4e37e5cc57b..affa9955280 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollectorFactory.java
@@ -23,7 +23,11 @@
import java.io.IOException;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.util.SchemaUtil;
/**
* Provides new {@link StatisticsCollector} instances based on configuration settings for a
@@ -48,8 +52,11 @@ public static StatisticsCollector createStatisticsCollector(
byte[] storeName, byte[] guidepostWidthBytes,
byte[] guidepostsPerRegionBytes) throws IOException {
if (statisticsEnabled(env)) {
- return new DefaultStatisticsCollector(env, tableName, clientTimeStamp, storeName,
- guidepostWidthBytes, guidepostsPerRegionBytes);
+ StatisticsWriter statsWriter = StatisticsWriter.newWriter(env, tableName, clientTimeStamp);
+ Table table = env.getTable(
+ SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()));
+ return new DefaultStatisticsCollector(env.getConfiguration(), env.getRegion(), tableName,
+ storeName,guidepostWidthBytes, guidepostsPerRegionBytes, statsWriter, table);
} else {
return new NoOpStatisticsCollector();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index 2fb6f14aedc..08c21008efd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -158,7 +158,8 @@ public Void call() throws IOException {
LOG.debug("Adding new stats for the region " + regionInfo.getRegionNameAsString()
+ " as part of major compaction");
}
- getStatisticsWriter().addStats(tracker, family, mutations);
+ getStatisticsWriter().addStats(tracker, family,
+ mutations, tracker.getGuidePostDepth());
if (LOG.isDebugEnabled()) {
LOG.debug("Committing new stats for the region " + regionInfo.getRegionNameAsString()
+ " as part of major compaction");
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index 23b1fcc5650..57942b6c93f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@ -16,9 +16,12 @@
* limitations under the License.
*/
package org.apache.phoenix.schema.stats;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ANALYZE_TABLE;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import static org.apache.phoenix.util.SchemaUtil.getVarCharLength;
import java.io.IOException;
+import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
@@ -32,10 +35,13 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.MetaDataUtil;
@@ -226,6 +232,21 @@ public static byte[] getGuidePostsInfoFromRowKey(byte[] tableNameBytes, byte[] f
public static boolean isStatsEnabled(TableName tableName) {
return !DISABLE_STATS.contains(tableName);
}
-
+
+ public static void setScanAttributes(Scan scan, Map statsProps) {
+ scan.setCacheBlocks(false);
+ scan.setMaxVersions();
+ scan.setAttribute(ANALYZE_TABLE, TRUE_BYTES);
+ if (statsProps != null) {
+ Object gp_width = statsProps.get(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB);
+ if (gp_width != null) {
+ scan.setAttribute(BaseScannerRegionObserver.GUIDEPOST_WIDTH_BYTES, PLong.INSTANCE.toBytes(gp_width));
+ }
+ Object gp_per_region = statsProps.get(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB);
+ if (gp_per_region != null) {
+ scan.setAttribute(BaseScannerRegionObserver.GUIDEPOST_PER_REGION, PInteger.INSTANCE.toBytes(gp_per_region));
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
index ae077b94858..36b33d62018 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
@@ -27,18 +27,21 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.sql.Date;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
@@ -51,8 +54,11 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
@@ -68,6 +74,19 @@
* Wrapper to access the statistics table SYSTEM.STATS using the HTable.
*/
public class StatisticsWriter implements Closeable {
+
+ public static StatisticsWriter newWriter(PhoenixConnection conn, String tableName, long clientTimeStamp)
+ throws SQLException {
+ Configuration configuration = conn.getQueryServices().getConfiguration();
+ long newClientTimeStamp = determineClientTimeStamp(configuration, clientTimeStamp);
+ TableName physicalTableName = SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, configuration);
+ Table statsWriterTable = conn.getQueryServices().getTable(physicalTableName.getName());
+ Table statsReaderTable = conn.getQueryServices().getTable(physicalTableName.getName());
+ StatisticsWriter statsTable = new StatisticsWriter(statsReaderTable, statsWriterTable, tableName,
+ newClientTimeStamp);
+ return statsTable;
+ }
+
/**
* @param tableName
* TODO
@@ -77,35 +96,47 @@ public class StatisticsWriter implements Closeable {
* @throws IOException
* if the table cannot be created due to an underlying HTable creation error
*/
- public static StatisticsWriter newWriter(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp, long guidePostDepth)
+ public static StatisticsWriter newWriter(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp)
throws IOException {
- if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
- clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis();
- }
- HTableInterface statsWriterTable = env.getTable(
+ long newClientTimeStamp = determineClientTimeStamp(env.getConfiguration(), clientTimeStamp);
+ Table statsWriterTable = env.getTable(
SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, env.getConfiguration()));
- HTableInterface statsReaderTable = ServerUtil.getHTableForCoprocessorScan(env, statsWriterTable);
+ Table statsReaderTable = ServerUtil.getHTableForCoprocessorScan(env, statsWriterTable);
StatisticsWriter statsTable = new StatisticsWriter(statsReaderTable, statsWriterTable, tableName,
- clientTimeStamp, guidePostDepth);
+ newClientTimeStamp);
return statsTable;
}
- private final HTableInterface statsWriterTable;
+ // Provides a means of clients controlling their timestamps to not use current time
+ // when background tasks are updating stats. Instead we track the max timestamp of
+ // the cells and use that.
+ private static long determineClientTimeStamp(Configuration configuration, long clientTimeStamp) {
+ boolean useCurrentTime = configuration.getBoolean(
+ QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
+ if (!useCurrentTime) {
+ clientTimeStamp = DefaultStatisticsCollector.NO_TIMESTAMP;
+ }
+ if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
+ clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis();
+ }
+ return clientTimeStamp;
+ }
+
+ private final Table statsWriterTable;
// In HBase 0.98.4 or above, the reader and writer will be the same.
// In pre HBase 0.98.4, there was a bug in using the HTable returned
// from a coprocessor for scans, so in that case it'll be different.
- private final HTableInterface statsReaderTable;
+ private final Table statsReaderTable;
private final byte[] tableName;
private final long clientTimeStamp;
- private final long guidePostDepth;
-
- private StatisticsWriter(HTableInterface statsReaderTable, HTableInterface statsWriterTable, String tableName,
- long clientTimeStamp, long guidePostDepth) {
+
+ private StatisticsWriter(Table statsReaderTable,
+ Table statsWriterTable, String tableName, long clientTimeStamp) {
this.statsReaderTable = statsReaderTable;
this.statsWriterTable = statsWriterTable;
this.tableName = Bytes.toBytes(tableName);
this.clientTimeStamp = clientTimeStamp;
- this.guidePostDepth = guidePostDepth;
}
/**
@@ -133,8 +164,8 @@ public void close() throws IOException {
* remaining list of stats to update
*/
@SuppressWarnings("deprecation")
- public void addStats(StatisticsCollector tracker, ImmutableBytesPtr cfKey, List mutations)
- throws IOException {
+ public void addStats(StatisticsCollector tracker, ImmutableBytesPtr cfKey,
+ List mutations, long guidePostDepth) throws IOException {
if (tracker == null) { return; }
boolean useMaxTimeStamp = clientTimeStamp == DefaultStatisticsCollector.NO_TIMESTAMP;
long timeStamp = clientTimeStamp;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/UpdateStatisticsTool.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/UpdateStatisticsTool.java
new file mode 100644
index 00000000000..f84b8592911
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/UpdateStatisticsTool.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema.stats;
+
+import org.antlr.runtime.CharStream;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.metrics.Gauge;
+import org.apache.hadoop.hbase.metrics.impl.MetricRegistriesImpl;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.htrace.SpanReceiver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MRJobType;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.tephra.TransactionNotInProgressException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.ZKClient;
+import org.joda.time.Chronology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+
+/**
+ * Tool to collect table level statistics on HBase snapshot
+ */
+public class UpdateStatisticsTool extends Configured implements Tool {
+
+ private static final Logger LOG = LoggerFactory.getLogger(UpdateStatisticsTool.class);
+
+ private static final Option TABLE_NAME_OPTION = new Option("t", "table", true,
+ "Phoenix Table Name");
+ private static final Option SNAPSHOT_NAME_OPTION = new Option("s", "snapshot", true,
+ "HBase Snapshot Name");
+ private static final Option RESTORE_DIR_OPTION = new Option("d", "restore-dir", true,
+ "Restore Directory for HBase snapshot");
+ private static final Option RUN_FOREGROUND_OPTION =
+ new Option("runfg", "run-foreground", false,
+ "If specified, runs UpdateStatisticsTool in Foreground. Default - Runs the build in background.");
+ private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+
+ private Configuration conf;
+ private String tableName;
+ private String snapshotName;
+ private Path restoreDir;
+ private boolean isForeground;
+
+ @Override
+ public int run(String[] args) throws Exception {
+ parseArgs(args);
+ Job job = configureJob(conf, tableName, snapshotName, restoreDir);
+ TableMapReduceUtil.initCredentials(job);
+ return runJob(job, isForeground);
+ }
+
+ private void parseArgs(String[] args) {
+ CommandLine cmdLine = null;
+ try {
+ cmdLine = parseOptions(args);
+ } catch (IllegalStateException e) {
+ printHelpAndExit(e.getMessage(), getOptions());
+ }
+
+ conf = HBaseConfiguration.create();
+ tableName = cmdLine.getOptionValue(TABLE_NAME_OPTION.getOpt());
+ snapshotName = cmdLine.getOptionValue(SNAPSHOT_NAME_OPTION.getOpt());
+ String restoreDirOptionValue = cmdLine.getOptionValue(RESTORE_DIR_OPTION.getOpt());
+ if (restoreDirOptionValue == null) {
+ restoreDirOptionValue = conf.get(FS_DEFAULT_NAME_KEY) + "/tmp";
+ }
+ restoreDir = new Path(restoreDirOptionValue);
+ isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+ }
+
+ Job configureJob(Configuration conf, String tableName,
+ String snapshotName, Path restoreDir) throws Exception {
+ Job job = Job.getInstance(conf, "Update statistics for " + tableName);
+ PhoenixMapReduceUtil.setInput(job, NullDBWritable.class,
+ snapshotName, tableName, restoreDir);
+
+ PhoenixConfigurationUtil.setMRJobType(job.getConfiguration(), MRJobType.UPDATE_STATS);
+ // DO NOT allow mapper splits using statistics since it may result into many smaller chunks
+ PhoenixConfigurationUtil.setSplitByStats(job.getConfiguration(), false);
+
+ job.setJarByClass(UpdateStatisticsTool.class);
+ job.setMapperClass(TableSnapshotMapper.class);
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setNumReduceTasks(0);
+ TableMapReduceUtil.addDependencyJars(job);
+ TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), PhoenixConnection.class, Chronology.class,
+ CharStream.class, TransactionSystemClient.class, TransactionNotInProgressException.class,
+ ZKClient.class, DiscoveryServiceClient.class, ZKDiscoveryService.class,
+ Cancellable.class, TTransportException.class, SpanReceiver.class, TransactionProcessor.class, Gauge.class, MetricRegistriesImpl.class);
+ LOG.info("UpdateStatisticsTool running for: " + tableName
+ + " on snapshot: " + snapshotName + " with restore dir: " + restoreDir);
+
+ return job;
+ }
+
+ int runJob(Job job, boolean isForeground) throws Exception {
+ if (isForeground) {
+ LOG.info("Running UpdateStatisticsTool in Foreground. " +
+ "Runs full table scans. This may take a long time!.");
+ return (job.waitForCompletion(true)) ? 0 : 1;
+ } else {
+ LOG.info("Running UpdateStatisticsTool in Background - Submit async and exit");
+ job.submit();
+ return 0;
+ }
+ }
+
+ private void printHelpAndExit(String errorMessage, Options options) {
+ System.err.println(errorMessage);
+ printHelpAndExit(options, 1);
+ }
+
+ private void printHelpAndExit(Options options, int exitCode) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("help", options);
+ System.exit(exitCode);
+ }
+
+ /**
+ * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are
+ * missing.
+ * @param args supplied command line arguments
+ * @return the parsed command line
+ */
+ private CommandLine parseOptions(String[] args) {
+
+ final Options options = getOptions();
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmdLine = null;
+ try {
+ cmdLine = parser.parse(options, args);
+ } catch (ParseException e) {
+ printHelpAndExit("Error parsing command line options: " + e.getMessage(), options);
+ }
+
+ if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+ printHelpAndExit(options, 0);
+ }
+
+ if (!cmdLine.hasOption(TABLE_NAME_OPTION.getOpt())) {
+ throw new IllegalStateException(TABLE_NAME_OPTION.getLongOpt() + " is a mandatory "
+ + "parameter");
+ }
+
+ if (!cmdLine.hasOption(SNAPSHOT_NAME_OPTION.getOpt())) {
+ throw new IllegalStateException(SNAPSHOT_NAME_OPTION.getLongOpt() + " is a mandatory "
+ + "parameter");
+ }
+
+ return cmdLine;
+ }
+
+ private Options getOptions() {
+ final Options options = new Options();
+ options.addOption(TABLE_NAME_OPTION);
+ options.addOption(SNAPSHOT_NAME_OPTION);
+ options.addOption(HELP_OPTION);
+ options.addOption(RESTORE_DIR_OPTION);
+ options.addOption(RUN_FOREGROUND_OPTION);
+ return options;
+ }
+
+ public static class TableSnapshotMapper
+ extends Mapper {
+
+ @Override
+ protected void map(NullWritable key, NullDBWritable value,
+ Context context) {
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new UpdateStatisticsTool(), args);
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 09701c52646..83ea7863fe8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -44,6 +44,7 @@
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
@@ -218,11 +219,12 @@ private static HTableInterface getTableFromSingletonPool(RegionCoprocessorEnviro
}
}
- public static HTableInterface getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, HTableInterface writerTable) throws IOException {
+ public static Table getHTableForCoprocessorScan (RegionCoprocessorEnvironment env,
+ Table writerTable) throws IOException {
if (coprocessorScanWorks(env)) {
return writerTable;
}
- return getTableFromSingletonPool(env, writerTable.getTableName());
+ return getTableFromSingletonPool(env, writerTable.getName().getName());
}
public static HTableInterface getHTableForCoprocessorScan (RegionCoprocessorEnvironment env, byte[] tableName) throws IOException {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
index 0c4c0047e1f..cb5efa146a1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MRJobType;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
import org.apache.phoenix.util.PropertiesUtil;
@@ -269,4 +270,20 @@ public void testOutputClusterOverride() throws Exception {
assertEquals(zkQuorumOverride3, OVERRIDE_CLUSTER_QUORUM);
}
+
+ @Test
+ public void testMrJobTypeOverride() throws Exception {
+ final Job job = Job.getInstance();
+ Configuration configuration = job.getConfiguration();
+ MRJobType mrJobType = PhoenixConfigurationUtil.getMRJobType(configuration,
+ MRJobType.QUERY.name());
+ assertEquals(MRJobType.QUERY.name(), mrJobType.name());
+
+ PhoenixConfigurationUtil.setMRJobType(configuration, MRJobType.UPDATE_STATS);
+ mrJobType = PhoenixConfigurationUtil.getMRJobType(configuration,
+ MRJobType.QUERY.name());
+ assertEquals(MRJobType.UPDATE_STATS.name(), mrJobType.name());
+
+ }
+
}
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
index d5b870dfedf..7e0203f4d3c 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
@@ -57,12 +57,12 @@ public static ResourceSchema getResourceSchema(final Configuration configuration
try {
List columns = null;
final SchemaType schemaType = PhoenixConfigurationUtil.getSchemaType(configuration);
- if(SchemaType.QUERY.equals(schemaType)) {
+ if(schemaType == SchemaType.QUERY) {
final String sqlQuery = PhoenixConfigurationUtil.getSelectStatement(configuration);
Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration");
final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(configuration);
columns = function.apply(sqlQuery);
- } else {
+ } else if (schemaType == SchemaType.TABLE) {
columns = dependencies.getSelectColumnMetadataList(configuration);
}
ResourceFieldSchema fields[] = new ResourceFieldSchema[columns.size()];
diff --git a/pom.xml b/pom.xml
index fed58a5ca31..6d663d9ed6a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -608,6 +608,16 @@
hbase-common
${hbase.version}
| | | |
+
+ org.apache.hbase
+ hbase-metrics-api
+ ${hbase.version}
+
+
+ org.apache.hbase
+ hbase-metrics
+ ${hbase.version}
+
org.apache.hbase
hbase-common