Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay in updates to status index #340

Closed
jnioche opened this issue Sep 23, 2016 · 7 comments
Closed

Delay in updates to status index #340

jnioche opened this issue Sep 23, 2016 · 7 comments

Comments

@jnioche
Copy link
Contributor

jnioche commented Sep 23, 2016

Sebastian observed that his WARC files contained between 10-15% duplicates when crawling with ES. The most likely explanation is that there is a delay between the moment where an update is committed to the ES index and reflected in the search and when the corresponding tuple is acked (which happens a lot sooner). This issue is more likely on small crawls where the diversity of URLs is low.

One way around it is to set es.status.min.delay.queries to a larger value than the default 2 secs i.e. the minimal amount of time allowed between two queries to ES. By setting it to 60 secs, Seb saw a drop to 0.5-1% duplicates. This means that it can take more than a minute between the moment a tuple is acked and when the update is reflected in the search results.

Idea : we could remove the tuples from the beingProcessed hash after some additional time, this way we wouldn't refetch the same URL too soon. The various ES spouts have an overlap in code, we could create an abstract class and share the functionality there.

@sebastian-nagel
Copy link
Contributor

The URLs fetched twice have in common that they are fetched shortly before ES is queried for new URLs:

...
2016-09-22 11:37:24.751 c.d.s.e.p.StatusUpdaterBolt [INFO] Bulk response 1, waitAck 0, acked 1
2016-09-22 11:37:25.128 c.d.s.b.FetcherBolt [INFO] [Fetcher #5] Fetched https://www.theguardian.com/football/2016/sep/21/swansea-city-manchester-city-efl-cup-match-report with status 200 in msec 32
2016-09-22 11:37:25.520 c.d.s.b.FetcherBolt [INFO] [Fetcher #5] Fetched http://www.elwatan.com/hebdo/arts-et-lettres/parler-et-lire-02-07-2016-324272_159.php with status 200 in msec 210
2016-09-22 11:37:25.658 c.d.s.e.p.StatusUpdaterBolt [INFO] Bulk response 1, waitAck 0, acked 1
2016-09-22 11:37:25.892 c.d.s.e.p.StatusUpdaterBolt [INFO] Bulk response 1, waitAck 0, acked 1
2016-09-22 11:37:25.972 c.d.s.e.p.AggregationSpout [INFO] [spout #1]  Populating buffer with nextFetchDate <= 2016-09-22T11:37:25.972+0000
2016-09-22 11:37:25.972 c.d.s.e.p.AggregationSpout [INFO] [spout #6]  Populating buffer with nextFetchDate <= 2016-09-22T11:37:25.972+0000
2016-09-22 11:37:25.977 c.d.s.e.p.AggregationSpout [INFO] [spout #1]  ES query returned 0 hits from 0 buckets in 4 msec with 0 already being processed
2016-09-22 11:37:25.982 c.d.s.e.p.AggregationSpout [INFO] [spout #1]  Not enough time elapsed since 2016-09-22T11:37:25.972+0000 - sleeping for 59990
2016-09-22 11:37:25.982 c.d.s.e.p.AggregationSpout [INFO] [spout #3]  Populating buffer with nextFetchDate <= 2016-09-22T11:37:25.982+0000
2016-09-22 11:37:25.984 c.d.s.e.p.AggregationSpout [INFO] [spout #7]  Populating buffer with nextFetchDate <= 2016-09-22T11:37:25.984+0000
2016-09-22 11:37:25.985 c.d.s.e.p.AggregationSpout [INFO] [spout #8]  Populating buffer with nextFetchDate <= 2016-09-22T11:37:25.985+0000
2016-09-22 11:37:25.985 c.d.s.e.p.AggregationSpout [INFO] [spout #6]  ES query returned 90 hits from 1 buckets in 10 msec with 41 already being processed
2016-09-22 11:37:25.987 c.d.s.e.p.AggregationSpout [INFO] [spout #7]  ES query returned 0 hits from 0 buckets in 2 msec with 0 already being processed
...
2016-09-22 11:38:34.652 c.d.s.b.FetcherBolt [INFO] [Fetcher #5] Fetched https://www.theguardian.com/football/2016/sep/21/swansea-city-manchester-city-efl-cup-match-report with status 200 in msec 46
...
2016-09-22 11:39:04.788 c.d.s.b.FetcherBolt [INFO] [Fetcher #5] Fetched http://www.elwatan.com/hebdo/arts-et-lettres/parler-et-lire-02-07-2016-324272_159.php with status 200 in msec 201
...

@jnioche
Copy link
Contributor Author

jnioche commented Sep 23, 2016

Thanks @sebastian-nagel, this explains why the min.delay.queries param does not work for all URLs.

This means that it can take more than a minute between the moment a tuple is acked and when the update is reflected in the search results.

that was actually incorrect (and reassuring)

@sebastian-nagel
Copy link
Contributor

Couldn't it be that the document from El Watan is fetched at 11:37:25.520, acked at 11:37:25.892, and 80 ms later at 11:37:25.972 spout 6 asks for new URLs and gets 90 back at 11:37:25.985. Just one possibility, and 80ms is quite short. Once the URL is queued in Fetcher, it will be fetched again, right? But this may happen later. Attached the full log: worker.log.zip

@jnioche
Copy link
Contributor Author

jnioche commented Sep 24, 2016

yes, we are saying the same thing. The min.delay.queries param doesn't help with URLs at the end of the buffer as the time is counted since the previous request to ES was completed.

Once the URL is queued in Fetcher, it will be fetched again, right?

yes, once it's in, it's in! the same applies to URLs that sit in the Fetcher internal queues for too long and get failed() because of a timeout. The spout releases them and chances are that they get returned again by the ES query and go straight back to the Fetcher queue.

sebastian-nagel added a commit to sebastian-nagel/storm-crawler that referenced this issue Sep 26, 2016
…after acked:

- acked elements are kept in a cache with configurable size and duration
- fix for apache#340 to avoid that URLs are fetched a second time
@sebastian-nagel
Copy link
Contributor

First patch which keeps the items longer to avoid duplicates

  • by now only AggregationSpout is changed
  • needs testing at scale
  • bundled shared methods and variables in AbstractElasticSearchSpout

@sebastian-nagel
Copy link
Contributor

... and this patch causes the worker to fail sometimes while placing items in the deletion queue/cache:

2016-09-26 16:45:37.765 o.a.s.util [ERROR] Async loop died!
java.lang.AssertionError
        at com.google.common.cache.LocalCache$Segment.evictEntries(LocalCache.java:2670) ~[stormjar.jar:?]
        at com.google.common.cache.LocalCache$Segment.put(LocalCache.java:2887) ~[stormjar.jar:?]
        at com.google.common.cache.LocalCache.put(LocalCache.java:4149) ~[stormjar.jar:?]
        at com.google.common.cache.LocalCache$LocalManualCache.put(LocalCache.java:4754) ~[stormjar.jar:?]
        at com.digitalpebble.stormcrawler.elasticsearch.persistence.AbstractElasticSearchSpout$InProcessMap.remove(AbstractElasticSearchSpout.java:84) ~[stormjar.jar:?]
        at com.digitalpebble.stormcrawler.elasticsearch.persistence.AggregationSpout.ack(AggregationSpout.java:398) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:445) ~[storm-core-1.0.1.jar:1.0.1]
...
2016-09-26 16:44:41.827 o.a.s.util [ERROR] Halting process: ("Worker died")

@jnioche
Copy link
Contributor Author

jnioche commented Oct 5, 2016

Let's first put things in an abstract class (#347) then look at this delay cache business

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants