Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ElasticSearch data source #22

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ apply plugin: 'com.dictanova.jcasgen'
apply plugin: 'com.github.tkruse.groovysh'
apply plugin: 'groovy'

targetCompatibility = 1.7
sourceCompatibility = 1.7

mainClassName = 'cz.brmlab.yodaqa.YodaQA_Interactive'

ext.dkproVersion = '1.7.0'
Expand Down Expand Up @@ -104,10 +107,9 @@ task(biocrftrain, dependsOn: 'classes', type: JavaExec) {

repositories {
mavenCentral()
maven {
url 'http://zoidberg.ukp.informatik.tu-darmstadt.de/artifactory/repo/'
url 'http://ailao.eu/maven/'
}
maven { url 'http://zoidberg.ukp.informatik.tu-darmstadt.de/artifactory/repo/' }
maven { url 'http://ailao.eu/maven/' }
maven { url 'http://repo.spring.io/libs-release-remote/' }
}

dependencies {
Expand All @@ -127,8 +129,11 @@ dependencies {
exclude group: "com.io7m.xom", module: "xom" // this dependency breaks XMI serialization by using utf16, c.f. UIMA-3818
}
compile "de.tudarmstadt.ukp.dkpro.core:de.tudarmstadt.ukp.dkpro.core.api.parameter-asl:$dkproVersion"
compile 'org.apache.solr:solr-solrj:3.6.0'
compile 'org.apache.solr:solr-core:3.6.0'
// compile 'org.apache.solr:solr-solrj:3.6.0'
// compile 'org.apache.solr:solr-core:3.6.0'
compile 'org.apache.solr:solr-solrj:4.3.0'
compile 'org.apache.solr:solr-core:4.3.0'
compile 'org.elasticsearch:elasticsearch:1.5.0'
compile 'org.apache.httpcomponents:httpmime:4.2.1'
compile 'org.apache.httpcomponents:httpclient:4.2.1'
compile 'org.slf4j:slf4j-api:1.7.7'
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/cz/brmlab/yodaqa/pipeline/YodaQA.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cz.brmlab.yodaqa.pipeline;

import cz.brmlab.yodaqa.pipeline.esdoc.EsDocAnswerProducer;
import org.apache.uima.analysis_engine.AnalysisEngineDescription;
import org.apache.uima.cas.CAS;
import org.apache.uima.fit.factory.AggregateBuilder;
Expand Down Expand Up @@ -55,6 +56,9 @@ public class YodaQA /* XXX: extends AggregateBuilder ? */ {

//SolrNamedSource.register("guten", "data/guten", null);
SolrNamedSource.register("enwiki", "collection1", "http://enwiki.ailao.eu:8983/solr/");

// TODO EsNamedSource ?

} catch (Exception e) {
e.printStackTrace();
System.err.println("*** Exception caught during SolrNamedSource initialization. ***");
Expand Down Expand Up @@ -137,7 +141,7 @@ protected static boolean buildPipeline(AggregateBuilder builder) throws Resource

AnalysisEngineDescription answerCASMerger = AnalysisEngineFactory.createEngineDescription(
AnswerCASMerger.class,
AnswerCASMerger.PARAM_ISLAST_BARRIER, 7,
AnswerCASMerger.PARAM_ISLAST_BARRIER, 8, //7
AnswerCASMerger.PARAM_PHASE, 0,
ParallelEngineFactory.PARAM_NO_MULTIPROCESSING, 1);
builder.add(answerCASMerger);
Expand Down Expand Up @@ -296,8 +300,13 @@ public static AnalysisEngineDescription createAnswerProducerDescription() throws
/* Full-text search: */
/* XXX: These aggregates have "Solr" in name but do not
* necessarily use just Solr, e.g. Bing. */

AnalysisEngineDescription esDoc = EsDocAnswerProducer.createEngineDescription();
builder.add(esDoc);

AnalysisEngineDescription solrFull = SolrFullAnswerProducer.createEngineDescription();
builder.add(solrFull); /* This one is worth 3 isLasts. */

AnalysisEngineDescription solrDoc = SolrDocAnswerProducer.createEngineDescription();
builder.add(solrDoc);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package cz.brmlab.yodaqa.pipeline.esdoc;

import cz.brmlab.yodaqa.provider.OpenNlpNamedEntities;
import de.tudarmstadt.ukp.dkpro.core.languagetool.LanguageToolLemmatizer;
import de.tudarmstadt.ukp.dkpro.core.languagetool.LanguageToolSegmenter;
import de.tudarmstadt.ukp.dkpro.core.stanfordnlp.StanfordParser;
import org.apache.uima.analysis_engine.AnalysisEngineDescription;
import org.apache.uima.cas.CAS;
import org.apache.uima.fit.factory.AggregateBuilder;
import org.apache.uima.fit.factory.AnalysisEngineFactory;
import org.apache.uima.fit.factory.FlowControllerFactory;
import org.apache.uima.flow.impl.FixedFlowController;
import org.apache.uima.resource.ResourceInitializationException;

public class EsDocAnswerProducer {

public static AnalysisEngineDescription createEngineDescription() throws ResourceInitializationException {

AggregateBuilder builder = new AggregateBuilder();
AnalysisEngineDescription primarySearch = AnalysisEngineFactory.createEngineDescription(EsDocPrimarySearch.class);
builder.add(primarySearch);

/* A bunch of DKpro-bound NLP processors (these are
* the giants we stand on the shoulders of) */
/* The mix below corresponds to what we use in
* Passage analysis, we just do minimal answer
* preprocessing expected by AnswerAnalysis. */

/* Tokenize: */
builder.add(AnalysisEngineFactory.createEngineDescription(LanguageToolSegmenter.class),
CAS.NAME_DEFAULT_SOFA, "Answer");

/* POS, constituents, dependencies: */
builder.add(AnalysisEngineFactory.createEngineDescription(
StanfordParser.class,
StanfordParser.PARAM_MAX_TOKENS, 50, // more takes a lot of RAM and is sloow, StanfordParser is O(N^2)
StanfordParser.PARAM_WRITE_POS, true),
CAS.NAME_DEFAULT_SOFA, "Answer");

/* Lemma features: */
builder.add(AnalysisEngineFactory.createEngineDescription(LanguageToolLemmatizer.class),
CAS.NAME_DEFAULT_SOFA, "Answer");

/* Named Entities: */
builder.add(OpenNlpNamedEntities.createEngineDescription(),
CAS.NAME_DEFAULT_SOFA, "Answer");

builder.setFlowControllerDescription(
FlowControllerFactory.createFlowControllerDescription(
FixedFlowController.class,
FixedFlowController.PARAM_ACTION_AFTER_CAS_MULTIPLIER, "drop"));

AnalysisEngineDescription aed = builder.createAggregateDescription();
aed.getAnalysisEngineMetaData().getOperationalProperties().setOutputsNewCASes(true);
aed.getAnalysisEngineMetaData().setName("cz.brmlab.yodaqa.pipeline.esdoc.EsDocAnswerProducer");
return aed;
}
}
246 changes: 246 additions & 0 deletions src/main/java/cz/brmlab/yodaqa/pipeline/esdoc/EsDocPrimarySearch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package cz.brmlab.yodaqa.pipeline.esdoc;

import cz.brmlab.yodaqa.analysis.ansscore.AF;
import cz.brmlab.yodaqa.analysis.ansscore.AnswerFV;
import cz.brmlab.yodaqa.flow.dashboard.AnswerIDGenerator;
import cz.brmlab.yodaqa.flow.dashboard.AnswerSource;
import cz.brmlab.yodaqa.flow.dashboard.AnswerSourceAguAbstract;
import cz.brmlab.yodaqa.flow.dashboard.QuestionDashboard;
import cz.brmlab.yodaqa.flow.dashboard.snippet.AnsweringDocTitle;
import cz.brmlab.yodaqa.flow.dashboard.snippet.SnippetIDGenerator;
import cz.brmlab.yodaqa.model.Question.Clue;
import cz.brmlab.yodaqa.model.Question.CluePhrase;
import cz.brmlab.yodaqa.model.CandidateAnswer.AnswerInfo;
import cz.brmlab.yodaqa.model.CandidateAnswer.AnswerResource;
import cz.brmlab.yodaqa.model.SearchResult.ResultInfo;
import org.apache.uima.UimaContext;
import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
import org.apache.uima.cas.AbstractCas;
import org.apache.uima.fit.component.JCasMultiplier_ImplBase;
import org.apache.uima.fit.descriptor.ConfigurationParameter;
import org.apache.uima.fit.util.FSCollectionFactory;
import org.apache.uima.fit.util.JCasUtil;
import org.apache.uima.jcas.JCas;
import org.apache.uima.jcas.cas.IntegerArray;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.util.CasCopier;
import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.node.Node;
import org.elasticsearch.search.SearchHit;

import java.util.*;

import static org.elasticsearch.node.NodeBuilder.nodeBuilder;

public class EsDocPrimarySearch extends JCasMultiplier_ImplBase {

private Logger logger;

protected JCas questionView;

//protected Node esNode;

protected Client esClient;
protected Iterator<SearchHit> results = Collections.emptyIterator();

@ConfigurationParameter(name = "es.cluster.name", mandatory = false, defaultValue = "elasticsearch_szednik")
protected String esClusterName;

@ConfigurationParameter(name = "es.index", mandatory = false, defaultValue = "zen")
protected String esIndex;

@ConfigurationParameter(name = "es.type", mandatory = false, defaultValue = "abstract")
protected String esType;

@ConfigurationParameter(name = "hitlist-size", mandatory = false, defaultValue = "20")
protected int hitListSize;

@Override
public void initialize(UimaContext context) throws ResourceInitializationException {
super.initialize(context);
logger = context.getLogger();

try {
logger.log(Level.INFO, "connecting to elasticsearch : " + esClusterName);

Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", esClusterName).build();

esClient = new TransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress("localhost", 9300));

//esNode = nodeBuilder().clusterName(esClusterName).client(true).node();
//esClient = esNode.client();

logger.log(Level.INFO, "connected to elasticsearch");

} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage());
throw new ResourceInitializationException(e);
}
}

@Override
public void destroy() {
super.destroy();
if(esClient != null) {
esClient.close();
}
}

@Override
public void process(JCas aJCas) throws AnalysisEngineProcessException {

Collection<Clue> clues = JCasUtil.select(aJCas, Clue.class);
String[] terms = cluesToTerms(clues);

logger.log(Level.INFO, "querying elasticsearch for: "+Arrays.toString(terms));

SearchResponse response = esClient
.prepareSearch(esIndex)
.setTypes(esType)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termsQuery("multi", terms))
.setFrom(0).setSize(hitListSize).setExplain(true)
.execute()
.actionGet();

logger.log(Level.INFO, "elasticsearch response status: "+response.status().name());

results = response.getHits().iterator();
}

@Override
public boolean hasNext() throws AnalysisEngineProcessException {
return results.hasNext();
}

@Override
public AbstractCas next() throws AnalysisEngineProcessException {

logger.log(Level.INFO, "");

if(!results.hasNext()) { return null; }

SearchHit hit = results.next();

JCas jcas = getEmptyJCas();

try {
jcas.createView("Question");
JCas canQuestionView = jcas.getView("Question");
copyQuestion(questionView, canQuestionView);

jcas.createView("Answer");
JCas canAnswerView = jcas.getView("Answer");

if (hit != null) {
documentToAnswer(canAnswerView, hit, questionView);
} else {
emptyAnswer(canAnswerView);
}
} catch (Exception e) {
jcas.release();
throw new AnalysisEngineProcessException(e);
}
return jcas;

}

static protected ResultInfo emptyResultInfo(JCas jcas) {
ResultInfo ri = new ResultInfo(jcas);
ri.setDocumentTitle("");
ri.setOrigin("cz.brmlab.yodaqa.pipeline.esdoc.EsDocPrimarySearch");
return ri;
}

static protected AnswerInfo emptyAnswerInfo(JCas jcas) {
AnswerInfo ai = new AnswerInfo(jcas);
// ai.setIsLast(1);
return ai;
}

protected void emptyAnswer(JCas jcas) {
jcas.setDocumentText("");
jcas.setDocumentLanguage("en");
emptyAnswerInfo(jcas);
emptyResultInfo(jcas);
}

protected void documentToAnswer(JCas jcas, SearchHit doc, JCas questionView) throws AnalysisEngineProcessException {

String id = doc.getId();
String title = doc.field("title").getValue().toString();
title = (title != null) ? title.trim() : "";

String uri = doc.field("uri").getValue().toString();

float score = doc.getScore();

logger.log(Level.FINER, "FOUND: "+ id + " " + title);

jcas.setDocumentText(title.replaceAll("\\s+\\([^)]*\\)\\s*$", ""));
jcas.setDocumentLanguage("en");

AnswerSource ac = new AnswerSourceAguAbstract(AnswerSourceAguAbstract.ORIGIN_DOCUMENT, title, id);
int sourceID = QuestionDashboard.getInstance().get(questionView).storeAnswerSource(ac);

AnsweringDocTitle adt = new AnsweringDocTitle(SnippetIDGenerator.getInstance().generateID(), sourceID);
QuestionDashboard.getInstance().get(questionView).addSnippet(adt);

ResultInfo ri = new ResultInfo(jcas);
ri.setDocumentId(id);
ri.setDocumentTitle(title);
ri.setRelevance(score);
ri.setSource(esClusterName);
ri.setSourceID(sourceID);
ri.setOrigin("cz.brmlab.yodaqa.pipeline.esdoc.EsDocPrimarySearch");
// ri.setIsLast(isLast);
ri.addToIndexes();

AnswerFV fv = new AnswerFV();
fv.setFeature(AF.Occurences, 1.0);
// fv.setFeature(AF.ResultRR, 1 / ((float) index));
fv.setFeature(AF.ResultLogScore, Math.log(1 + ri.getRelevance()));
fv.setFeature(AF.OriginDocTitle, 1.0);

AnswerResource ar = new AnswerResource(jcas);
ar.setIri(uri);
ar.addToIndexes();
ArrayList<AnswerResource> ars = new ArrayList<>();
ars.add(ar);

AnswerInfo ai = new AnswerInfo(jcas);
ai.setFeatures(fv.toFSArray(jcas));
ai.setResources(FSCollectionFactory.createFSArray(jcas, ars));
// ai.setIsLast(1);
ai.setSnippetIDs(new IntegerArray(jcas, 1));
ai.setSnippetIDs(0, adt.getSnippetID());
ai.setAnswerID(AnswerIDGenerator.getInstance().generateID());
ai.addToIndexes();
}

protected static void copyQuestion(JCas src, JCas dest) throws Exception {
CasCopier copier = new CasCopier(src.getCas(), dest.getCas());
copier.copyCasView(src.getCas(), dest.getCas(), true);
}

protected static String[] cluesToTerms(Collection<Clue> clues) {
List<String> terms = new ArrayList<String>(clues.size());
for (Clue clue : clues) {
if (clue instanceof CluePhrase)
continue;
terms.add(clue.getLabel());
}
return terms.toArray(new String[terms.size()]);
}
}