From 840dfe68895fabdc8ff5458b3f114a678d0dd080 Mon Sep 17 00:00:00 2001 From: "U-CEB\\YKozlov" Date: Thu, 20 Oct 2016 11:34:55 +0300 Subject: [PATCH] SOLR-9668 Support cursor paging in SolrEntityProcessor --- .../dataimport/SolrEntityProcessor.java | 140 +++++++++++++++++- .../TestSolrEntityProcessorEndToEnd.java | 31 +++- 2 files changed, 167 insertions(+), 4 deletions(-) mode change 100644 => 100755 solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java mode change 100644 => 100755 solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java old mode 100644 new mode 100755 index 5e62731879e6..ad1545620a55 --- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java +++ b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/SolrEntityProcessor.java @@ -28,6 +28,7 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.CursorMarkParams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +74,8 @@ public class SolrEntityProcessor extends EntityProcessorBase { private String[] fields; private String requestHandler;// 'qt' param private int timeout = TIMEOUT_SECS; - + private String sortBy; + @Override public void destroy() { try { @@ -127,6 +129,7 @@ protected void firstInit(Context context) { } catch (MalformedURLException e) { throw new DataImportHandlerException(DataImportHandlerException.SEVERE, e); } + } @Override @@ -134,12 +137,18 @@ public Map nextRow() { buildIterator(); return getNext(); } - + + private void buildIterator(){ + sortBy = context.getResolvedEntityAttribute(CommonParams.SORT); + if(sortBy != null) buildCursorIterator(); + else buildBasicIterator(); + } + /** * The following method changes the rowIterator mutable field. It requires * external synchronization. */ - private void buildIterator() { + private void buildBasicIterator() { if (rowIterator != null) { SolrDocumentListIterator documentListIterator = (SolrDocumentListIterator) rowIterator; if (!documentListIterator.hasNext() && documentListIterator.hasMoreRows()) { @@ -158,6 +167,26 @@ private void buildIterator() { } } + private void buildCursorIterator() { + if (rowIterator != null) { + SolrDocumentCursorIterator documentListIterator = (SolrDocumentCursorIterator) rowIterator; + if (!documentListIterator.hasNext() && documentListIterator.hasMoreRows()) { + String cursorMark = documentListIterator.getNextCursorMark(); + QueryResponse response = doQuery(cursorMark); + if (response != null) { + rowIterator = new SolrDocumentCursorIterator(cursorMark, response); + } + } + } else { + String cursorMark = "*"; + QueryResponse response = doQuery(cursorMark); + if (response != null) { + rowIterator = new SolrDocumentCursorIterator(cursorMark, response); + } + return; + } + } + protected SolrDocumentList doQuery(int start) { this.queryString = context.getResolvedEntityAttribute(QUERY); if (this.queryString == null) { @@ -213,6 +242,64 @@ protected SolrDocumentList doQuery(int start) { return response == null ? null : response.getResults(); } + protected QueryResponse doQuery(String cursorMark) { + this.queryString = context.getResolvedEntityAttribute(QUERY); + if (this.queryString == null) { + throw new DataImportHandlerException( + DataImportHandlerException.SEVERE, + "SolrEntityProcessor: parameter 'query' is required" + ); + } + + String rowsP = context.getResolvedEntityAttribute(CommonParams.ROWS); + if (rowsP != null) { + rows = Integer.parseInt(rowsP); + } + + String fqAsString = context.getResolvedEntityAttribute(CommonParams.FQ); + if (fqAsString != null) { + this.filterQueries = fqAsString.split(","); + } + + String fieldsAsString = context.getResolvedEntityAttribute(CommonParams.FL); + if (fieldsAsString != null) { + this.fields = fieldsAsString.split(","); + } + this.requestHandler = context.getResolvedEntityAttribute(CommonParams.QT); + String timeoutAsString = context.getResolvedEntityAttribute(TIMEOUT); + if (timeoutAsString != null) { + this.timeout = Integer.parseInt(timeoutAsString); + } + + SolrQuery solrQuery = new SolrQuery(queryString); + solrQuery.setRows(rows); + solrQuery.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark); + if (fields != null) { + for (String field : fields) { + solrQuery.addField(field); + } + } + solrQuery.setRequestHandler(requestHandler); + solrQuery.setFilterQueries(filterQueries); + solrQuery.set(CommonParams.SORT, sortBy); + + QueryResponse response = null; + try { + response = solrClient.query(solrQuery); + } catch (SolrServerException | IOException e) { + if (ABORT.equals(onError)) { + wrapAndThrow(SEVERE, e); + } else if (SKIP.equals(onError)) { + wrapAndThrow(DataImportHandlerException.SKIP_ROW, e); + } + } + + return response; + } + + /** + * iterate using basic paging + */ private static class SolrDocumentListIterator implements Iterator> { private final int start; @@ -266,4 +353,51 @@ public void remove() { } } + /** + * iterate using cursor paging + */ + private static class SolrDocumentCursorIterator implements Iterator> { + + private final String currentCursorMark; + private final String nextCursorMark; + private final Iterator solrDocumentIterator; + + public SolrDocumentCursorIterator(String currentCursorMark, QueryResponse response) { + this.currentCursorMark = currentCursorMark; + this.solrDocumentIterator = response.getResults().iterator(); + this.nextCursorMark = response.getNextCursorMark(); + } + + @Override + public boolean hasNext() { + return solrDocumentIterator.hasNext(); + } + + @Override + public Map next() { + SolrDocument solrDocument = solrDocumentIterator.next(); + + HashMap map = new HashMap<>(); + Collection fields = solrDocument.getFieldNames(); + for (String field : fields) { + Object fieldValue = solrDocument.getFieldValue(field); + map.put(field, fieldValue); + } + return map; + } + + public boolean hasMoreRows() { + return !currentCursorMark.equals(nextCursorMark); + } + + public String getNextCursorMark(){ + return nextCursorMark; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + } diff --git a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java old mode 100644 new mode 100755 index 8ef94c02c76a..2d956fce64f0 --- a/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java +++ b/solr/contrib/dataimporthandler/src/test/org/apache/solr/handler/dataimport/TestSolrEntityProcessorEndToEnd.java @@ -260,7 +260,36 @@ public void testFullImportBadConfig() { assertQ(req("*:*"), "//result[@numFound='0']"); } - + + public void testFullImportCursorPaging() { + assertQ(req("*:*"), "//result[@numFound='0']"); + + try { + addDocumentsToSolr(generateSolrDocuments(7)); + runFullImport(generateDIHConfig("query='*:*' sort='id asc' rows='2'", false)); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + fail(e.getMessage()); + } + + assertQ(req("*:*"), "//result[@numFound='7']"); + assertQ(req("id:3"), "//result[@numFound='1']", "//result/doc/arr[@name='desc'][.='Description3']"); + } + + public void testFullImportCursorPagingInvalidSort() { + assertQ(req("*:*"), "//result[@numFound='0']"); + try { + addDocumentsToSolr(generateSolrDocuments(7)); + // will throw 'Cursor functionality requires a sort containing a uniqueKey field tie breaker' + runFullImport(generateDIHConfig("query='*:*' sort='date asc' rows='2'", false)); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + fail(e.getMessage()); + } + + assertQ(req("*:*"), "//result[@numFound='0']"); + } + private static List> generateSolrDocuments(int num) { List> docList = new ArrayList<>(); for (int i = 1; i <= num; i++) {