From e25e84156083d9de365458ebdce52a260bc2d486 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 26 Apr 2017 13:58:00 -0400 Subject: [PATCH 1/2] NIFI-3717: Added HortonworksSchemaRegistry --- nifi-assembly/pom.xml | 5 + .../nifi-hwx-schema-registry-nar/pom.xml | 39 +++ .../src/main/resources/META-INF/LICENSE | 233 +++++++++++++ .../src/main/resources/META-INF/NOTICE | 115 +++++++ .../nifi-hwx-schema-registry-service/pom.xml | 195 +++++++++++ .../HortonworksSchemaRegistry.java | 323 ++++++++++++++++++ ...g.apache.nifi.controller.ControllerService | 12 + .../nifi-hwx-schema-registry-bundle/pom.xml | 31 ++ .../record/SchemaIdentifier.java | 51 +-- .../record/StandardSchemaIdentifier.java | 69 ++++ .../nifi-standard-services/pom.xml | 1 + pom.xml | 6 + 12 files changed, 1032 insertions(+), 48 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/pom.xml create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index c43b13d095e5..a009acd5e09d 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -479,6 +479,11 @@ nifi-cdc-mysql-nar nar + + org.apache.nifi + nifi-hwx-schema-registry-nar + nar + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-nar/pom.xml new file mode 100644 index 000000000000..46d479d60683 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-nar/pom.xml @@ -0,0 +1,39 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-hwx-schema-registry-bundle + 1.2.0-SNAPSHOT + + + nifi-hwx-schema-registry-nar + nar + + + org.apache.nifi + nifi-standard-services-api-nar + nar + + + org.apache.nifi + nifi-hwx-schema-registry-service + 1.2.0-SNAPSHOT + + + + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 000000000000..ca0bdb3a868c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,233 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + + The binary distribution of this product bundles 'ParaNamer' + which is available under a BSD style license. + + Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of the copyright holders nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 000000000000..bc2b201797e2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,115 @@ +nifi-hwx-schema-registry-nar +Copyright 2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2016 The Apache Software Foundation + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + (ASLv2) Apache Avro + The following NOTICE information applies: + Apache Avro + Copyright 2009-2013 The Apache Software Foundation + + (ASLv2) Apache Commons Compress + The following NOTICE information applies: + Apache Commons Compress + Copyright 2002-2014 The Apache Software Foundation + + The files in the package org.apache.commons.compress.archivers.sevenz + were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/), + which has been placed in the public domain: + + "LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html) + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Guava + The following NOTICE information applies: + Guava + Copyright 2015 The Guava Authors + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. + + (ASLv2) Snappy Java + The following NOTICE information applies: + This product includes software developed by Google + Snappy: http://code.google.com/p/snappy/ (New BSD License) + + This product includes software developed by Apache + PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/ + (Apache 2.0 license) + + This library containd statically linked libstdc++. This inclusion is allowed by + "GCC RUntime Library Exception" + http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html + + + +************************ +Common Development and Distribution License 1.1 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. + + (CDDL 1.1) (GPL2 w/ CPE) ServiceLocator Default Implementation (org.glassfish.hk2:hk2-locator:jar:2.4.0-b25 - https://hk2.java.net/hk2-locator) + (CDDL 1.1) (GPL2 w/ CPE) HK2 API module (org.glassfish.hk2:hk2-api:jar:2.4.0-b25 - https://hk2.java.net/hk2-api) + (CDDL 1.1) (GPL2 w/ CPE) HK2 Implementation Utilities (org.glassfish.hk2:hk2-utils:jar:2.4.0-b25 - https://hk2.java.net/hk2-utils) + (CDDL 1.1) (GPL2 w/ CPE) javax.annotation API (javax.annotation:javax.annotation-api:jar:1.2 - http://jcp.org/en/jsr/detail?id=250) + (CDDL 1.1) (GPL2 w/ CPE) javax.inject:1 as OSGi bundle (org.glassfish.hk2.external:javax.inject:jar:2.4.0-b25 - https://hk2.java.net/external/javax.inject) + (CDDL 1.1) (GPL2 w/ CPE) javax.ws.rs-api (javax.ws.rs:javax.ws.rs-api:jar:2.0.1 - http://jax-rs-spec.java.net) + (CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.19 - https://jersey.java.net/jersey-client/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-core-common (org.glassfish.jersey.core:jersey-common:jar:2.19 - https://jersey.java.net/jersey-common/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-repackaged-guava (org.glassfish.jersey.bundles.repackaged:jersey-guava:bundle:2.19 - https://jersey.java.net/project/project/jersey-guava/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-multipart (com.sun.jersey:jersey-multipart:jar:1.19 - https://jersey.java.net/jersey-multipart/) + (CDDL 1.1) (GPL2 w/ CPE) MIME Streaming Extension (org.jvnet.mimepull:mimepull:jar:1.9.3 - http://mimepull.java.net) + + +***************** +Public Domain +***************** + +The following binary components are provided to the 'Public Domain'. See project link for details. + + (Public Domain) XZ for Java (org.tukaani:xz:jar:1.5 - http://tukaani.org/xz/java.html diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml new file mode 100644 index 000000000000..e85773d7be28 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml @@ -0,0 +1,195 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-hwx-schema-registry-bundle + 1.2.0-SNAPSHOT + + + nifi-hwx-schema-registry-service + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + + + + org.apache.nifi + nifi-schema-registry-service-api + + + + org.apache.avro + avro + + + + + com.hortonworks.registries + schema-registry-client + 0.2.0 + + + org.apache.avro + avro + + + org.apache.kafka + kafka-clients + + + org.jboss.logging + jboss-logging + + + org.apache.directory.server + apacheds-i18n + + + org.apache.directory.server + apacheds-kerberos-codec + + + org.apache.zookeeper + zookeeper + + + javax.servlet.jsp + jsp-api + + + org.javassist + javassist + + + org.tukaani + xz + + + com.thoughtworks.paranamer + paranamer + + + org.hibernate + hibernate-validator + + + org.apache.hadoop + hadoop-annotations + + + org.fusesource.leveldbjni + leveldbjni-all + + + io.netty + netty-all + + + org.apache.hadoop + hadoop-auth + + + org.apache.hadoop + hadoop-client + + + org.apache.hadoop + hadoop-common + + + org.glassfish.hk2 + osgi-resource-locator + + + com.github.fge + btf + + + javax.mail + mailapi + + + com.googlecode.libphonenumber + libphonenumber + + + org.yaml + snakeyaml + + + com.github.fge + msg-simple + + + org.xerial.snappy + snappy-java + + + org.glassfish.hk2.external + aopalliance-repackaged + + + joda-time + joda-time + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + + + org.mozilla + rhino + + + com.github.fge + uri-template + + + com.github.fge + jackson-coreutils + + + com.github.fge + json-schema-core + + + com.github.fge + json-schema-validator + + + org.apache.commons + commons-compress + + + + + + junit + junit + test + + + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java new file mode 100644 index 000000000000..218b310f107a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.hortonworks; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaField; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.util.Tuple; + +import com.hortonworks.registries.schemaregistry.SchemaMetadata; +import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo; +import com.hortonworks.registries.schemaregistry.SchemaVersionInfo; +import com.hortonworks.registries.schemaregistry.SchemaVersionKey; +import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient; +import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; + +@Tags({"schema", "registry", "avro", "hortonworks", "hwx"}) +@CapabilityDescription("Provides a Schema Registry Service that interacts with a Hortonworks Schema Registry") +public class HortonworksSchemaRegistry extends AbstractControllerService implements SchemaRegistry { + private static final Set schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, + SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); + private final ConcurrentMap, RecordSchema> schemaNameToSchemaMap = new ConcurrentHashMap<>(); + + private static final String LOGICAL_TYPE_DATE = "date"; + private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis"; + private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros"; + private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis"; + private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros"; + + + static final PropertyDescriptor URL = new PropertyDescriptor.Builder() + .name("url") + .displayName("Schema Registry URL") + .description("URL of the schema registry that this Controller Service should connect to, including version. For example, http://localhost:9090/api/v1") + .addValidator(StandardValidators.URL_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + + + private static final List propertyDescriptors = Collections.singletonList(URL); + private volatile SchemaRegistryClient schemaRegistryClient; + private volatile boolean initialized; + private volatile Map schemaRegistryConfig; + + public HortonworksSchemaRegistry() { + } + + + @OnEnabled + public void enable(final ConfigurationContext context) throws InitializationException { + schemaRegistryConfig = new HashMap<>(); + + // The below properties may or may not need to be exposed to the end + // user. We just need to watch usage patterns to see if sensible default + // can satisfy NiFi requirements + String urlValue = context.getProperty(URL).evaluateAttributeExpressions().getValue(); + if (urlValue == null || urlValue.trim().length() == 0){ + throw new IllegalArgumentException("'Schema Registry URL' must not be nul or empty."); + } + + schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), urlValue); + schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name(), 10L); + schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name(), 5000L); + schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name(), 1000L); + schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name(), 60 * 60 * 1000L); + } + + + + @OnDisabled + public void close() { + if (schemaRegistryClient != null) { + schemaRegistryClient.close(); + } + + initialized = false; + } + + + @Override + protected List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + + private synchronized SchemaRegistryClient getClient() { + if (!initialized) { + schemaRegistryClient = new SchemaRegistryClient(schemaRegistryConfig); + initialized = true; + } + + return schemaRegistryClient; + } + + + @Override + public String retrieveSchemaText(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException { + try { + final SchemaVersionInfo latest = getClient().getLatestSchemaVersionInfo(schemaName); + return latest.getSchemaText(); + } catch (final SchemaNotFoundException e) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); + } + } + + + @Override + public RecordSchema retrieveSchema(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException { + try { + final SchemaRegistryClient client = getClient(); + final SchemaMetadataInfo metadataInfo = client.getSchemaMetadataInfo(schemaName); + final Long schemaId = metadataInfo.getId(); + + final SchemaVersionInfo versionInfo = client.getLatestSchemaVersionInfo(schemaName); + final Integer version = versionInfo.getVersion(); + final String schemaText = versionInfo.getSchemaText(); + + final SchemaIdentifier schemaIdentifier = (schemaId == null || version == null) ? SchemaIdentifier.ofName(schemaName) : SchemaIdentifier.of(schemaName, schemaId, version); + + final Tuple tuple = new Tuple<>(schemaIdentifier, schemaText); + return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { + final Schema schema = new Schema.Parser().parse(schemaText); + return createRecordSchema(schema, schemaText, schemaIdentifier); + }); + } catch (final SchemaNotFoundException e) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); + } + } + + + @Override + public String retrieveSchemaText(final long schemaId, final int version) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException { + try { + final SchemaRegistryClient client = getClient(); + final SchemaMetadata metadata = client.getSchemaMetadataInfo(schemaId).getSchemaMetadata(); + final String schemaName = metadata.getName(); + final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); + final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(schemaVersionKey); + return versionInfo.getSchemaText(); + } catch (final SchemaNotFoundException e) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); + } + } + + @Override + public RecordSchema retrieveSchema(final long schemaId, final int version) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException { + try { + final SchemaRegistryClient client = getClient(); + final SchemaMetadata metadata = client.getSchemaMetadataInfo(schemaId).getSchemaMetadata(); + final String schemaName = metadata.getName(); + final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); + final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(schemaVersionKey); + final String schemaText = versionInfo.getSchemaText(); + + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.of(schemaName, schemaId, version); + final Tuple tuple = new Tuple<>(schemaIdentifier, schemaText); + return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { + final Schema schema = new Schema.Parser().parse(schemaText); + return createRecordSchema(schema, schemaText, schemaIdentifier); + }); + } catch (final SchemaNotFoundException e) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); + } + } + + + /** + * Converts an Avro Schema to a RecordSchema + * + * @param avroSchema the Avro Schema to convert + * @param text the textual representation of the schema + * @param schemaName the name of the schema + * @return the Corresponding Record Schema + */ + private RecordSchema createRecordSchema(final Schema avroSchema, final String text, final SchemaIdentifier schemaId) { + final List recordFields = new ArrayList<>(avroSchema.getFields().size()); + for (final Field field : avroSchema.getFields()) { + final String fieldName = field.name(); + final DataType dataType = determineDataType(field.schema()); + + recordFields.add(new RecordField(fieldName, dataType, field.defaultVal(), field.aliases())); + } + + final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, text, "avro", schemaId); + return recordSchema; + } + + /** + * Returns a DataType for the given Avro Schema + * + * @param avroSchema the Avro Schema to convert + * @return a Data Type that corresponds to the given Avro Schema + */ + private DataType determineDataType(final Schema avroSchema) { + final Type avroType = avroSchema.getType(); + + final LogicalType logicalType = avroSchema.getLogicalType(); + if (logicalType != null) { + final String logicalTypeName = logicalType.getName(); + switch (logicalTypeName) { + case LOGICAL_TYPE_DATE: + return RecordFieldType.DATE.getDataType(); + case LOGICAL_TYPE_TIME_MILLIS: + case LOGICAL_TYPE_TIME_MICROS: + return RecordFieldType.TIME.getDataType(); + case LOGICAL_TYPE_TIMESTAMP_MILLIS: + case LOGICAL_TYPE_TIMESTAMP_MICROS: + return RecordFieldType.TIMESTAMP.getDataType(); + } + } + + switch (avroType) { + case ARRAY: + return RecordFieldType.ARRAY.getArrayDataType(determineDataType(avroSchema.getElementType())); + case BYTES: + case FIXED: + return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); + case BOOLEAN: + return RecordFieldType.BOOLEAN.getDataType(); + case DOUBLE: + return RecordFieldType.DOUBLE.getDataType(); + case ENUM: + case STRING: + return RecordFieldType.STRING.getDataType(); + case FLOAT: + return RecordFieldType.FLOAT.getDataType(); + case INT: + return RecordFieldType.INT.getDataType(); + case LONG: + return RecordFieldType.LONG.getDataType(); + case RECORD: { + final List avroFields = avroSchema.getFields(); + final List recordFields = new ArrayList<>(avroFields.size()); + + for (final Field field : avroFields) { + final String fieldName = field.name(); + final Schema fieldSchema = field.schema(); + final DataType fieldType = determineDataType(fieldSchema); + recordFields.add(new RecordField(fieldName, fieldType, field.defaultVal(), field.aliases())); + } + + final RecordSchema recordSchema = new SimpleRecordSchema(recordFields, avroSchema.toString(), "avro", SchemaIdentifier.EMPTY); + return RecordFieldType.RECORD.getRecordDataType(recordSchema); + } + case NULL: + return RecordFieldType.STRING.getDataType(); + case MAP: + final Schema valueSchema = avroSchema.getValueType(); + final DataType valueType = determineDataType(valueSchema); + return RecordFieldType.MAP.getMapDataType(valueType); + case UNION: { + final List nonNullSubSchemas = avroSchema.getTypes().stream() + .filter(s -> s.getType() != Type.NULL) + .collect(Collectors.toList()); + + if (nonNullSubSchemas.size() == 1) { + return determineDataType(nonNullSubSchemas.get(0)); + } + + final List possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size()); + for (final Schema subSchema : nonNullSubSchemas) { + final DataType childDataType = determineDataType(subSchema); + possibleChildTypes.add(childDataType); + } + + return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes); + } + } + + return null; + } + + + @Override + public Set getSuppliedSchemaFields() { + return schemaFields; + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 000000000000..1a7506161f21 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,12 @@ +# Copyright 2016 Hortoworks, Inc. All rights reserved. +# +# Hortonworks, Inc. licenses this file to you under the Apache License, +# Version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, either express or implied. See the License for +# the specific language governing permissions and limitations under the License. +# See the associated NOTICE file for additional information regarding copyright +# ownership. +org.apache.nifi.schemaregistry.hortonworks.HortonworksSchemaRegistry \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/pom.xml new file mode 100644 index 000000000000..2493e9c66420 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/pom.xml @@ -0,0 +1,31 @@ + + + 4.0.0 + + org.apache.nifi + nifi-standard-services + 1.2.0-SNAPSHOT + + + nifi-hwx-schema-registry-bundle + pom + + + nifi-hwx-schema-registry-service + nifi-hwx-schema-registry-nar + + + diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java index b7119525ec29..d7f56647b243 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java @@ -39,58 +39,13 @@ public interface SchemaIdentifier { OptionalInt getVersion(); - public static SchemaIdentifier EMPTY = new SchemaIdentifier() { - @Override - public Optional getName() { - return Optional.empty(); - } - - @Override - public OptionalLong getIdentifier() { - return OptionalLong.empty(); - } - - @Override - public OptionalInt getVersion() { - return OptionalInt.empty(); - } - }; + public static SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null); public static SchemaIdentifier ofName(final String name) { - return new SchemaIdentifier() { - @Override - public Optional getName() { - return Optional.ofNullable(name); - } - - @Override - public OptionalLong getIdentifier() { - return OptionalLong.empty(); - } - - @Override - public OptionalInt getVersion() { - return OptionalInt.empty(); - } - }; + return new StandardSchemaIdentifier(name, null, null); } public static SchemaIdentifier of(final String name, final long identifier, final int version) { - return new SchemaIdentifier() { - @Override - public Optional getName() { - return Optional.ofNullable(name); - } - - @Override - public OptionalLong getIdentifier() { - return OptionalLong.of(identifier); - } - - @Override - public OptionalInt getVersion() { - return OptionalInt.of(version); - } - }; + return new StandardSchemaIdentifier(name, identifier, version); } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java new file mode 100644 index 000000000000..86db284f197a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; + +public class StandardSchemaIdentifier implements SchemaIdentifier { + private final Optional name; + private final OptionalLong identifier; + private final OptionalInt version; + + StandardSchemaIdentifier(final String name, final Long identifier, final Integer version) { + this.name = Optional.ofNullable(name); + this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);; + this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);; + } + + @Override + public Optional getName() { + return name; + } + + @Override + public OptionalLong getIdentifier() { + return identifier; + } + + @Override + public OptionalInt getVersion() { + return version; + } + + @Override + public int hashCode() { + return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof SchemaIdentifier)) { + return false; + } + final SchemaIdentifier other = (SchemaIdentifier) obj; + return getName().equals(other.getName()) && getIdentifier().equals(other.getIdentifier()) && getVersion().equals(other.getVersion()); + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/pom.xml index 4fac7d2a9145..126f2444137c 100644 --- a/nifi-nar-bundles/nifi-standard-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/pom.xml @@ -38,5 +38,6 @@ nifi-schema-registry-service-api nifi-record-serialization-service-api nifi-record-serialization-services-bundle + nifi-hwx-schema-registry-bundle diff --git a/pom.xml b/pom.xml index 58fa379ddea5..7c4f041714bd 100644 --- a/pom.xml +++ b/pom.xml @@ -1390,6 +1390,12 @@ 1.2.0-SNAPSHOT nar + + org.apache.nifi + nifi-hwx-schema-registry-nar + 1.2.0-SNAPSHOT + nar + org.apache.nifi nifi-properties From a94e70ef6658da28a23fa183a51510f058a00ebc Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 27 Apr 2017 13:56:58 -0400 Subject: [PATCH 2/2] NIFI-3717: Fixed licensing and contrib-check failures --- .../nifi-hwx-schema-registry-service/pom.xml | 28 +++++----- .../HortonworksSchemaRegistry.java | 51 ++++++++++++++++--- ...g.apache.nifi.controller.ControllerService | 26 ++++++---- 3 files changed, 76 insertions(+), 29 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml index e85773d7be28..b17f93b30c0a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml @@ -1,17 +1,21 @@ - + 4.0.0 diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java index 218b310f107a..793acac1cecd 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java @@ -59,7 +59,7 @@ import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; @Tags({"schema", "registry", "avro", "hortonworks", "hwx"}) -@CapabilityDescription("Provides a Schema Registry Service that interacts with a Hortonworks Schema Registry") +@CapabilityDescription("Provides a Schema Registry Service that interacts with a Hortonworks Schema Registry, available at https://github.com/hortonworks/registry") public class HortonworksSchemaRegistry extends AbstractControllerService implements SchemaRegistry { private static final Set schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); @@ -100,7 +100,7 @@ public void enable(final ConfigurationContext context) throws InitializationExce // can satisfy NiFi requirements String urlValue = context.getProperty(URL).evaluateAttributeExpressions().getValue(); if (urlValue == null || urlValue.trim().length() == 0){ - throw new IllegalArgumentException("'Schema Registry URL' must not be nul or empty."); + throw new IllegalArgumentException("'Schema Registry URL' must not be nul or empty."); } schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), urlValue); @@ -142,6 +142,10 @@ private synchronized SchemaRegistryClient getClient() { public String retrieveSchemaText(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException { try { final SchemaVersionInfo latest = getClient().getLatestSchemaVersionInfo(schemaName); + if (latest == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); + } + return latest.getSchemaText(); } catch (final SchemaNotFoundException e) { throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); @@ -154,12 +158,27 @@ public RecordSchema retrieveSchema(final String schemaName) throws org.apache.ni try { final SchemaRegistryClient client = getClient(); final SchemaMetadataInfo metadataInfo = client.getSchemaMetadataInfo(schemaName); + if (metadataInfo == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); + } + final Long schemaId = metadataInfo.getId(); + if (schemaId == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); + } + final SchemaVersionInfo versionInfo = client.getLatestSchemaVersionInfo(schemaName); + if (versionInfo == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); + } + final Integer version = versionInfo.getVersion(); - final String schemaText = versionInfo.getSchemaText(); + if (version == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); + } + final String schemaText = versionInfo.getSchemaText(); final SchemaIdentifier schemaIdentifier = (schemaId == null || version == null) ? SchemaIdentifier.ofName(schemaName) : SchemaIdentifier.of(schemaName, schemaId, version); final Tuple tuple = new Tuple<>(schemaIdentifier, schemaText); @@ -177,10 +196,20 @@ public RecordSchema retrieveSchema(final String schemaName) throws org.apache.ni public String retrieveSchemaText(final long schemaId, final int version) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException { try { final SchemaRegistryClient client = getClient(); - final SchemaMetadata metadata = client.getSchemaMetadataInfo(schemaId).getSchemaMetadata(); + final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); + if (info == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + } + + final SchemaMetadata metadata = info.getSchemaMetadata(); final String schemaName = metadata.getName(); + final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(schemaVersionKey); + if (versionInfo == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + } + return versionInfo.getSchemaText(); } catch (final SchemaNotFoundException e) { throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); @@ -191,10 +220,20 @@ public String retrieveSchemaText(final long schemaId, final int version) throws public RecordSchema retrieveSchema(final long schemaId, final int version) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException { try { final SchemaRegistryClient client = getClient(); - final SchemaMetadata metadata = client.getSchemaMetadataInfo(schemaId).getSchemaMetadata(); + final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); + if (info == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + } + + final SchemaMetadata metadata = info.getSchemaMetadata(); final String schemaName = metadata.getName(); + final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(schemaVersionKey); + if (versionInfo == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + } + final String schemaText = versionInfo.getSchemaText(); final SchemaIdentifier schemaIdentifier = SchemaIdentifier.of(schemaName, schemaId, version); @@ -214,7 +253,7 @@ public RecordSchema retrieveSchema(final long schemaId, final int version) throw * * @param avroSchema the Avro Schema to convert * @param text the textual representation of the schema - * @param schemaName the name of the schema + * @param schemaId the id of the schema * @return the Corresponding Record Schema */ private RecordSchema createRecordSchema(final Schema avroSchema, final String text, final SchemaIdentifier schemaId) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 1a7506161f21..26171c816816 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -1,12 +1,16 @@ -# Copyright 2016 Hortoworks, Inc. All rights reserved. -# -# Hortonworks, Inc. licenses this file to you under the Apache License, -# Version 2.0 (the "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software distributed -# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES -# OR CONDITIONS OF ANY KIND, either express or implied. See the License for -# the specific language governing permissions and limitations under the License. -# See the associated NOTICE file for additional information regarding copyright -# ownership. +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + org.apache.nifi.schemaregistry.hortonworks.HortonworksSchemaRegistry \ No newline at end of file