Skip to content

Commit

Permalink
fix(elasticsearch): Set vertx dependencies as provided
Browse files Browse the repository at this point in the history
  • Loading branch information
brasseld authored and NicolasGeraud committed May 28, 2019
1 parent 93945ca commit eb5fb31
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 57 deletions.
Expand Up @@ -112,17 +112,17 @@ public void index(Reportable reportable) {
}

protected Buffer transform(Reportable reportable) {
if (reportable instanceof Metrics) {
return getSource((Metrics) reportable, pipelineConfiguration.getPipeline());
} else if (reportable instanceof EndpointStatus) {
return getSource((EndpointStatus) reportable);
} else if (reportable instanceof Monitor) {
return getSource((Monitor) reportable);
} else if (reportable instanceof Log) {
return getSource((Log) reportable);
}

return null;
if (reportable instanceof Metrics) {
return getSource((Metrics) reportable, pipelineConfiguration.getPipeline());
} else if (reportable instanceof EndpointStatus) {
return getSource((EndpointStatus) reportable);
} else if (reportable instanceof Monitor) {
return getSource((Monitor) reportable);
} else if (reportable instanceof Log) {
return getSource((Log) reportable);
}

return null;
}

/**
Expand Down
Expand Up @@ -16,21 +16,14 @@
package io.gravitee.reporter.elasticsearch.indexer;

import io.gravitee.elasticsearch.client.Client;
import io.gravitee.elasticsearch.model.bulk.BulkResponse;
import io.gravitee.reporter.api.Reportable;
import io.gravitee.reporter.elasticsearch.config.ReporterConfiguration;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import io.vertx.core.buffer.Buffer;
import io.vertx.reactivex.RxHelper;
import io.vertx.reactivex.core.Vertx;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -68,44 +61,7 @@ public void initialize() {
TimeUnit.SECONDS,
configuration.getBulkActions())
.filter(payload -> !payload.isEmpty())
.subscribe(new Subscriber<List<Buffer>>() {
private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(List<Buffer> items) {
client.bulk(items)
.subscribeOn(RxHelper.scheduler(vertx.getDelegate()))
.subscribe(new DisposableSingleObserver<BulkResponse>() {
@Override
public void onSuccess(BulkResponse bulkResponse) {
dispose();
}

@Override
public void onError(Throwable t) {
logger.error("Unexpected error while indexing data", t);
}
});

subscription.request(1);
}

@Override
public void onError(Throwable t) {
logger.error("Unexpected error while indexing data", t);
}

@Override
public void onComplete() {
// Nothing to do here
}
});
.subscribe(new DocumentBulkProcessor(client, vertx));
}

@Override
Expand Down
@@ -0,0 +1,88 @@
/**
* Copyright (C) 2015 The Gravitee team (http://gravitee.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.gravitee.reporter.elasticsearch.indexer;

import io.gravitee.elasticsearch.client.Client;
import io.gravitee.elasticsearch.model.bulk.BulkResponse;
import io.reactivex.observers.DisposableSingleObserver;
import io.vertx.core.buffer.Buffer;
import io.vertx.reactivex.RxHelper;
import io.vertx.reactivex.core.Vertx;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
*
* @author David BRASSELY (david.brassely at graviteesource.com)
* @author GraviteeSource Team
*/
class DocumentBulkProcessor implements Subscriber<List<Buffer>> {

/**
* Logger
*/
private final Logger logger = LoggerFactory.getLogger(DocumentBulkProcessor.class);

private Subscription subscription;

private final Client client;

private final Vertx vertx;

DocumentBulkProcessor(Client client, Vertx vertx) {
this.client = client;
this.vertx = vertx;
}

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(List<Buffer> items) {
client.bulk(items)
.subscribeOn(RxHelper.scheduler(vertx.getDelegate()))
.subscribe(new DisposableSingleObserver<BulkResponse>() {
@Override
public void onSuccess(BulkResponse bulkResponse) {
dispose();
}

@Override
public void onError(Throwable t) {
logger.error("Unexpected error while indexing data", t);
}
});

subscription.request(1);
}

@Override
public void onError(Throwable t) {
logger.error("Unexpected error while indexing data", t);
}

@Override
public void onComplete() {
// Nothing to do here
}
}
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -110,7 +110,7 @@
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
<scope>compile</scope>
<scope>provided</scope>
</dependency>

<!-- Spring dependencies -->
Expand Down

0 comments on commit eb5fb31

Please sign in to comment.