Skip to content

Commit

Permalink
Switch from embedded index to Elasticsearch server
Browse files Browse the repository at this point in the history
Update client libs to 5.6.3

See #67
  • Loading branch information
fsteeg committed Apr 27, 2018
1 parent ed31c46 commit cfc9031
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 48 deletions.
10 changes: 9 additions & 1 deletion .travis.yml
@@ -1,4 +1,12 @@
language: scala
scala: 2.12.2
jdk: oraclejdk8

dist: trusty
env:
- ES_VERSION=5.6.3 ES_DOWNLOAD_URL=https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ES_VERSION}.tar.gz
install:
- wget ${ES_DOWNLOAD_URL}
- tar -xzf elasticsearch-${ES_VERSION}.tar.gz
- ./elasticsearch-${ES_VERSION}/bin/elasticsearch &
script:
- wget -q --waitretry=1 --retry-connrefused -T 10 -O - http://127.0.0.1:9200
14 changes: 7 additions & 7 deletions app/controllers/HomeController.java
Expand Up @@ -100,10 +100,10 @@ public Result index() {
}

/**
* An action that renders an HTML page with a welcome message. The
* configuration in the <code>routes</code> file means that this method will
* be called when the application receives a <code>GET</code> request with a
* path of <code>/</code>.
* An action that renders an HTML page with a welcome message. The configuration
* in the <code>routes</code> file means that this method will be called when
* the application receives a <code>GET</code> request with a path of
* <code>/</code>.
*/
public Result api() {
String format = "json";
Expand Down Expand Up @@ -156,7 +156,7 @@ public Result authority(String id, String format) {
private List<String> creatorOf(String id) {
String q = String.format("firstAuthor:\"%s\" OR firstComposer:\"%s\"", id, id);
SearchResponse response = index.query(q, "", 0, 1000);
Stream<String> ids = Arrays.asList(response.getHits().hits()).stream()
Stream<String> ids = Arrays.asList(response.getHits().getHits()).stream()
.map(hit -> AuthorityResource.DNB_PREFIX + hit.getId());
return ids.collect(Collectors.toList());
}
Expand Down Expand Up @@ -252,7 +252,7 @@ public static String currentUri() {
}

private static String returnAsJson(String q, SearchResponse queryResponse) {
List<Map<String, Object>> hits = Arrays.asList(queryResponse.getHits().hits()).stream()
List<Map<String, Object>> hits = Arrays.asList(queryResponse.getHits().getHits()).stream()
.map(hit -> hit.getSource()).collect(Collectors.toList());
ObjectNode object = Json.newObject();
object.put("@context", "http://" + request().host() + routes.HomeController.context());
Expand All @@ -264,7 +264,7 @@ private static String returnAsJson(String q, SearchResponse queryResponse) {
for (String a : AGGREGATIONS) {
Aggregation aggregation = queryResponse.getAggregations().get(a);
Terms terms = (Terms) aggregation;
Stream<Bucket> stream = terms.getBuckets().stream()
Stream<? extends Bucket> stream = terms.getBuckets().stream()
.filter(b -> !b.getKeyAsString().equals("AuthorityResource"));
Stream<Map<String, Object>> buckets = stream.map((Bucket b) -> ImmutableMap.of(//
"key", b.getKeyAsString(), "doc_count", b.getDocCount()));
Expand Down
2 changes: 1 addition & 1 deletion app/modules/AuthoritiesModule.java
Expand Up @@ -8,6 +8,6 @@

public class AuthoritiesModule extends Module {
public Seq<Binding<?>> bindings(Environment environment, Configuration configuration) {
return seq(bind(IndexComponent.class).to(EmbeddedIndex.class));
return seq(bind(IndexComponent.class).to(ElasticsearchServer.class));
}
}
66 changes: 32 additions & 34 deletions app/modules/IndexComponent.java
@@ -1,25 +1,24 @@
package modules;

import static controllers.HomeController.CONFIG;
import static controllers.HomeController.config;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.Normalizer;
import java.text.Normalizer.Form;
import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;

import javax.inject.Inject;

import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
Expand All @@ -28,14 +27,17 @@
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.BoostingQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -54,30 +56,25 @@ public default SearchResponse query(String q) {
}
}

class EmbeddedIndex implements IndexComponent {
class ElasticsearchServer implements IndexComponent {

private static final String INDEX_TYPE = "authority";
private static final Settings SETTINGS = Settings.builder()
.put("cluster.name", HomeController.config("index.cluster")).build();

private static class ConfigurableNode extends Node {
public ConfigurableNode(Settings settings, Collection<Class<? extends Plugin>> classpathPlugins) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), Version.CURRENT, classpathPlugins);
}
}

private Settings clientSettings = Settings.settingsBuilder().put("path.home", config("index.home"))
.put("http.port", config("index.port.http")).put("transport.tcp.port", config("index.port.tcp"))
.put("script.default_lang", "native").build();

private Node node = new ConfigurableNode(nodeBuilder().settings(clientSettings).local(true).getSettings().build(),
Arrays.asList(/* BundlePlugin.class */)).start();

public final Client client = node.client();
private final TransportClient client;

@Inject
public EmbeddedIndex(ApplicationLifecycle lifecycle) {
public ElasticsearchServer(ApplicationLifecycle lifecycle) {
client = new PreBuiltTransportClient(SETTINGS);
CONFIG.getStringList("index.hosts").forEach((host) -> {
try {
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), 9300));
} catch (UnknownHostException e) {
e.printStackTrace();
}
});
startup();
lifecycle.addStopHook(() -> {
node.close();
client.close();
return null;
});
Expand All @@ -101,8 +98,8 @@ private void startup() {
indexData(client, pathToJson, indexName);
}
} else {
Logger.info("Index exists. Delete the '" + config("index.home") + "/data' directory to reindex from "
+ pathToJson);
Logger.info("Index {} exists. Delete index or change index name in config to reindex from {}",
indexName, pathToJson);
}
if (new File(pathToUpdates).exists()) {
Logger.info("Indexing updates from " + pathToUpdates);
Expand All @@ -111,7 +108,7 @@ private void startup() {
} catch (IOException e) {
e.printStackTrace();
}
Logger.info("Using Elasticsearch index settings: {}", clientSettings.getAsMap());
Logger.info("Using Elasticsearch index settings: {}", SETTINGS.getAsMap());
}

private static void deleteIndex(final Client client, final String index) {
Expand All @@ -131,7 +128,7 @@ static void createEmptyIndex(final Client aClient, final String aIndexName, fina
CreateIndexRequestBuilder cirb = aClient.admin().indices().prepareCreate(aIndexName);
if (aPathToIndexSettings != null) {
String settingsMappings = Files.lines(Paths.get(aPathToIndexSettings)).collect(Collectors.joining());
cirb.setSource(settingsMappings);
cirb.setSource(settingsMappings, XContentType.JSON);
}
cirb.execute().actionGet();//
aClient.admin().indices().refresh(new RefreshRequest()).actionGet();
Expand Down Expand Up @@ -169,7 +166,8 @@ private static void readData(final BufferedReader br, final Client client, final
} else {
Form nfc = Normalizer.Form.NFC;
data = Normalizer.isNormalized(line, nfc) ? line : Normalizer.normalize(line, nfc);
bulkRequest.add(client.prepareIndex(aIndex, INDEX_TYPE, id).setSource(data));
bulkRequest
.add(client.prepareIndex(aIndex, config("index.type"), id).setSource(data, XContentType.JSON));
}
currentLine++;
if (pendingIndexRequests == 1000) {
Expand Down Expand Up @@ -198,10 +196,10 @@ private static void executeBulk(int pendingIndexRequests) {

@Override
public SearchResponse query(String q, String filter, int from, int size) {
BoostingQueryBuilder boostQuery = QueryBuilders.boostingQuery().negativeBoost(0.1f)
.negative(QueryBuilders.matchQuery("type", "UndifferentiatedPerson"))
.positive(QueryBuilders.queryStringQuery(q).field("_all").field("preferredName", 0.5f)
.field("variantName", 0.1f).field("gndIdentifier", 0.5f));
QueryStringQueryBuilder positive = QueryBuilders.queryStringQuery(q).field("_all").field("preferredName", 2f)
.field("variantName", 1f).field("gndIdentifier", 2f);
MatchQueryBuilder negative = QueryBuilders.matchQuery("type", "UndifferentiatedPerson");
BoostingQueryBuilder boostQuery = QueryBuilders.boostingQuery(positive, negative).negativeBoost(0.1f);
BoolQueryBuilder query = QueryBuilders.boolQuery().must(boostQuery);
if (!filter.isEmpty()) {
query = query.filter(QueryBuilders.queryStringQuery(filter));
Expand Down
4 changes: 3 additions & 1 deletion build.sbt
Expand Up @@ -20,7 +20,9 @@ libraryDependencies += "org.apache.jena" % "apache-jena-libs" % "3.7.0"

libraryDependencies += "org.culturegraph" % "metafacture-core" % "4.0.0"

libraryDependencies += "org.elasticsearch" % "elasticsearch" % "2.4.5"
libraryDependencies += "org.elasticsearch" % "elasticsearch" % "5.6.3"

libraryDependencies += "org.elasticsearch.client" % "transport" % "5.6.3"

libraryDependencies += "org.dspace" % "oclc-harvester2" % "0.1.12"

Expand Down
7 changes: 3 additions & 4 deletions conf/application.conf
Expand Up @@ -23,11 +23,10 @@ entityfacts {
}

index {
home: "."
name: "authorities"
hosts: ["localhost"] # ["weywot3.hbz-nrw.de", "weywot4.hbz-nrw.de", "weywot5.hbz-nrw.de"]
cluster: "elasticsearch" # "weywot"
name: "gnd-20180426-1400"
type: "authority"
port.http: "7111"
port.tcp: "7122"
settings: "conf/index-settings.json"
content: "application/json; charset=utf-8"
}
Expand Down

0 comments on commit cfc9031

Please sign in to comment.