You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
publicclassSingleColumnValueFilterextendsFilterBase {
privatestaticfinalLogLOG = LogFactory.getLog(SingleColumnValueFilter.class);
protectedbyte [] columnFamily;
protectedbyte [] columnQualifier;
protectedCompareOpcompareOp;
protectedByteArrayComparablecomparator;
protectedbooleanfoundColumn = false;
protectedbooleanmatchedColumn = false;
protectedbooleanfilterIfMissing = false;
protectedbooleanlatestVersionOnly = true;
/** * Constructor for binary compare of the value of a single column. If the * column is found and the condition passes, all columns of the row will be * emitted. If the condition fails, the row will not be emitted. * <p> * Use the filterIfColumnMissing flag to set whether the rest of the columns * in a row will be emitted if the specified column to check is not found in * the row. * * @param family name of column family * @param qualifier name of column qualifier * @param compareOp operator * @param comparator Comparator to use. */publicSingleColumnValueFilter(finalbyte [] family, finalbyte [] qualifier,
finalCompareOpcompareOp, finalByteArrayComparablecomparator) {
this.columnFamily = family;
this.columnQualifier = qualifier;
this.compareOp = compareOp;
this.comparator = comparator;
}
@OverridepublicReturnCodefilterKeyValue(Cellc) {
if (this.matchedColumn) {
// We already found and matched the single column, all keys now passreturnReturnCode.INCLUDE;
} elseif (this.latestVersionOnly && this.foundColumn) {
// We found but did not match the single column, skip to next rowreturnReturnCode.NEXT_ROW;
}
if (!CellUtil.matchingColumn(c, this.columnFamily, this.columnQualifier)) {
returnReturnCode.INCLUDE;
}
foundColumn = true;
if (filterColumnValue(c.getValueArray(), c.getValueOffset(), c.getValueLength())) {
returnthis.latestVersionOnly? ReturnCode.NEXT_ROW: ReturnCode.INCLUDE;
}
this.matchedColumn = true;
returnReturnCode.INCLUDE;
}
privatebooleanfilterColumnValue(finalbyte [] data, finalintoffset,
finalintlength) {
intcompareResult = this.comparator.compareTo(data, offset, length);
switch (this.compareOp) {
caseLESS:
returncompareResult <= 0;
caseLESS_OR_EQUAL:
returncompareResult < 0;
caseEQUAL:
returncompareResult != 0;
caseNOT_EQUAL:
returncompareResult == 0;
caseGREATER_OR_EQUAL:
returncompareResult > 0;
caseGREATER:
returncompareResult >= 0;
default:
thrownewRuntimeException("Unknown Compare op " + compareOp.name());
}
}
publicbooleanfilterRow() {
// If column was found, return false if it was matched, true if it was not// If column not found, return true if we filter if missing, false if notreturnthis.foundColumn? !this.matchedColumn: this.filterIfMissing;
}
}
/** * This is a Filter wrapper class which is used in the server side. Some filter * related hooks can be defined in this wrapper. The only way to create a * FilterWrapper instance is passing a client side Filter instance through * {@link org.apache.hadoop.hbase.client.Scan#getFilter()}. * */finalpublicclassFilterWrapperextendsFilter {
Filterfilter = null;
publicFilterWrapper( Filterfilter ) {
if (null == filter) {
// ensure the filter instance is not nullthrownewNullPointerException("Cannot create FilterWrapper with null Filter");
}
this.filter = filter;
}
publicenumFilterRowRetCode {
NOT_CALLED,
INCLUDE, // corresponds to filter.filterRow() returning falseEXCLUDE// corresponds to filter.filterRow() returning true
}
publicFilterRowRetCodefilterRowCellsWithRet(List<Cell> kvs) throwsIOException {
this.filter.filterRowCells(kvs);
if (!kvs.isEmpty()) {
if (this.filter.filterRow()) {
kvs.clear();
returnFilterRowRetCode.EXCLUDE;
}
returnFilterRowRetCode.INCLUDE;
}
returnFilterRowRetCode.NOT_CALLED;
}
}
有这样一个场景,在HBase中需要分页查询,同时根据某一列的值进行过滤。
不同于RDBMS天然支持分页查询,HBase要进行分页必须由自己实现。据我了解的,目前有两种方案, 一是《HBase权威指南》中提到的用PageFilter加循环动态设置startRow实现,详细见这里。但这种方法效率比较低,且有冗余查询。因此京东研发了一种用额外的一张表来保存行序号的方案。 该种方案效率较高,但实现麻烦些,需要维护一张额外的表。
不管是方案也好,人也好,没有最好的,只有最适合的。
在我司的使用场景中,对于性能的要求并不高,所以采取了第一种方案。本来使用的美滋滋,但有一天需要在分页查询的同时根据某一列的值进行过滤。根据列值过滤,自然是用SingleColumnValueFilter(下文简称SCVFilter)。代码大致如下,只列出了本文主题相关的逻辑,
数据如下
在上面的代码中。向scan添加了两个filter:首先添加了PageFilter,限制这次查询数量为1,然后添加了一个SCVFilter,限制了只返回
isDeleted=false
的行。上面的代码,看上去无懈可击,但在运行时却没有查询到数据!
刚好最近在看HBase的代码,就在本地debug了下HBase服务端Filter相关的查询流程。
Filter流程
首先看下HBase Filter的流程,见图:
然后再看PageFilter的实现逻辑。
其实很简单,内部有一个计数器,每次调用filterRow的时候,计数器都会+1,如果计数器值大于pageSize,filterrow就会返回true,那之后的行就会被过滤掉。
再看SCVFilter的实现逻辑。
在HBase中,对于每一行的每一列都会调用到filterKeyValue,SCVFilter的该方法处理逻辑如下:
再看filterRow方法,该方法调用时机在filterKeyValue之后,对每一行只会调用一次。
SCVFilter中该方法逻辑很简单:
猜想:
是不是因为将PageFilter添加到SCVFilter的前面,当判断第一行的时候,调用PageFilter的filterRow,导致PageFilter的计数器+1,但是进行到SCVFilter的filterRow的时候,该行又被过滤掉了,在检验下一行时,因为PageFilter计数器已经达到了我们设定的pageSize,所以接下来的行都会被过滤掉,返回结果没有数据。
验证:
在FilterList中,先加入SCVFilter,再加入PageFilter
结果是我们期望的第2行的值。
结论
当要将PageFilter和其他Filter使用时,最好将PageFilter加入到FilterList的末尾,否则可能会出现结果个数小于你期望的数量。
(其实正常情况PageFilter返回的结果数量可能大于设定的值,因为服务器集群的PageFilter是隔离的。)
彩蛋
其实,在排查问题的过程中,并没有这样顺利,因为问题出在线上,所以我在本地查问题时自己造了一些测试数据,令人惊讶的是,就算我先加入SCVFilter,再加入PageFilter,返回的结果也是符合预期的。
测试数据如下:
当时在本地一直不能复现问题。很是苦恼,最后竟然发现使用SCVFilter查询的结果还和数据的列的顺序有关。
在服务端,HBase会对客户端传递过来的filter封装成FilterWrapper。
在查询数据时,在HRegion的nextInternal方法中,会调用FilterWrapper的filterRowCellsWithRet方法
FilterWrapper相关代码如下:
这里的kvs就是一行数据经过filterKeyValue后没被过滤的列。
可以看到当kvs不为empty时,filterRowCellsWithRet方法中会调用指定filter的filterRow方法,上面已经说过了,PageFilter的计数器就是在其filterRow方法中增加的。
而当kvs为empty时,PageFilter的计数器就不会增加了。再看我们的测试数据,因为行的第一列就是SCVFilter的目标列isDeleted。回顾上面SCVFilter的讲解我们知道,当一行的目标列的值不满足要求时,该行剩下的列都会直接被过滤掉!
对于测试数据第一行,走到filterRowCellsWithRet时kvs是empty的。导致PageFilter的计数器没有+1。还会继续遍历剩下的行。从而使得返回的结果看上去是正常的。
而出问题的数据,因为在列isDeleted之前还有列content,所以当一行的isDeleted不满足要求时,kvs也不会为empty。因为列content的值已经加入到kvs中了(这些数据要调用到SCVFilter的filterrow的时间会被过滤掉)。
感想
从实现上来看HBase的Filter的实现还是比较粗糙的。效率也比较感人,不考虑网络传输和客户端内存的消耗,基本上和你在客户端过滤差不多。
The text was updated successfully, but these errors were encountered: