New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Elasticsearch 5.3 #451

Merged
merged 37 commits into from Apr 5, 2017

Conversation

Projects
None yet
3 participants
@jnioche
Member

jnioche commented Apr 3, 2017

The API in Elasticsearch 5.x has changed quite a bit compared to 2.x. This PR ports the code to 5.3, the functionalities are roughly the same as previously except:

  • ElasticsearchSpout has been removed: it required a lot of memory and had terrible performance. There was no way of guaranteeing a good diversity of URLs
  • SamplerAggregationSpout has been removed: sampling is now triggered by configuration within the AggregationSpout
  • Added CollapsingSpout
  • Refactored code so that more is done in AbstractSpout + unified the config names

Note: in order to use this PR in distributed mode, you'll have to use Storm 1.1.0 (see #450 ) or upgrade its logging dependencies by hand. This should not be necessary if you run it in local mode.

I will soon publish a comparison of the performance and functionalities of ES5.3 in a blog

BTW am keeping the intermediate commits as some come from master and they contain useful information in isolation.

Fixes #221

@dstraub

This comment has been minimized.

Show comment
Hide comment
@dstraub

dstraub Jan 13, 2017

With this version, we run in the following error:

5870 [Thread-14-spout-executor[8 8]] INFO  com.digitalpebble.stormcrawler.elasticsearch.persistence.ElasticSearchSpout - Populating buffer with nextFetchDate <= 2017-01-13T18:51:10.710+0100
5929 [Thread-14-spout-executor[8 8]] ERROR org.apache.storm.util - Async loop died!
org.elasticsearch.action.search.SearchPhaseExecutionException: all shards failed
	at org.elasticsearch.action.search.AbstractSearchAsyncAction.onFirstPhaseResult(AbstractSearchAsyncAction.java:204) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.action.search.AbstractSearchAsyncAction$1.onFailure(AbstractSearchAsyncAction.java:139) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:51) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleException(TransportService.java:984) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.transport.TransportService$DirectResponseChannel.processException(TransportService.java:1091) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.transport.TransportService$DirectResponseChannel.sendResponse(TransportService.java:1069) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.transport.TransportService$6.onFailure(TransportService.java:588) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.onFailure(ThreadContext.java:512) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:39) ~[elasticsearch-5.1.2.jar:5.1.2]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_60]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_60]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
Caused by: org.elasticsearch.transport.RemoteTransportException: [y643DXZ][127.0.0.1:9300][indices:data/read/search[phase/dfs]]
Caused by: org.elasticsearch.ElasticsearchParseException: failed to parse date field [Fri Jan 13 18:51:10 CET 2017] with format [dateOptionalTime]
	at org.elasticsearch.common.joda.DateMathParser.parseDateTime(DateMathParser.java:213) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.common.joda.DateMathParser.parse(DateMathParser.java:66) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.index.mapper.DateFieldMapper$DateFieldType.parseToMilliseconds(DateFieldMapper.java:297) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.index.mapper.DateFieldMapper$DateFieldType.isFieldWithinQuery(DateFieldMapper.java:337) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.index.query.RangeQueryBuilder.getRelation(RangeQueryBuilder.java:419) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.index.query.RangeQueryBuilder.doRewrite(RangeQueryBuilder.java:426) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.index.query.AbstractQueryBuilder.rewrite(AbstractQueryBuilder.java:264) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.search.builder.SearchSourceBuilder.rewrite(SearchSourceBuilder.java:885) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.search.internal.ShardSearchLocalRequest.rewrite(ShardSearchLocalRequest.java:206) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.search.internal.ShardSearchTransportRequest.rewrite(ShardSearchTransportRequest.java:162) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.search.SearchService.createSearchContext(SearchService.java:598) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.search.SearchService.createContext(SearchService.java:547) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.search.SearchService.createAndPutContext(SearchService.java:529) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.search.SearchService.executeDfsPhase(SearchService.java:233) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.action.search.SearchTransportService$5.messageReceived(SearchTransportService.java:291) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.action.search.SearchTransportService$5.messageReceived(SearchTransportService.java:288) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:69) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.transport.TransportService$6.doRun(TransportService.java:577) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:527) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ~[elasticsearch-5.1.2.jar:5.1.2]

I found the following difference between elasticsearch 2.4.1 and 5.1.2 :

In 2.4.1. the object in the RangeQuery was treaten as ByteRef after transport.
However, in org.elasticsearch.index.mapper.DateFieldMapper.parseToMilliseconds the value was instanceof BytesRef and everything works :

public long parseToMilliseconds(Object value, boolean roundUp,
                @Nullable DateTimeZone zone, @Nullable DateMathParser forcedDateParser, QueryRewriteContext context) {
            DateMathParser dateParser = dateMathParser();
            if (forcedDateParser != null) {
                dateParser = forcedDateParser;
            }

            String strValue;
            if (value instanceof BytesRef) {
                strValue = ((BytesRef) value).utf8ToString();
            } else {
                strValue = value.toString();
            }
            return dateParser.parse(strValue, context::nowInMillis, roundUp, zone);
        }

In 5.1.2. the value here is instanceof Date, the ugly date.toString is used and throws the error :(

As a workaround, I use the following in the ElasticSearchSpout:

        LOG.info("Populating buffer with nextFetchDate <= {}", lastDate);

        QueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(
                "nextFetchDate").lte(String.format("%1$tY-%1$tm-%1$tdT%1$tH:%1$tM:%1$tS.%1$tL",lastDate));

and the crawler works fine.
I'm not sure if this error is related to Elasticsearch or if some settings are missing.

dstraub commented on b8d4c43 Jan 13, 2017

With this version, we run in the following error:

5870 [Thread-14-spout-executor[8 8]] INFO  com.digitalpebble.stormcrawler.elasticsearch.persistence.ElasticSearchSpout - Populating buffer with nextFetchDate <= 2017-01-13T18:51:10.710+0100
5929 [Thread-14-spout-executor[8 8]] ERROR org.apache.storm.util - Async loop died!
org.elasticsearch.action.search.SearchPhaseExecutionException: all shards failed
	at org.elasticsearch.action.search.AbstractSearchAsyncAction.onFirstPhaseResult(AbstractSearchAsyncAction.java:204) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.action.search.AbstractSearchAsyncAction$1.onFailure(AbstractSearchAsyncAction.java:139) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.action.ActionListenerResponseHandler.handleException(ActionListenerResponseHandler.java:51) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleException(TransportService.java:984) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.transport.TransportService$DirectResponseChannel.processException(TransportService.java:1091) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.transport.TransportService$DirectResponseChannel.sendResponse(TransportService.java:1069) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.transport.TransportService$6.onFailure(TransportService.java:588) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.onFailure(ThreadContext.java:512) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:39) ~[elasticsearch-5.1.2.jar:5.1.2]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_60]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_60]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
Caused by: org.elasticsearch.transport.RemoteTransportException: [y643DXZ][127.0.0.1:9300][indices:data/read/search[phase/dfs]]
Caused by: org.elasticsearch.ElasticsearchParseException: failed to parse date field [Fri Jan 13 18:51:10 CET 2017] with format [dateOptionalTime]
	at org.elasticsearch.common.joda.DateMathParser.parseDateTime(DateMathParser.java:213) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.common.joda.DateMathParser.parse(DateMathParser.java:66) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.index.mapper.DateFieldMapper$DateFieldType.parseToMilliseconds(DateFieldMapper.java:297) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.index.mapper.DateFieldMapper$DateFieldType.isFieldWithinQuery(DateFieldMapper.java:337) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.index.query.RangeQueryBuilder.getRelation(RangeQueryBuilder.java:419) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.index.query.RangeQueryBuilder.doRewrite(RangeQueryBuilder.java:426) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.index.query.AbstractQueryBuilder.rewrite(AbstractQueryBuilder.java:264) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.search.builder.SearchSourceBuilder.rewrite(SearchSourceBuilder.java:885) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.search.internal.ShardSearchLocalRequest.rewrite(ShardSearchLocalRequest.java:206) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.search.internal.ShardSearchTransportRequest.rewrite(ShardSearchTransportRequest.java:162) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.search.SearchService.createSearchContext(SearchService.java:598) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.search.SearchService.createContext(SearchService.java:547) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.search.SearchService.createAndPutContext(SearchService.java:529) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.search.SearchService.executeDfsPhase(SearchService.java:233) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.action.search.SearchTransportService$5.messageReceived(SearchTransportService.java:291) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.action.search.SearchTransportService$5.messageReceived(SearchTransportService.java:288) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:69) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.transport.TransportService$6.doRun(TransportService.java:577) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:527) ~[elasticsearch-5.1.2.jar:5.1.2]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ~[elasticsearch-5.1.2.jar:5.1.2]

I found the following difference between elasticsearch 2.4.1 and 5.1.2 :

In 2.4.1. the object in the RangeQuery was treaten as ByteRef after transport.
However, in org.elasticsearch.index.mapper.DateFieldMapper.parseToMilliseconds the value was instanceof BytesRef and everything works :

public long parseToMilliseconds(Object value, boolean roundUp,
                @Nullable DateTimeZone zone, @Nullable DateMathParser forcedDateParser, QueryRewriteContext context) {
            DateMathParser dateParser = dateMathParser();
            if (forcedDateParser != null) {
                dateParser = forcedDateParser;
            }

            String strValue;
            if (value instanceof BytesRef) {
                strValue = ((BytesRef) value).utf8ToString();
            } else {
                strValue = value.toString();
            }
            return dateParser.parse(strValue, context::nowInMillis, roundUp, zone);
        }

In 5.1.2. the value here is instanceof Date, the ugly date.toString is used and throws the error :(

As a workaround, I use the following in the ElasticSearchSpout:

        LOG.info("Populating buffer with nextFetchDate <= {}", lastDate);

        QueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(
                "nextFetchDate").lte(String.format("%1$tY-%1$tm-%1$tdT%1$tH:%1$tM:%1$tS.%1$tL",lastDate));

and the crawler works fine.
I'm not sure if this error is related to Elasticsearch or if some settings are missing.

This comment has been minimized.

Show comment
Hide comment
@jnioche

jnioche Jan 16, 2017

Member

thanks @dstraub, looks like a problem with Elasticsearch but we could take care of generating a clean string and/or specify the format explicitely in the mapping. Since this is used by the AggregationSpout as well it would be worth moving it to the abstract class.

BTW did you change anything to the pom files? I haven't managed to get past the dependency problem with the logging.

Thanks!

Member

jnioche replied Jan 16, 2017

thanks @dstraub, looks like a problem with Elasticsearch but we could take care of generating a clean string and/or specify the format explicitely in the mapping. Since this is used by the AggregationSpout as well it would be worth moving it to the abstract class.

BTW did you change anything to the pom files? I haven't managed to get past the dependency problem with the logging.

Thanks!

This comment has been minimized.

Show comment
Hide comment
@dstraub

dstraub Jan 16, 2017

I resolved the logging jar hell in my client pom:

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.7</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.7</version>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm-core.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-slf4j-impl</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>log4j-over-slf4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

dstraub replied Jan 16, 2017

I resolved the logging jar hell in my client pom:

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.7</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.7</version>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm-core.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-slf4j-impl</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>log4j-over-slf4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

This comment has been minimized.

Show comment
Hide comment
@jnioche

jnioche Jan 16, 2017

Member

Does it work in deployed mode or did you have to force the versions used by Storm itself?

Member

jnioche replied Jan 16, 2017

Does it work in deployed mode or did you have to force the versions used by Storm itself?

This comment has been minimized.

Show comment
Hide comment
@dstraub

dstraub Jan 16, 2017

We run the crawler only in local mode with an uberjar, without the storm-command.
Here is the full pom of our application.
pom.xml.txt

dstraub replied Jan 16, 2017

We run the crawler only in local mode with an uberjar, without the storm-command.
Here is the full pom of our application.
pom.xml.txt

This comment has been minimized.

Show comment
Hide comment
@jnioche

jnioche Jan 16, 2017

Member

Thanks @dstraub, any particular reason you run it in local mode?

I'll see if I can find a solution to the dep problem so that it works in deployed mode

Note to self : here is how the dependency issue is solved by elasticsearch-hadoop

and the ES 5.x doc

Member

jnioche replied Jan 16, 2017

Thanks @dstraub, any particular reason you run it in local mode?

I'll see if I can find a solution to the dep problem so that it works in deployed mode

Note to self : here is how the dependency issue is solved by elasticsearch-hadoop

and the ES 5.x doc

This comment has been minimized.

Show comment
Hide comment
@dstraub

dstraub Jan 16, 2017

we run our crawler in a docker container in dcos/mesos

dstraub replied Jan 16, 2017

we run our crawler in a docker container in dcos/mesos

This comment has been minimized.

Show comment
Hide comment
@jnioche

jnioche Feb 20, 2017

Member

@dstraub I have committed your fix for the date formatting, thanks!

Have been struggling to get Storm to work with the version of log4j required by the ES client, which is needed to run it in deployed mode but I haven't given up yet.

Member

jnioche replied Feb 20, 2017

@dstraub I have committed your fix for the date formatting, thanks!

Have been struggling to get Storm to work with the version of log4j required by the ES client, which is needed to run it in deployed mode but I haven't given up yet.

jnioche added some commits Jan 16, 2017

Update es-conf.yaml
  es.status.sample.size added
NPE collapsingspout + MetricsConsumer can handle collections + Collap…
…singSpout reports individual query times to metrics
Merge branch 'master' into es5.3
Conflicts:
	external/elasticsearch/ES_IndexInit.sh
	external/elasticsearch/src/main/java/com/digitalpebble/stormcrawler/elasticsearch/ESCrawlTopology.java
	external/elasticsearch/src/main/java/com/digitalpebble/stormcrawler/elasticsearch/persistence/ElasticSearchSpout.java
	external/elasticsearch/src/main/java/com/digitalpebble/stormcrawler/elasticsearch/persistence/SamplerAggregationSpout.java
	external/elasticsearch/src/main/java/com/digitalpebble/stormcrawler/elasticsearch/persistence/StatusUpdaterBolt.java

@jnioche jnioche added this to the 1.5 milestone Apr 3, 2017

@jnioche jnioche merged commit 2371c9d into master Apr 5, 2017

2 checks passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details

@jnioche jnioche deleted the es5.3 branch Apr 5, 2017

@noerw

This comment has been minimized.

Show comment
Hide comment
@noerw

noerw May 15, 2018

Hi Julien,
is there a reason for this method to be private?
I'd like to override it to add additional metadata to the the status stream.

Hi Julien,
is there a reason for this method to be private?
I'd like to override it to add additional metadata to the the status stream.

This comment has been minimized.

Show comment
Hide comment
@jnioche

jnioche May 15, 2018

Member

Hi @noerw the current version of the code is quite different and ElasticsearchSpout has been removed. You should be able to define what metadata to store in the StatusUpdaterBolt.

Member

jnioche replied May 15, 2018

Hi @noerw the current version of the code is quite different and ElasticsearchSpout has been removed. You should be able to define what metadata to store in the StatusUpdaterBolt.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment