Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

Support equi-joins #20

jtaylor-sfdc opened this Issue Jan 30, 2013 · 15 comments


None yet
2 participants

jtaylor-sfdc commented Jan 30, 2013

Support joins, starting with hash joins. Some work on this has already been done. See HashCache* and HashJoiningRegionObserver.

@ghost ghost assigned maryannxue May 31, 2013


jtaylor-sfdc commented Jun 1, 2013

Here's my initial thinking on a equi-inner-join implementation.
First, my assumptions:

  1. Only support ANSI SQL join syntax for now, where the join criteria is in the ON clause.
  2. Only support equals in the ON clause. We'll need to support multiple clauses (AND separated only) for composite keys, and we might be able to support arbitrary filters pretty easily (that'd be extra credit :-)).
  3. Since we don't have stats yet, the onus would be on the user to order the tables being joined in the order they'll be processed (from smallest rows to most rows).

For the implementation, at a high level, we'd want to support a new implementation of QueryPlan called JoinPlan. The JoinPlan would nest two QueryPlans, the LHS and RHS, with a HashCache being created and sent at execution time for the LHS. For example, A JOIN B JOIN C would cause HashCache(A) to be sent to all region servers, the scan for B to be done (where the the GroupedAggregateRegionObserver or ScanRegionObserver would do the join), the HashCache(A joined to B) sent to all region servers, and the scan for C would be done (again with the region observer doing the join). The key of the HashCache would be formed by the concatenated expression evaluation of the expressions in the ON clause for the table being cached, kind of the same as the GROUP BY key formed in GroupedAggregateRegionObserver#getKey. The region observer would be passed the parallel expressions for the other side of the expression in the ON clause. For each row scanned, we'd evaluate the expressions and look up (using an ImmutableBytesPtr key) the result in our HashCache to find the corresponding result. The List of KeyValues from the hash cache would be concatenated with the List of Key Values from the scan and the row key from each would be concatenated together and returned as the result.

For the client side changes:

  1. Implement FromCompiler.FromClauseVisitor#visit(JoinTableNode) to capture all of the tables involved. There'd need to be some refactoring of the code in visit(NamedTableNode), but the code would probably be very similar. You'd need to maintain the ParseNode from the ON clause with each TableRef (maybe have a derived JoinTableRef). The code that resolves the columns should (hopefully) work when there are multiple tables in the ColumnResolver.
  2. Recurse over the list of TableRefs from the ColumnResolver and create nested JoinPlans. The QueryPlan contained in the JoinPlan will be created as they are now, with the following modifications:
    • The current TableRef would be passed down to the WhereCompiler and only the ParseNodes that involve that TableRef would be used to form the where expression. The formation of the RowKeyExpression (the expression that looks up a column in the row key) would need to change. With each join, we'd concatenate the row key from both tables, so the ExpressionCompiler used by WhereCompiler would need to have an offset passed to it so that when a RowKeyExpression is formed, it would use the correct offset to find the column value. We'd need to do some extra checking for expressions that compared columns from different tables (this should only occur in the ON clause) and an OR between expressions in different tables (not sure how to handle this - for example WHERE a.col1 < 5 OR b.col2 < 6. Maybe post join or just disallow initially? We could extract the ParseNodes as we go, and whatever is left over could be executed in a post filter on the client side)
    • Same thing for the SELECT expressions - we'd want to isolate only the expressions involved in the current TableRef and we'd need to have an offset for the RowKeyExpressions. We'd need to handle cases where the SELECT expression is an aggregation - we'd basically want to let the column be projected in the scan (so that it gets returned when the scan is run), but not do anything else. We'd probably want to use a different mechanism than the existing ProjectionCompiler for this - that would be used only once for the final scan.
    • If the query has an GROUP BY, ORDER BY and/or LIMIT, I think we'd need to push these into the last QueryPlan. It's the last scan that would handle TopN and aggregation.
    • Use a new visitor (derived from ExpressionCompiler) to parse the ON expression to extract out the list of expressions that form the join key.
    • Create the JoinPlan using a) the join key expressions, b) the ScanPlan or AggregatePlan as the LHS and c) the recursion over the QueryPlan for the rest of the TableRefs.
  3. On the execution side of things, the JoinPlan would use a new implementation of Scanner, a JoinScanner that would use HashCacheClient.addHashCache to execute the LHS Scanner, put the results in a HashCache, and send then to each region server. Then it would execute the RHS Scanner, serializing the ON expressions for the RHS. Note that HashCacheClient needs to be modified - it was implemented well before we had a general expression evaluation ability. Conceptually, it will do what it's doing now, though.
  4. I don't think we need HashJoiningRegionObserver. I think we can just do the join in ScanRegionObserver and GroupingAggregateRegionObserver instead, based on what gets serialized to these region observers. Otherwise, we'd end up having chained coprocessors and I'm not sure how well this will function.
  5. At execution time, in PhoenixStatement.ExecutableSelectStatement#executeQuery, it would execute the query as it does now - the JoinScanner returned by it and the ResultIterator returned by JoinScanner would abstract away the join being done.

jtaylor-sfdc commented Jun 1, 2013

For outer joins, it'd be similar, but we couldn't push the where clauses down to the scans (not sure if you can sometimes push them or not). I think we should start with inner joins and that'll be the bulk of the work.


jtaylor-sfdc commented Jun 4, 2013

Updated the above write-up slightly. I think we'll be better off if the JoinPlan nests two plans, the LHS plan and the RHS plan. The nested plan may itself be another JoinPlan or a simple ScanPlan or AggregatePlan.


jtaylor-sfdc commented Jun 21, 2013

Here's a rundown on the server side of the existing/partial hash join implementation and what needs to change.

GlobalCache. We instantiate a GlobalCache singleton on each region server. This GlobaCache has a map of child TenantCache instances (see below). A different, better mechanism for holding onto an object on the region server was surfaced by @lhofhansl that we should switch to - maybe he can remind us what this is?

TenantCacheImpl. This is what's holding on to the HashCache, potentially a list of them if multiple joins are being processed by the same tenant. Don't get confused by the tenant stuff. By default, there's a single global tenant being held onto by the GlobalCache. You can have multiple tenants if the client sets a "TenantId" connection property. It's just a way of potentially rolling up resource usage on the server side to an "owner" so that a single owner doesn't consume too many resources.

AgeOutHashCache. The server-side implementation of HashCache. It builds up a map from the row key to the Result when it deserializes the hash cache sent from the client. It's basically hard coded right now to do a join between a PK and FK (in either direction). Instead, we want to serialize through the LHS expressions from the ON clause (along with the hash cache) and evaluate them against each Result in the HashCache to form a key (exactly like the key that's formed in GroupedAggregateRegionObserver#getKey - just refactor that code). This will be the key of the Map. Use an ImmutableBytesPtr as the key - it's like an ImmutableBytesWritable, but it caches the hash code for better performance. The value should be a Tuple instead of a Result. The cache gets aged out after a configurable amount of time. We should either just switch to using the Guava Map for caching (they have age-out options), or tie into the HBase mechanism for a "lease". This mechanism was surfaced by @lhofhansl in HBase awhile back - maybe another reminder would be good.

HashJoiningRegionObserver. Take a look below at the prior revision. This is the coprocessor that handles a hash join. I think it's probably better to get rid of this coprocessor and put the logic in a class that's usable by both ScanRegionObserver and GroupedAggregateRegionObserver. Otherwise we'll have multiple coprocessors firing, for example if there's a top N query that does a join or a group by with a join which will get difficult to manage. The changes necessary here are similar to the changes to AgeOutHashCache. You'll want to send the RHS of the ON expression through the Scan attributes and use those to build up a key by evaluating them against each Tuple returned by the scan. You'll then use this key to look for a match in the HashCache, similar to what's being done now. If you find a match, then the two Tuples need to be unioned, including the row keys. The SchemaUtil.joinColumns (which isn't in the code base anymore), didn't do much more than concatenate the list of KeyValues. I think you'll need to devise a scheme where similarly named columns from the LHS and RHS don't conflict. If you pass along the alias used on the RHS and LHS, I think you can use these as the column family name (and ignore the current column family name). This should handle this case. I think the logic for an outer join is the same as in this code.

package com.salesforce.phoenix.join;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.collect.Lists;
import com.salesforce.phoenix.cache.*;
import com.salesforce.phoenix.coprocessor.BaseRegionScanner;
import com.salesforce.phoenix.coprocessor.BaseScannerRegionObserver;
import com.salesforce.phoenix.query.QueryConstants.JoinType;
import com.salesforce.phoenix.util.*;
 * Prototype for region observer that performs a hash join between two tables.
 * The client sends over the rows in a serialized format and the coprocessor
 * deserialized into a Map and caches it on the region server.  The map is then
 * used to resolve the foreign key reference and the rows are then joined together.
 * TODO: Scan rows locally on region server instead of returning to client
 * if we can know that all both tables rows are on the same region server.
 * @author jtaylor
 * @since 0.1
public class HashJoiningRegionObserver extends BaseScannerRegionObserver  {
    public static final String JOIN_COUNT = "JoinCount";
    public static final String JOIN_IDS = "JoinIds";
    public static final String JOIN_THROUGH_FAMILIES = "JoinFamilies";
    public static final String JOIN_THROUGH_QUALIFIERS = "JoinQualifiers";
    public static final String JOIN_THROUGH_POSITIONS = "JoinPositions";
    public static final String JOIN_TYPES = "JoinTypes";
    private HashCache getHashCache(Configuration config, ImmutableBytesWritable tenantId, ImmutableBytesWritable joinId) throws IOException {
        TenantCache tenantCache = GlobalCache.getTenantCache(config, tenantId);
        HashCache hashCache = tenantCache.getHashCache(joinId);
        if (hashCache == null) {
            throw new DoNotRetryIOException("Unable to find hash cache for tenantId=" + tenantId + ",joinId=" + joinId);
        return hashCache;
    protected RegionScanner doPostScannerOpen(final ObserverContext c, final Scan scan,
            final RegionScanner s) throws IOException {
         * Support snowflake join.  The JOIN_COUNT is the number of dimension
         * tables being joined against.  Each one of them is cached inside of
         * the OrgCache.  The join column families, join column names, and join
         * column IDs are passed through in parallel arrays through attributes
         * on the Scan object.
        byte[] joinCountBytes = scan.getAttribute(JOIN_COUNT);
        if (joinCountBytes == null) return s;
        final int joinCount = (int)Bytes.bytesToVint(joinCountBytes);
        byte[] joinThroughFamsBytes = scan.getAttribute(JOIN_THROUGH_FAMILIES);
        if (joinThroughFamsBytes == null) return s;
        final byte[][] joinThroughFams = ByteUtil.toByteArrays(joinThroughFamsBytes, joinCount);
        byte[] joinThroughColsBytes = scan.getAttribute(JOIN_THROUGH_QUALIFIERS);
        if (joinThroughColsBytes == null) return s;
        final byte[][] joinThroughCols = ByteUtil.toByteArrays(joinThroughColsBytes, joinCount);
        byte[] joinThroughPossBytes = scan.getAttribute(JOIN_THROUGH_POSITIONS);
        if (joinThroughPossBytes == null) return s;
        final int[] joinThroughPoss = ByteUtil.deserializeIntArray(joinThroughPossBytes, joinCount);
        byte[] joinTypesBytes = scan.getAttribute(JOIN_TYPES);
        if (joinTypesBytes == null) return s;
        final int[] joinTypes = ByteUtil.deserializeIntArray(joinTypesBytes, joinCount);
        byte[] joinIdsBytes = scan.getAttribute(JOIN_IDS);
        if (joinIdsBytes == null) return s;
        final byte[][] joinIds = ByteUtil.toByteArrays(joinIdsBytes, joinCount);
        final ImmutableBytesWritable fkValue = new ImmutableBytesWritable();
        if (joinThroughFams.length != joinCount
            || joinThroughCols.length != joinCount 
            || joinIds.length != joinCount) {
            throw new IllegalStateException("Expected join column family, join column name, and join id arrays to be of the same length");
        ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
        Configuration config = c.getEnvironment().getConfiguration();
        final HashCache[] hashCaches = new HashCache[joinCount];
        for (int i = 0; i < joinCount; i++) {
            hashCaches[i] = getHashCache(config, tenantId, new ImmutableBytesWritable(joinIds[i]));
        return new BaseRegionScanner() {
            public HRegionInfo getRegionInfo() {
                return s.getRegionInfo();
            public boolean isFilterDone() {
                return s.isFilterDone();
            public void close() throws IOException {
            public boolean next(List joinedResults) throws IOException {
                try {
                    boolean hasMore;
                    do {
                        // Results are potentially returned even when the return value of s.next is false
                        // since this is an indication of whether or not there are more values after the
                        // ones returned
                        List results = Lists.newArrayList();
                        hasMore = s.next(results) && !s.isFilterDone();
                        if (!results.isEmpty()) {
                            Result result = new Result(results);
                            ImmutableBytesWritable key = ResultUtil.getKey(result);
                            for (int i = 0; i < joinCount; i++) {
                                Result hashCacheRow = null;
                                byte[] joinThroughCol = joinThroughCols[i];
                                byte[] joinThroughFam = joinThroughFams[i];
                                int joinThroughPos = joinThroughPoss[i];
                                HashCache hashCache = hashCaches[i];
                                if (joinThroughCol == null) { // Join using key of lhs table
                                    hashCacheRow = hashCache.get(new ImmutableBytesWritable(key.get(),key.getOffset(), key.getLength()));
                                } else { // else join through fk
                                    if (SchemaUtil.getValue(result, joinThroughFam, joinThroughCol, joinThroughPos, fkValue)) {
                                        hashCacheRow = hashCache.get(fkValue);
                                if (hashCacheRow != null) {
                                    SchemaUtil.joinColumns(hashCacheRow.list(), joinedResults);
                                } else if (JoinType.values()[joinTypes[i]] == JoinType.LEFT_OUTER) {
                                    // If left outer join nothing to do
                                } else { // If not left outer join and no match found then filter this row
                            if (joinedResults.isEmpty()) {
                                continue; // Unable to find joined row for one or more tables, so just continue
                            } else {
                                break; // Found row for all joined tables, so return result back to client
                    } while (hasMore);
                    return hasMore;
                } catch (IOException e) {
                    throw e;
                } catch (Throwable t) {
                    throw new DoNotRetryIOException(t.getMessage(),t);
            public boolean next(List result, int limit) throws IOException {
                // TODO: not sure how this limit comes into play.  This would limit
                // the number of columns being returned, since we're always returning
                // a single row here.
                return next(result);
            public boolean reseek(byte[] row) throws IOException {
                throw new DoNotRetryIOException("Unsupported");

maryannxue commented Jun 25, 2013

Is the implementation in HashJoiningRegionObserver#doPostScanner() meant for star-join, by having multiple join tables? or for now we just support pipelined join as described in the first comment, like ((A join B) join C) join D ?


jtaylor-sfdc commented Jun 25, 2013

Yes, the multiple join tables in HashJoiningRegionObserver#doPostScanner() are meant to support star joins for queries like below. This saves us from having to push the result of each join back to the region server.

    S.Country AS Countries,
FROM Fact_Sales F
INNER JOIN Dim_Date D    ON F.Date_Id = D.Id
INNER JOIN Dim_Store S   ON F.Store_Id = S.Id
INNER JOIN Dim_Product P ON F.Product_Id = P.Id
    D.YEAR = 1997
AND P.Product_Category = 'tv'

We can stage this, though, and only do the pipelining for v1.


maryannxue commented Jun 25, 2013

Yes. think it's just the complexity in the compiler to decide between the two join schemes.
will try to support it on the runtime side first.


maryannxue commented Jul 2, 2013

How can we do KeyValueColumnExpression evaluation on a joined result? like to evaluate f1:c1 from table A and f1:c1 from table B in (A join B)?
Do we need to do projection somewhere, either on the serverside or the clientside?


jtaylor-sfdc commented Jul 3, 2013

This would look something like this:
When we compile the expression(s) in the ON clause, we'll use a subclass of ExpressionCompiler specifically for joins (OnClauseExpressionCompiler). We'd only support the '=' operator currently, as opposed to the complete set. It's somewhat tricky, in that we'd want to support expressions in the ON of the form = AND = . As we compile each top level expression separated by AND, we'd want to put them into three buckets:

  1. The LHS refers to columns from A and the RHS refers to columns from B (or visa versa) and they're a ComparisonParseNode that uses CompareOp.EQUALS. In this case, we have the form we expect and can tease apart the LHS expression and the RHS expression. If we have anything other than CompareOp.EQUALS, then we have to throw an SQLFeatureNotSupportedException. Note that this rule only applies to a top level ComparisonParseNode or ComparisonParseNodes separated by an AND. It would still be ok to have a ComparisonParseNode within say a CaseParseNode (i.e. CASE statement).
  2. The RHS refers to a constant expression (note you should tweak RHSLiteralStatementRewriter.normalize to traverse the ON expression as well to ensure that constants appear only on the RHS). In this case, we can push this to a filter for the LHS. For an inner join, it doesn't matter if you include filters in the WHERE clause versus the ON clause, but for an outer join it makes a big difference. You can't push down filters for an outer join that are in the WHERE clause, but you can push them down if they're in the ON clause. If we have this case, then we can support any expressions, not just ComparisonParseNode that uses CompareOp.EQUALS.

Our ExpressionCompiler currently adds cf/cq to the Scan as compiles in ExpressionCompiler.visit(ColumnParseNode node), but instead I think we'll want to delay doing this and keep a List on ExpressionCompiler instead (this captures the TableRef as well). For our OnClauseExpressionCompiler, we can use this to detect between case (1) and (2) above, and we'll end up with two QueryPlans. For the default case, we can project into the Scan as we do now from QueryCompiler.

jtaylor-sfdc added a commit that referenced this issue Jul 27, 2013

Merge pull request #339 from maryannxue/master
issue #20 : Runtime part implementation

maryannxue commented Jul 31, 2013

With all presumptions mentioned above, like we only support AND-connected = conditions in the ON clause and conditions in the where clause will be compiled as either table filters or as post-scan checks, we decided to compile multiple join queries in a slightly different way, given the below situations:

  1. A lot of the queries are written in a way where the major table is put as the first table followed by all other auxiliary tables in the successive join clauses, and sometimes with left joins. E.g.

FROM awards
LEFT JOIN employee_info ON (awards.eid = employee_info.eid)
LEFT JOIN award_info ON (awards.aid = award_info.aid)
WHERE awards.region = 'GER';

  1. For a LEFT JOIN, it should always be the RHS to be hashed and the LHS to be scanned; and vice versa for RIGHT JOINs.
  2. For a sub-query containing one or more joins appearing on either side of a JOIN operator, it should always be compiled as a hashed result. E.g.
    1. “A join B on A.p = B.p” as in “select … from A join B on A.p = B.p join C on B.q = C.q;”
    2. “B join C on B.q = C.q” as in “select … from A join (B join C on B.q = C.q) on A.p = B.p;”

So in the first example, we can't actually make table hashed in their order of appearance, for we wouldn’t be able to do a LEFT JOIN with the LHS being hashed. However, what we intend to do with the query here is most like a star join, scanning the table awards and joining against the rest two small tables.

Here, we simply define our star join as multiple LEFT or INNER joins with conditions in the ON clause only referred to the current joining table and the first table.
In the above three examples, the first can be seen as a typical star join; the second is ruled out since the condition “B.q = C.q” deals with another table B other than the first table A; the third can also been viewed as a “single” star join by looking at the sub-query in the parentheses “(B join C on B.q = C.q)” as a whole.

The query generation process for multiple (single) joins goes like:
compile_as_scan() is similar to the current query compilation process for SELECT statements, which can be a scan plan or an aggregation plan, only that a JOIN attribute may be added to the Scan object.
compile_as_hash() goes through the following recursive process, generates a “hash” operation in the end, and returns a hash ID.
Note that compile_as_scan() throws exceptions when the input query has JOINs.

  1. If this is a simple query (no join):
  2. Else if this query / sub-query is a star join:
    for (t[i] : all first-level join tables) { // join nodes in parentheses are deeper-level joins
    hash_ids[i] = compile_as_hash(t[i]);
    compile_as_scan(the_first_table, hash_ids);
  3. Otherwise, take the rightmost first-level join node (JOIN operators being left-associative):
    a) If RIGHT join:
    hash_ids[0] = compile_as_hash(lhs);
    compile_as_scan(rhs, hash_ids); // will throw exception if rhs is a sub-query with joins.
    b) Else if LEFT join:
    hash_ids[0] = compile_as_hash(rhs);
    compile_as_scan(lhs, hash_ids); // will throw exception if lhs contains another join.
    c) Otherwise, i.e. INNER join:
    Mark lhs and rhs as “must be hashed” if containing joins;
    If either side “must be hashed”, compile_as_hash() for that side and compile_as_scan() for the other, as illustrated above;
    Else if neither side “must be hashed”, compile_as_hash() for the sub-query side (no join), and if no sub-query, compile_as_hash() for rhs, then compile_as_scan() for the other side;
    Otherwise, i.e. both should be hashed, exception!

For group-by in the main query, we do join earlier than grouping. But for sub-queries containing groupby, we may have to specify for “compile_as_scan()” if the groupby should be performed before or after the join operation, though in the process mentioned above we prefer putting such sub-queries into the hash.

Some cases we now leave out from hash join implementation may also be doable. So for later improvements, we can try rewriting those join queries, maybe like re-grouping sub-queries.


jtaylor-sfdc commented Aug 1, 2013

Great write-up, @maryannxue. When you talk about "sub-queries", are you talking about the traditional sub-query, but expressed (or re-written) as a join?

One comment on "we only support AND-connected = conditions in the ON clause...", I think this is true only for conditions that define the join key (i.e. involve multiple tables). We should be able to allow arbitrary filters in the ON clause which would be parsed out and pushed as predicates to the individual table scans. For inner joins this doesn't make a difference, but for outer joins it can. With outer joins, you're forced to evaluate where clause filters after the join (so you can't push them down). But if those filters are included in the ON clause, you can always push them down. These filters would be ok to include OR conditions too. We'd just require AND conditions between the join keys.


maryannxue commented Aug 1, 2013

@jtaylor-sfdc The "sub-queries" I mentioned above can be traditional sub-queries or JOIN clause in parentheses (which will most probably be compiled to the hashed side), or the LHS of the rightmost top-level JOIN node, e.g., "select ... from A join B on A.p = B.p" as in "select * from A join B on A.p = B.p join C on B.q = C.q;", as we consider JOIN left-associative.

Yes, thanks for pointing out the condition handling! Just to confirm, we only support OR conditions as table filters if they are enclosed in a top-level AND condition in the ON clause, right? like "on A.p = B.p AND (A.a = 'xx' OR A.a = 'yy')". And for those OR filter conditions dealing with more than one table, we also defer the evaluation to after join stage.


jtaylor-sfdc commented Aug 1, 2013

Yes, on all counts


maryannxue commented Aug 2, 2013

With a runtime enhancement to support iterative join key evaluation on the previously joined result, for example:
A --> hashjoin B, hashjoin C, the evaluation of keys used for "hashjoin C" will be evaluated based on the result of (A hashjoin B),
then we can extend our star join criteria, and will be able to handle left joins with join conditions containing fields from other tables than only the current joining table and the first table, like "select * from A left join B on A.p = B.p left join C on B.q = C.q".

jtaylor-sfdc added a commit that referenced this issue Oct 3, 2013

Merge pull request #430 from maryannxue/master
Issue #20 - hashjoin implementation

jtaylor-sfdc added a commit that referenced this issue Oct 17, 2013

Merge pull request #482 from maryannxue/master
Issue #20 - hashjoin implementation improvements

jtaylor-sfdc added a commit that referenced this issue Oct 28, 2013

Merge pull request #501 from maryannxue/master
Issue #20 - Hash join implementations

jtaylor-sfdc commented Oct 30, 2013

Implemented by @maryannxue and pulled into the master branch. Fantastic job!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment