Skip to content
This repository has been archived by the owner on May 18, 2020. It is now read-only.

Commit

Permalink
Support for Elasticsearch 2.x and 5.x (#9)
Browse files Browse the repository at this point in the history
* Support for Elasticsearch 2.x and 5.x

Besides:
- got rid of elasticsearch dependency
- few changes in API
- plugins are now installed using elasticsearch-plugin/plugin tool
- cluster name and tcp transport port are not set by default
- configurable startup timeout & higher timeout in tests
- possibility to set ES_JAVA_OPTS variable (that let me fix build
  problem on Travis)
- updated README
- added methods returning port numbers used by elasticsearch
  • Loading branch information
gaczm committed Nov 15, 2016
1 parent 308f950 commit 56be6c1
Show file tree
Hide file tree
Showing 42 changed files with 966 additions and 485 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
@@ -1,5 +1,5 @@
---
language: java
script: ./gradlew clean check
script: ./gradlew clean check --info
jdk:
- oraclejdk8
43 changes: 29 additions & 14 deletions README.md
Expand Up @@ -3,18 +3,18 @@
![build status](https://travis-ci.org/allegro/embedded-elasticsearch.svg?branch=master)
![Maven Central](https://maven-badges.herokuapp.com/maven-central/pl.allegro.tech/embedded-elasticsearch/badge.svg)

Small utility for creating integration tests that uses Elasticsearch. Instead of using `Node` it downloads elastic search in specified version and starts it in seprate process. It also allows you to install required plugins which is not possible when using `NodeBuilder`. Utility was tested with 2.x version of Elasticsearch.
Small utility for creating integration tests that uses Elasticsearch. Instead of using `Node` it downloads elastic search in specified version and starts it in seprate process. It also allows you to install required plugins which is not possible when using `NodeBuilder`. Utility was tested with 2.x and 5.x versions of Elasticsearch.

## Introduction

All you need to do to use this tool is create `EmbeddedElastic` instance. To do so, use provided builder:

```
final embeddedElastic = EmbeddedElastic.builder()
.withElasticVersion("2.2.0")
.withPortNumber(9300)
.withClusterName("my_cluster")
.withSetting("http.port", 9201)
.withElasticVersion("5.0.0")
.withSetting(PopularProperties.TRANSPORT_TCP_PORT, 9350)
.withSetting(PopularProperties.CLUSTER_NAME, "my_cluster")
.withPlugin("analysis-stempel")
.withIndex("cars", IndexSettings.builder()
.withType("car", getSystemResourceAsStream("car-mapping.json"))
.build())
Expand All @@ -41,13 +41,13 @@ And that's all, you can connect to your embedded-elastic instance on specified p
| ------------- | ------------- |
| `withElasticVersion(String version)` | version of Elasticsearch; based on that version download url to official Elasticsearch repository will be created |
| `withDownloadUrl(URL downloadUrl)` | if you prefer to download Elasticsearch from different location than official repositories you can do that using this method |
| `withPortNumber(int portNumber)` | port number on which Elasticsearch will be started |
| `withClusterName(String clusterName)` | cluster name for created Elasticsearch instance |
| `withSetting(String name, Object value` | setting name and value as in elasticsearch.yml file |
| `withMapping(InputStream mapping)`, `withMapping(String mapping)` | JSON with mapping of your index |
| `withSettings(InputStream settings)`, `withSettings(String settings)` | JSON with settings of your index |
| `withPlugin(String name, URL urlToDownload)` | plugin that should be installed into Elasticsearch; use multiple times for multiple plugins |
| `withSetting(String key, Object value)` | setting name and value as in elasticsearch.yml file |
| `withPlugin(String expression)` | plugin that should be installed into Elasticsearch; treat expression as argument to `./elasticsearch-plugin install <expression>` command; use multiple times for multiple plugins |
| `withIndex(String indexName, IndexSettings indexSettings)` | specify index that should be created and managed by EmbeddedElastic |
| `withStartTimeout(long value, TimeUnit unit)` | specify timeout you give Elasticsearch to start |
| `withEsJavaOpts(String javaOpts)` | value of `ES_JAVA_OPTS` variable to be set for Elasticsearch process |
| `getTransportTcpPort()` | get transport tcp port number used by Elasticsearch instance |
| `getHttpPort()` | get http port number used by Elasticsearch instance |

Available `IndexSettings.Builder` options

Expand All @@ -69,7 +69,6 @@ Available `IndexSettings.Builder` options
| `createIndex(String indexName)`, `createIndices()` | creates index with name specified during EmbeddedElastic creation; note that this index is created during EmbeddedElastic startup, you will need this method only if you deleted your index using `deleteIndex` method |
| `recreateIndex(String indexName)`, `recreateIndices()` | combination of `deleteIndex` and `createIndex` |
| `refreshIndices()` | refresh index; useful when you make changes in different thread, and want to check results instantly in tests |
| `createClient()` | create transport client withc default settings |

## Example
If you want to see example, look at this spec: `pl.allegro.tech.search.embeddedelasticsearch.EmbeddedElasticSpec`
Expand All @@ -80,7 +79,7 @@ To start using embedded-elasticsearch in your project add it as a test dependenc
Gradle:

```
testCompile 'pl.allegro.tech:embedded-elasticsearch:1.0.0'
testCompile 'pl.allegro.tech:embedded-elasticsearch:2.0.0'
```

Maven:
Expand All @@ -89,11 +88,27 @@ Maven:
<dependency>
<groupId>pl.allegro.tech</groupId>
<artifactId>embedded-elasticsearch</artifactId>
<version>1.0.0</version>
<version>2.0.0</version>
<scope>testCompile</scope>
</dependency>
```

## Known problems
If you build your project on Travis, you may have problems with OOM errors when using default settings. You can change Elasticsearch memory settings using `withEsJavaOpts` method. Example (from spec `pl.allegro.tech.embeddedelasticsearch.EmbeddedElasticSpec`):

```
static EmbeddedElastic embeddedElastic = EmbeddedElastic.builder()
.withElasticVersion(ELASTIC_VERSION)
.withSetting(TRANSPORT_TCP_PORT, TRANSPORT_TCP_PORT_VALUE)
.withSetting(CLUSTER_NAME, CLUSTER_NAME_VALUE)
.withEsJavaOpts("-Xms128m -Xmx512m")
.withIndex(CARS_INDEX_NAME, CARS_INDEX)
.withIndex(BOOKS_INDEX_NAME, BOOKS_INDEX)
.withStartTimeout(1, MINUTES)
.build()
.start()
```

## License

*embedded-elasticsearch* is published under [Apache License 2.0](http://www.apache.org/licenses/LICENSE-2.0).
28 changes: 13 additions & 15 deletions build.gradle
Expand Up @@ -28,22 +28,20 @@ scmVersion {
project.version = scmVersion.version
project.group = 'pl.allegro.tech'

dependencies {
compile group: 'org.elasticsearch', name: 'elasticsearch', version: '2.2.0'
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.21'
compile group: 'commons-io', name: 'commons-io', version: '2.5'
compile group: 'net.lingala.zip4j', name: 'zip4j', version: '1.3.2'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.4'

testCompile group: 'org.codehaus.groovy', name: 'groovy', version: '2.4.6'
testCompile group: 'org.spockframework', name: 'spock-core', version: '1.0-groovy-2.4'
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.7'
testCompile group: 'org.skyscreamer', name: 'jsonassert', version: '1.3.0'
}
subprojects {
apply plugin: 'groovy'

repositories {
mavenCentral()
}

test {
testLogging {
exceptionFormat = 'full'
sourceCompatibility = 1.8

test {
testLogging {
events "passed", "skipped", "failed"
exceptionFormat = 'full'
}
}
}

Expand Down
14 changes: 14 additions & 0 deletions core/build.gradle
@@ -0,0 +1,14 @@
dependencies {
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.21'
compile group: 'commons-io', name: 'commons-io', version: '2.5'
compile group: 'net.lingala.zip4j', name: 'zip4j', version: '1.3.2'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.4'
compile group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.2'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.2'
compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.2'

testCompile group: 'org.codehaus.groovy', name: 'groovy', version: '2.4.6'
testCompile group: 'org.spockframework', name: 'spock-core', version: '1.0-groovy-2.4'
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.7'
testCompile group: 'org.skyscreamer', name: 'jsonassert', version: '1.3.0'
}
@@ -1,30 +1,39 @@
package pl.allegro.tech.embeddedelasticsearch;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.RandomStringUtils;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.google.common.base.Throwables.propagate;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;

class ElasticDownloadUrlUtils {


private static final String ELASTIC_2x_DOWNLOAD_URL = "https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/{VERSION}/elasticsearch-{VERSION}.zip";
private static final String ELASTIC_5x_DOWNLOAD_URL = "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-{VERSION}.zip";

static URL urlFromVersion(String elasticVersion) {
assertVersionValid(elasticVersion);
try {
return new URL("https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/" + elasticVersion + "/elasticsearch-" + elasticVersion + ".zip");
if (is2xVersion(elasticVersion)) {
return new URL(StringUtils.replace(ELASTIC_2x_DOWNLOAD_URL, "{VERSION}", elasticVersion));
}
if (is5xVersion(elasticVersion)) {
return new URL(StringUtils.replace(ELASTIC_5x_DOWNLOAD_URL, "{VERSION}", elasticVersion));
}
throw new IllegalArgumentException("Invalid version: " + elasticVersion);
} catch (MalformedURLException e) {
throw propagate(e);
throw new RuntimeException(e);
}
}

private static void assertVersionValid(String elasticVersion) {
if (!Pattern.matches("[a-zA-Z0-9\\-_\\.]+", elasticVersion)) {
throw new IllegalArgumentException("Invalid version: " + elasticVersion);
}
private static boolean is2xVersion(String elasticVersion) {
return elasticVersion.startsWith("2.");
}

private static boolean is5xVersion(String elasticVersion) {
return elasticVersion.startsWith("5.");
}

static String versionFromUrl(URL url) {
Expand Down
@@ -0,0 +1,157 @@
package pl.allegro.tech.embeddedelasticsearch;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static org.apache.http.entity.ContentType.APPLICATION_JSON;
import static pl.allegro.tech.embeddedelasticsearch.HttpStatusCodes.OK;

class ElasticRestClient {

private static final Logger logger = LoggerFactory.getLogger(ElasticRestClient.class);

private int elasticsearchHttpPort;
private final HttpClient httpClient;
private final IndicesDescription indicesDescription;

ElasticRestClient(int elasticsearchHttpPort, HttpClient httpClient, IndicesDescription indicesDescription) {
this.elasticsearchHttpPort = elasticsearchHttpPort;
this.httpClient = httpClient;
this.indicesDescription = indicesDescription;
}

void createIndices() {
indicesDescription.getIndicesNames().forEach(this::createIndex);
}

void createIndex(String indexName) {
if (!indexExists(indexName)) {
HttpPut request = new HttpPut(url("/" + indexName));
request.setEntity(new StringEntity(indicesDescription.getIndexSettings(indexName).toJson().toString(), APPLICATION_JSON));
CloseableHttpResponse response = httpClient.execute(request);
if (response.getStatusLine().getStatusCode() != 200) {
String responseBody = readBodySafely(response);
throw new RuntimeException("Call to elasticsearch resulted in error:\n" + responseBody);
}
waitForClusterYellow();
}
}

private boolean indexExists(String indexName) {
HttpHead request = new HttpHead(url("/" + indexName));
CloseableHttpResponse response = httpClient.execute(request);
return response.getStatusLine().getStatusCode() == OK;
}

private void waitForClusterYellow() {
HttpGet request = new HttpGet(url("/_cluster/health?wait_for_status=yellow&timeout=60s"));
CloseableHttpResponse response = httpClient.execute(request);
assertOk(response, "Cluster does not reached yellow status in specified timeout");
}

void deleteIndices() {
indicesDescription.getIndicesNames().forEach(this::deleteIndex);
}

void deleteIndex(String indexName) {
if (indexExists(indexName)) {
HttpDelete request = new HttpDelete(url("/" + indexName));
assertOk(httpClient.execute(request), "Delete request resulted in error");
waitForClusterYellow();
} else {
logger.warn("Index: {} does not exists so cannot be removed", indexName);
}
}

void indexWithIds(String indexName, String indexType, Collection<DocumentWithId> idJsonMap) {
String bulkRequestBody = idJsonMap.stream()
.flatMap(json -> Stream.of(indexMetadataJsonWithId(json.getId()), json.getDocument()))
.map((jsonNodes) -> jsonNodes.replace('\n', ' ').replace('\r', ' '))
.collect(joining("\n")) + "\n";

HttpPost request = new HttpPost(url("/" + indexName + "/" + indexType + "/_bulk"));
request.setEntity(new StringEntity(bulkRequestBody, UTF_8));
CloseableHttpResponse response = httpClient.execute(request);
assertOk(response, "Request finished with error");
refresh();
}

private String indexMetadataJsonWithId(String id) {
return "{ \"index\": { \"_id\": \"" + id + "\"} }";
}

void refresh() {
HttpPost request = new HttpPost(url("/_refresh"));
httpClient.execute(request);
}

private String url(String path) {
return "http://localhost:" + elasticsearchHttpPort + path;
}

private void assertOk(CloseableHttpResponse response, String message) {
if (response.getStatusLine().getStatusCode() != OK) {
throw new IllegalStateException(message + "\nResponse body:\n" + readBodySafely(response));
}
}

private String readBodySafely(CloseableHttpResponse response) {
try {
return IOUtils.toString(response.getEntity().getContent(), UTF_8);
} catch (IOException e) {
logger.error("Error during reading response body", e);
return "";
}
}

List<String> fetchAllDocuments(String... indices) {
if (indices.length == 0) {
return searchForDocuments(Optional.empty()).collect(toList());
} else {
return Stream.of(indices)
.flatMap((index) -> searchForDocuments(Optional.of(index)))
.collect(toList());
}
}

private Stream<String> searchForDocuments(Optional<String> indexMaybe) {
String searchCommand = indexMaybe
.map(index -> "/" + index + "/_search")
.orElse("/_search");
HttpGet request = new HttpGet(url(searchCommand));

CloseableHttpResponse response = httpClient.execute(request);
assertOk(response, "Error during search (" + searchCommand + ")");
String body = readBodySafely(response);

ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode jsonNode = objectMapper.readTree(body);
return StreamSupport.stream(jsonNode.get("hits").get("hits").spliterator(), false)
.map(hitNode -> hitNode.get("_source"))
.map(JsonNode::toString);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit 56be6c1

Please sign in to comment.