Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.google.common.collect.Maps;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
Expand All @@ -30,6 +32,7 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
Expand Down Expand Up @@ -137,6 +140,7 @@ public static synchronized void setup() throws Exception {
serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
serverProps.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Long.toString(3600));
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
Expand Down Expand Up @@ -676,6 +680,186 @@ public void testDisableOutputLogging() throws Exception {
}
}

@Test
public void testUpdatablePKFilterViewIndexRebuild() throws Exception {
if (!mutable) {
return;
}
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String view1Name = generateUniqueName();
String view1FullName = SchemaUtil.getTableName(schemaName, view1Name);
String view2Name = generateUniqueName();
String view2FullName = SchemaUtil.getTableName(schemaName, view2Name);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);

try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
// Create Table and Views. Note the view is on a non leading PK data table column
String createTable =
"CREATE TABLE IF NOT EXISTS " + dataTableFullName + " (\n"
+ " ORGANIZATION_ID VARCHAR NOT NULL,\n"
+ " KEY_PREFIX CHAR(3) NOT NULL,\n" + " CREATED_BY VARCHAR,\n"
+ " CONSTRAINT PK PRIMARY KEY (\n" + " ORGANIZATION_ID,\n"
+ " KEY_PREFIX\n" + " )\n"
+ ") VERSIONS=1, COLUMN_ENCODED_BYTES=0";
conn.createStatement().execute(createTable);
String createView1 =
"CREATE VIEW IF NOT EXISTS " + view1FullName + " (\n"
+ " VIEW_COLA VARCHAR NOT NULL,\n"
+ " VIEW_COLB CHAR(1) CONSTRAINT PKVIEW PRIMARY KEY (\n"
+ " VIEW_COLA\n" + " )) AS \n" + " SELECT * FROM " + dataTableFullName
+ " WHERE KEY_PREFIX = 'aaa'";
conn.createStatement().execute(createView1);
String createView2 =
"CREATE VIEW IF NOT EXISTS " + view2FullName + " (\n"
+ " VIEW_COL1 VARCHAR NOT NULL,\n"
+ " VIEW_COL2 CHAR(1) CONSTRAINT PKVIEW PRIMARY KEY (\n"
+ " VIEW_COL1\n" + " )) AS \n" + " SELECT * FROM " + dataTableFullName
+ " WHERE KEY_PREFIX = 'ccc'";
conn.createStatement().execute(createView2);

// We want to verify if deletes and set null result in expected rebuild of view index
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'A', 'G')");
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'C', 'I')");
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'D', 'J')");

conn.createStatement().execute("UPSERT INTO " + view2FullName
+ "(ORGANIZATION_ID, VIEW_COL1, VIEW_COL2) VALUES('ORG2', 'B', 'H')");
conn.commit();
conn.createStatement().execute("DELETE FROM " + view1FullName
+ " WHERE ORGANIZATION_ID = 'ORG1' AND VIEW_COLA = 'C'");
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'D', NULL)");
conn.commit();

String createViewIndex =
"CREATE INDEX IF NOT EXISTS " + indexTableName + " ON " + view1FullName
+ " (VIEW_COLB) ASYNC";
conn.createStatement().execute(createViewIndex);
conn.commit();
// Rebuild using index tool
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, view1Name, indexTableName);
ResultSet rs =
conn.createStatement()
.executeQuery("SELECT COUNT(*) FROM " + indexTableFullName);
rs.next();
assertEquals(2, rs.getInt(1));

Pair<Integer, Integer> putsAndDeletes =
countPutsAndDeletes("_IDX_" + dataTableFullName);
assertEquals(4, (int) putsAndDeletes.getFirst());
assertEquals(2, (int) putsAndDeletes.getSecond());
}
}

@Test
public void testUpdatableNonPkFilterViewIndexRebuild() throws Exception {
if (!mutable) {
return;
}
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String view1Name = generateUniqueName();
String view1FullName = SchemaUtil.getTableName(schemaName, view1Name);
String view2Name = generateUniqueName();
String view2FullName = SchemaUtil.getTableName(schemaName, view2Name);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);

try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
// Create Table and Views. Note the view is on a non PK data table column
String createTable =
"CREATE TABLE IF NOT EXISTS " + dataTableFullName + " (\n"
+ " ORGANIZATION_ID VARCHAR NOT NULL,\n"
+ " KEY_PREFIX CHAR(3) NOT NULL,\n" + " CREATED_BY VARCHAR,\n"
+ " CONSTRAINT PK PRIMARY KEY (\n" + " ORGANIZATION_ID,\n"
+ " KEY_PREFIX\n" + " )\n"
+ ") VERSIONS=1, COLUMN_ENCODED_BYTES=0";
conn.createStatement().execute(createTable);
String createView1 =
"CREATE VIEW IF NOT EXISTS " + view1FullName + " (\n"
+ " VIEW_COLA VARCHAR NOT NULL,\n"
+ " VIEW_COLB CHAR(1) CONSTRAINT PKVIEW PRIMARY KEY (\n"
+ " VIEW_COLA\n" + " )) AS \n" + " SELECT * FROM " + dataTableFullName
+ " WHERE CREATED_BY = 'foo'";
conn.createStatement().execute(createView1);
String createView2 =
"CREATE VIEW IF NOT EXISTS " + view2FullName + " (\n"
+ " VIEW_COL1 VARCHAR NOT NULL,\n"
+ " VIEW_COL2 CHAR(1) CONSTRAINT PKVIEW PRIMARY KEY (\n"
+ " VIEW_COL1\n" + " )) AS \n" + " SELECT * FROM " + dataTableFullName
+ " WHERE CREATED_BY = 'bar'";
conn.createStatement().execute(createView2);

// We want to verify if deletes and set null result in expected rebuild of view index
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, KEY_PREFIX, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'aaa', 'A', 'G')");
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, KEY_PREFIX, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'ccc', 'C', 'I')");
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, KEY_PREFIX, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'ddd', 'D', 'J')");

conn.createStatement().execute("UPSERT INTO " + view2FullName
+ "(ORGANIZATION_ID, KEY_PREFIX, VIEW_COL1, VIEW_COL2) VALUES('ORG2', 'bbb', 'B', 'H')");
conn.commit();
conn.createStatement().execute("DELETE FROM " + view1FullName
+ " WHERE ORGANIZATION_ID = 'ORG1' AND VIEW_COLA = 'C'");
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, KEY_PREFIX, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'ddd', 'D', NULL)");
conn.commit();

String createViewIndex =
"CREATE INDEX IF NOT EXISTS " + indexTableName + " ON " + view1FullName
+ " (VIEW_COLB) ASYNC";
conn.createStatement().execute(createViewIndex);
conn.commit();
// Rebuild using index tool
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, view1Name, indexTableName);
ResultSet rs =
conn.createStatement()
.executeQuery("SELECT COUNT(*) FROM " + indexTableFullName);
rs.next();
assertEquals(2, rs.getInt(1));

Pair<Integer, Integer> putsAndDeletes =
countPutsAndDeletes("_IDX_" + dataTableFullName);
assertEquals(4, (int) putsAndDeletes.getFirst());
assertEquals(2, (int) putsAndDeletes.getSecond());
}
}

private Pair<Integer, Integer> countPutsAndDeletes(String tableName) throws Exception {
int numPuts = 0;
int numDeletes = 0;
try (org.apache.hadoop.hbase.client.Connection hcon =
ConnectionFactory.createConnection(config)) {
Table htable = hcon.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.setRaw(true);
ResultScanner scanner = htable.getScanner(scan);

for (Result result = scanner.next(); result != null; result = scanner.next()) {
for (Cell cell : result.rawCells()) {
if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
numPuts++;
} else if (KeyValue.Type
.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamily) {
numDeletes++;
}
}
}
}
return new Pair<Integer, Integer>(numPuts, numDeletes);
}

public void deleteAllRows(Connection conn, TableName tableName) throws SQLException,
IOException, InterruptedException {
Scan scan = new Scan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
Expand Down Expand Up @@ -87,6 +88,7 @@
import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.Aggregators;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
import org.apache.phoenix.filter.AllVersionsIndexRebuildFilter;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
Expand Down Expand Up @@ -1081,7 +1083,15 @@ private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Reg
rawScan.setRaw(true);
rawScan.setMaxVersions();
rawScan.getFamilyMap().clear();
rawScan.setFilter(null);
// For rebuilds we use count (*) as query for regular tables which ends up setting the FKOF on scan
// This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col
// For rebuilds we need all columns and all versions
if (scan.getFilter() instanceof FirstKeyOnlyFilter) {
Copy link
Copy Markdown
Contributor

@gjacoby126 gjacoby126 Jun 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment to explain why FirstKeyOnlyFilter is a special case. If the rebuild index scan is explicitly asking for only the first keyvalue, why do we avoid using the AllVersions filter which also only gives the first keyvalue?

And if there is a reason not to use the FirstKeyOnlyFilter, are we still OK with using the AllVersionsIndexRebuildFilter if the Scan's filter it will delegate to is a composite filter which contains a FirstKeyOnlyFilter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allversions filter does not only give the first key value, its purpose is to make sure all versions of a column are returned(when matched by underlying supplied filter), instead of just one. Usually the filters used in normal queries(which also end up being used for rebuild since we use select count(*)) returns only 1 version of a column, in rebuild we want to return all versions hence this.

rawScan.setFilter(null);
} else if (scan.getFilter() != null) {
// Override the filter so that we get all versions
rawScan.setFilter(new AllVersionsIndexRebuildFilter(scan.getFilter()));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do any special filter logic down at line 1099 in the else block if the Scan was raw in the first place?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK we get raw scan here in case of old design and partial rebuild (Correct me if i'm wrong here @kadirozde ). I didn't want to mess with the old design and hence only made the changes for new.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we get raw scan only for the old design partial rebuilds (i.e., auto-rebuilds).

rawScan.setCacheBlocks(false);
for (byte[] family : scan.getFamilyMap().keySet()) {
rawScan.addFamily(family);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.filter;

import java.io.IOException;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.filter.Filter;

/**
* This filter overrides the behavior of delegate so that we do not jump to the next
* column as soon as we find a value for a column but rather include all versions which is
* needed for rebuilds.
*/
public class AllVersionsIndexRebuildFilter extends DelegateFilter {

public AllVersionsIndexRebuildFilter(Filter originalFilter) {
super(originalFilter);
}

@Override
public ReturnCode filterKeyValue(Cell v) throws IOException {
ReturnCode delegateCode = super.filterKeyValue(v);
if (delegateCode == ReturnCode.INCLUDE_AND_NEXT_COL) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment here why we convert it to INCLUDE? Why are we not happy with NEXT_COL?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gokceni NEXT_COL skips this column and goes to the next one. What we want to do is when the underlying filter says yes to a column, we want to say yes too, but instead of jumping to the next col since we got a value, we want to get all versions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree, comment in the code would be helpful here. @abhishek-chouhan

return ReturnCode.INCLUDE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is simulating the effects of a FirstKeyOnlyFilter? Comment would be good

} else {
return delegateCode;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.filter;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;

public class DelegateFilter extends FilterBase {

protected Filter delegate = null;

public DelegateFilter(Filter delegate) {
this.delegate = delegate;
}

@Override
public void reset() throws IOException {
delegate.reset();
}

@Override
public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
return delegate.filterRowKey(buffer, offset, length);
}

@Override
public boolean filterAllRemaining() throws IOException {
return delegate.filterAllRemaining();
}

@Override
public ReturnCode filterKeyValue(Cell v) throws IOException {
return delegate.filterKeyValue(v);
}

@Override
public Cell transformCell(Cell v) throws IOException {
return delegate.transformCell(v);
}

@Override
public KeyValue transform(KeyValue currentKV) throws IOException {
return delegate.transform(currentKV);
}

@Override
public void filterRowCells(List<Cell> kvs) throws IOException {
delegate.filterRowCells(kvs);
}

@Override
public boolean hasFilterRow() {
return delegate.hasFilterRow();
}

@Override
public boolean filterRow() throws IOException {
return delegate.filterRow();
}

@Override
public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
return delegate.getNextKeyHint(currentKV);
}

@Override
public Cell getNextCellHint(Cell currentKV) throws IOException {
return delegate.getNextCellHint(currentKV);
}

@Override
public boolean isFamilyEssential(byte[] name) throws IOException {
return delegate.isFamilyEssential(name);
}

@Override
public byte[] toByteArray() throws IOException {
return delegate.toByteArray();
}
}