Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot flush non-initialized write operation #1357

Closed
1 task
supernova-start opened this issue Sep 20, 2019 · 5 comments · Fixed by #1901
Closed
1 task

Cannot flush non-initialized write operation #1357

supernova-start opened this issue Sep 20, 2019 · 5 comments · Fixed by #1901

Comments

@supernova-start
Copy link

What kind an issue is this?

  • [ *] Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
    The easier it is to track down the bug, the faster it is solved.
  • Feature Request. Start by telling us what problem you’re trying to solve.
    Often a solution already exists! Don’t send pull requests to implement new features without
    first getting our support. Sometimes we leave features out on purpose to keep the project small.

Issue description

Description

When and strom integration, if not data in some time ,the storm supervisor process will crash

Steps to reproduce

Code:

Test/code snippet

Strack trace:
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot flush non-initialized write operation
at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60) ~[mystormdemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:202) ~[mystormdemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.elasticsearch.storm.EsBolt.flushNoAck(EsBolt.java:193) ~[mystormdemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.elasticsearch.storm.EsBolt.flush(EsBolt.java:155) ~[mystormdemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.elasticsearch.storm.EsBolt.cleanup(EsBolt.java:200) ~[mystormdemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.storm.executor.ExecutorShutdown.shutdown(ExecutorShutdown.java:121) ~[storm-client-2.0.0.jar:2.0.0]
at org.apache.storm.daemon.worker.Worker.shutdown(Worker.java:456) ~[storm-client-2.0.0.jar:2.0.0]
at org.apache.storm.ProcessSimulator.killProcess(ProcessSimulator.java:62) ~[storm-server-2.0.0.jar:2.0.0]
at org.apache.storm.daemon.supervisor.LocalContainer.kill(LocalContainer.java:66) ~[storm-server-2.0.0.jar:2.0.0]
at org.apache.storm.daemon.supervisor.Slot.killContainerFor(Slot.java:269) ~[storm-server-2.0.0.jar:2.0.0]
at org.apache.storm.daemon.supervisor.Slot.handleRunning(Slot.java:724) ~[storm-server-2.0.0.jar:2.0.0]
at org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:218) ~[storm-server-2.0.0.jar:2.0.0]
at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:931) [storm-server-2.0.0.jar:2.0.0]

Stack trace goes here

Version Info

OS: : Ubuntu
JVM : 12.0.2
Hadoop/Spark:
ES-Hadoop :
ES :

Feature description

After check the code ,we find the fucntion lazyInitWriting is the reason in org.elasticsearch.hadoop.rest.RestRepository class , it don't have be called if the storm don't have data flow。

@DaveyDevOps
Copy link

We saw a similar issue working with Storm 1.1.0 and Elastic 7.3

Thanks to @kiranyagna for the explanation.

In our case

It happens when you have tick tuples generated by storm before a document to be indexed, this makes it to call flush when tick tuple arrives without calling writeToIndex and it checks on

Assert.isTrue(writeInitialized, "Cannot flush non-initialized write operation");
which fails

I got to the root cause of this problem which is a setting in yaml file es.storm.bolt.tick.tuple.flush: true
disabling tick tuple and the flush setting in yaml file successfully started the topology and didn't throw this exception.

@DaveyDevOps
Copy link

There were changes to public BulkResponse tryFlush() from
Add "Dead Letter Handlers" for bulk write failures #1095

Currently the problem we are facing is, when the tick tuple arrives before any document for indexing, it tries to flush. But writeInitialized flag is not set to true until document arrives which results in throwing "Cannot flush non-initialized write operation" exception. We can’t get away with not generating tick tuples as it is needed to flush ES docs frequently instead of waiting for es.storm.bolt.flush.entries.size to kick in.

@supernova-start
Copy link
Author

supernova-start commented Sep 27, 2019

I think the Assert function is not appropriate, this is my changes in org.elasticsearch.hadoop.rest.RestRepository file。It work better now。
public BulkResponse tryFlush() {
- Assert.isTrue(writeInitialized, "Cannot flush non-initialized write operation");
+ if (!writeInitialized) {
+ return BulkResponse.complete();
+ }

return bulkProcessor.tryFlush();
}

public void flush() {
    **- Assert.isTrue(writeInitialized, "Cannot flush non-initialized write operation");
    + if (!writeInitialized) {
    +       return;
    + }**    
     bulkProcessor.flush();
}

This changes is correct?

@yeongchuin
Copy link

Hi, any update on this issue?

@kiranyagna
Copy link

kiranyagna commented Jul 9, 2020 via email

masseyke added a commit that referenced this issue Feb 22, 2022
This commit logs a warning instead of throwing an exception if an attempt to flush before writing is made (which our
Storm implementation can do).
Closes #1357
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants