From 8a5f7e54edc5640655edd19f15d22fada6ca9900 Mon Sep 17 00:00:00 2001 From: JohannesDaniel Date: Thu, 5 Oct 2017 22:57:53 +0200 Subject: [PATCH 1/3] NIFI-3248: Improvement of GetSolr Processor --- .../nifi-solr-processors/pom.xml | 19 + .../apache/nifi/processors/solr/GetSolr.java | 370 +++++++++++------- .../nifi/processors/solr/SolrProcessor.java | 2 +- .../nifi/processors/solr/TestGetSolr.java | 256 ++++++------ .../solr/TestPutSolrContentStream.java | 6 +- .../solr/testCollection/conf/schema.xml | 10 + .../src/test/resources/test-schema.avsc | 25 ++ .../testdata/test-csv-multiple-docs.csv | 4 +- .../testdata/test-custom-json-single-doc.json | 2 + .../test-solr-json-multiple-docs.json | 2 + .../testdata/test-xml-multiple-docs.xml | 2 + 11 files changed, 435 insertions(+), 263 deletions(-) create mode 100644 nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/test-schema.avsc diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml index 1068a57dd32f..f35927ef4d16 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/pom.xml @@ -26,6 +26,18 @@ jar + + org.apache.nifi + nifi-record-path + + + org.apache.nifi + nifi-record-serialization-service-api + + + org.apache.nifi + nifi-schema-registry-service-api + org.apache.solr solr-solrj @@ -61,6 +73,12 @@ provided + + org.apache.nifi + nifi-record-serialization-services + 1.5.0-SNAPSHOT + test + org.apache.nifi nifi-mock @@ -136,6 +154,7 @@ src/test/resources/testdata/test-xml-multiple-docs.xml src/test/resources/log4j.properties src/test/resources/jaas-client.conf + src/test/resources/test-schema.avsc diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java index c7711352de55..ec6471da5987 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java @@ -18,35 +18,37 @@ */ package org.apache.nifi.processors.solr; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; + import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; -import java.util.Date; +import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; -import java.util.Properties; +import java.util.Map; import java.util.Set; import java.util.TimeZone; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnRemoved; -import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; @@ -57,7 +59,18 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.StopWatch; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.ListRecordSet; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.StringUtils; + import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.request.QueryRequest; @@ -66,42 +79,64 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.CursorMarkParams; -@Tags({"Apache", "Solr", "Get", "Pull"}) +@Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Queries Solr and outputs the results as a FlowFile") +@CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") +@Stateful(scopes = {Scope.LOCAL}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") public class GetSolr extends SolrProcessor { - public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor - .Builder().name("Solr Query") - .description("A query to execute against Solr") + public static final String STATE_MANAGER_FILTER = "stateManager_filter"; + public static final String STATE_MANAGER_CURSOR_MARK = "stateManager_cursorMark"; + public static final AllowableValue MODE_XML = new AllowableValue("XML"); + public static final AllowableValue MODE_REC = new AllowableValue("Records"); + + public static final PropertyDescriptor RETURN_TYPE = new PropertyDescriptor + .Builder().name("Return Type") + .displayName("Return Type") + .description("Write Solr documents to FlowFiles as XML or using a Record Writer") .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(MODE_XML, MODE_REC) + .defaultValue(MODE_REC.getValue()) .build(); - public static final PropertyDescriptor RETURN_FIELDS = new PropertyDescriptor - .Builder().name("Return Fields") - .description("Comma-separated list of fields names to return") + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor + .Builder().name("Record Writer") + .displayName("Record Writer") + .description("The Record Writer to use in order to write Solr documents to FlowFiles. Must be set if \"Records\" is used as return type.") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(false) .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - public static final PropertyDescriptor SORT_CLAUSE = new PropertyDescriptor - .Builder().name("Sort Clause") - .description("A Solr sort clause, ex: field1 asc, field2 desc") + public static final PropertyDescriptor SOLR_QUERY = new PropertyDescriptor + .Builder().name("Solr Query") + .displayName("Solr Query") + .description("A query to execute against Solr") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); public static final PropertyDescriptor DATE_FIELD = new PropertyDescriptor .Builder().name("Date Field") + .displayName("Date Field") .description("The name of a date field in Solr used to filter results") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor RETURN_FIELDS = new PropertyDescriptor + .Builder().name("Return Fields") + .displayName("Return Fields") + .description("Comma-separated list of field names to return") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor .Builder().name("Batch Size") + .displayName("Batch Size") .description("Number of rows per Solr query") .required(true) .addValidator(StandardValidators.INTEGER_VALIDATOR) @@ -113,22 +148,17 @@ public class GetSolr extends SolrProcessor { .description("The results of querying Solr") .build(); - static final String FILE_PREFIX = "conf/.getSolr-"; - static final String LAST_END_DATE = "LastEndDate"; - static final String LAST_END_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; - static final String UNINITIALIZED_LAST_END_DATE_VALUE; + private final AtomicBoolean clearState = new AtomicBoolean(false); + private final AtomicBoolean dateFieldNotInSpecifiedFieldsList = new AtomicBoolean(false); + private volatile String id_field = null; + private static final SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); static { - SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - UNINITIALIZED_LAST_END_DATE_VALUE = sdf.format(new Date(1L)); + df.setTimeZone(TimeZone.getTimeZone("GMT")); } - final AtomicReference lastEndDatedRef = new AtomicReference<>(UNINITIALIZED_LAST_END_DATE_VALUE); - private Set relationships; private List descriptors; - private final Lock fileLock = new ReentrantLock(); @Override protected void init(final ProcessorInitializationContext context) { @@ -138,10 +168,11 @@ protected void init(final ProcessorInitializationContext context) { descriptors.add(SOLR_TYPE); descriptors.add(SOLR_LOCATION); descriptors.add(COLLECTION); + descriptors.add(RETURN_TYPE); + descriptors.add(RECORD_WRITER); descriptors.add(SOLR_QUERY); - descriptors.add(RETURN_FIELDS); - descriptors.add(SORT_CLAUSE); descriptors.add(DATE_FIELD); + descriptors.add(RETURN_FIELDS); descriptors.add(BATCH_SIZE); descriptors.add(JAAS_CLIENT_APP_NAME); descriptors.add(BASIC_USERNAME); @@ -172,157 +203,196 @@ public List getSupportedPropertyDescriptors() { @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { - lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE); + clearState.set(true); } - @OnStopped - public void onStopped() { - writeLastEndDate(); - } + @OnScheduled + public void onScheduled2(final ProcessContext context) throws IOException { + if (clearState.getAndSet(false)) { + context.getStateManager().clear(Scope.LOCAL); + final Map newStateMap = new HashMap(); - @OnRemoved - public void onRemoved() { - final File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); - if (lastEndDateCache.exists()) { - lastEndDateCache.delete(); - } - } + newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*"); + newStateMap.put(STATE_MANAGER_FILTER, "*"); - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - final ComponentLog logger = getLogger(); - readLastEndDate(); - - final SimpleDateFormat sdf = new SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US); - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - final String currDate = sdf.format(new Date()); - - final boolean initialized = !UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get()); - - final String query = context.getProperty(SOLR_QUERY).getValue(); - final SolrQuery solrQuery = new SolrQuery(query); - solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); - - // if initialized then apply a filter to restrict results from the last end time til now - if (initialized) { - StringBuilder filterQuery = new StringBuilder(); - filterQuery.append(context.getProperty(DATE_FIELD).getValue()) - .append(":{").append(lastEndDatedRef.get()).append(" TO ") - .append(currDate).append("]"); - solrQuery.addFilterQuery(filterQuery.toString()); - logger.info("Applying filter query {}", new Object[]{filterQuery.toString()}); - } + context.getStateManager().setState(newStateMap, Scope.LOCAL); - final String returnFields = context.getProperty(RETURN_FIELDS).getValue(); - if (returnFields != null && !returnFields.trim().isEmpty()) { - for (String returnField : returnFields.trim().split("[,]")) { - solrQuery.addField(returnField.trim()); - } + id_field = null; } + } - final String fullSortClause = context.getProperty(SORT_CLAUSE).getValue(); - if (fullSortClause != null && !fullSortClause.trim().isEmpty()) { - for (String sortClause : fullSortClause.split("[,]")) { - String[] sortParts = sortClause.trim().split("[ ]"); - solrQuery.addSort(sortParts[0], SolrQuery.ORDER.valueOf(sortParts[1])); - } + @Override + protected final Collection customValidate(ValidationContext context) { + final Collection problems = new ArrayList<>(); + + if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue()) + && !context.getProperty(RECORD_WRITER).isSet()) { + problems.add(new ValidationResult.Builder() + .explanation("for parsing records a record writer has to be configured") + .valid(false) + .subject("Record writer check") + .build()); } + problems.addAll(super.customValidate(context)); + return problems; + } + private String getFieldNameOfUniqueKey() { + final SolrQuery solrQuery = new SolrQuery(); try { + solrQuery.setRequestHandler("/schema/uniquekey"); final QueryRequest req = new QueryRequest(solrQuery); if (isBasicAuthEnabled()) { req.setBasicAuthCredentials(getUsername(), getPassword()); } - // run the initial query and send out the first page of results - final StopWatch stopWatch = new StopWatch(true); - QueryResponse response = req.process(getSolrClient()); - stopWatch.stop(); - - long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS); + return(req.process(getSolrClient()).getResponse().get("uniqueKey").toString()); + } catch (SolrServerException | IOException e) { + getLogger().error("Solr query to retrieve uniqueKey-field failed due to {}", new Object[]{solrQuery.toString(), e}, e); + throw new ProcessException(e); + } + } - final SolrDocumentList documentList = response.getResults(); - logger.info("Retrieved {} results from Solr for {} in {} ms", - new Object[] {documentList.getNumFound(), query, duration}); + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - if (documentList != null && documentList.getNumFound() > 0) { - FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response)); - flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml"); - session.transfer(flowFile, REL_SUCCESS); + final ComponentLog logger = getLogger(); + final AtomicBoolean continuePaging = new AtomicBoolean(true); + final SolrQuery solrQuery = new SolrQuery(); - StringBuilder transitUri = new StringBuilder("solr://"); - transitUri.append(getSolrLocation()); - if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) { - transitUri.append("/").append(context.getProperty(COLLECTION).getValue()); - } + try { + if (id_field == null) { + id_field = getFieldNameOfUniqueKey(); + } - session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration); + final String dateField = context.getProperty(DATE_FIELD).getValue(); - // if initialized then page through the results and send out each page - if (initialized) { - int endRow = response.getResults().size(); - long totalResults = response.getResults().getNumFound(); + solrQuery.setQuery("*:*"); + final String query = context.getProperty(SOLR_QUERY).getValue(); + if (!StringUtils.isBlank(query) && !query.equals("*:*")) { + solrQuery.addFilterQuery(query); + } + final StringBuilder automatedFilterQuery = (new StringBuilder()) + .append(dateField) + .append(":[") + .append(context.getStateManager().getState(Scope.LOCAL).get(STATE_MANAGER_FILTER)) + .append(" TO *]"); + solrQuery.addFilterQuery(automatedFilterQuery.toString()); + + final List fieldList = new ArrayList(); + final String returnFields = context.getProperty(RETURN_FIELDS).getValue(); + if (!StringUtils.isBlank(returnFields)) { + fieldList.addAll(Arrays.asList(returnFields.trim().split("[,]"))); + if (!fieldList.contains(dateField)) { + fieldList.add(dateField); + dateFieldNotInSpecifiedFieldsList.set(true); + } + for (String returnField : fieldList) { + solrQuery.addField(returnField.trim()); + } + } - while (endRow < totalResults) { - solrQuery.setStart(endRow); + solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, context.getStateManager().getState(Scope.LOCAL).get(STATE_MANAGER_CURSOR_MARK)); + solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); - stopWatch.start(); - response = getSolrClient().query(solrQuery); - stopWatch.stop(); + final StringBuilder sortClause = (new StringBuilder()) + .append(dateField) + .append(" asc,") + .append(id_field) + .append(" asc"); + solrQuery.setParam("sort", sortClause.toString()); - duration = stopWatch.getDuration(TimeUnit.MILLISECONDS); - logger.info("Retrieved results for {} in {} ms", new Object[]{query, duration}); + while (continuePaging.get()) { + final QueryRequest req = new QueryRequest(solrQuery); + if (isBasicAuthEnabled()) { + req.setBasicAuthCredentials(getUsername(), getPassword()); + } - flowFile = session.create(); + logger.debug(solrQuery.toQueryString()); + final QueryResponse response = req.process(getSolrClient()); + final SolrDocumentList documentList = response.getResults(); + + if (response.getResults().size() > 0) { + final SolrDocument lastSolrDocument = documentList.get(response.getResults().size()-1); + final String latestDateValue = df.format(lastSolrDocument.get(dateField)); + + solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, response.getNextCursorMark()); + final Map updateStateManager = new HashMap(); + updateStateManager.putAll(context.getStateManager().getState(Scope.LOCAL).toMap()); + updateStateManager.put(STATE_MANAGER_CURSOR_MARK, response.getNextCursorMark()); + updateStateManager.put(STATE_MANAGER_FILTER, latestDateValue); + context.getStateManager().setState(updateStateManager, Scope.LOCAL); + + FlowFile flowFile = session.create(); + flowFile = session.putAttribute(flowFile, "solrQuery", solrQuery.toString()); + + if (context.getProperty(RETURN_TYPE).getValue().equals(MODE_XML.getValue())){ + if (dateFieldNotInSpecifiedFieldsList.get()) { + for (SolrDocument doc : response.getResults()) { + doc.removeFields(dateField); + } + } flowFile = session.write(flowFile, new QueryResponseOutputStreamCallback(response)); - session.transfer(flowFile, REL_SUCCESS); - session.getProvenanceReporter().receive(flowFile, transitUri.toString(), duration); - endRow += response.getResults().size(); + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/xml"); + + } else { + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordSchema schema = writerFactory.getSchema(null, null); + final RecordSet recordSet = solrDocumentsToRecordSet(response.getResults(), schema); + final StringBuffer mimeType = new StringBuffer(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try { + final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out); + writer.write(recordSet); + writer.flush(); + mimeType.append(writer.getMimeType()); + } catch (SchemaNotFoundException e) { + throw new ProcessException("Could not parse Solr response", e); + } + } + }); + flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), mimeType.toString()); } + session.transfer(flowFile, REL_SUCCESS); } + continuePaging.set(response.getResults().size() == Integer.parseInt(context.getProperty(BATCH_SIZE).getValue())); } - - lastEndDatedRef.set(currDate); - writeLastEndDate(); - } catch (SolrServerException | IOException e) { + } catch(SolrServerException | SchemaNotFoundException | IOException e){ context.yield(); session.rollback(); - logger.error("Failed to execute query {} due to {}", new Object[]{query, e}, e); + logger.error("Failed to execute query {} due to {}", new Object[]{solrQuery.toString(), e}, e); throw new ProcessException(e); - } catch (final Throwable t) { + } catch( final Throwable t){ context.yield(); session.rollback(); - logger.error("Failed to execute query {} due to {}", new Object[]{query, t}, t); + logger.error("Failed to execute query {} due to {}", new Object[]{solrQuery.toString(), t}, t); throw t; } } - private void readLastEndDate() { - fileLock.lock(); - File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); - try (FileInputStream fis = new FileInputStream(lastEndDateCache)) { - Properties props = new Properties(); - props.load(fis); - lastEndDatedRef.set(props.getProperty(LAST_END_DATE)); - } catch (IOException swallow) { - } finally { - fileLock.unlock(); - } - } - - private void writeLastEndDate() { - fileLock.lock(); - File lastEndDateCache = new File(FILE_PREFIX + getIdentifier()); - try (FileOutputStream fos = new FileOutputStream(lastEndDateCache)) { - Properties props = new Properties(); - props.setProperty(LAST_END_DATE, lastEndDatedRef.get()); - props.store(fos, "GetSolr LastEndDate value"); - } catch (IOException e) { - getLogger().error("Failed to persist LastEndDate due to " + e, e); - } finally { - fileLock.unlock(); + /** + * Writes each SolrDocument to a record. + */ + private RecordSet solrDocumentsToRecordSet(final List docs, final RecordSchema schema) { + final List lr = new ArrayList(); + + for (SolrDocument doc : docs) { + final Map recordValues = new LinkedHashMap<>(); + for (RecordField field : schema.getFields()){ + final Object fieldValue = doc.getFieldValue(field.getFieldName()); + if (fieldValue != null) { + if (field.getDataType().getFieldType().equals(RecordFieldType.ARRAY)){ + recordValues.put(field.getFieldName(), ((List) fieldValue).toArray()); + } else { + recordValues.put(field.getFieldName(), fieldValue); + } + } + } + lr.add(new MapRecord(schema, recordValues)); } + return new ListRecordSet(schema, lr); } /** diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java index 8c8d3126593c..920a879a1187 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java @@ -275,7 +275,7 @@ protected final boolean isBasicAuthEnabled() { } @Override - protected final Collection customValidate(ValidationContext context) { + protected Collection customValidate(ValidationContext context) { final List problems = new ArrayList<>(); if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) { diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java index 4a4aa04582e3..395d876ec9c2 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java @@ -18,9 +18,14 @@ */ package org.apache.nifi.processors.solr; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; + import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.common.SolrInputDocument; @@ -29,19 +34,10 @@ import org.junit.Before; import org.junit.Test; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Calendar; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.Date; -import java.util.GregorianCalendar; -import java.util.Locale; -import java.util.Properties; -import java.util.TimeZone; - -import static org.junit.Assert.assertTrue; public class TestGetSolr { @@ -51,13 +47,9 @@ public class TestGetSolr { @Before public void setup() { - // create the conf dir if it doesn't exist - File confDir = new File("conf"); - if (!confDir.exists()) { - confDir.mkdir(); - } try { + // create an EmbeddedSolrServer for the processor to use String relPath = getClass().getProtectionDomain().getCodeSource() .getLocation().getFile() + "../../target"; @@ -65,52 +57,30 @@ public void setup() { solrClient = EmbeddedSolrServerFactory.create(EmbeddedSolrServerFactory.DEFAULT_SOLR_HOME, DEFAULT_SOLR_CORE, relPath); - // create some test documents - SolrInputDocument doc1 = new SolrInputDocument(); - doc1.addField("first", "bob"); - doc1.addField("last", "smith"); - doc1.addField("created", new Date()); - - SolrInputDocument doc2 = new SolrInputDocument(); - doc2.addField("first", "alice"); - doc2.addField("last", "smith"); - doc2.addField("created", new Date()); - - SolrInputDocument doc3 = new SolrInputDocument(); - doc3.addField("first", "mike"); - doc3.addField("last", "smith"); - doc3.addField("created", new Date()); - - SolrInputDocument doc4 = new SolrInputDocument(); - doc4.addField("first", "john"); - doc4.addField("last", "smith"); - doc4.addField("created", new Date()); - - SolrInputDocument doc5 = new SolrInputDocument(); - doc5.addField("first", "joan"); - doc5.addField("last", "smith"); - doc5.addField("created", new Date()); - - // add the test data to the index - solrClient.add(Arrays.asList(doc1, doc2, doc3, doc4, doc5)); + for (int i = 0; i < 10; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", "doc" + i); + doc.addField("created", new Date()); + doc.addField("string_single", "single" + i + ".1"); + doc.addField("string_multi", "multi" + i + ".1"); + doc.addField("string_multi", "multi" + i + ".2"); + doc.addField("integer_single", i); + doc.addField("integer_multi", 1); + doc.addField("integer_multi", 2); + doc.addField("integer_multi", 3); + doc.addField("double_single", 0.5 + i); + solrClient.add(doc); + + } solrClient.commit(); } catch (Exception e) { + e.printStackTrace(); Assert.fail(e.getMessage()); } } @After public void teardown() { - File confDir = new File("conf"); - assertTrue(confDir.exists()); - File[] files = confDir.listFiles(); - if (files.length > 0) { - for (File file : files) { - assertTrue("Failed to delete " + file.getName(), file.delete()); - } - } - assertTrue(confDir.delete()); - try { solrClient.close(); } catch (Exception e) { @@ -118,92 +88,160 @@ public void teardown() { } @Test - public void testMoreThanBatchSizeShouldProduceMultipleFlowFiles() throws IOException, SolrServerException { - final TestableProcessor proc = new TestableProcessor(solrClient); - final TestRunner runner = TestRunners.newTestRunner(proc); - - // setup a lastEndDate file to simulate picking up from a previous end date - SimpleDateFormat sdf = new SimpleDateFormat(GetSolr.LAST_END_DATE_PATTERN, Locale.US); - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - - Calendar cal = new GregorianCalendar(); - cal.add(Calendar.MINUTE, -30); - final String lastEndDate = sdf.format(cal.getTime()); - - File lastEndDateCache = new File(GetSolr.FILE_PREFIX + proc.getIdentifier()); - try (FileOutputStream fos = new FileOutputStream(lastEndDateCache)) { - Properties props = new Properties(); - props.setProperty(GetSolr.LAST_END_DATE, lastEndDate); - props.store(fos, "GetSolr LastEndDate value"); - } catch (IOException e) { - Assert.fail("Failed to setup last end date value: " + e.getMessage()); - } + public void testLessThanBatchSizeShouldProduceOneFlowFile() throws IOException, SolrServerException { + final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); - runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); - runner.setProperty(GetSolr.SOLR_QUERY, "last:smith"); - runner.setProperty(GetSolr.RETURN_FIELDS, "first, last, created"); - runner.setProperty(GetSolr.SORT_CLAUSE, "created desc, first asc"); runner.setProperty(GetSolr.DATE_FIELD, "created"); - runner.setProperty(GetSolr.BATCH_SIZE, "2"); + runner.setProperty(GetSolr.BATCH_SIZE, "20"); + runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); + runner.setProperty(GetSolr.COLLECTION, "testCollection"); runner.run(); - runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 3); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1); } @Test - public void testLessThanBatchSizeShouldProduceOneFlowFile() throws IOException, SolrServerException { - final TestableProcessor proc = new TestableProcessor(solrClient); + public void testNoResultsShouldProduceNoOutput() throws IOException, SolrServerException { + final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); - runner.setProperty(GetSolr.SOLR_QUERY, "last:smith"); - runner.setProperty(GetSolr.RETURN_FIELDS, "created"); - runner.setProperty(GetSolr.SORT_CLAUSE, "created desc"); + runner.setProperty(GetSolr.SOLR_QUERY, "integer_single:1000"); runner.setProperty(GetSolr.DATE_FIELD, "created"); - runner.setProperty(GetSolr.BATCH_SIZE, "10"); + runner.setProperty(GetSolr.BATCH_SIZE, "1"); + runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); + runner.setProperty(GetSolr.COLLECTION, "testCollection"); runner.run(); - runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0); } @Test - public void testNoResultsShouldProduceNoOutput() throws IOException, SolrServerException { - final TestableProcessor proc = new TestableProcessor(solrClient); + public void testSolrModes() throws IOException, SolrServerException { + + } + + @Test(expected = java.lang.AssertionError.class) + public void testValidation() throws IOException, SolrServerException { + final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); - runner.setProperty(GetSolr.SOLR_QUERY, "last:xyz"); - runner.setProperty(GetSolr.RETURN_FIELDS, "created"); - runner.setProperty(GetSolr.SORT_CLAUSE, "created desc"); runner.setProperty(GetSolr.DATE_FIELD, "created"); - runner.setProperty(GetSolr.BATCH_SIZE, "10"); + runner.setProperty(GetSolr.BATCH_SIZE, "2"); + runner.setProperty(GetSolr.COLLECTION, "testCollection"); + runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_REC.getValue()); - runner.run(); - runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0); + runner.run(1); } @Test - public void testOnRemovedRemovesState() throws IOException, SolrServerException { - final TestableProcessor proc = new TestableProcessor(solrClient); + public void testCompletenessDespiteUpdates() throws IOException, SolrServerException { + final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_STANDARD.getValue()); + runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); - runner.setProperty(GetSolr.SOLR_QUERY, "last:smith"); - runner.setProperty(GetSolr.RETURN_FIELDS, "created"); - runner.setProperty(GetSolr.SORT_CLAUSE, "created desc"); runner.setProperty(GetSolr.DATE_FIELD, "created"); - runner.setProperty(GetSolr.BATCH_SIZE, "10"); + runner.setProperty(GetSolr.BATCH_SIZE, "1"); + runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); + runner.setProperty(GetSolr.COLLECTION, "testCollection"); + + runner.run(1,false, true); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10); + runner.clearTransferState(); + + SolrInputDocument doc0 = new SolrInputDocument(); + doc0.addField("id", "doc0"); + doc0.addField("created", new Date()); + SolrInputDocument doc1 = new SolrInputDocument(); + doc1.addField("id", "doc1"); + doc1.addField("created", new Date()); + + solrClient.add(doc0); + solrClient.add(doc1); + solrClient.commit(); + + runner.run(1,true, false); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 2); + runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key()); + } - runner.run(); + @Test + public void testCompletenessDespiteDeletions() throws IOException, SolrServerException { + final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); + + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(GetSolr.DATE_FIELD, "created"); + runner.setProperty(GetSolr.BATCH_SIZE, "1"); + runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); + runner.setProperty(GetSolr.COLLECTION, "testCollection"); + + runner.run(1,false, true); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10); + runner.clearTransferState(); + + SolrInputDocument doc10 = new SolrInputDocument(); + doc10.addField("id", "doc10"); + doc10.addField("created", new Date()); + SolrInputDocument doc11 = new SolrInputDocument(); + doc11.addField("id", "doc11"); + doc11.addField("created", new Date()); + + solrClient.add(doc10); + solrClient.add(doc11); + solrClient.deleteById("doc0"); + solrClient.deleteById("doc1"); + solrClient.deleteById("doc2"); + solrClient.commit(); + + runner.run(1,true, false); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 2); + runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key()); + } + + + @Test + public void testRecordWriter() throws IOException, SolrServerException, InitializationException { + final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); - File lastEndDateCache = new File(GetSolr.FILE_PREFIX + proc.getIdentifier()); - Assert.assertTrue("State file should exist, but doesn't", lastEndDateCache.exists()); - proc.onRemoved(); - Assert.assertFalse("State file should have been removed, but wasn't", lastEndDateCache.exists()); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(GetSolr.DATE_FIELD, "created"); + runner.setProperty(GetSolr.BATCH_SIZE, "2"); + runner.setProperty(GetSolr.COLLECTION, "testCollection"); + + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/test-schema.avsc"))); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + runner.setProperty(GetSolr.RECORD_WRITER, "writer"); + + runner.run(1,true, true); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 5); + runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key()); } // Override createSolrClient and return the passed in SolrClient diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java index 19461fbbda75..18948c6f5ed9 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestPutSolrContentStream.java @@ -69,6 +69,7 @@ public class TestPutSolrContentStream { static final SolrDocument expectedDoc1 = new SolrDocument(); static { + expectedDoc1.addField("id", "1"); expectedDoc1.addField("first", "John"); expectedDoc1.addField("last", "Doe"); expectedDoc1.addField("grade", 8); @@ -79,6 +80,7 @@ public class TestPutSolrContentStream { static final SolrDocument expectedDoc2 = new SolrDocument(); static { + expectedDoc2.addField("id", "2"); expectedDoc2.addField("first", "John"); expectedDoc2.addField("last", "Doe"); expectedDoc2.addField("grade", 8); @@ -137,6 +139,7 @@ public void testUpdateWithCustomJson() throws IOException, SolrServerException { runner.setProperty("f.4", "subject:/exams/subject"); runner.setProperty("f.5", "test:/exams/test"); runner.setProperty("f.6", "marks:/exams/marks"); + runner.setProperty("f.7", "id:/exams/id"); try (FileInputStream fileIn = new FileInputStream(CUSTOM_JSON_SINGLE_DOC_FILE)) { runner.enqueue(fileIn); @@ -162,7 +165,7 @@ public void testUpdateWithCsv() throws IOException, SolrServerException { final TestRunner runner = createDefaultTestRunner(proc); runner.setProperty(PutSolrContentStream.CONTENT_STREAM_PATH, "/update/csv"); - runner.setProperty("fieldnames", "first,last,grade,subject,test,marks"); + runner.setProperty("fieldnames", "id,first,last,grade,subject,test,marks"); try (FileInputStream fileIn = new FileInputStream(CSV_MULTIPLE_DOCS_FILE)) { runner.enqueue(fileIn); @@ -219,6 +222,7 @@ public void testDeleteWithXml() throws IOException, SolrServerException { // add a document so there is something to delete SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", "1"); doc.addField("first", "bob"); doc.addField("last", "smith"); doc.addField("created", new Date()); diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml index d2f7e8f5bab4..58b925684bf6 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/solr/testCollection/conf/schema.xml @@ -16,6 +16,16 @@ + + + + + + + + + id + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/test-schema.avsc b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/test-schema.avsc new file mode 100644 index 000000000000..f9146e05c275 --- /dev/null +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/test-schema.avsc @@ -0,0 +1,25 @@ +{ + "name": "testschema", + "namespace": "nifi", + "type": "record", + "fields": [ + { "name": "id", "type": "string" }, + { "name": "string_single", "type": "string" }, + { "name": "string_multi", + "type": { + "type": "array", + "items": "string" + } + }, + { "name": "integer_single", "type": "int" }, + { "name": "integer_multi", + "type": { + "type": "array", + "items": "int" + } + }, + { "name": "double_single", "type": "double" }, + { "name": "created", "type": "long" }, + { "name": "val_null", "type": "string" } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-csv-multiple-docs.csv b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-csv-multiple-docs.csv index 5657a89ef9fd..a7e5d201d156 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-csv-multiple-docs.csv +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-csv-multiple-docs.csv @@ -1,2 +1,2 @@ -John,Doe,8,Math,term1,90 -John,Doe,8,Biology,term1,86 +1,John,Doe,8,Math,term1,90 +2,John,Doe,8,Biology,term1,86 diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-custom-json-single-doc.json b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-custom-json-single-doc.json index 5cca8078ac28..7d1d8bc14043 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-custom-json-single-doc.json +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-custom-json-single-doc.json @@ -4,10 +4,12 @@ "grade": 8, "exams": [ { + "id": "1", "subject": "Math", "test" : "term1", "marks":90}, { + "id": "2", "subject": "Biology", "test" : "term1", "marks":86} diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-solr-json-multiple-docs.json b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-solr-json-multiple-docs.json index cea939bc2d3e..20c1f958029b 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-solr-json-multiple-docs.json +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-solr-json-multiple-docs.json @@ -1,5 +1,6 @@ [ { + "id": "1", "first": "John", "last": "Doe", "grade": 8, @@ -8,6 +9,7 @@ "marks": 90 }, { + "id": "2", "first": "John", "last": "Doe", "grade": 8, diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-xml-multiple-docs.xml b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-xml-multiple-docs.xml index 4622e0dd7843..6d8e71cb53d5 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-xml-multiple-docs.xml +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/resources/testdata/test-xml-multiple-docs.xml @@ -1,5 +1,6 @@ + 1 John Doe 8 @@ -8,6 +9,7 @@ 90 + 2 John Doe 8 From 4bf23db1f2f3282822d47e7b6a0887502f1d3995 Mon Sep 17 00:00:00 2001 From: JohannesDaniel Date: Wed, 18 Oct 2017 23:17:41 +0200 Subject: [PATCH 2/3] NIFI-3248: Additions after review --- .../apache/nifi/processors/solr/GetSolr.java | 48 +++++++++---- .../nifi/processors/solr/SolrProcessor.java | 2 +- .../nifi/processors/solr/TestGetSolr.java | 72 +++++++++++++++++++ 3 files changed, 109 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java index ec6471da5987..99ad7469b1d8 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java @@ -84,7 +84,7 @@ @Tags({"Apache", "Solr", "Get", "Pull", "Records"}) @InputRequirement(Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Queries Solr and outputs the results as a FlowFile in the format of XML or using a Record Writer") -@Stateful(scopes = {Scope.LOCAL}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") +@Stateful(scopes = {Scope.CLUSTER}, description = "Stores latest date of Date Field so that the same data will not be fetched multiple times.") public class GetSolr extends SolrProcessor { public static final String STATE_MANAGER_FILTER = "stateManager_filter"; @@ -126,6 +126,14 @@ public class GetSolr extends SolrProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor DATE_FILTER = new PropertyDescriptor + .Builder().name("Initial Date Filter") + .displayName("Initial Date Filter") + .description("Date value to filter results. Documents with an earlier date will not be fetched. The format has to correspond to the date pattern of Solr 'YYYY-MM-DDThh:mm:ssZ'") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor RETURN_FIELDS = new PropertyDescriptor .Builder().name("Return Fields") .displayName("Return Fields") @@ -172,6 +180,7 @@ protected void init(final ProcessorInitializationContext context) { descriptors.add(RECORD_WRITER); descriptors.add(SOLR_QUERY); descriptors.add(DATE_FIELD); + descriptors.add(DATE_FILTER); descriptors.add(RETURN_FIELDS); descriptors.add(BATCH_SIZE); descriptors.add(JAAS_CLIENT_APP_NAME); @@ -201,28 +210,44 @@ public List getSupportedPropertyDescriptors() { return this.descriptors; } + final static Set propertyNamesForActivatingClearState = new HashSet(); + static { + propertyNamesForActivatingClearState.add(SOLR_TYPE.getName()); + propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName()); + propertyNamesForActivatingClearState.add(COLLECTION.getName()); + propertyNamesForActivatingClearState.add(SOLR_QUERY.getName()); + propertyNamesForActivatingClearState.add(DATE_FIELD.getName()); + propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName()); + } + @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { - clearState.set(true); + if (propertyNamesForActivatingClearState.contains(descriptor.getName())) + clearState.set(true); } @OnScheduled - public void onScheduled2(final ProcessContext context) throws IOException { + public void clearState(final ProcessContext context) throws IOException { if (clearState.getAndSet(false)) { - context.getStateManager().clear(Scope.LOCAL); + context.getStateManager().clear(Scope.CLUSTER); final Map newStateMap = new HashMap(); newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*"); - newStateMap.put(STATE_MANAGER_FILTER, "*"); - context.getStateManager().setState(newStateMap, Scope.LOCAL); + final String initialDate = context.getProperty(DATE_FILTER).getValue(); + if (StringUtils.isBlank(initialDate)) + newStateMap.put(STATE_MANAGER_FILTER, "*"); + else + newStateMap.put(STATE_MANAGER_FILTER, initialDate); + + context.getStateManager().setState(newStateMap, Scope.CLUSTER); id_field = null; } } @Override - protected final Collection customValidate(ValidationContext context) { + protected final Collection additionalCustomValidation(ValidationContext context) { final Collection problems = new ArrayList<>(); if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue()) @@ -233,7 +258,6 @@ protected final Collection customValidate(ValidationContext co .subject("Record writer check") .build()); } - problems.addAll(super.customValidate(context)); return problems; } @@ -275,7 +299,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final StringBuilder automatedFilterQuery = (new StringBuilder()) .append(dateField) .append(":[") - .append(context.getStateManager().getState(Scope.LOCAL).get(STATE_MANAGER_FILTER)) + .append(context.getStateManager().getState(Scope.CLUSTER).get(STATE_MANAGER_FILTER)) .append(" TO *]"); solrQuery.addFilterQuery(automatedFilterQuery.toString()); @@ -292,7 +316,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } } - solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, context.getStateManager().getState(Scope.LOCAL).get(STATE_MANAGER_CURSOR_MARK)); + solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, context.getStateManager().getState(Scope.CLUSTER).get(STATE_MANAGER_CURSOR_MARK)); solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); final StringBuilder sortClause = (new StringBuilder()) @@ -318,10 +342,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, response.getNextCursorMark()); final Map updateStateManager = new HashMap(); - updateStateManager.putAll(context.getStateManager().getState(Scope.LOCAL).toMap()); + updateStateManager.putAll(context.getStateManager().getState(Scope.CLUSTER).toMap()); updateStateManager.put(STATE_MANAGER_CURSOR_MARK, response.getNextCursorMark()); updateStateManager.put(STATE_MANAGER_FILTER, latestDateValue); - context.getStateManager().setState(updateStateManager, Scope.LOCAL); + context.getStateManager().setState(updateStateManager, Scope.CLUSTER); FlowFile flowFile = session.create(); flowFile = session.putAttribute(flowFile, "solrQuery", solrQuery.toString()); diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java index 920a879a1187..cd29f7cd2500 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/SolrProcessor.java @@ -275,7 +275,7 @@ protected final boolean isBasicAuthEnabled() { } @Override - protected Collection customValidate(ValidationContext context) { + final protected Collection customValidate(ValidationContext context) { final List problems = new ArrayList<>(); if (SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) { diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java index 395d876ec9c2..ca26aae82203 100644 --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java @@ -37,7 +37,10 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.text.SimpleDateFormat; import java.util.Date; +import java.util.Locale; +import java.util.TimeZone; public class TestGetSolr { @@ -215,6 +218,74 @@ public void testCompletenessDespiteDeletions() throws IOException, SolrServerExc runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key()); } + @Test + public void testInitialDateFilter() throws IOException, SolrServerException { + final SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", Locale.US); + df.setTimeZone(TimeZone.getTimeZone("GMT")); + final Date dateToFilter = new Date(); + + final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); + + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(GetSolr.DATE_FIELD, "created"); + runner.setProperty(GetSolr.DATE_FILTER, df.format(dateToFilter)); + runner.setProperty(GetSolr.BATCH_SIZE, "1"); + runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); + runner.setProperty(GetSolr.COLLECTION, "testCollection"); + + SolrInputDocument doc10 = new SolrInputDocument(); + doc10.addField("id", "doc10"); + doc10.addField("created", new Date()); + SolrInputDocument doc11 = new SolrInputDocument(); + doc11.addField("id", "doc11"); + doc11.addField("created", new Date()); + + solrClient.add(doc10); + solrClient.add(doc11); + solrClient.commit(); + + runner.run(1,true, true); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 2); + runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key()); + } + + @Test + public void testPropertyModified() throws IOException, SolrServerException { + final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); + + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(GetSolr.DATE_FIELD, "created"); + runner.setProperty(GetSolr.BATCH_SIZE, "1"); + runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); + runner.setProperty(GetSolr.COLLECTION, "testCollection"); + + runner.run(1,false, true); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10); + runner.clearTransferState(); + + // Change property contained in propertyNamesForActivatingClearState + runner.setProperty(GetSolr.RETURN_FIELDS, "id,created,string_multi"); + runner.run(1, false, true); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10); + runner.clearTransferState(); + + // Change property not contained in propertyNamesForActivatingClearState + runner.setProperty(GetSolr.BATCH_SIZE, "2"); + runner.run(1, true, true); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0); + runner.clearTransferState(); + } + @Test public void testRecordWriter() throws IOException, SolrServerException, InitializationException { @@ -244,6 +315,7 @@ public void testRecordWriter() throws IOException, SolrServerException, Initiali runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key()); } + // Override createSolrClient and return the passed in SolrClient private class TestableProcessor extends GetSolr { private SolrClient solrClient; From eeb804f526327ed08df8f0d1d5ae391e193a6926 Mon Sep 17 00:00:00 2001 From: JohannesDaniel Date: Fri, 20 Oct 2017 12:36:20 +0200 Subject: [PATCH 3/3] NIFI-3248 Additions after review v2 --- .../apache/nifi/processors/solr/GetSolr.java | 54 +++++++++++------- .../additionalDetails.html | 56 +++++++++++++++++++ .../nifi/processors/solr/TestGetSolr.java | 35 +++++++++++- 3 files changed, 124 insertions(+), 21 deletions(-) mode change 100644 => 100755 nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java create mode 100755 nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.GetSolr/additionalDetails.html mode change 100644 => 100755 nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java old mode 100644 new mode 100755 index 99ad7469b1d8..1871f1c8f20e --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java @@ -98,7 +98,7 @@ public class GetSolr extends SolrProcessor { .description("Write Solr documents to FlowFiles as XML or using a Record Writer") .required(true) .allowableValues(MODE_XML, MODE_REC) - .defaultValue(MODE_REC.getValue()) + .defaultValue(MODE_XML.getValue()) .build(); public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor @@ -218,6 +218,7 @@ public List getSupportedPropertyDescriptors() { propertyNamesForActivatingClearState.add(SOLR_QUERY.getName()); propertyNamesForActivatingClearState.add(DATE_FIELD.getName()); propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName()); + propertyNamesForActivatingClearState.add(DATE_FILTER.getName()); } @Override @@ -228,22 +229,31 @@ public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, S @OnScheduled public void clearState(final ProcessContext context) throws IOException { - if (clearState.getAndSet(false)) { + if (clearState.getAndSet(false)) context.getStateManager().clear(Scope.CLUSTER); - final Map newStateMap = new HashMap(); - newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*"); + final Map stateMap = new HashMap(); + stateMap.putAll(context.getStateManager().getState(Scope.CLUSTER).toMap()); + final AtomicBoolean stateMapHasChanged = new AtomicBoolean(false); + if (stateMap.get(STATE_MANAGER_CURSOR_MARK) == null) { + stateMap.put(STATE_MANAGER_CURSOR_MARK, "*"); + stateMapHasChanged.set(true); + } + + if (stateMap.get(STATE_MANAGER_FILTER) == null) { final String initialDate = context.getProperty(DATE_FILTER).getValue(); if (StringUtils.isBlank(initialDate)) - newStateMap.put(STATE_MANAGER_FILTER, "*"); + stateMap.put(STATE_MANAGER_FILTER, "*"); else - newStateMap.put(STATE_MANAGER_FILTER, initialDate); + stateMap.put(STATE_MANAGER_FILTER, initialDate); + stateMapHasChanged.set(true); + } - context.getStateManager().setState(newStateMap, Scope.CLUSTER); + if (stateMapHasChanged.get()) + context.getStateManager().setState(stateMap, Scope.CLUSTER); - id_field = null; - } + id_field = null; } @Override @@ -253,7 +263,7 @@ protected final Collection additionalCustomValidation(Validati if (context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue()) && !context.getProperty(RECORD_WRITER).isSet()) { problems.add(new ValidationResult.Builder() - .explanation("for parsing records a record writer has to be configured") + .explanation("for writing records a record writer has to be configured") .valid(false) .subject("Record writer check") .build()); @@ -291,6 +301,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final String dateField = context.getProperty(DATE_FIELD).getValue(); + final Map stateMap = new HashMap(); + stateMap.putAll(context.getStateManager().getState(Scope.CLUSTER).toMap()); + solrQuery.setQuery("*:*"); final String query = context.getProperty(SOLR_QUERY).getValue(); if (!StringUtils.isBlank(query) && !query.equals("*:*")) { @@ -299,7 +312,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final StringBuilder automatedFilterQuery = (new StringBuilder()) .append(dateField) .append(":[") - .append(context.getStateManager().getState(Scope.CLUSTER).get(STATE_MANAGER_FILTER)) + .append(stateMap.get(STATE_MANAGER_FILTER)) .append(" TO *]"); solrQuery.addFilterQuery(automatedFilterQuery.toString()); @@ -316,7 +329,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } } - solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, context.getStateManager().getState(Scope.CLUSTER).get(STATE_MANAGER_CURSOR_MARK)); + solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, stateMap.get(STATE_MANAGER_CURSOR_MARK)); solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger()); final StringBuilder sortClause = (new StringBuilder()) @@ -339,13 +352,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (response.getResults().size() > 0) { final SolrDocument lastSolrDocument = documentList.get(response.getResults().size()-1); final String latestDateValue = df.format(lastSolrDocument.get(dateField)); + final String newCursorMark = response.getNextCursorMark(); - solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, response.getNextCursorMark()); - final Map updateStateManager = new HashMap(); - updateStateManager.putAll(context.getStateManager().getState(Scope.CLUSTER).toMap()); - updateStateManager.put(STATE_MANAGER_CURSOR_MARK, response.getNextCursorMark()); - updateStateManager.put(STATE_MANAGER_FILTER, latestDateValue); - context.getStateManager().setState(updateStateManager, Scope.CLUSTER); + solrQuery.setParam(CursorMarkParams.CURSOR_MARK_PARAM, newCursorMark); + stateMap.put(STATE_MANAGER_CURSOR_MARK, newCursorMark); + stateMap.put(STATE_MANAGER_FILTER, latestDateValue); FlowFile flowFile = session.create(); flowFile = session.putAttribute(flowFile, "solrQuery", solrQuery.toString()); @@ -383,6 +394,7 @@ public void process(final OutputStream out) throws IOException { } continuePaging.set(response.getResults().size() == Integer.parseInt(context.getProperty(BATCH_SIZE).getValue())); } + context.getStateManager().setState(stateMap, Scope.CLUSTER); } catch(SolrServerException | SchemaNotFoundException | IOException e){ context.yield(); session.rollback(); @@ -431,14 +443,16 @@ public QueryResponseOutputStreamCallback(QueryResponse response) { @Override public void process(OutputStream out) throws IOException { + IOUtils.write("", out, StandardCharsets.UTF_8); for (SolrDocument doc : response.getResults()) { - String xml = ClientUtils.toXML(toSolrInputDocument(doc)); + final String xml = ClientUtils.toXML(toSolrInputDocument(doc)); IOUtils.write(xml, out, StandardCharsets.UTF_8); } + IOUtils.write("", out, StandardCharsets.UTF_8); } public SolrInputDocument toSolrInputDocument(SolrDocument d) { - SolrInputDocument doc = new SolrInputDocument(); + final SolrInputDocument doc = new SolrInputDocument(); for (String name : d.getFieldNames()) { doc.addField(name, d.getFieldValue(name)); diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.GetSolr/additionalDetails.html b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.GetSolr/additionalDetails.html new file mode 100755 index 000000000000..b38656302b95 --- /dev/null +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/resources/docs/org.apache.nifi.processors.solr.GetSolr/additionalDetails.html @@ -0,0 +1,56 @@ + + + + + + GetSolr + + + + +

Usage Example

+

+ This processor pulls data from Solr collections. For its usage, Solr collections + have to fulfil two requirements: +

+
    +
  • The documents must include a date field containing the time when they were + indexed. Such kind of field can be easily added to documents at indexing time + e. g. using Solrs' UpdateRequestProcessor created by + 'TimestampUpdateProcessorFactory'.
  • +
  • The configuration of the Solr index (e. g. schema.xml or managed-schema) must + define a uniqueKey field.
  • +
+

+ Backwards compatibility to configurations of the GetSolr processor used within releases + of NiFi prior to 1.5 can be realized as follows: +

+
    +
  • Find the file conf/.getSolr* within the prior NiFi installation.
  • +
  • Open the file and copy the timestamp defined for 'LastEndDate'.
  • +
  • Insert the timestamp into the field 'Initial Date Filter'.
  • +
+

+ Annotation: The value of property 'Solr Query' actually is not added to parameter 'q' + but to parameter 'fq' for two reasons: +

+
    +
  • Improving performance by leveraging Solrs' filter cache.
  • +
  • Scoring is not required for the purpose of this processor as the sorting + is fixed to 'DateField asc, IdField asc'
  • +
+ + diff --git a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java old mode 100644 new mode 100755 index ca26aae82203..cb3d9c53eb4c --- a/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java +++ b/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java @@ -18,6 +18,7 @@ */ package org.apache.nifi.processors.solr; +import org.apache.nifi.components.state.Scope; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.reporting.InitializationException; @@ -286,6 +287,39 @@ public void testPropertyModified() throws IOException, SolrServerException { runner.clearTransferState(); } + @Test + public void testStateCleared() throws IOException, SolrServerException { + final org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor proc = new org.apache.nifi.processors.solr.TestGetSolr.TestableProcessor(solrClient); + + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(GetSolr.RETURN_TYPE, GetSolr.MODE_XML.getValue()); + runner.setProperty(GetSolr.SOLR_TYPE, PutSolrContentStream.SOLR_TYPE_CLOUD.getValue()); + runner.setProperty(GetSolr.SOLR_LOCATION, "http://localhost:8443/solr"); + runner.setProperty(GetSolr.DATE_FIELD, "created"); + runner.setProperty(GetSolr.BATCH_SIZE, "1"); + runner.setProperty(GetSolr.RETURN_FIELDS, "id,created"); + runner.setProperty(GetSolr.COLLECTION, "testCollection"); + + runner.run(1,false, true); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10); + runner.clearTransferState(); + + // run without clearing statemanager + runner.run(1,false, true); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 0); + runner.clearTransferState(); + + // run with cleared statemanager + runner.getStateManager().clear(Scope.CLUSTER); + runner.run(1, true, true); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(GetSolr.REL_SUCCESS, 10); + runner.clearTransferState(); + + + } @Test public void testRecordWriter() throws IOException, SolrServerException, InitializationException { @@ -315,7 +349,6 @@ public void testRecordWriter() throws IOException, SolrServerException, Initiali runner.assertAllFlowFilesContainAttribute(CoreAttributes.MIME_TYPE.key()); } - // Override createSolrClient and return the passed in SolrClient private class TestableProcessor extends GetSolr { private SolrClient solrClient;