diff --git a/camel-core/src/main/java/org/apache/camel/component/validator/ValidatorEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/validator/ValidatorEndpoint.java index 1b0c28c59b2e8..6b04a8c15588f 100644 --- a/camel-core/src/main/java/org/apache/camel/component/validator/ValidatorEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/validator/ValidatorEndpoint.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.validator; +import java.io.IOException; import java.io.InputStream; import javax.xml.XMLConstants; import javax.xml.validation.SchemaFactory; @@ -26,9 +27,12 @@ import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.api.management.ManagedOperation; +import org.apache.camel.api.management.ManagedResource; import org.apache.camel.converter.IOConverter; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.processor.validation.DefaultValidationErrorHandler; +import org.apache.camel.processor.validation.SchemaReader; import org.apache.camel.processor.validation.ValidatingProcessor; import org.apache.camel.processor.validation.ValidatorErrorHandler; import org.apache.camel.spi.Metadata; @@ -43,6 +47,7 @@ /** * Validates the payload of a message using XML Schema and JAXP Validation. */ +@ManagedResource(description = "Managed ValidatorEndpoint") @UriEndpoint(scheme = "validator", title = "Validator", syntax = "validator:resourceUri", producerOnly = true, label = "core,validation") public class ValidatorEndpoint extends DefaultEndpoint { @@ -73,6 +78,14 @@ public class ValidatorEndpoint extends DefaultEndpoint { @UriParam(description = "To validate against a header instead of the message body.") private String headerName; + /** + * We need a one-to-one relation between endpoint and schema reader in order + * to be able to clear the cached schema in the schema reader. See method + * {@link #clearCachedSchema}. + */ + private final SchemaReader schemaReader = new SchemaReader(); + private volatile boolean schemaReaderConfigured; + public ValidatorEndpoint() { } @@ -81,28 +94,58 @@ public ValidatorEndpoint(String endpointUri, Component component, String resourc this.resourceUri = resourceUri; } + @ManagedOperation(description = "Clears the cached schema, forcing to re-load the schema on next request") + public void clearCachedSchema() throws Exception { + LOG.debug("{} rereading schema resource: {}", this, resourceUri); + byte[] bytes = readSchemaResource(); + schemaReader.setSchemaAsByteArray(bytes); + + schemaReader.setSchema(null); // will cause to reload the schema from + // the set byte-array on next request + } + @Override public Producer createProducer() throws Exception { - ValidatingProcessor validator = new ValidatingProcessor(); + if (!schemaReaderConfigured) { + if (resourceResolver != null) { + schemaReader.setResourceResolver(resourceResolver); + } else { + schemaReader.setResourceResolver(new DefaultLSResourceResolver(getCamelContext(), resourceUri)); + } + schemaReader.setSchemaLanguage(getSchemaLanguage()); + schemaReader.setSchemaFactory(getSchemaFactory()); + + byte[] bytes = readSchemaResource(); + schemaReader.setSchemaAsByteArray(bytes); + LOG.debug("{} using schema resource: {}", this, resourceUri); + + // force loading of schema at create time otherwise concurrent + // processing could cause thread safe issues for the + // javax.xml.validation.SchemaFactory + schemaReader.loadSchema(); + + // configure only once + schemaReaderConfigured = true; + } + + ValidatingProcessor validator = new ValidatingProcessor(schemaReader); + configureValidator(validator); + + return new ValidatorProducer(this, validator); + } + + protected byte[] readSchemaResource() throws IOException { InputStream is = ResourceHelper.resolveMandatoryResourceAsInputStream(getCamelContext(), resourceUri); byte[] bytes = null; try { bytes = IOConverter.toBytes(is); } finally { - // and make sure to close the input stream after the schema has been loaded + // and make sure to close the input stream after the schema has been + // loaded IOHelper.close(is); } - - validator.setSchemaAsByteArray(bytes); - LOG.debug("{} using schema resource: {}", this, resourceUri); - configureValidator(validator); - - // force loading of schema at create time otherwise concurrent - // processing could cause thread safe issues for the javax.xml.validation.SchemaFactory - validator.loadSchema(); - - return new ValidatorProducer(this, validator); + return bytes; } @Override @@ -116,13 +159,6 @@ public boolean isSingleton() { } protected void configureValidator(ValidatingProcessor validator) throws Exception { - if (resourceResolver != null) { - validator.setResourceResolver(resourceResolver); - } else { - validator.setResourceResolver(new DefaultLSResourceResolver(getCamelContext(), resourceUri)); - } - validator.setSchemaLanguage(getSchemaLanguage()); - validator.setSchemaFactory(getSchemaFactory()); validator.setErrorHandler(getErrorHandler()); validator.setUseDom(isUseDom()); validator.setUseSharedSchema(isUseSharedSchema()); diff --git a/camel-core/src/main/java/org/apache/camel/processor/validation/SchemaReader.java b/camel-core/src/main/java/org/apache/camel/processor/validation/SchemaReader.java new file mode 100644 index 0000000000000..0fdb9e17fe515 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/validation/SchemaReader.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.validation; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.net.URL; + +import javax.xml.XMLConstants; +import javax.xml.transform.Source; +import javax.xml.transform.stream.StreamSource; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; + +import org.w3c.dom.ls.LSResourceResolver; +import org.xml.sax.SAXException; + +/** + * Reads the schema used in the processor {@link ValidatingProcessor}. Contains + * the method {@link clearCachedSchema()} to force re-reading the schema. + */ +public class SchemaReader { + + private String schemaLanguage = XMLConstants.W3C_XML_SCHEMA_NS_URI; + // must be volatile because is accessed from different threads see ValidatorEndpoint.clearCachedSchema + private volatile Schema schema; + private Source schemaSource; + // must be volatile because is accessed from different threads see ValidatorEndpoint.clearCachedSchema + private volatile SchemaFactory schemaFactory; + private URL schemaUrl; + private File schemaFile; + private volatile byte[] schemaAsByteArray; + private LSResourceResolver resourceResolver; + + public void loadSchema() throws Exception { + // force loading of schema + schema = createSchema(); + } + + // Properties + // ----------------------------------------------------------------------- + + public Schema getSchema() throws IOException, SAXException { + if (schema == null) { + synchronized (this) { + if (schema == null) { + schema = createSchema(); + } + } + } + return schema; + } + + public void setSchema(Schema schema) { + this.schema = schema; + } + + public String getSchemaLanguage() { + return schemaLanguage; + } + + public void setSchemaLanguage(String schemaLanguage) { + this.schemaLanguage = schemaLanguage; + } + + public Source getSchemaSource() throws IOException { + if (schemaSource == null) { + schemaSource = createSchemaSource(); + } + return schemaSource; + } + + public void setSchemaSource(Source schemaSource) { + this.schemaSource = schemaSource; + } + + public URL getSchemaUrl() { + return schemaUrl; + } + + public void setSchemaUrl(URL schemaUrl) { + this.schemaUrl = schemaUrl; + } + + public File getSchemaFile() { + return schemaFile; + } + + public void setSchemaFile(File schemaFile) { + this.schemaFile = schemaFile; + } + + public byte[] getSchemaAsByteArray() { + return schemaAsByteArray; + } + + public void setSchemaAsByteArray(byte[] schemaAsByteArray) { + this.schemaAsByteArray = schemaAsByteArray; + } + + public SchemaFactory getSchemaFactory() { + if (schemaFactory == null) { + synchronized (this) { + if (schemaFactory == null) { + schemaFactory = createSchemaFactory(); + } + } + } + return schemaFactory; + } + + public void setSchemaFactory(SchemaFactory schemaFactory) { + this.schemaFactory = schemaFactory; + } + + public LSResourceResolver getResourceResolver() { + return resourceResolver; + } + + public void setResourceResolver(LSResourceResolver resourceResolver) { + this.resourceResolver = resourceResolver; + } + + protected SchemaFactory createSchemaFactory() { + SchemaFactory factory = SchemaFactory.newInstance(schemaLanguage); + if (getResourceResolver() != null) { + factory.setResourceResolver(getResourceResolver()); + } + return factory; + } + + protected Source createSchemaSource() throws IOException { + throw new IllegalArgumentException("You must specify either a schema, schemaFile, schemaSource or schemaUrl property"); + } + + protected Schema createSchema() throws SAXException, IOException { + SchemaFactory factory = getSchemaFactory(); + + URL url = getSchemaUrl(); + if (url != null) { + synchronized (this) { + return factory.newSchema(url); + } + } + + File file = getSchemaFile(); + if (file != null) { + synchronized (this) { + return factory.newSchema(file); + } + } + + byte[] bytes = getSchemaAsByteArray(); + if (bytes != null) { + synchronized (this) { + return factory.newSchema(new StreamSource(new ByteArrayInputStream(schemaAsByteArray))); + } + } + + Source source = getSchemaSource(); + synchronized (this) { + return factory.newSchema(source); + } + } + +} diff --git a/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java index 359ca0dcb8d2c..96b3db8605e93 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/validation/ValidatingProcessor.java @@ -16,14 +16,12 @@ */ package org.apache.camel.processor.validation; -import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.Collections; -import javax.xml.XMLConstants; import javax.xml.parsers.ParserConfigurationException; import javax.xml.transform.Result; import javax.xml.transform.Source; @@ -61,22 +59,24 @@ */ public class ValidatingProcessor implements AsyncProcessor { private static final Logger LOG = LoggerFactory.getLogger(ValidatingProcessor.class); - private final XmlConverter converter = new XmlConverter(); - private String schemaLanguage = XMLConstants.W3C_XML_SCHEMA_NS_URI; - private volatile Schema schema; - private Source schemaSource; - private volatile SchemaFactory schemaFactory; - private URL schemaUrl; - private File schemaFile; - private byte[] schemaAsByteArray; + private final SchemaReader schemaReader; private ValidatorErrorHandler errorHandler = new DefaultValidationErrorHandler(); + private final XmlConverter converter = new XmlConverter(); private boolean useDom; private boolean useSharedSchema = true; - private LSResourceResolver resourceResolver; private boolean failOnNullBody = true; private boolean failOnNullHeader = true; private String headerName; + public ValidatingProcessor() { + schemaReader = new SchemaReader(); + } + + public ValidatingProcessor(SchemaReader schemaReader) { + // schema reader can be a singelton per schema, therefore make reuse, see ValidatorEndpoint and ValidatorProducer + this.schemaReader = schemaReader; + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } @@ -182,84 +182,66 @@ private boolean shouldUseHeader() { } public void loadSchema() throws Exception { - // force loading of schema - schema = createSchema(); + schemaReader.loadSchema(); } // Properties // ----------------------------------------------------------------------- public Schema getSchema() throws IOException, SAXException { - if (schema == null) { - synchronized (this) { - if (schema == null) { - schema = createSchema(); - } - } - } - return schema; + return schemaReader.getSchema(); } public void setSchema(Schema schema) { - this.schema = schema; + schemaReader.setSchema(schema); } public String getSchemaLanguage() { - return schemaLanguage; + return schemaReader.getSchemaLanguage(); } public void setSchemaLanguage(String schemaLanguage) { - this.schemaLanguage = schemaLanguage; + schemaReader.setSchemaLanguage(schemaLanguage); } public Source getSchemaSource() throws IOException { - if (schemaSource == null) { - schemaSource = createSchemaSource(); - } - return schemaSource; + return schemaReader.getSchemaSource(); } public void setSchemaSource(Source schemaSource) { - this.schemaSource = schemaSource; + schemaReader.setSchemaSource(schemaSource); } public URL getSchemaUrl() { - return schemaUrl; + return schemaReader.getSchemaUrl(); } public void setSchemaUrl(URL schemaUrl) { - this.schemaUrl = schemaUrl; + schemaReader.setSchemaUrl(schemaUrl); } public File getSchemaFile() { - return schemaFile; + return schemaReader.getSchemaFile(); } public void setSchemaFile(File schemaFile) { - this.schemaFile = schemaFile; + schemaReader.setSchemaFile(schemaFile); } public byte[] getSchemaAsByteArray() { - return schemaAsByteArray; + return schemaReader.getSchemaAsByteArray(); } public void setSchemaAsByteArray(byte[] schemaAsByteArray) { - this.schemaAsByteArray = schemaAsByteArray; + schemaReader.setSchemaAsByteArray(schemaAsByteArray); } public SchemaFactory getSchemaFactory() { - if (schemaFactory == null) { - synchronized (this) { - if (schemaFactory == null) { - schemaFactory = createSchemaFactory(); - } - } - } - return schemaFactory; + return schemaReader.getSchemaFactory(); } public void setSchemaFactory(SchemaFactory schemaFactory) { - this.schemaFactory = schemaFactory; + schemaReader.setSchemaFactory(schemaFactory); } public ValidatorErrorHandler getErrorHandler() { @@ -294,11 +276,11 @@ public void setUseSharedSchema(boolean useSharedSchema) { } public LSResourceResolver getResourceResolver() { - return resourceResolver; + return schemaReader.getResourceResolver(); } public void setResourceResolver(LSResourceResolver resourceResolver) { - this.resourceResolver = resourceResolver; + schemaReader.setResourceResolver(resourceResolver); } public boolean isFailOnNullBody() { @@ -329,45 +311,15 @@ public void setHeaderName(String headerName) { // ----------------------------------------------------------------------- protected SchemaFactory createSchemaFactory() { - SchemaFactory factory = SchemaFactory.newInstance(schemaLanguage); - if (getResourceResolver() != null) { - factory.setResourceResolver(getResourceResolver()); - } - return factory; + return schemaReader.createSchemaFactory(); } protected Source createSchemaSource() throws IOException { - throw new IllegalArgumentException("You must specify either a schema, schemaFile, schemaSource or schemaUrl property"); + return schemaReader.createSchemaSource(); } protected Schema createSchema() throws SAXException, IOException { - SchemaFactory factory = getSchemaFactory(); - - URL url = getSchemaUrl(); - if (url != null) { - synchronized (this) { - return factory.newSchema(url); - } - } - - File file = getSchemaFile(); - if (file != null) { - synchronized (this) { - return factory.newSchema(file); - } - } - - byte[] bytes = getSchemaAsByteArray(); - if (bytes != null) { - synchronized (this) { - return factory.newSchema(new StreamSource(new ByteArrayInputStream(schemaAsByteArray))); - } - } - - Source source = getSchemaSource(); - synchronized (this) { - return factory.newSchema(source); - } + return schemaReader.createSchema(); } /** @@ -461,4 +413,4 @@ protected Source getSource(Exchange exchange, Object content) { return source; } -} \ No newline at end of file +} diff --git a/camel-core/src/test/java/org/apache/camel/component/validator/ValidatorEndpointClearCachedSchemaTest.java b/camel-core/src/test/java/org/apache/camel/component/validator/ValidatorEndpointClearCachedSchemaTest.java new file mode 100644 index 0000000000000..e1442080a685c --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/validator/ValidatorEndpointClearCachedSchemaTest.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.validator; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.DefaultClassResolver; +import org.apache.camel.impl.SimpleRegistry; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests whether the ValidatorEndpoint.clearCachedSchema() can be executed when + * several sender threads are running. + */ +public class ValidatorEndpointClearCachedSchemaTest extends ContextTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(ValidatorEndpointClearCachedSchemaTest.class); + + private SimpleRegistry simpleReg; + + private CamelContext context; + + @Test + public void testClearCachedSchema() throws Exception { + + MockEndpoint mock = getMockEndpoint("mock:result"); + + // send one message for start up to finish. + new Sender().run(); + + // send with 5 sender threads in parallel and call clear cache in + // between + ExecutorService senderPool = Executors.newFixedThreadPool(5); + ExecutorService executorClearCache = Executors.newFixedThreadPool(1); + for (int i = 0; i < 5; i++) { + senderPool.execute(new Sender()); + if (i == 2) { + /** + * The clear cache thread calls xsdEndpoint.clearCachedSchema + */ + executorClearCache.execute(new ClearCache()); + } + } + + senderPool.shutdown(); + executorClearCache.shutdown(); + + senderPool.awaitTermination(2, TimeUnit.SECONDS); + + List exchanges = mock.getExchanges(); + + assertNotNull(exchanges); + + // expect at least 5 correct sent messages, the messages sent before + // the clearCacheSchema method is called will fail with a validation + // error and will nor result in an exchange + assertTrue("Less then expected exchanges", exchanges.size() > 5); + + } + + @Override + protected CamelContext createCamelContext() throws Exception { + simpleReg = new SimpleRegistry(); + context = new DefaultCamelContext(simpleReg); + context.setClassResolver(new ClassResolverImpl()); + return context; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("validator:pd:somefile.xsd").convertBodyTo(String.class).to("log:after").to("mock:result"); + + } + }; + } + + private class Sender implements Runnable { + + private final String message = "" + // + "" + // + "MessageContent" + // + ""; + + private final byte[] messageBytes = message.getBytes(StandardCharsets.UTF_8); + + @Override + public void run() { + // send up to 5 messages + for (int j = 0; j < 5; j++) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + sendBody("direct:start", messageBytes); + } + } + + } + + private class ClearCache implements Runnable { + + @Override + public void run() { + try { + // start later after the first sender + // threads are running + Thread.sleep(200); + clearCachedSchema(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + } + + private void clearCachedSchema() throws Exception { + Collection endpoints = context.getEndpoints(); + for (Endpoint endpoint : endpoints) { + LOG.info("Endpoint URI: " + endpoint.getEndpointUri()); + if (endpoint.getEndpointUri().startsWith("validator:")) { + ValidatorEndpoint xsltEndpoint = (ValidatorEndpoint)endpoint; + xsltEndpoint.clearCachedSchema(); + LOG.info("schema cache cleared"); + } + } + } + + /** + * Class to simulate a change of the XSD document. During the first call of + * the resource a XSD is returned which does not fit to the XML document. In + * the second call a XSD fitting to the XML document is returned. + */ + static class ClassResolverImpl extends DefaultClassResolver { + + private final String xsdtemplate1 = "\n" + // + "" + + // + " " + // + " " + // + " " + // // + // wrong + // element + // name + // will + // cause + // the + // validation + // to + // fail + " " + // + " " + // + " " + // + " " + // + ""; // + + private final String xsdtemplate2 = xsdtemplate1.replace("\"Content\"", "\"MessageContent\""); // correct + // element + // name + // --> + // validation + // will + // be + // correct + + private byte[] xsd1 = xsdtemplate1.getBytes(StandardCharsets.UTF_8); + + private byte[] xsd2 = xsdtemplate2.getBytes(StandardCharsets.UTF_8); + + private volatile short counter; + + @Override + public InputStream loadResourceAsStream(String uri) { + if (uri.startsWith("pd:")) { + byte[] xsd; + if (counter == 0) { + xsd = xsd1; + LOG.info("resolved XSD1"); + } else { + xsd = xsd2; + LOG.info("resolved XSD2"); + } + counter++; + return new ByteArrayInputStream(xsd); + } else { + return super.loadResourceAsStream(uri); + } + } + + } + +}