Skip to content

Commit

Permalink
first pass at chunking json using solrj
Browse files Browse the repository at this point in the history
  • Loading branch information
kshefchek committed Jun 6, 2017
1 parent e50e45f commit b9cd24d
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 5 deletions.
20 changes: 20 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,26 @@
<artifactId>fluent-hc</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>6.2.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.8.7</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.7</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.8.7</version>
</dependency>
</dependencies>

<build>
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/monarch/golr/GolrLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ private long serializeGolrQuery(GolrCypherQuery query, Result result, Writer wri
com.tinkerpop.blueprints.Graph evidenceGraph =
EvidenceGraphInfo.toGraph(pairGraph.graphBytes);
processor.addAssociations(evidenceGraph);
/*serializer.serialize(EVIDENCE_GRAPH,
processor.getEvidenceGraph(evidenceGraph, metaSourceQuery));*/
serializer.serialize(EVIDENCE_GRAPH,
processor.getEvidenceGraph(evidenceGraph, metaSourceQuery));

// TODO: Hackish to remove evidence but the resulting JSON is blooming out of control
// Don't emit evidence for ontology sources
Expand Down
73 changes: 70 additions & 3 deletions src/main/java/org/monarch/golr/GolrWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,29 @@
import java.io.File;
import java.io.FileWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.net.ssl.SSLContext;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.core.JsonFactory;


import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.apache.commons.io.FilenameUtils;
import org.apache.http.client.fluent.Executor;
import org.apache.http.client.fluent.Request;
Expand All @@ -32,6 +48,7 @@ public class GolrWorker implements Callable<Boolean> {
String solrJsonUrlSuffix;
Object solrLock;
boolean deleteJson;
final static int BATCH_SIZE = 100000;

public GolrWorker(Optional<String> solrServer, File outputFile, GolrLoader loader,
GolrCypherQuery query, String solrJsonUrlSuffix, Object solrLock, boolean deleteJson) {
Expand All @@ -44,6 +61,22 @@ public GolrWorker(Optional<String> solrServer, File outputFile, GolrLoader loade
this.deleteJson = deleteJson;
Thread.currentThread().setName("Golr processor - " + outputFile.getName());
}

public static SolrInputDocument mapNodeToSolrDocument(ObjectNode node) {
SolrInputDocument doc = new SolrInputDocument();
Iterator<Entry<String, JsonNode>> fieldIterator = node.fields();
while(fieldIterator.hasNext()) {
Map.Entry<String, JsonNode> entry = fieldIterator.next();
if (entry.getValue().isArray()) {
doc.addField(entry.getKey(),
new ObjectMapper().convertValue(
entry.getValue(), ArrayList.class));
} else {
doc.addField(entry.getKey(), entry.getValue().asText());
}
}
return doc;
}

@Override
public Boolean call() throws Exception {
Expand All @@ -57,6 +90,37 @@ public Boolean call() throws Exception {
synchronized (solrLock) {
logger.info("Posting JSON " + outputFile.getName() + " to " + solrServer.get());
try {

SolrClient solr = new HttpSolrClient.Builder(solrServer.get()).build();

ObjectMapper mapper = new ObjectMapper();
JsonFactory factory = new JsonFactory();
JsonParser parser = mapper.getFactory().createParser(outputFile);
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
int documentCount = 0;
if(parser.nextToken() != JsonToken.START_ARRAY) {
throw new IllegalArgumentException("Expected an array");
}

while (parser.nextToken() == JsonToken.START_OBJECT) {
if (documentCount == BATCH_SIZE) {
solr.add(docs);
solr.commit();
documentCount = 0;
docs.clear();
}
ObjectNode jsonDoc = mapper.readTree(parser);
SolrInputDocument doc = mapNodeToSolrDocument(jsonDoc);
docs.add(doc);
documentCount++;
}

if (docs.size() > 0) {
solr.add(docs);
solr.commit();
}
solr.close();/*
// ignore ssl certs because letsencrypt is not supported by Oracle yet
SSLContext sslContext =
new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
Expand All @@ -73,11 +137,14 @@ public boolean isTrusted(java.security.cert.X509Certificate[] arg0, String arg1)
Request request = Request.Post(new URI(
solrServer.get() + (solrServer.get().endsWith("/") ? "" : "/") + solrJsonUrlSuffix))
.bodyFile(outputFile, ContentType.APPLICATION_JSON);

Executor executor = Executor.newInstance(httpClient);
String result = executor.execute(request).returnContent().asString();

logger.info(result);
logger.info(result);*/

if (deleteJson) {
logger.info("Deleting JSON " + outputFile.getName());
outputFile.delete();
Expand Down

0 comments on commit b9cd24d

Please sign in to comment.