New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Terms Lookup by Query/Filter (aka. Join Filter) #3278

Closed
wants to merge 6 commits into
base: master
from

Conversation

Projects
None yet
@mattweber
Copy link
Contributor

mattweber commented Jul 1, 2013

This PR adds support for generating a terms filter based on the field values
of documents matching a specified lookup query/filter. The value of the
configurable "path" field is collected from the field data cache for each
document matching the lookup query/filter and is then used to filter the main
query. This is can also be called a join filter.

This PR abstracts the TermsLookup functionality in order to support multiple
lookup methods. The existing functionality is moved into FieldTermsLookup and
the new query based lookup is in QueryTermsLookup. All existing caching
functionality works with the new query based lookup for increased performance.

During testing of I found that one of the performance bottlenecks was
generating the Lucene TermsFilter on large sets of terms (probably since
it sorts the terms). I have created a FieldDataTermsFilter that uses the
field data cache to lookup value of the field being filtered and compare it to
the set of gathered terms. This significantly increased performance at the
cost of higher memory usage. Currently a TermsFilter is used when the number
of filtering terms is less than 1024 and the FieldDataTermsFilter is used
for everything else. This should eventually be configurable or we need to
perform some test to find the optimal value.

Examples:

Replicate a has_child query by joining on the child's "pid" field to the
parent's "id" field for each child that has the tag "something".

curl -XPOST 'http://localhost:9200/parentIndex/_search' -d '{
    "query": {
        "constant_score": {
            "filter": {
                "terms": {
                    "id": {
                        "index": "childIndex",
                        "type": "childType",
                        "path": "pid",
                        "query": {
                            "term": {
                                "tag": "something"
                            }
                        }
                    }
                }
            }
        }
    }
}'

Lookup companies that offer products or services mentioning elasticsearch.
Notice that products and services are kept in their own indices.

curl -XPOST 'http://localhost:9200/companies/_search' -d '{
    "query": {
        "constant_score": {
            "filter": {
                "terms": {
                    "company_id": {
                        "indices": ["products", "services"],
                        "path": "company_id",
                        "filter": {
                            "term": {
                                "description": "elasticsearch"
                            }
                        }
                    }
                }
            }
        }
    }
}'
@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Jul 2, 2013

Just updated PR with significant improvements to lookup and filter on numeric fields. Performing the lookup on a numeric field is now ~2x faster than the same lookup on a string field.

@ghost ghost assigned martijnvg Jul 3, 2013

@damienalexandre

This comment has been minimized.

Copy link
Contributor

damienalexandre commented Jul 10, 2013

I love this functionality, I already got an use case for it! 👍

@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Jul 27, 2013

Updated PR to be current with the latest master changes and fixed a bug when executing across multiple nodes. All commits have been squashed.

Has anyone had a chance to review this PR?

@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Aug 15, 2013

Updated to work with latest changes in master.

@martijnvg

This comment has been minimized.

Copy link
Member

martijnvg commented Sep 11, 2013

Sorry for getting involved so late... This feature is really cool! (equivalent of a subquery).

The main concern I have with this feature, is that the amount of values being send over the wire between nodes is unbounded. Each time the terms filter with query is parsed a query is being executed on all shards if the routing option isn't used. In the case a query matches with millions of values (which is think is a common scenario), then all these values need to be send over the wire. This transport of values will occur for each search request with a terms filter being executed times the number of shards this search request is targeted for.

I'm also wondering about the lookup cache here, if one document changes in the index that terms lookup query is being executed on, then this the cache entry for that query needs to be trashed and think a bit more about the lookup cache it doesn't make much sense in the case if terms lookup query is used, since it is really hard to find out what has changed in perspective with the terms lookup query.

Also wondering If the terms lookup query would only be executed on shards to are locally available on the node that is executing the terms filter, would this feature still be useful? We can make for example the routing option required.

@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Oct 8, 2013

Hey @martijnvg,

The main concern I have with this feature, is that the amount of values being send over the wire between nodes is unbounded. Each time the terms filter with query is parsed a query is being executed on all shards if the routing option isn't used.

I have the same concern about the amount of data being sent, but I guess that could be documented as a known limitation. I imagine people will find it acceptable given the benefits a query like this provide.

The request per shard (since it happens during parsing) is a problem. I actually didn't realize that the json was parsed per-shard. I would really like to gather the terms once then send the to the shards. Maybe creating a new phase for this? Not sure. A while back we talked about field collapsing and you mentioned that to do it correctly some things will need to be moved around internally (the phases if I remember correctly). Would that change make implementing this feature any easier/better?

I'm also wondering about the lookup cache here

Yea, I did this to try and piggyback on the terms lookup functionality/caching. I don't think caching the lookup makes much sense unless we can do it per-segment, but then we still have the cost of sending over the transport. My original idea was to have this as a new JoinQuery and JoinFilter that only caches the resulting terms fitler. I would probably move to that vs. being part of the lookup if we were to move forward on this.

Also wondering If the terms lookup query would only be executed on shards to are locally available on the node that is executing the terms filter, would this feature still be useful?

I don't think so. You might as well use parent/child functionality if that was the case.

At any rate, the move from trove to hppc (088e05b) caused some issues with my current patch due to hppc ObjectOpenHashSet not implementing the standard Collection interface. Before I spend some time getting everything fixed up, let's figure out the best approach to get this moving forward.

Thanks for taking the time to look at this!

@martijnvg

This comment has been minimized.

Copy link
Member

martijnvg commented Oct 11, 2013

I have the same concern about the amount of data being sent, but I guess that could be documented as a known limitation. I imagine people will find it acceptable given the benefits a query like this provide.

Perhaps there can be an option that controls the amount of terms being fetched per shard? Something like shard_size in terms facet. We can set it to a default value based on size? It can be set to unlimited, but then someone who does this is aware that it can generate a lot of traffic between nodes.

About the request per shard, we can do something smart and have a helper service that just bundles the shard level field value retrieval in one request. I don't think there is a need to introduce an additional distributed phase here. (it is possible via an additional phase, but no necessary and this makes this feature much bigger then it should?)

About hppc, in what case do you need bridge to the JCF world?

@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Oct 11, 2013

Yea, I think having a shard_size makes perfect sense especially if there is a way to set it to unlimited.

That helper service sounds great, is there anything like that being used somewhere in the code that I can have a look at? I prefer that over the new distributed phase for sure!

I had relied on JCF by abstracting TermsLookup to have a getTerms method that returns a collection of terms. The existing field based implementation returns a list of strings, my query implementation returned THashSet of either BytesRef or Number depending on the field type being looked up. Hppc doesnt implment Collection interface, so I can't just swap Trove for Hppc easily. This won't be a problem if I didn't try to make this part of terms lookup. What do you think about pulling this back out into a JoinFilter and JoinQuery?

@martijnvg

This comment has been minimized.

Copy link
Member

martijnvg commented Oct 13, 2013

I don't know of a good example right now, but it should be something simple. It should just keep the query result around for the duration of the query phase (maybe as a ThreadLocal?), so that unnecessary requests are avoided and drop the results after the query phase has been completed.

I think it is best just to completely move over to Hppc. The TermsLookup#getTerms should return Iterator instead of Collection. Both implementations would return a simple wrapper that delegate to the actual implementation. For the QueryTermsLookup you can just make TermsByQueryAction work with ObjectOpenHashSet and just wrap the result in an anonymous Iterator impl. For FieldTermsLookup you can just keep using XContentMapValues#extractRawValues as is and just return the list's iterator.

Looking at XContentMapValues#extractRawValues usages, I think it can be moved to use Hppc nativly instead of JCF, but that is an unrelated to this change.

@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Oct 14, 2013

Sounds good, let me see what I can do. I am going to work on Hppc fix first as that should be pretty easy. Thanks.

@martijnvg

This comment has been minimized.

Copy link
Member

martijnvg commented Oct 14, 2013

@mattweber great that you can work on this! Lets move this forward and get this in.

@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Oct 15, 2013

@martijnvg Got this updated for hppc but I am stuck trying to figure out how to execute only a single TermsByQueryAction or re-using the response. The filter parser runs in multiple threads (one per shard) with nothing tying it back to the original TransportSearchTypeAction that actually submitted the search request to the threadpool and ultimately merges responses. A ThreadLocal won't work because we will have X threads parsing the filter and thus triggering a lookup. I need to figure out a way to tie each individual shard request to the original query request...

@martijnvg

This comment has been minimized.

Copy link
Member

martijnvg commented Oct 15, 2013

Yes, a ThreadLocal won't help then... Maybe for now keep the TermsByQueryAction as is. The IndicesTermsFilterCache actually does the caching on a node level, so that should prevent unnecessary requests if a node holds multiple shards. Only the app integrating with the ES should clear the cache when the data that matched with the lookup query has changed. Perhaps the app can use the percolate api for this.

@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Oct 15, 2013

PR updated with latest changes from master and the switch to hppc.

@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Oct 15, 2013

@s1monw The tests I am writing for this keep failing. Just wondering if this is because I am running it on a mac with the new randomized testing? Doesn't appear to be anything related to my actual test.

https://gist.github.com/mattweber/28e400ec2d33090fd5e0

@s1monw

This comment has been minimized.

Copy link
Contributor

s1monw commented Oct 15, 2013

it tells you that you are missing to release the searcher you are acquired:

Caused by: java.lang.RuntimeException: Unreleased Searcher, source [termsByQuery]
    at org.elasticsearch.test.engine.MockRobinEngine$AssertingSearcher.<init>(MockRobinEngine.java:104)
    at org.elasticsearch.test.engine.MockRobinEngine.newSearcher(MockRobinEngine.java:92)
    at org.elasticsearch.index.engine.robin.RobinEngine.acquireSearcher(RobinEngine.java:689)
    at org.elasticsearch.index.shard.service.InternalIndexShard.acquireSearcher(InternalIndexShard.java:609)
    at org.elasticsearch.index.shard.service.InternalIndexShard.acquireSearcher(InternalIndexShard.java:603)
    at org.elasticsearch.action.terms.TransportTermsByQueryAction.shardOperation(TransportTermsByQueryAction.java:183)
    at org.elasticsearch.action.terms.TransportTermsByQueryAction.shardOperation(TransportTermsByQueryAction.java:73)
    at org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction$AsyncBroadcastAction$2.run(TransportBroadcastOperationAction.java:225)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    ... 1 more

it even tells you where this happened: TransportTermsByQueryAction.java:183 - test framework awesomeness :)

@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Oct 15, 2013

I do release it...

} finally {
            // this will also release the index searcher
            context.release();
            SearchContext.removeCurrent();
}

It looks like something with the MockDirectory. I have had this test complete successfully, but most the time it fails. If I delete all the TEST cluster data manually and run it, it tends to pass.

@s1monw

This comment has been minimized.

Copy link
Contributor

s1monw commented Oct 15, 2013

The mock dir failure is a side-effect of this not being released properly!

This seems spooky:

 FieldMapper fieldMapper = context.smartNameFieldMapper(request.field());
 if (fieldMapper == null) {
    throw new SearchContextException(context, "field not found");
  }

seems like you don't release it if there is no field mapper? Maybe you don't create the mapping propperly and the mapping is not available on the shard? do you use dynamic mapping?

@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Oct 15, 2013

I use dynamic mapping, but this is only executing over a single shard so it should have the mapping once documents are indexed. Let me make sure the context gets released even if there is a fieldMapper exception. What get's me is that the test passes sometimes...

@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Oct 15, 2013

Still getting random fails... I can see that on tests that fail there is this:

[2013-10-15 11:09:54,077][INFO ][org.elasticsearch.plugins] [transport_client_node_0] loaded [], sites []

On tests that pass, that is not there.

@s1monw

This comment has been minimized.

Copy link
Contributor

s1monw commented Oct 15, 2013

I pulled that PR in and did some modifications to get a more clear error message. I don't see any pending searchers with that one anymore and it fails all the time with that seed. check this out: https://gist.github.com/s1monw/6998524

this should give you a better idea why you are seeing the nullpointer exceptions all the time

with this cmd it fails consistently for me:

 mvn test -Dtests.seed=6181805A6A5BDD1 -Dtests.class=org.elasticsearch.test.integration.action.terms.SimpleTermsByQueryActionTests -Dtests.prefix=tests -Dfile.encoding=UTF-8 -Duser.timezone=America/Los_Angeles -Dtests.cluster_seed=132D5734748D2F80
@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Oct 15, 2013

Thanks Simon, I think I found the problem in the serialization code. It only happens when using a different transport. Randomized testing did it's job!

@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Oct 15, 2013

Ohh I see you fixed the serialization as well in that gist. Thanks!

@mattweber

This comment has been minimized.

Copy link
Contributor

mattweber commented Oct 16, 2013

Pushed up the latest changes. The bug in the tests was due to missing serialization of the field variable in the TermsByQueryRequest which would only get triggered using a Transport client. I would have never caught this if it wasn't for the randomized testing! Thanks for the help and awesome test framework @s1monw.

While fixing this, I removed some of the unnecessary multi-shard and multi-node tests since that is all handled by the randomized tests. I updated my tests to pick a random number of shards, documents to index, and range query.

BTW, I force pushed the latest update to this PR so you should do a new checkout.

@martijnvg

View changes

src/main/java/org/elasticsearch/action/terms/ResponseTerms.java Outdated
import java.util.Iterator;
import java.util.List;

public abstract class ResponseTerms implements Streamable {

This comment has been minimized.

@martijnvg

martijnvg Oct 18, 2013

Member

I have been thinking about this abstraction and maybe we don't need it? We only care about the term value and we don't do anything with it. I think it only there because of Field data, right? Maybe can only use ByteRef and when there is a join on a number based field convert to BR prejoin and convert it back to number based representation postjoin? This can reduce the amount of code here and in FieldDataTermsFilter.

This comment has been minimized.

@mattweber

mattweber Oct 18, 2013

Contributor

That is actually how I originally implemented it but found performance considerably worse for numeric fields. This is due to the fact to get the BytesRef from the fielddata it does a new BytesRef(Long.toString(val)). These string conversions killed performance to the point where the joins on numeric fields were 2x as slow as string field. Now that I stick with the primitive types the joins on numeric fields are ~2x faster than string fields. I figured the more complex code was worth the performance increase.

This comment has been minimized.

@martijnvg

martijnvg Oct 19, 2013

Member

Makes sense, I was just wondering if this could be done with less code :)

@martijnvg

View changes

src/main/java/org/elasticsearch/action/terms/ResponseTerms.java Outdated

@Override
public void merge(ResponseTerms other) {
if (other.getType() != Type.BYTES) {

This comment has been minimized.

@martijnvg

martijnvg Oct 18, 2013

Member

// In theory this shouldn't happen, right? If so maybe we can change it into an assert?

This comment has been minimized.

@mattweber

mattweber Oct 18, 2013

Contributor

Fixed for all 3 response types.

@martijnvg

View changes

src/main/java/org/elasticsearch/action/terms/ResponseTerms.java Outdated
for (int i = 0; i < size; i++) {
BytesRef term = in.readBytesRef();
terms.add(term);
sizeInBytes += term.length;

This comment has been minimized.

@martijnvg

martijnvg Oct 18, 2013

Member

Also include the 2 ints in ByteRef?

This comment has been minimized.

@mattweber

mattweber Oct 18, 2013

Contributor

Fixed. Thanks for pointing this out, I forgot about those.

@martijnvg

View changes

src/main/java/org/elasticsearch/action/terms/ResponseTerms.java Outdated
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(getType().ordinal());
out.writeVInt(terms.size());
for (Iterator<ObjectCursor<BytesRef>> termIter = terms.iterator(); termIter.hasNext(); ) {

This comment has been minimized.

@martijnvg

martijnvg Oct 18, 2013

Member

// Using direct buffers is the fastest way to iterate in Hppc, see: http://labs.carrotsearch.com/hppc-api-and-code-examples.html#example-code

            boolean[] allocated = terms.allocated;
            Object[] keys = terms.keys;
            for (int i = 0; i < allocated.length; i++) {
                if (allocated[i]) {
                    BytesRef key = (BytesRef) keys[i];
                    out.writeBytesRef(key);
                }
            }

This comment has been minimized.

@mattweber

mattweber Oct 18, 2013

Contributor

Fixed for all 3 response types.

@martijnvg

View changes

src/main/java/org/elasticsearch/action/terms/TermsByQueryResponse.java Outdated
import java.util.List;

/**
* The response of the count action.

This comment has been minimized.

@martijnvg

martijnvg Oct 18, 2013

Member

count jdocs :)

This comment has been minimized.

@mattweber

mattweber Oct 18, 2013

Contributor

Fixed here and in some other files. Have to love copy/paste :)

@hiro4848

This comment has been minimized.

Copy link

hiro4848 commented Jul 12, 2015

Is there any update on this PR? (timeline, roadmap, etc)

@kerwin

This comment has been minimized.

Copy link

kerwin commented Nov 17, 2015

+1

1 similar comment
@thihy

This comment has been minimized.

Copy link

thihy commented Nov 30, 2015

+1

@pwain

This comment has been minimized.

Copy link

pwain commented Dec 17, 2015

+1

@mattweber did you ever get any traction with moving this feature into a plugin? I'd be very interested in trying it out in order to avoid my current solution for joins of passing large lists of identifiers in terms filters.

@yehosef

This comment has been minimized.

Copy link

yehosef commented Jan 6, 2016

@mrkamel

This comment has been minimized.

Copy link
Contributor

mrkamel commented Jan 15, 2016

+1

@pwain

This comment has been minimized.

Copy link

pwain commented Jan 18, 2016

Thanks for the heads-up regarding Siren @yehosef . I took it for a quick spin and it seems to work well! At the very least it cuts out a round trip between the cluster and the client for single join scenarios.

@dakrone

This comment has been minimized.

Copy link
Member

dakrone commented Apr 6, 2016

@mattweber now that a lot of the query refactoring work has been done, is there still interest in pursuing this pull request? (it may need to be rewritten with all the changes in master lately)

@jezell

This comment has been minimized.

Copy link

jezell commented May 26, 2016

Please. I'm sure a lot of people would love to see this added. Siren works well, but would be very nice to have this out of the box.

@vsizov

This comment has been minimized.

Copy link

vsizov commented Jul 29, 2016

Yes, I would love to see it out of the box as well.

@elasticmachine

This comment has been minimized.

Copy link

elasticmachine commented Jul 29, 2016

Can one of the admins verify this patch?

@dakrone

This comment has been minimized.

Copy link
Member

dakrone commented Aug 8, 2016

Closing this as we didn't receive any feedback from the author

@dakrone dakrone closed this Aug 8, 2016

@JDvorak

This comment has been minimized.

Copy link

JDvorak commented Aug 8, 2016

noooooo

On Mon, Aug 8, 2016 at 1:18 PM, Lee Hinman notifications@github.com wrote:

Closed #3278 #3278.


You are receiving this because you commented.
Reply to this email directly, view it on GitHub
#3278 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAyH-PhtaePfrEHKBnT0sIEB2acOhOF9ks5qd48CgaJpZM4Ax8UE
.

@brusic

This comment has been minimized.

Copy link
Contributor

brusic commented Aug 8, 2016

Either way, I want to thank @mattweber for this PR and trying. I am sure this type of functionality will eventually be part of Elasticsearch.

@gcampbell-epiq

This comment has been minimized.

Copy link

gcampbell-epiq commented Mar 27, 2017

+1
There's lots of interest. Reopen?
#23713

@clintongormley

This comment has been minimized.

Copy link
Member

clintongormley commented Mar 28, 2017

@gcampbell-epiq not until somebody comes up with a way to do this that is horizontally scalable

@yehosef

This comment has been minimized.

Copy link

yehosef commented Mar 29, 2017

@clintongormley - I'm not sure if this a viable option or totally related, but here is a way I had been considering adding joins to ES that is horizontally scalable (I think..). It's a different approach that this - you can decide if it's helpful/relevant/crazy.

You have the ability to insert with a query (sort of - the reindex api) and update with a query. If you had the ability to do an UPSERT with a query you could relatively easily do large joins.

Let's say I have a index (“sales”) with products users have bought and I want to find people that bought product x and product y (not in the same purchase). You can do an upsert by query of the mem_ids that bought product x, with the mem_id as the doc_id in a new index (call it “tmp_join_sales_123”). The doc json would look like {product_x:true}. You would then do the same thing for product y - the data would be {product_y:true}.

The data would be one of three forms {product_x:true}, {product_y:true}, or {product_x:true,product_y:true}.

The join query would then be like:
GET tmp_join_sales_123/_search?q=product_x:true%20AND%20product_y:true

It is a little more indirect because you are actually creating a temp index for the join, but I think that's what saves it for large data sets. It seems to me the general approach would be very powerful. The main functionality that missing is UPSERT by query.

This obviously doesn’t need the two indices to be the same, just that they have a common element - which is the basis of any join. If the join conditions are complex you could serialize or hash it. If you need extra data in the results of the query other than the mem_id, you could add it to data both queries write to the tmp index.

@clintongormley

This comment has been minimized.

Copy link
Member

clintongormley commented Mar 30, 2017

@yehosef yes, this is similar to an approach we have been considering (/cc @costin ) but is quite different to what has been proposed in this PR

@yehosef

This comment has been minimized.

Copy link

yehosef commented Mar 30, 2017

@clintongormley - great to hear. It seems to me that it shouldn't be too complicated to implement, but I'm not sure.

There's a different product I'm playing with now (under NDA - can't say what) that can do broad joins in elasticsearch, but I think having this capability internal would probably perform better for big data sets.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment