New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-4491] Handle index.number_of_shards in the ES connector #2790
Conversation
import java.util.List; | ||
import java.util.Map; | ||
|
||
public class ElasticSearchHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class needs a javadoc that explains what this it can be used for.
Any other feedback on this?could it be merged? |
for (String indexName:mappingRequest.indices()){ | ||
// If the index does not exist, create it | ||
client.admin().indices().prepareCreate(indexName) | ||
.setSettings(Settings.builder().put("index.number_of_shards", DEFAULT_INDEX_SHARDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use elasticsearch constant for "index.number_of_shards" => IndexMetaData.SETTING_NUMBER_OF_SHARDS
// If the index does not exist, create it | ||
client.admin().indices().prepareCreate(indexName) | ||
.setSettings(Settings.builder().put("index.number_of_shards", DEFAULT_INDEX_SHARDS) | ||
.put("index.number_of_replicas", DEFAULT_INDEX_REPLICAS)).execute().actionGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use elasticsearch constant for "index.number_of_replicas" => IndexMetaData. SETTING_NUMBER_OF_REPLICAS
// This instructs the sink to emit after every element, otherwise they | ||
// would be buffered | ||
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); | ||
config.put("cluster.name", "my-transport-client-cluster"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try to see if "cluster.name" can be replaced by ClusterName.CLUSTER_NAME_SETTING
Thanks for the pull request @ddolzan and sorry for the delayed review. To be honest, I am not sure if we should include the |
From my point of view ElasticSearchHelper is very useful if you plan to use the Flink Elasticsearch Sink in a real production use case, because it reduce a lot the complexity of interfacing with ES. |
I agree with Fabian here. The ESHelper does not use any Flink code at all, so the relation to Flink is not clear. A user of Hadoop would equally benefit from such a utility. I would expect that ES provides such tools to Java developers. Maybe there is a repository somewhere offering ES tools / abstractions? |
} else { | ||
throw new RuntimeException("An error occured in ElasticsearchSink."); | ||
} | ||
LOG.error("Some documents failed while indexing to Elasticsearch: " + failureThrowable.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to add a debug log statement as well logging the full stack trace.
Also, in the other connectors we have a flag that allows the user to control whether an error should be logged or fail the connector. I would suggest to add this here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to substitute the current line 263 with a LOG.DEBUG + stacktrace or leave the current line or to add another LOG.debug line where we log only the stacktrace? Currently the detail of the error is logged within afterBulk(). Should we modify that part?
How do you suggest to control the error behaviour ? Is it ok comething like
public static final String CONFIG_KEY_BULK_ERROR_POLICY = "bulk.error.policy";
that a user can set to "strict" or "lenient" in the userConfig object?
Fine for us about the ES helper. Since it took some time to implement that functionality and there was a JIRA ticket for that we thought that it was a good idea to share that code with other users. |
Index template and index mapping creation/configuration will be kept outside of flink. |
Thank you @ddolzan |
Implemented the Index Template and Index Mapping creation.
Number of shards and many other properties can be defined in the Index Template.
Usage
Before calling ElasticasearchSink instantiate ElasticSearchHelper
TemplateRequest example
MappingRequest example