Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document Count in ES Different from Number of Entries Pushed #283

Closed
mparker4 opened this issue Sep 26, 2014 · 31 comments
Closed

Document Count in ES Different from Number of Entries Pushed #283

mparker4 opened this issue Sep 26, 2014 · 31 comments

Comments

@mparker4
Copy link

We are seeing an issue where the number of documents successfully created/indexed in ES is different (more--very odd, or less) from the number of documents which we are giving ES to index.

Specifically, we are using spark to grab specific correctly formatted json from a repository and simply write it to ES into a new index. The difference in documents only seems to occur if there is a large number of documents being inserted (specifically we tested with 929,660) and too few shards (specifically 1). We also did a test at a larger scale (roughly 9 million documents and over 100 shards) which also gave incorrect document counts. We have ruled out this being a spark issue, but there are no apparent issues or exceptions in the logs (ES or Spark/ES-Hadoop), including when they are upped to debug. We do notice that ES does throttle the input. The fact that this only happens when large numbers of documents are created/indexed and that it seems to alleviate when more shards are added to balance the load points it to a load issue. I believe this to be an issue with ES core, but I was interested in seeing if anyone else here had seen this due to ES-hadoop being able to create a load issue more easily than the normal api. We have tried ES-hadoop 2.1.0-beta and 2.0.1 release with the same results.

I can provide more details as needed, but as there is nothing in the logs, there are not many other observables that I can detail. It is very similar to: http://elasticsearch-users.115913.n3.nabble.com/missing-documents-after-bulk-indexing-td4056100.html, however I am not seeing any exceptions. Please let me know if anyone has seen this issue or has any resolution.

@costin
Copy link
Member

costin commented Sep 26, 2014

@mparker4 This seems to be related to #268. To confirm - is this an issue in ES or in the metrics reported by es-hadoop.
Can you provide some information on your environment - in particular ES version, Spark, Java version and number?

Thanks,

@costin
Copy link
Member

costin commented Sep 26, 2014

Also can you post out the relevant logs for debugging? Cheers

@mparker4
Copy link
Author

@costin Thanks for responding. This is an issue regarding the actual number of indexed documents in a fresh index in ES (0 documents, created just before the indexing job begins) being different than the number of documents that were given to ES (by spark) to create/index. So spark gives ES 929,660 json documents to index, however sometimes only 924,000 (roughly) are indexed. I have seen the actual number of documents in ES fluctuate between missing up to 5000 or missing as few as 3. On other instances it has had too many documents, up to almost double--still with a fresh index and the same number of documents input. There is no decipherable pattern and the same documents are used for every run. My last run: ES reports 1,409,124 documents, but spark submitted only 929,660 for indexing. The index contained 0 documents to start, nothing else is writing to it, it contains 1 shard and 0 replicas. The number of shards and replicas does seem to matter--the trend is with a higher number of shards, a lower number of replicas and a lower number of total documents attempting to be indexed, we will see the exactly correct number of documents under those conditions, but that is very difficult for me to verify and extrapolate those results in any significant way. I have also tried using both the new native RDD format for spark in the beta release, as well as the old output format based format inside of spark in both the beta and the 2.0.1 releases.

Although I don't think it is related to #268, as a side question, how can I see those metrics when using spark? From the documentation, it looked as if the metrics used the hadoop counter infrastructure, which to my knowledge would be independent of the spark job. Is it possible to see these when using the spark API?

ES version: 1.3.0 (9 data nodes, 3 masters, 1 load balancer)
Spark version: 1.0.0 (1 master, 9 workers)
Java version: 1.8.0_11

Spark and ES are running on the same nodes (spark on a subset or the ES nodes). Working on posting some logs.

Thanks again for your help and response.

@mparker4
Copy link
Author

Just as another data point--

Fresh index with 100 shards, 0 replicas: ES reports 43,172,074, spark gave ES 43,172,131
=57 missing documents

Again I have seen the number of missing documents fluctuate from the thousands to only 3.

LOGS:
There are hundreds of thousands of lines of log files for this past test. After a quick skim (of some of the workers) and some pattern matching, copied below is the only error or irregularity I saw and it seems to be after everything in ES has completed. Its due to a recent config change in our spark configs. I can fix and rerun, but I don't think this is causing any issues.

14/09/26 18:01:50 DEBUG RestRepository: Sending batch of [1047125] bytes/[341] entries
14/09/26 18:01:50 DEBUG RestRepository: Sending batch of [1045597] bytes/[344] entries
14/09/26 18:01:50 DEBUG RestRepository: Sending batch of [1047059] bytes/[333] entries
14/09/26 18:01:51 DEBUG RestRepository: Sending batch of [1048354] bytes/[344] entries
14/09/26 18:01:51 DEBUG RestRepository: Sending batch of [1045574] bytes/[343] entries
14/09/26 18:01:51 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
14/09/26 18:01:51 DEBUG RestRepository: Sending batch of [600530] bytes/[191] entries
14/09/26 18:01:51 DEBUG RestRepository: Refreshing index [twitter-json/twitter]
14/09/26 18:01:51 INFO Executor: Serialized size of result for 9 is 539
14/09/26 18:01:51 INFO Executor: Sending result for 9 directly to driver
14/09/26 18:01:51 INFO Executor: Finished task ID 9
14/09/26 18:01:52 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@worker_node_ip:54954] -> [akka.tcp://spark@master_node_ip:37514] disassociated! Shutting down.

@costin
Copy link
Member

costin commented Sep 26, 2014

Can you upgrade to ES 1.3.2? How are you checking the number of documents
in ES? Do you issue any refresh/flush before hand?

On Sat, Sep 27, 2014 at 1:13 AM, mparker4 notifications@github.com wrote:

Just as another data point--

Fresh index with 100 shards, 0 replicas: ES reports 43,172,074, spark gave
ES 43,172,131
=57 missing documents

Again I have seen the number of missing documents fluctuate from the
thousands to only 3.


Reply to this email directly or view it on GitHub
#283 (comment)
.

Costin

@mparker4
Copy link
Author

Yes, we have a separate cluster set up with ES 1.3.2. I'm running a job there now.

Using ES HQ, which is a front end for ES administration. Also verified by curl -XGET ES_IP:9200/twitter-json/_search (where twitter-json is our index) and looking at the number of hits

Yes to flushing and refreshing. No change in number.

@mparker4
Copy link
Author

In the ES 1.3.2 cluster, which only has 2 nodes and i am pumping in data from 9 spark nodes, I am seeing even larger numbers of missing documents. If this is truly a load issue, I would expect that.

Last run: ES 1.3.2, 2 nodes (both masters and data), 37 shards, 0 replicas
Documents in ES: 852,527
Documents passed from spark: 929,660
Missing Documents: 77,133

This is the largest number of missing documents I have seen.
No irregularities in the logs.

@mparker4
Copy link
Author

Another run on the ES 1.3.2 cluster, shards 37, replicas 0:
in ES: 929,210
from Spark: 929,660
Missing: 450

I had forgotten I had upped the batch size from the default before the previous run (the 77,000 missing documents run), so I'm guessing that has something to do with that large number of missing documents. This run is back at the default batch size. Shrinking it 100 for next run.

@mparker4
Copy link
Author

Another run on the ES 1.3.2 cluster, shards 37, replicas 0, batch size 100 entries:
in ES: 929,660
from Spark: 929,660
Missing: 0

So this is good and bad. From my observations, it appears now that the number of shards, the number of replicas and the batch size all have effects on the proper number of documents being created/indexed. This would point to a load issue in my mind. Would there be a reason this I am the only one seeing this? Configs?

This is very difficult to scale and set appropriately as my number of input documents will change all the time (even though I've been keeping it constant for these runs) and the size of my cluster may change (nodes die, nodes added, etc.). I also might want replicas or not want replicas in different situations. Any advice on this would be great.

Thank you for all your help.

Just so you know I will be away from this issue until Monday or Tuesday of next week so I apologize if I do not respond until then. Thanks again.

@mparker4
Copy link
Author

mparker4 commented Oct 3, 2014

Update: Updated to ES 1.3.4. Still seeing too few documents issue. Too many documents issue may still be an issue for the framework, but it currently looks like we have made a successful work around by manually passing in the id for each document via a path'd mapping and a guaranteed unique id. Could this be the same issue as #167, which was closed? Looks similar.

Last run: ES 1.3.4, shards 50, replicas 0, batch size 1000:
in ES: 43,166,511
from spark: 43,172,131
missing: 5620

It looks like the internal code for the RestClient and RestRepository are using stats objects. Is there any easy way to get these through the spark api or would i have to modify the code?

Thanks!

@b0c1
Copy link

b0c1 commented Oct 10, 2014

Hi guys!
I have same problem. I have 320 json document. If I using elastic4s all document added, if I using elastic-hadoop (in spark) 25 document missing (always same documents).

@costin
Copy link
Member

costin commented Oct 10, 2014

@b0c1 Interesting. Can you share the docs? Potentially remove sensitive content and strip them out as much as possible while allowing the bug to be reproduced?

@b0c1
Copy link

b0c1 commented Oct 10, 2014

If you give me an email address I can send you the messages to test. I always got same result.

The error doesn't exists if I send only the excluded message, but exists if I send all...

@costin
Copy link
Member

costin commented Oct 11, 2014

Upload the data on dropbox/s3/gist and send the link to my email (you can find it in the commit log) or as a private
message on twitter.

Cheers,

On 10/11/14 1:54 AM, Janos Haber wrote:

If you give me an email address I can send you the messages to test. I always got same result.


Reply to this email directly or view it on GitHub
#283 (comment).

Costin

@b0c1
Copy link

b0c1 commented Oct 11, 2014

Hehe, if you follow me I can send you the message on twitter...

@b0c1
Copy link

b0c1 commented Oct 14, 2014

I sent at hangout and irc :)

@costin
Copy link
Member

costin commented Oct 14, 2014

Sorry - currently travelling; hope to get to it by next week. I'll keep you
posted.

On Tue, Oct 14, 2014 at 2:51 PM, Janos Haber notifications@github.com
wrote:

I sent at hangout and irc :)


Reply to this email directly or view it on GitHub
#283 (comment)
.

Costin

@costin
Copy link
Member

costin commented Dec 11, 2014

@b0c1 Sorry for the looong delay. Picked up your message on IRC and am inspecting the log, trying to find what's the issue.

@bzz
Copy link

bzz commented Dec 22, 2014

Experience the same issue with Spark 1.1.1 ES 1.4.1 and elasticsearch-spark_2.10:2.1.0.Beta3
On every re-run of

    out.count // 34190
    out.saveToEs(Map(
      ConfigurationOptions.ES_NODES -> esAddrs,
      ConfigurationOptions.ES_RESOURCE -> ("test-{date}/stats")
    ))

number of documents written to 3 indexes (6 shards, 1 replica) is about 171,300 (varies every time ~ +/-100)

BTW it's not clear to me why is RDD of 32190 result in ~ 171,000 docs

UPD: "shards 10, 0 replicas" allowed to get same number of documents in ES on every re-run. BUT it's still exactly factor of 5 from original number of elements in RDD + every time the distribution between indexes changes (same index does not have same # of doc, but SUM(# docs) is stable between re-runs)

costin added a commit that referenced this issue Jan 7, 2015
Fix bug in JSON extraction that caused the last matching node to win
despite a previous match

relates #283
@costin
Copy link
Member

costin commented Jan 7, 2015

@b0c1 Thanks for the sample dataset - I've managed to reproduce the bug and fix it. I've pushed out the new binaries - can you please try them out and report back?

@bzz Can you please try the latest binaries as well and see whether it fixes your problem? Can please try several scenarios: removing the patterns from the index name and indexing everything under the same index and see whether that makes any difference. Adding a path id (as suggested by @mparker4). Also between your runs, can you spot what documents are missing - are there any shards that stand out (whether some have the same number of documents or not).
Do you have any sample of documents that I could try? Maybe reduce the number of documents to 10K or less and see whether it's reproducible.
Anything special about your cluster - number of nodes vs machines etc?

P.S. The exact number of 5 makes me think about the default sharding in elasticsearch. How many Spark tasks do you have by the way? If it helps, give me a ping on IRC maybe it helps getting down to the root of the problem.

@mparker4 Hopefully you're still around - can you please try out the latest 2.1 build and report back; it looks like you were using the id mapping and the latest build fixes that (so you can use it as an alternative to path ids in Elasticsearch).
As for the internal stats to be exposed in Spark, it looks like the library provides a metrics package but I haven't seen much use of it. If there is any, we could easily provide the internal es-hadoop stats there, just as we do with Map/Reduce in Hadoop.

@costin
Copy link
Member

costin commented Jan 7, 2015

@mparker4 By the way, there is a chance that under load you might have hit these Elasticsearch bugs, namely elastic/elasticsearch#9125 and elastic/elasticsearch#7729. These are fixed in ES master.

costin added a commit that referenced this issue Jan 9, 2015
refine the internals of JSON parsing
preserve type of headers instead of always converting them to strings
improve the quoting of values by treating numbers and booleans properly

relates to #283
@bzz
Copy link

bzz commented Jan 12, 2015

@costin thanks for taking care, I'm working on providing an reproducible environment for you.

Latest code gives the same x5 number of documents written. BUT this happens only when custom mapping is provided (and applied well, by the template).

Funny enough, exactly same happens if I switch to TransportClient with BULK API, indexing same RDD from the SparkDriver after .collect()
Even more funny is that this x5 effect persists in case I do NOT bulk but index every document.

Without the template (so no mapping set) it works well - the number of documents is same as number of RDD elements.

@costin
Copy link
Member

costin commented Jan 12, 2015

@bzz so it looks like you are using spark and elastic without es-hadoop, correct (since you mention the TransportClient). Do you use es-hadoop for anything (like reading the data once it is indexed) or not really?
I'm just trying to understand your environment.

The consistent x5 might indicate an over indexing - that is the data is indexed multiple times (potentially by each Spark task). The custom mapping might trigger that if it contains a different id strategy causing documents that otherwise would be merged to be added. Just a guess...
You might be also hitting the two ES bugs mentioned above.

@costin
Copy link
Member

costin commented Jan 12, 2015

@b0c1 @mparker4 hey guys - can you please check out the latest changes?

@bzz
Copy link

bzz commented Jan 13, 2015

@costin thanks for helping

Do you use es-hadoop for anything (like reading the data once it is indexed) or not really?

I use es-hadoop with Scala native api since it became available to write to single-node ES 1.4.2 cluster, but as result got x5 documents written

  • so I switched to using TranspportClient + batch api instead of es-hadoop to check if that's an es-hadoop issue, but still had x5 docs. After you'v mention known ES bugs of the bulk api I was not aware of
  • so I switched to TranspportClient + indexing each document separately to check if I hit any of those bugs and still had x5 documents

Only when I got rid of the template with custom mappings(have no things like 'different ID strategy', just nested objects) I started getting correct number of documents written (any technique from above works now)

I guess there are strong evidence now that my case although very strange is not the issue of the es-hadoop so I'll better be moving further investigation to elasticseach mailing list.

@costin
Copy link
Member

costin commented Jan 26, 2015

@bzz Any update on this front? do you use aliases by any chance?

@bzz
Copy link

bzz commented Feb 2, 2015

No aliases and I still see the number of documents reported by ES x5 from the rdd.count.
At the same time, on reading from those indexes using es-hadoop, on the spark side esEsRdd.count will show the original number..

@costin
Copy link
Member

costin commented Feb 2, 2015

The x5 makes me think that this has something to do with the number of shards for the target index. es-hadoop does the count per shard while in your case, you might be indexing all the documents in each task and thus you end up doing 5 times the work hence the multiplier.

@mparker4
Copy link
Author

@costin Sorry I haven't been paying attention here. We had to go an entirely different route in our architecture based on this and some other requirements so we're not using es-hadoop at the moment but will probably look into it again in the future (when other priorities settle down). Unfortunately it is difficult for me to test the issue now this seeing our architecture has changed so much, but I will put it on the back burner to try. If problems are fixed for bzz and others, feel free to close this if you would like. I can reopen at a later date if anything arises. Thanks for all the help.

@costin
Copy link
Member

costin commented Apr 28, 2015

@bzz @b0c1 @mparker4 Thanks for the updates, I'll close the issue for now. If the issue occurs in the future, please open up a new one while potentially linking to this one.

Cheers,

@costin costin closed this as completed Apr 28, 2015
@swatisaini
Copy link

Hi @costin ,

I'm getting the similar issue. There is mismatch found in document count while reading data from elasticsearch index using pyspark code.
Actual document count in elasticsearch is 991121(1.08 GB) but when we read data into spark, at every attempt it keep changing.
Number of shards - 5
replica - 0
following is the observations
1 st try 991121
2nd try 991147
3rd try 991095
Architecture version info
spark version is 2.0.2
scala - 2.11
python 2.7
elasticsearch 5.1.1
elasticsearch-spark connector- 20_2.11-5.1.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants