Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest] Move pipeline configuration to the cluster state #15906

Merged

Conversation

Projects
None yet
4 participants
@martijnvg
Copy link
Member

commented Jan 11, 2016

Move the pipeline configuration from the dedicated index to the cluster state.

@martijnvg martijnvg force-pushed the martijnvg:ingest_move_config_to_cs branch Jan 11, 2016

@talevy

View changes

core/src/main/java/org/elasticsearch/ingest/PipelineStore.java Outdated
}

public Map<String, Processor.Factory> getProcessorFactoryRegistry() {
return processorFactoryRegistry;
}

public List<PipelineDefinition> getReference(String... ids) {
ensureReady();
// Using map of maps instead of

This comment has been minimized.

Copy link
@talevy

talevy Jan 12, 2016

Contributor

left over?

@martijnvg martijnvg force-pushed the martijnvg:ingest_move_config_to_cs branch Jan 12, 2016

@javanna

View changes

core/src/main/java/org/elasticsearch/action/ingest/PutPipelineResponse.java Outdated

import java.io.IOException;

public class PutPipelineResponse extends AcknowledgedResponse {

This comment has been minimized.

Copy link
@javanna

javanna Jan 12, 2016

Member

it is too bad that DeletePipelineResponse and PutPipelineResponse are exactly the same. Shall we instead have a single WritePipelineResponse and use it in both places?

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jan 12, 2016

Author Member

+1 makes sense.

On 12 January 2016 at 13:38, Luca Cavanna notifications@github.com wrote:

In
core/src/main/java/org/elasticsearch/action/ingest/PutPipelineResponse.java
#15906 (comment)
:

  • * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  • * KIND, either express or implied. See the License for the
  • * specific language governing permissions and limitations
  • * under the License.
  • */
    +
    +package org.elasticsearch.action.ingest;
    +
    +import org.elasticsearch.action.support.master.AcknowledgedResponse;
    +import org.elasticsearch.common.io.stream.StreamInput;
    +
    +import org.elasticsearch.common.io.stream.StreamOutput;
    +
    +import java.io.IOException;
    +
    +public class PutPipelineResponse extends AcknowledgedResponse {

it is too bad that DeletePipelineResponse and PutPipelineResponse are
exactly the same. Shall we instead have a single WritePipelineResponse and
use it in both places?


Reply to this email directly or view it on GitHub
https://github.com/elastic/elasticsearch/pull/15906/files#r49448657.

Met vriendelijke groet,

Martijn van Groningen

@javanna

View changes

core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java Outdated
@@ -77,6 +80,11 @@ public String getDescription() {
return compoundProcessor.getOnFailureProcessors();
}

public Map<String, Object> toMap() {
Map<String, Object> result = new HashMap<>();
return result;

This comment has been minimized.

Copy link
@javanna

javanna Jan 12, 2016

Member

where is this used?

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jan 13, 2016

Author Member

leftover from trying to make pipeline/processors know how serialise themselves :)

@javanna

View changes

core/src/main/java/org/elasticsearch/ingest/PipelineStore.java Outdated
try {
@SuppressWarnings("unchecked")
Map<String, Object> pipelineConfigCopy = (Map<String, Object>) deepCopy(entry.getValue());
pipelines.put(id, constructPipeline(id, pipelineConfigCopy));

This comment has been minimized.

Copy link
@javanna

javanna Jan 12, 2016

Member

given where it ended up being called, I think that removing properties from config is not that useful anymore here? maybe we should have two versions of the method, one to validate and one called here so we avoid the deep copy? in theory the pipeline should be correct at this point right? no need to validate it twice probably

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jan 13, 2016

Author Member

I'll try this out.

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jan 13, 2016

Author Member

Looked at it and it makes it messy if we have to pass this down all the way down to ConfigurationUtils.

Maybe instead we should not store the config a map of maps, but as bytes reference. After the pipeline has been created we don't use it any more.

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

with BytesReference though what would we pass to the Pipeline.factory /ConfigurationUtils? Maybe have this PipelineConfiguration class, which also allows to retrieve its config in map format to hand it over to ConfigurationUtils?

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jan 13, 2016

Author Member

right, that is what I have now :)

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

ok let me have a look then ;)

@javanna

View changes

core/src/main/java/org/elasticsearch/ingest/IngestMetadata.java Outdated
public final static String TYPE = "ingest";
public final static IngestMetadata PROTO = new IngestMetadata();

private final Map<String, Map<String, Object>> pipelines;

This comment has been minimized.

Copy link
@javanna

javanna Jan 12, 2016

Member

I see why we have maps of maps, but if we have to do it this way, maybe we could have a PipelineConfiguration class that holds the configuration as a map and is Writeable and ToXContent etc. ? similar to the previous PipelineDefinition but this one would hold the configuration.

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jan 13, 2016

Author Member

+1 I'll add PipelineConfiguration as static inner class to PipelineConfiguration

@javanna

View changes

core/src/main/java/org/elasticsearch/ingest/IngestMetadata.java Outdated

private final Map<String, Map<String, Object>> pipelines;

public IngestMetadata() {

This comment has been minimized.

Copy link
@javanna

javanna Jan 12, 2016

Member

this one can be private

@javanna

View changes

core/src/main/java/org/elasticsearch/ingest/PipelineStore.java Outdated
*/
// Returning map of maps instead of pipeline, because pipeline / processor interface doesn't
// know how to serialize itself.
public Map<String, Map<String, Object>> getPipelines(String... ids) {

This comment has been minimized.

Copy link
@javanna

javanna Jan 12, 2016

Member

if we add PipelineConfiguration as suggested above, it could be returned by the get pipeline api and things would be a bit nicer?

@javanna

This comment has been minimized.

Copy link
Member

commented Jan 12, 2016

I left some comments, would love @s1monw to have a look too and see if we can improve this further.

@@ -154,18 +115,58 @@ public void close() throws IOException {
IOUtils.close(closeables);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {

This comment has been minimized.

Copy link
@s1monw

s1monw Jan 13, 2016

Contributor

this needs to be tested expclitily in the PipelineStoreTests

@s1monw

View changes

core/src/main/java/org/elasticsearch/ingest/PipelineStore.java Outdated
client.delete(deleteRequest, handleWriteResponseAndReloadPipelines(listener));
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE);

This comment has been minimized.

Copy link
@s1monw

s1monw Jan 13, 2016

Contributor

would be nice to tests this without a ClusterService, can we make all the execute bodies pkg private methods we can call and have dedicated unittests and assert the content etc?

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jan 13, 2016

Author Member

+1 good point, then we can drop the current test

@martijnvg

This comment has been minimized.

Copy link
Member Author

commented Jan 13, 2016

@javanna @s1monw I've updated the PR.

@s1monw

This comment has been minimized.

Copy link
Contributor

commented Jan 13, 2016

thanks for the tests @martijnvg

/**
* An iterator that easily helps to consume all hits from a scroll search.
*/
public final class SearchScrollIterator implements Iterator<SearchHit> {

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

wasn't the plan to keep this one as part of the java api? Seemed like a nice to have, too bad you wrote it for nothing!

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jan 13, 2016

Author Member

agreed it is a nice utility, but if it isn't used then it feels weird to have. Unless we make it a java client utility, but then it needs in a different location and it needs to be documented too and we should it add via a different pr outside this branch.

@javanna

View changes

core/src/main/java/org/elasticsearch/ingest/IngestMetadata.java Outdated
}

/**
* Encapsulates a pipeline's id and configuration as a map of maps

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

comment is outdated (we don't have map of maps anymore)

@javanna

View changes

core/src/main/java/org/elasticsearch/ingest/PipelineStore.java Outdated
public List<PipelineDefinition> getReference(String... ids) {
ensureReady();
/**
* @return a pipeline as maps of maps by id. Multiple pipelines will be returned if multiple ids are provided.

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

can you update the comment saying that it returns the pipeline configuration?

@javanna

View changes

core/src/main/java/org/elasticsearch/ingest/PipelineStore.java Outdated
* @return a pipeline as maps of maps by id. Multiple pipelines will be returned if multiple ids are provided.
*/
// Returning PipelineConfiguration instead of Pipeline, because Pipeline and Processor interface don't
// know how to serialize themselves.

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

Can we clarify why we do this? isn't it more because processors might have dependencies that are not available statically, thus we keep their configuration in the cluster state so that we can delay their construction?

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jan 13, 2016

Author Member

right, this is just a note about serialization. In the top of the file I mention the reasons why pipelines can't be part of cluster state.

@javanna

View changes

core/src/main/java/org/elasticsearch/ingest/core/Pipeline.java Outdated
@@ -20,9 +20,12 @@

package org.elasticsearch.ingest.core;

import org.elasticsearch.common.collect.HppcMaps;

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

leftover?

@javanna

View changes

core/src/main/java/org/elasticsearch/action/ingest/GetPipelineResponse.java Outdated
this.pipelines = pipelines;
}

public List<PipelineDefinition> pipelines() {
public Set<PipelineConfiguration> pipelines() {

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

why do we need a Set here?

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jan 13, 2016

Author Member

I was thinking about if duplicated ids were requested, but lets return a list. It would make the unit tests for this class a bit easier too.

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

yes and remove equals and hashcode from PipelineConfiguration too I think

@javanna

View changes

core/src/main/java/org/elasticsearch/ingest/IngestMetadata.java Outdated
*/
// We can't use Pipeline class directly in cluster state, because we don't have the processor factories around when
// IngestMetadata is registered as custom metadata.
public final static class PipelineConfiguration implements ToXContent {

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

Doesn't PipelineConfiguration deserve its own class file under o.e.ingest ? It's returned by the java api too.

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

also implement Writeable?

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jan 13, 2016

Author Member

but then also need to implement readFrom(). This class has a contractor that accepts a StreamInput, which allows its fields to be final.

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

Writeable#readFrom returns a new instance of the object, it allows to have final fields, but it requires to have a PROTO instance of the object to call readFrom against. I wish there was an interface to declare writeTo only though but we don't have it at the moment.

@javanna

View changes

core/src/main/java/org/elasticsearch/ingest/IngestMetadata.java Outdated
}
break;
case START_OBJECT:
config = XContentFactory.jsonBuilder().startObject().value(parser.map()).endObject().bytes();

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

nit: wondering if we can avoid parsing the map and rewriting it directly in json format. not sure there's a better way to do this.

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jan 13, 2016

Author Member

I think we can use the XContentHelper.copyCurrentEvent(...) method here to avoid creating the map.

@javanna

View changes

core/src/main/java/org/elasticsearch/ingest/IngestMetadata.java Outdated
return id;
}

public BytesReference getConfig() {

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

where is this used?

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jan 13, 2016

Author Member

it is unused, I'll remove it

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

maybe rename getConfigAsMap to getConfig then?

@martijnvg

This comment has been minimized.

Copy link
Member Author

commented Jan 13, 2016

@javanna I've updated the PR. I think it is closed to getting merged.

@javanna

View changes

core/src/main/java/org/elasticsearch/ingest/PipelineStore.java Outdated
Map<String, PipelineConfiguration> pipelines = currentIngestMetadata.getPipelines();
if (pipelines.containsKey(request.id()) == false) {
// nothing to delete...
return currentState;

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

shall we instead return an error / 404 when the pipeline is not there? I think that would be appropriate for a CRUD api?

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

I double checked and this has changed with this PR, we used to return 404 before.

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jan 13, 2016

Author Member

true, before we relied on the delete api, I'll throw an exception when the pipeline is missing.

@javanna

View changes

core/src/main/java/org/elasticsearch/ingest/IngestBootstrapper.java Outdated
}

// for testing:
IngestBootstrapper(Settings settings, ThreadPool threadPool, ClusterService clusterService,
PipelineStore pipelineStore, PipelineExecutionService pipelineExecutionService) {
IngestBootstrapper(Settings settings, PipelineStore pipelineStore,

This comment has been minimized.

Copy link
@javanna

javanna Jan 13, 2016

Member

I think this constructor can go away

@javanna

This comment has been minimized.

Copy link
Member

commented Jan 13, 2016

LGTM

@martijnvg martijnvg force-pushed the martijnvg:ingest_move_config_to_cs branch Jan 13, 2016

@martijnvg martijnvg force-pushed the martijnvg:ingest_move_config_to_cs branch 4 times, most recently Jan 13, 2016

@martijnvg martijnvg force-pushed the martijnvg:ingest_move_config_to_cs branch to f388334 Jan 13, 2016

@martijnvg martijnvg merged commit f388334 into elastic:feature/ingest Jan 13, 2016

1 check passed

CLA Commit author is a member of Elasticsearch
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.