From 4e70373c410eda83bf3b2f0ef2b4add7512d7b44 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Fri, 30 Mar 2018 13:17:19 +0900 Subject: [PATCH 1/6] NIFIREG-162: Support Git backed PersistenceProvider - Added GitPersistenceProvider which manages Flow snapshot versions by Git commits - Updated FlowSnapshotContext to provide author - Added new serialization data model version 2 which stores Flow snapshots as JSON files --- nifi-registry-assembly/LICENSE | 70 +++- nifi-registry-assembly/NOTICE | 4 + .../main/asciidoc/administration-guide.adoc | 166 +++++++- nifi-registry-framework/pom.xml | 43 ++ .../flow/StandardFlowSnapshotContext.java | 14 + .../registry/provider/flow/git/Bucket.java | 79 ++++ .../nifi/registry/provider/flow/git/Flow.java | 101 +++++ .../provider/flow/git/GitFlowMetaData.java | 384 ++++++++++++++++++ .../flow/git/GitFlowPersistenceProvider.java | 256 ++++++++++++ .../VersionedProcessGroupSerializer.java | 130 +++--- .../serialization/VersionedSerializer.java | 65 +++ .../jackson/JacksonSerializer.java | 127 ++++++ ...acksonVersionedProcessGroupSerializer.java | 33 ++ .../jackson/ObjectMapperProvider.java | 43 ++ .../jackson/SerializationContainer.java | 50 +++ .../serialization/jaxb/JAXBSerializer.java | 53 ++- ...nifi.registry.flow.FlowPersistenceProvider | 3 +- .../git/TestGitFlowPersistenceProvider.java | 287 +++++++++++++ .../TestVersionedProcessGroupSerializer.java | 70 ++++ ...stJAXBVersionedProcessGroupSerializer.java | 12 +- .../serialization/json/no-version.snapshot | 5 + .../json/non-integer-version.snapshot | 6 + .../resources/serialization/ver1.snapshot | Bin 0 -> 4421 bytes .../resources/serialization/ver2.snapshot | 97 +++++ .../resources/serialization/ver3.snapshot | 6 + .../registry/flow/FlowSnapshotContext.java | 5 + .../src/main/resources/conf/providers.xml | 9 + .../web/mapper/NiFiRegistryJsonProvider.java | 16 +- 28 files changed, 2063 insertions(+), 71 deletions(-) create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/VersionedSerializer.java create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonSerializer.java create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonVersionedProcessGroupSerializer.java create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/ObjectMapperProvider.java create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/SerializationContainer.java create mode 100644 nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java create mode 100644 nifi-registry-framework/src/test/resources/serialization/json/no-version.snapshot create mode 100644 nifi-registry-framework/src/test/resources/serialization/json/non-integer-version.snapshot create mode 100644 nifi-registry-framework/src/test/resources/serialization/ver1.snapshot create mode 100644 nifi-registry-framework/src/test/resources/serialization/ver2.snapshot create mode 100644 nifi-registry-framework/src/test/resources/serialization/ver3.snapshot diff --git a/nifi-registry-assembly/LICENSE b/nifi-registry-assembly/LICENSE index 8898774e5..7f4007a29 100644 --- a/nifi-registry-assembly/LICENSE +++ b/nifi-registry-assembly/LICENSE @@ -1299,4 +1299,72 @@ This product bundles 'jQuery' which is available under an MIT license. AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - THE SOFTWARE. \ No newline at end of file + THE SOFTWARE. + +This product bundles 'JSch' which is available under a 3-Clause BSD style license. +For details see http://www.jcraft.com/jsch/LICENSE.txt + + Copyright (c) 2002-2015 Atsuhiko Yamanaka, JCraft,Inc. + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + + 3. The names of the authors may not be used to endorse or promote products + derived from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, + INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND + FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL JCRAFT, + INC. OR ANY CONTRIBUTORS TO THIS SOFTWARE BE LIABLE FOR ANY DIRECT, INDIRECT, + INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +This product bundles 'JGit' which is available under a Eclipse Distribution License - v 1.0 license. +For details see http://www.eclipse.org/org/documents/edl-v10.php + + Copyright (c) 2007, Eclipse Foundation, Inc. and its licensors. + + All rights reserved. + + Redistribution and use in source and binary forms, with or + without modification, are permitted provided that the following + conditions are met: + + - Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + - Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + - Neither the name of the Eclipse Foundation, Inc. nor the + names of its contributors may be used to endorse or promote + products derived from this software without specific prior + written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND + CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/nifi-registry-assembly/NOTICE b/nifi-registry-assembly/NOTICE index 125b5e6f4..d5fe07d32 100644 --- a/nifi-registry-assembly/NOTICE +++ b/nifi-registry-assembly/NOTICE @@ -242,6 +242,10 @@ The following binary components are provided under the Apache Software License v may be obtained from: http://www.oracle.com/webfolder/technetwork/jsc/xml/ns/javaee/index.html + (ASLv2) SnakeYAML + The following NOTICE information applies: + Copyright (c) 2008, http://www.snakeyaml.org + ************************ Common Development and Distribution License 1.1 ************************ diff --git a/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc b/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc index 93864af2d..5ea41717b 100644 --- a/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc @@ -849,7 +849,7 @@ Security Configuration section of this Administrator's Guide. === Providers Properties These properties pertain to flow persistence providers. NiFi Registry uses a pluggable flow persistence provider to store the -content of the flows saved to the registry. NiFi Registry provides the `FileSystemFlowPersistenceProvider`. +content of the flows saved to the registry. For further details on persistence providers, refer <>. |==== |*Property*|*Description* @@ -895,3 +895,167 @@ Providing 2 total locations, including `nifi.registry.extension.dir.1`. Example: `/etc/http-nifi-registry.keytab` |nifi.registry.kerberos.spengo.authentication.expiration|The expiration duration of a successful Kerberos user authentication, if used. The default value is `12 hours`. |==== + +== Persistence Providers + +NiFi Registry uses a pluggable flow persistence provider to store the content of the flows saved to the registry. NiFi Registry provides `<>` and `<>`. + +Each persistence provider has its own configuration parameters, those can be configured in a XML file specified in <>. + +The XML configuration file looks like below. It has a `flowPersistenceProvider` element in which qualified class name of a persistence provider implementation and its configuration properties are defined. See following sections for available configurations for each providers. + +.Example providers.xml +[source,xml] +.... + + + + + persistence-provider-qualified-class-name + property-value-1 + property-value-2 + property-value-n + + + +.... + + +=== FileSystemFlowPersistenceProvider + +FileSystemFlowPersistenceProvider simply stores serialized Flow contents into `{bucket-id}/{flow-id}/{version}` directories. + +Example of persisted files: +.... +Flow Storage Directory/ +├── {bucket-id}/ +│ └── {flow-id}/ +│ ├── {version}/{version}.snapshot +└── d1beba88-32e9-45d1-bfe9-057cc41f7ce8/ + └── 219cf539-427f-43be-9294-0644fb07ca63/ + ├── 1/1.snapshot + └── 2/2.snapshot +.... + +Qualified class name: `org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider` + +|==== +|*Property*|*Description* +|Flow Storage Directory|REQUIRED: File system path for a directory where flow contents files are persisted to. If the directory does not exist when NiFi Registry starts, it will be created. If the directory exists, it must be readable and writable from NiFi Registry. +|==== + + +=== GitFlowPersistenceProvider + +GitFlowPersistenceProvider stores flow contents under a Git directory. + +In contrast to FileSystemFlowPersistenceProvider, this provider uses human friendly Bucket and Flow names so that those files can be accessed by external tools. However, it is NOT supported to modify stored files outside of NiFi Registry. Persisted files are only read when NiFi Registry starts up. + +Buckets are represented as directories and Flow contents are stored as files in a Bucket directory they belong to. Flow snapshot histories are managed as Git commits, meaning only the latest version of Buckets and Flows exist in the Git directory. Old versions are retrieved from Git commit histories. + +.Example persisted files +.... +Flow Storage Directory/ +├── .git/ +├── Bucket A/ +│ ├── bucket.yml +│ ├── Flow 1.snapshot +│ └── Flow 2.snapshot +└── Bucket B/ + ├── bucket.yml + └── Flow 4.snapshot +.... + +Each Bucket directory contains a YAML file named `bucket.yml`. The file manages links from NiFi Registry Bucket and Flow IDs to actual directory and file names. When NiFi Registry starts, this provider reads through Git commit histories and lookup these `bucket.yml` files to restore Buckets and Flows for each snapshot version. + +.Example bucket.yml +[source,yml] +.... +layoutVer: 1 +bucketId: d1beba88-32e9-45d1-bfe9-057cc41f7ce8 +flows: + 219cf539-427f-43be-9294-0644fb07ca63: {ver: 7, file: Flow 1.snapshot} + 22cccb6c-3011-4493-a996-611f8f112969: {ver: 3, file: Flow 2.snapshot} +.... + +Qualified class name: `org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider` + +|==== +|*Property*|*Description* +|Flow Storage Directory|REQUIRED: File system path for a directory where flow contents files are persisted to. The directory must exist when NiFi registry starts. Also must be initialized as a Git directory. See <> for detail. +|Remote To Push|When a new flow snapshot is created, this persistence provider updated files in the specified Git directory, then create a commit to the local repository. If `Remote To Push` is defined, it also pushes to the specified remote repository. E.g. 'origin'. To define more detailed remote spec such as branch names, use `Refspec`. See https://git-scm.com/book/en/v2/Git-Internals-The-Refspec +|Remote Access User|This user name is used to make push requests to the remote repository when `Remote To Push` is enabled, and the remote repository is accessed by HTTP protocol. If SSH is used, user authentication is done with SSH keys. +|Remote Access Password|Used with `Remote Access User`. +|==== + +==== Initialize Git directory + +In order to use GitFlowPersistenceRepository, you need to prepare a Git directory on the local file system. You can do so by initializing a directory with `git init` command, or clone an existing Git project from a remote Git repository by `git clone` command. + +- Git init command +https://git-scm.com/docs/git-init +- Git clone command +https://git-scm.com/docs/git-clone + + +==== Git user configuration + +Git distinguishes a user by its username and email address. This persistence provider uses NiFi Registry username when it creates Git commits. However since NiFi Registry users do not provide email address, preconfigured Git user email address is used. + +You can configure Git user name and email address by `git config` command. + +- Git config command +https://git-scm.com/docs/git-config + + +==== Git user authentication + +By default, this persistence repository only create commits to local repository. No user authentication is needed to do so. However, if 'Commit To Push' is enabled, user authentication to the remote Git repository is required. + +If the remote repository is accessed by HTTP, then username and password for authentication can be configured in the providers XML configuration file. + +When SSH is used, SSH keys are used to identify a Git user. In order to pick the right key to a remote server, the SSH configuration file `${USER_HOME}/.ssh/config` is used. The SSH configuration file can contain multiple `Host` entries to specify a key file to login to a remote Git server. The `Host` must much with the target remote Git server hostname. + +.example SSH config file +.... +Host git.example.com + HostName git.example.com + IdentityFile ~/.ssh/id_rsa + +Host github.com + HostName github.com + IdentityFile ~/.ssh/key-for-github + +Host bitbucket.org + HostName bitbucket.org + IdentityFile ~/.ssh/key-for-bitbucket +.... + +=== Data model version of serialized Flow snapshots + +Serialized Flow snapshots saved by these persistence providers have versions, so that the data format and schema can evolve over time. Data model version update is done automatically by NiFi Registry when it reads and stores each Flow content. + +Here is the data model version histories: + +|==== +|*Data model version*|*Since NiFi Registry*|*Description* +|2|0.2|JSON formatted text file. The root object contains header and Flow content object. +|1|0.1|Binary format having header bytes at the beginning followed by Flow content represented as XML. +|==== + +=== Migrating stored files between different Persistence Provider + +If you need to migrate existing Flow snapshot files, for example, switching from `FileSystemFlowPersistenceProvider` to `GitFlowPersistenceProvider`, then follow the steps described below. + +NOTE: The `export-flow-snapshots` tool for NiFi Registry is implemented as an additional mode to the existing tool in the `nifi-toolkit`. The following sections +assume you have downloaded the binary for the nifi-toolkit. + +. Ensure NiFi Registry is running with the current Persistence Provider to migrate (e.g. FileSystemFlowPersistenceProvider) +. Create a new `providers.xml` with the new Persistence Provider configuration (e.g. GitFlowPersistenceProvider) +. Execute `registry export-flow-snapshots` from `cli.sh` or `cli.bat` provided by NiFi Toolkit. Specify the URL of currently running NiFi Registry and the providers.xml file: + + registry export-flow-snapshots --baseUrl http://localhost:18080 --input /some-directory/providers.xml --verbose + +. Then, existing Flow snapshot files are created in the directory managed by the new Persistence Provider specified by the providers.xml. +. Stop running NiFi Registry, update the provers.xml that the NiFi Registry is using to switch to the new Persistence Provider. +. Start NiFi Registry. Now the new Persistence Provider can provide existing Flow snapshots as well as support storing new snapshots. \ No newline at end of file diff --git a/nifi-registry-framework/pom.xml b/nifi-registry-framework/pom.xml index 7fadf6aae..64957af59 100644 --- a/nifi-registry-framework/pom.xml +++ b/nifi-registry-framework/pom.xml @@ -133,6 +133,19 @@ 1.8 + + org.apache.rat + apache-rat-plugin + + + src/test/resources/serialization/json/no-version.snapshot + src/test/resources/serialization/json/non-integer-version.snapshot + src/test/resources/serialization/ver1.snapshot + src/test/resources/serialization/ver2.snapshot + src/test/resources/serialization/ver3.snapshot + + + @@ -262,6 +275,36 @@ h2 1.4.196 + + org.eclipse.jgit + org.eclipse.jgit + 4.11.0.201803080745-r + + + com.jcraft + jsch + 0.1.54 + + + org.yaml + snakeyaml + 1.20 + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + + com.fasterxml.jackson.module + jackson-module-jaxb-annotations + ${jackson.version} + org.springframework.boot diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java index db60783bb..513f37fa2 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java @@ -34,6 +34,7 @@ public class StandardFlowSnapshotContext implements FlowSnapshotContext { private final int version; private final String comments; private final long snapshotTimestamp; + private final String author; private StandardFlowSnapshotContext(final Builder builder) { this.bucketId = builder.bucketId; @@ -43,6 +44,7 @@ private StandardFlowSnapshotContext(final Builder builder) { this.version = builder.version; this.comments = builder.comments; this.snapshotTimestamp = builder.snapshotTimestamp; + this.author = builder.author; Validate.notBlank(bucketId); Validate.notBlank(bucketName); @@ -87,6 +89,11 @@ public long getSnapshotTimestamp() { return snapshotTimestamp; } + @Override + public String getAuthor() { + return author; + } + /** * Builder for creating instances of StandardFlowSnapshotContext. */ @@ -99,6 +106,7 @@ public static class Builder { private int version; private String comments; private long snapshotTimestamp; + private String author; public Builder() { @@ -112,6 +120,7 @@ public Builder(final Bucket bucket, final VersionedFlow versionedFlow, final Ver version(snapshotMetadata.getVersion()); comments(snapshotMetadata.getComments()); snapshotTimestamp(snapshotMetadata.getTimestamp()); + author(snapshotMetadata.getAuthor()); } public Builder bucketId(final String bucketId) { @@ -149,6 +158,11 @@ public Builder snapshotTimestamp(final long snapshotTimestamp) { return this; } + public Builder author(final String author) { + this.author = author; + return this; + } + public StandardFlowSnapshotContext build() { return new StandardFlowSnapshotContext(this); } diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java new file mode 100644 index 000000000..3d2bdda87 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow.git; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +class Bucket { + private final String bucketId; + private String bucketName; + + /** + * Flow ID to Flow. + */ + private Map flows = new HashMap<>(); + + public Bucket(String bucketId) { + this.bucketId = bucketId; + } + + public String getBucketId() { + return bucketId; + } + + public String getBucketName() { + return bucketName; + } + + public void setBucketName(String bucketName) { + this.bucketName = bucketName; + } + + public Flow getFlowOrCreate(String flowId) { + return this.flows.computeIfAbsent(flowId, k -> new Flow(flowId)); + } + + public Optional getFlow(String flowId) { + return Optional.ofNullable(flows.get(flowId)); + } + + public void removeFlow(String flowId) { + flows.remove(flowId); + } + + public boolean isEmpty() { + return flows.isEmpty(); + } + + /** + * Serialize the latest version of this Bucket meta data. + * @return serialized bucket + */ + Map serialize() { + final Map map = new HashMap<>(); + + map.put(GitFlowMetaData.LAYOUT_VERSION, GitFlowMetaData.CURRENT_LAYOUT_VERSION); + map.put(GitFlowMetaData.BUCKET_ID, bucketId); + map.put(GitFlowMetaData.FLOWS, + flows.keySet().stream().collect(Collectors.toMap(k -> k, k -> flows.get(k).serialize()))); + + return map; + } +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java new file mode 100644 index 000000000..37078ad3e --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow.git; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +class Flow { + /** + * The ID of a Flow. It never changes. + */ + private final String flowId; + + /** + * A version to a Flow pointer. + */ + private final Map versions = new HashMap<>(); + + public Flow(String flowId) { + this.flowId = flowId; + } + + public boolean hasVersion(int version) { + return versions.containsKey(version); + } + + public FlowPointer getFlowVersion(int version) { + return versions.get(version); + } + + public void putVersion(int version, FlowPointer pointer) { + versions.put(version, pointer); + } + + public static class FlowPointer { + private String gitRev; + private String objectId; + private final String fileName; + + public FlowPointer(String fileName) { + this.fileName = fileName; + } + + public void setGitRev(String gitRev) { + this.gitRev = gitRev; + } + + public String getGitRev() { + return gitRev; + } + + public String getFileName() { + return fileName; + } + + public String getObjectId() { + return objectId; + } + + public void setObjectId(String objectId) { + this.objectId = objectId; + } + } + + /** + * Serialize the latest version of this Flow meta data. + * @return serialized flow + */ + Map serialize() { + final Map map = new HashMap<>(); + final Optional latestVerOpt = getLatestVersion(); + if (!latestVerOpt.isPresent()) { + throw new IllegalStateException("Flow version is not added yet, can not be serialized."); + } + final Integer latestVer = latestVerOpt.get(); + map.put(GitFlowMetaData.VER, latestVer); + map.put(GitFlowMetaData.FILE, versions.get(latestVer).fileName); + + return map; + } + + Optional getLatestVersion() { + return versions.keySet().stream().reduce(Integer::max); + } + +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java new file mode 100644 index 000000000..44447b2ce --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow.git; + +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.PushCommand; +import org.eclipse.jgit.api.Status; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.api.errors.NoHeadException; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.lib.UserConfig; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.revwalk.RevTree; +import org.eclipse.jgit.storage.file.FileRepositoryBuilder; +import org.eclipse.jgit.transport.CredentialsProvider; +import org.eclipse.jgit.transport.PushResult; +import org.eclipse.jgit.transport.RemoteConfig; +import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider; +import org.eclipse.jgit.treewalk.TreeWalk; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static org.apache.commons.lang3.StringUtils.isEmpty; + +class GitFlowMetaData { + + static final int CURRENT_LAYOUT_VERSION = 1; + + static final String LAYOUT_VERSION = "layoutVer"; + static final String BUCKET_ID = "bucketId"; + static final String FLOWS = "flows"; + static final String VER = "ver"; + static final String FILE = "file"; + static final String BUCKET_FILENAME = "bucket.yml"; + + private static final Logger logger = LoggerFactory.getLogger(GitFlowMetaData.class); + + private Repository gitRepo; + private String remoteToPush; + private CredentialsProvider credentialsProvider; + + /** + * Bucket ID to Bucket. + */ + private Map buckets = new HashMap<>(); + + public void setRemoteToPush(String remoteToPush) { + this.remoteToPush = remoteToPush; + } + + public void setRemoteCredential(String userName, String password) { + this.credentialsProvider = new UsernamePasswordCredentialsProvider(userName, password); + } + + /** + * Open a Git repository using the specified directory. + * @param gitProjectRootDir a root directory of a Git project + * @return created Repository + * @throws IOException thrown when the specified directory does not exist, + * does not have read/write privilege or not containing .git directory + */ + private Repository openRepository(final File gitProjectRootDir) throws IOException { + + // Instead of using FileUtils.ensureDirectoryExistAndCanReadAndWrite, check availability manually here. + // Because the util will try to create a dir if not exist. + // The git dir should be initialized and configured by users. + if (!gitProjectRootDir.isDirectory()) { + throw new IOException(format("'%s' is not a directory or does not exist.", gitProjectRootDir)); + } + + if (!(gitProjectRootDir.canRead() && gitProjectRootDir.canWrite())) { + throw new IOException(format("Directory '%s' does not have read/write privilege.", gitProjectRootDir)); + } + + // Search .git dir but avoid searching parent directories. + final FileRepositoryBuilder builder = new FileRepositoryBuilder() + .readEnvironment() + .setMustExist(true) + .addCeilingDirectory(gitProjectRootDir) + .findGitDir(gitProjectRootDir); + + if (builder.getGitDir() == null) { + throw new IOException(format("Directory '%s' does not contain a .git directory." + + " Please init and configure the directory with 'git init' command before using it from NiFi Registry.", + gitProjectRootDir)); + } + + return builder.build(); + } + + @SuppressWarnings("unchecked") + public void loadGitRepository(File gitProjectRootDir) throws IOException, GitAPIException { + gitRepo = openRepository(gitProjectRootDir); + + try (final Git git = new Git(gitRepo)) { + + // Check if remote exists. + if (!isEmpty(remoteToPush)) { + final List remotes = git.remoteList().call(); + final boolean isRemoteExist = remotes.stream().anyMatch(remote -> remote.getName().equals(remoteToPush)); + if (!isRemoteExist) { + final List remoteNames = remotes.stream().map(RemoteConfig::getName).collect(Collectors.toList()); + throw new IllegalArgumentException( + format("The configured remote '%s' to push does not exist. Available remotes are %s", remoteToPush, remoteNames)); + } + } + + boolean isLatestCommit = true; + try { + for (RevCommit commit : git.log().call()) { + final String shortCommitId = commit.getId().abbreviate(7).name(); + logger.debug("Processing a commit: {}", shortCommitId); + final RevTree tree = commit.getTree(); + + try (final TreeWalk treeWalk = new TreeWalk(gitRepo)) { + treeWalk.addTree(tree); + + // Path -> ObjectId + final Map bucketObjectIds = new HashMap<>(); + final Map flowSnapshotObjectIds = new HashMap<>(); + while (treeWalk.next()) { + if (treeWalk.isSubtree()) { + treeWalk.enterSubtree(); + } else { + final String pathString = treeWalk.getPathString(); + // TODO: what is this nth?? When does it get grater than 0? Tree count seems to be always 1.. + if (pathString.endsWith("/" + BUCKET_FILENAME)) { + bucketObjectIds.put(pathString, treeWalk.getObjectId(0)); + } else if (pathString.endsWith(GitFlowPersistenceProvider.SNAPSHOT_EXTENSION)) { + flowSnapshotObjectIds.put(pathString, treeWalk.getObjectId(0)); + } + } + } + + if (bucketObjectIds.isEmpty()) { + // No bucket.yml means at this point, all flows are deleted. No need to scan older commits because those are already deleted. + logger.debug("Tree at commit {} does not contain any " + BUCKET_FILENAME + ". Stop loading commits here.", shortCommitId); + return; + } + + loadBuckets(gitRepo, commit, isLatestCommit, bucketObjectIds, flowSnapshotObjectIds); + isLatestCommit = false; + } + } + } catch (NoHeadException e) { + logger.debug("'{}' does not have any commit yet. Starting with empty buckets.", gitProjectRootDir); + } + } + } + + @SuppressWarnings("unchecked") + private void loadBuckets(Repository gitRepo, RevCommit commit, boolean isLatestCommit, Map bucketObjectIds, Map flowSnapshotObjectIds) throws IOException { + final Yaml yaml = new Yaml(); + for (String bucketFilePath : bucketObjectIds.keySet()) { + final ObjectId bucketObjectId = bucketObjectIds.get(bucketFilePath); + final Map bucketMeta; + try (InputStream bucketIn = gitRepo.newObjectReader().open(bucketObjectId).openStream()) { + bucketMeta = yaml.load(bucketIn); + } + + if (!validateRequiredValue(bucketMeta, bucketFilePath, LAYOUT_VERSION, BUCKET_ID, FLOWS)) { + continue; + } + + int layoutVersion = (int) bucketMeta.get(LAYOUT_VERSION); + if (layoutVersion > CURRENT_LAYOUT_VERSION) { + logger.warn("{} has unsupported {} {}. This Registry can only support {} or lower. Skipping it.", + bucketFilePath, LAYOUT_VERSION, layoutVersion, CURRENT_LAYOUT_VERSION); + continue; + } + + final String bucketId = (String) bucketMeta.get(BUCKET_ID); + + final Bucket bucket; + if (isLatestCommit) { + // If this is the latest commit, then create one. + bucket = getBucketOrCreate(bucketId); + } else { + // Otherwise non-existing bucket means it's already deleted. + final Optional bucketOpt = getBucket(bucketId); + if (bucketOpt.isPresent()) { + bucket = bucketOpt.get(); + } else { + logger.debug("Bucket {} does not exist any longer. It may have been deleted.", bucketId); + continue; + } + } + + // E.g. DirA/DirB/DirC/bucket.yml -> DirC will be the bucket name. + final String[] pathNames = bucketFilePath.split("/"); + final String bucketName = pathNames[pathNames.length - 2]; + + // Since commits are read in LIFO order, avoid old commits overriding the latest bucket name. + if (isEmpty(bucket.getBucketName())) { + bucket.setBucketName(bucketName); + } + + final Map flows = (Map) bucketMeta.get(FLOWS); + loadFlows(commit, isLatestCommit, bucket, bucketFilePath, flows, flowSnapshotObjectIds); + } + } + + @SuppressWarnings("unchecked") + private void loadFlows(RevCommit commit, boolean isLatestCommit, Bucket bucket, String backetFilePath, Map flows, Map flowSnapshotObjectIds) { + for (String flowId : flows.keySet()) { + final Map flowMeta = (Map) flows.get(flowId); + + if (!validateRequiredValue(flowMeta, backetFilePath + ":" + flowId, VER, FILE)) { + continue; + } + + final Flow flow; + if (isLatestCommit) { + // If this is the latest commit, then create one. + flow = bucket.getFlowOrCreate(flowId); + } else { + // Otherwise non-existing flow means it's already deleted. + final Optional flowOpt = bucket.getFlow(flowId); + if (flowOpt.isPresent()) { + flow = flowOpt.get(); + } else { + logger.debug("Flow {} does not exist in bucket {}:{} any longer. It may have been deleted.", flowId, bucket.getBucketName(), bucket.getBucketId()); + continue; + } + } + + final int version = (int) flowMeta.get(VER); + final String flowSnapshotFilename = (String) flowMeta.get(FILE); + + // Since commits are read in LIFO order, avoid old commits overriding the latest pointer. + if (!flow.hasVersion(version)) { + final Flow.FlowPointer pointer = new Flow.FlowPointer(flowSnapshotFilename); + final File flowSnapshotFile = new File(new File(backetFilePath).getParent(), flowSnapshotFilename); + final ObjectId objectId = flowSnapshotObjectIds.get(flowSnapshotFile.getPath()); + if (objectId == null) { + logger.warn("Git object id for Flow {} version {} with path {} in bucket {}:{} was not found. Ignoring this entry.", + flowId, version, flowSnapshotFile.getPath(), bucket.getBucketName(), bucket.getBucketId()); + continue; + } + pointer.setGitRev(commit.getName()); + pointer.setObjectId(objectId.getName()); + flow.putVersion(version, pointer); + } + } + } + + private boolean validateRequiredValue(final Map map, String nameOfMap, Object ... keys) { + for (Object key : keys) { + if (!map.containsKey(key)) { + logger.warn("{} does not have {}. Skipping it.", nameOfMap, key); + return false; + } + } + return true; + } + + public Bucket getBucketOrCreate(String bucketId) { + return buckets.computeIfAbsent(bucketId, k -> new Bucket(bucketId)); + } + + public Optional getBucket(String bucketId) { + return Optional.ofNullable(buckets.get(bucketId)); + } + + + void saveBucket(final Bucket bucket, final File bucketDir) throws IOException { + final Yaml yaml = new Yaml(); + final Map serializedBucket = bucket.serialize(); + final File bucketFile = new File(bucketDir, GitFlowMetaData.BUCKET_FILENAME); + + try (final Writer writer = new OutputStreamWriter( + new FileOutputStream(bucketFile), StandardCharsets.UTF_8)) { + yaml.dump(serializedBucket, writer); + } + } + + boolean isGitDirectoryClean() throws GitAPIException { + final Status status = new Git(gitRepo).status().call(); + return status.isClean() && !status.hasUncommittedChanges(); + } + + /** + * Create a Git commit. + * @param author The name of a user who created the snapshot, it will be used as the author name. + * If not specified, the one in Git config is used. + * @param message Commit message. + * @param bucket A bucket to commit. + * @param flowPointer A flow pointer for the flow snapshot which is updated. + * After a commit is created, new commit rev id and flow snapshot file object id are set to this pointer. + * It can be null if none of flow content is modified. + */ + void commit(String author, String message, Bucket bucket, Flow.FlowPointer flowPointer) throws GitAPIException, IOException { + try (final Git git = new Git(gitRepo)) { + // Execute add command for newly added files (if any). + git.add().addFilepattern(".").call(); + + // Execute add command again for deleted files (if any). + git.add().addFilepattern(".").setUpdate(true).call(); + + final UserConfig userConfig = gitRepo.getConfig().get(UserConfig.KEY); + final String authorName = isEmpty(author) ? userConfig.getAuthorName() : author; + final String authorEmail = userConfig.getAuthorEmail(); + final RevCommit commit = git.commit() + .setAuthor(authorName, authorEmail) + .setMessage(message) + .call(); + + if (flowPointer != null) { + final RevTree tree = commit.getTree(); + final String flowSnapshotPath = new File(bucket.getBucketName(), flowPointer.getFileName()).getPath(); + try (final TreeWalk treeWalk = new TreeWalk(gitRepo)) { + treeWalk.addTree(tree); + + while (treeWalk.next()) { + if (treeWalk.isSubtree()) { + treeWalk.enterSubtree(); + } else { + final String pathString = treeWalk.getPathString(); + if (pathString.equals(flowSnapshotPath)) { + // Capture updated object id. + final String flowSnapshotObjectId = treeWalk.getObjectId(0).getName(); + flowPointer.setObjectId(flowSnapshotObjectId); + break; + } + } + } + } + + flowPointer.setGitRev(commit.getName()); + } + + // Push if necessary. + if (!isEmpty(remoteToPush)) { + logger.debug("Pushing to {}...", remoteToPush); + final PushCommand pushCommand = new Git(gitRepo).push().setRemote(remoteToPush); + if (credentialsProvider != null) { + pushCommand.setCredentialsProvider(credentialsProvider); + } + + final Iterable pushResults = pushCommand.call(); + for (PushResult pushResult : pushResults) { + logger.debug(pushResult.getMessages()); + } + } + + } + } + + byte[] getContent(String objectId) throws IOException { + final ObjectId flowSnapshotObjectId = gitRepo.resolve(objectId); + return gitRepo.newObjectReader().open(flowSnapshotObjectId).getBytes(); + } + +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java new file mode 100644 index 000000000..1cc3b0631 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow.git; + +import org.apache.nifi.registry.flow.FlowPersistenceException; +import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.flow.FlowSnapshotContext; +import org.apache.nifi.registry.provider.ProviderConfigurationContext; +import org.apache.nifi.registry.provider.ProviderCreationException; +import org.apache.nifi.registry.util.FileUtils; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.Optional; + +import static java.lang.String.format; +import static org.apache.commons.lang3.StringUtils.isEmpty; + +public class GitFlowPersistenceProvider implements FlowPersistenceProvider { + + private static final Logger logger = LoggerFactory.getLogger(GitFlowMetaData.class); + static final String FLOW_STORAGE_DIR_PROP = "Flow Storage Directory"; + private static final String REMOTE_TO_PUSH = "Remote To Push"; + private static final String REMOTE_ACCESS_USER = "Remote Access User"; + private static final String REMOTE_ACCESS_PASSWORD = "Remote Access Password"; + static final String SNAPSHOT_EXTENSION = ".snapshot"; + + private File flowStorageDir; + private GitFlowMetaData flowMetaData; + + @Override + public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException { + flowMetaData = new GitFlowMetaData(); + + final Map props = configurationContext.getProperties(); + if (!props.containsKey(FLOW_STORAGE_DIR_PROP)) { + throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " must be provided"); + } + + final String flowStorageDirValue = props.get(FLOW_STORAGE_DIR_PROP); + if (isEmpty(flowStorageDirValue)) { + throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " cannot be null or blank"); + } + + flowMetaData.setRemoteToPush(props.get(REMOTE_TO_PUSH)); + + final String remoteUser = props.get(REMOTE_ACCESS_USER); + final String remotePassword = props.get(REMOTE_ACCESS_PASSWORD); + if (!isEmpty(remoteUser) && isEmpty(remotePassword)) { + throw new ProviderCreationException(format("The property %s is specified but %s is not." + + " %s is required for username password authentication.", + REMOTE_ACCESS_USER, REMOTE_ACCESS_PASSWORD, REMOTE_ACCESS_PASSWORD)); + } + if (!isEmpty(remotePassword)) { + flowMetaData.setRemoteCredential(remoteUser, remotePassword); + } + + try { + flowStorageDir = new File(flowStorageDirValue); + flowMetaData.loadGitRepository(flowStorageDir); + logger.info("Configured GitFlowPersistenceProvider with Flow Storage Directory {}", + new Object[] {flowStorageDir.getAbsolutePath()}); + } catch (IOException|GitAPIException e) { + throw new ProviderCreationException("Failed to load a git repository " + flowStorageDir, e); + } + } + + @Override + public void saveFlowContent(FlowSnapshotContext context, byte[] content) throws FlowPersistenceException { + + try { + // Check if working dir is clean, any uncommitted file? + if (!flowMetaData.isGitDirectoryClean()) { + throw new FlowPersistenceException(format("Git directory %s is not clean" + + " or has uncommitted changes, resolve those changes first to save flow contents.", + flowStorageDir)); + } + } catch (GitAPIException e) { + throw new FlowPersistenceException(format("Failed to get Git status for directory %s due to %s", + flowStorageDir, e)); + } + + final String bucketId = context.getBucketId(); + final Bucket bucket = flowMetaData.getBucketOrCreate(bucketId); + final String currentBucketName = bucket.getBucketName(); + final String bucketName = context.getBucketName(); + final boolean isBucketNameChanged = !bucketName.equals(currentBucketName); + bucket.setBucketName(bucketName); + + final Flow flow = bucket.getFlowOrCreate(context.getFlowId()); + final String flowSnapshotFilename = context.getFlowName() + SNAPSHOT_EXTENSION; + + final Optional currentFlowSnapshotFilename = flow + .getLatestVersion().map(flow::getFlowVersion).map(Flow.FlowPointer::getFileName); + + // Add new version. + final Flow.FlowPointer flowPointer = new Flow.FlowPointer(flowSnapshotFilename); + flow.putVersion(context.getVersion(), flowPointer); + + final File bucketDir = new File(flowStorageDir, bucket.getBucketName()); + final File flowSnippetFile = new File(bucketDir, flowSnapshotFilename); + + final File currentBucketDir = isEmpty(currentBucketName) ? null : new File(flowStorageDir, currentBucketName); + if (currentBucketDir != null && currentBucketDir.isDirectory()) { + if (isBucketNameChanged) { + logger.debug("Detected bucket name change from {} to {}, moving it.", currentBucketName, bucketName); + if (!currentBucketDir.renameTo(bucketDir)) { + throw new FlowPersistenceException(format("Failed to move existing bucket %s to %s.", currentBucketDir, bucketDir)); + } + } + } else { + if (!bucketDir.mkdirs()) { + throw new FlowPersistenceException(format("Failed to create new bucket dir %s.", bucketDir)); + } + } + + + try { + if (currentFlowSnapshotFilename.isPresent() && !flowSnapshotFilename.equals(currentFlowSnapshotFilename.get())) { + // Delete old file if flow name has been changed. + final File latestFlowSnapshotFile = new File(bucketDir, currentFlowSnapshotFilename.get()); + logger.debug("Detected flow name change from {} to {}, deleting the old snapshot file.", + currentFlowSnapshotFilename.get(), flowSnapshotFilename); + latestFlowSnapshotFile.delete(); + } + + // Save the content. + try (final OutputStream os = new FileOutputStream(flowSnippetFile)) { + os.write(content); + os.flush(); + } + + // Write a bucket file. + flowMetaData.saveBucket(bucket, bucketDir); + + // Create a Git Commit. + flowMetaData.commit(context.getAuthor(), context.getComments(), bucket, flowPointer); + + } catch (IOException|GitAPIException e) { + throw new FlowPersistenceException("Failed to persist flow.", e); + } + + // TODO: What if user rebased commits? Version number to Commit ID mapping will be broken. + } + + @Override + public byte[] getFlowContent(String bucketId, String flowId, int version) throws FlowPersistenceException { + + final Bucket bucket = getBucketOrFail(bucketId); + final Flow flow = getFlowOrFail(bucket, flowId); + if (!flow.hasVersion(version)) { + throw new FlowPersistenceException(format("Flow ID %s version %d was not found in bucket %s:%s.", + flowId, version, bucket.getBucketName(), bucketId)); + } + + final Flow.FlowPointer flowPointer = flow.getFlowVersion(version); + try { + return flowMetaData.getContent(flowPointer.getObjectId()); + } catch (IOException e) { + throw new FlowPersistenceException(format("Failed to get content of Flow ID %s version %d in bucket %s:%s due to %s.", + flowId, version, bucket.getBucketName(), bucketId, e), e); + } + } + + // TODO: Need to add userId argument? + @Override + public void deleteAllFlowContent(String bucketId, String flowId) throws FlowPersistenceException { + final Bucket bucket = getBucketOrFail(bucketId); + final Flow flow = getFlowOrFail(bucket, flowId); + final Optional latestVersionOpt = flow.getLatestVersion(); + if (!latestVersionOpt.isPresent()) { + throw new IllegalStateException("Flow version is not added yet, can not be deleted."); + } + + final Integer latestVersion = latestVersionOpt.get(); + final Flow.FlowPointer flowPointer = flow.getFlowVersion(latestVersion); + + // Delete the flow snapshot. + final File bucketDir = new File(flowStorageDir, bucket.getBucketName()); + final File flowSnapshotFile = new File(bucketDir, flowPointer.getFileName()); + if (flowSnapshotFile.exists()) { + if (!flowSnapshotFile.delete()) { + throw new FlowPersistenceException(format("Failed to delete flow content for %s:%s in bucket %s:%s", + flowPointer.getFileName(), flowId, bucket.getBucketName(), bucketId)); + } + } + + bucket.removeFlow(flowId); + + try { + + if (bucket.isEmpty()) { + // delete bucket dir if this is the last flow. + FileUtils.deleteFile(bucketDir, true); + } else { + // Write a bucket file. + flowMetaData.saveBucket(bucket, bucketDir); + } + + // Create a Git Commit. + final String commitMessage = format("Deleted flow %s:%s in bucket %s:%s.", + flowPointer.getFileName(), flowId, bucket.getBucketName(), bucketId); + flowMetaData.commit(null, commitMessage, bucket, null); + + } catch (IOException|GitAPIException e) { + throw new FlowPersistenceException(format("Failed to delete flow %s:%s in bucket %s:%s due to %s", + flowPointer.getFileName(), flowId, bucket.getBucketName(), bucketId, e), e); + } + + } + + private Bucket getBucketOrFail(String bucketId) throws FlowPersistenceException { + final Optional bucketOpt = flowMetaData.getBucket(bucketId); + if (!bucketOpt.isPresent()) { + throw new FlowPersistenceException(format("Bucket ID %s was not found.", bucketId)); + } + + return bucketOpt.get(); + } + + private Flow getFlowOrFail(Bucket bucket, String flowId) throws FlowPersistenceException { + final Optional flowOpt = bucket.getFlow(flowId); + if (!flowOpt.isPresent()) { + throw new FlowPersistenceException(format("Flow ID %s was not found in bucket %s:%s.", + flowId, bucket.getBucketName(), bucket.getBucketId())); + } + + return flowOpt.get(); + } + + @Override + public void deleteFlowContent(String bucketId, String flowId, int version) throws FlowPersistenceException { + // TODO: Do nothing? This signature is not used. Actually there's nothing to do to the old versions as those exist in old commits even if this method is called. + } + +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/VersionedProcessGroupSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/VersionedProcessGroupSerializer.java index ae667b483..7322ce71c 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/VersionedProcessGroupSerializer.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/VersionedProcessGroupSerializer.java @@ -17,87 +17,121 @@ package org.apache.nifi.registry.serialization; import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.registry.serialization.jackson.JacksonVersionedProcessGroupSerializer; import org.apache.nifi.registry.serialization.jaxb.JAXBVersionedProcessGroupSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; +import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; /** - * A serializer for VersionedFlowSnapshots that maps a "version" of the data model to a serializer. The version - * will be written to a header at the beginning of the OutputStream, and then the object and the OutputStream will - * be passed on to the real serializer for the given version. Similarly, when deserializing, the header will first be - * read from the InputStream to determine the version, and then the InputStream will be passed to the deserializer - * for the given version. + *

+ * A serializer for VersionedProcessGroup that maps a "version" of the data model to a serializer. + *

+ * + *

+ * When serializing, the serializer associated with the {@link #CURRENT_DATA_MODEL_VERSION} is used. + * The version will be written as a header at the beginning of the OutputStream then followed by the content. + *

+ * + *

+ * When deserializing, each registered serializer will be asked to read a data model version number from the input stream + * in descending version order until a version number is read successfully. + * Then the associated serializer to the read data model version is used to deserialize content back to the target object. + * If no serializer can read the version, or no serializer is registered for the read version, then SerializationException is thrown. + *

+ * + *

+ * Current data model version is 2. + * Data Model Version Histories: + *

    + *
  • version 2: Serialized by {@link JacksonVersionedProcessGroupSerializer}
  • + *
  • version 1: Serialized by {@link JAXBVersionedProcessGroupSerializer}
  • + *
+ *

*/ @Service public class VersionedProcessGroupSerializer implements Serializer { - static final String MAGIC_HEADER = "Flows"; - static final byte[] MAGIC_HEADER_BYTES = MAGIC_HEADER.getBytes(StandardCharsets.UTF_8); + private static final Logger logger = LoggerFactory.getLogger(VersionedProcessGroupSerializer.class); - static final Integer CURRENT_VERSION = 1; + static final Integer CURRENT_DATA_MODEL_VERSION = 2; - private final Map> serializersByVersion; + private final Map> serializersByVersion; + private final VersionedSerializer defaultSerializer; + private final List descendingVersions; + public static final int MAX_HEADER_BYTES = 1024; public VersionedProcessGroupSerializer() { - final Map> tempSerializers = new HashMap<>(); - tempSerializers.put(CURRENT_VERSION, new JAXBVersionedProcessGroupSerializer()); + + final Map> tempSerializers = new HashMap<>(); + tempSerializers.put(2, new JacksonVersionedProcessGroupSerializer()); + tempSerializers.put(1, new JAXBVersionedProcessGroupSerializer()); + this.serializersByVersion = Collections.unmodifiableMap(tempSerializers); + this.defaultSerializer = tempSerializers.get(CURRENT_DATA_MODEL_VERSION); + + final List sortedVersions = new ArrayList<>(serializersByVersion.keySet()); + sortedVersions.sort(Collections.reverseOrder(Integer::compareTo)); + this.descendingVersions = sortedVersions; } @Override public void serialize(final VersionedProcessGroup versionedProcessGroup, final OutputStream out) throws SerializationException { - final ByteBuffer byteBuffer = ByteBuffer.allocate(9); - byteBuffer.put(MAGIC_HEADER_BYTES); - byteBuffer.putInt(CURRENT_VERSION); - - try { - out.write(byteBuffer.array()); - } catch (final IOException e) { - throw new SerializationException("Unable to write header while serializing process group", e); - } - - final Serializer serializer = serializersByVersion.get(CURRENT_VERSION); - if (serializer == null) { - throw new SerializationException("No process group serializer for version " + CURRENT_VERSION); - } - serializer.serialize(versionedProcessGroup, out); + defaultSerializer.serialize(CURRENT_DATA_MODEL_VERSION, versionedProcessGroup, out); } @Override public VersionedProcessGroup deserialize(final InputStream input) throws SerializationException { - final int headerLength = 9; - final byte[] buffer = new byte[headerLength]; - - int bytesRead = -1; - try { - bytesRead = input.read(buffer, 0, headerLength); - } catch (final IOException e) { - throw new SerializationException("Unable to read header while deserializing process group", e); - } - if (bytesRead < headerLength) { - throw new SerializationException("Unable to read header while deserializing process group, expected" - + headerLength + " bytes, but found " + bytesRead); + final InputStream markSupportedInput = input.markSupported() ? input : new BufferedInputStream(input); + + // Mark the beginning of the stream. + markSupportedInput.mark(MAX_HEADER_BYTES); + + // Applying each serializer + for (int serializerVersion : descendingVersions) { + final VersionedSerializer serializer = serializersByVersion.get(serializerVersion); + + // Serializer version will not be the data model version always. + // E.g. higher version of serializer can read the old data model version number if it has the same header structure, + // but it does not mean the serializer is compatible with the old format. + final int version; + try { + version = serializer.readDataModelVersion(markSupportedInput); + if (!serializersByVersion.containsKey(version)) { + throw new SerializationException(String.format( + "Version %d was returned by %s, but no serializer is registered for that version.", version, serializer)); + } + } catch (SerializationException e) { + logger.debug("Deserialization failed with {}", serializer, e); + continue; + } finally { + // Either when continue with the next serializer, or proceed deserialization with the corresponding serializer, + // reset the stream position. + try { + markSupportedInput.reset(); + } catch (IOException resetException) { + // Should not happen. + logger.error("Unable to reset the input stream.", resetException); + } + } + + return serializersByVersion.get(version).deserialize(markSupportedInput); } - final ByteBuffer bb = ByteBuffer.wrap(buffer); - final int version = bb.getInt(MAGIC_HEADER_BYTES.length); - - final Serializer serializer = serializersByVersion.get(Integer.valueOf(version)); - if (serializer == null) { - throw new SerializationException("No process group serializer for version " + version); - } + throw new SerializationException("Unable to find a process group serializer compatible with the input."); - return serializer.deserialize(input); } } diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/VersionedSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/VersionedSerializer.java new file mode 100644 index 000000000..b3c626f78 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/VersionedSerializer.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.serialization; + +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Serializes and de-serializes objects. + * This interface is designed to provide backward compatibility to different versioned serialization formats. + * So that serialized data model and format can evolve overtime. + */ +public interface VersionedSerializer { + + /** + * Serialize the given object into the target output stream with the specified version format. + * Implementation classes are responsible to serialize the version to the head of the serialized content, + * so that it can be retrieved by {@link #readDataModelVersion(InputStream)} method efficiently + * without reading the entire byte array. + * + * @param dataModelVersion the data model version + * @param t the object to serialize + * @param out the target output stream + * @throws SerializationException thrown when serialization failed + */ + void serialize(int dataModelVersion, T t, OutputStream out) throws SerializationException; + + /** + * Read data model version from the given InputStream. + *

+ * Even if an implementation serializer was able to read a version, it does not necessary mean + * the same serializers {@link #deserialize(InputStream)} method will be called. + * For example, when the header structure has not been changed, the newer version of serializer may be able to + * read older data model version. But deserialization should be done with the older serializer. + *

+ * @param input the input stream to read version from + * @return the read data model version + * @throws SerializationException thrown when reading version failed + */ + int readDataModelVersion(InputStream input) throws SerializationException; + + /** + * Deserializes the given InputStream back to an object of the given type. + * + * @param input the InputStream to deserialize, + * the position of input is reset to the the beginning of the stream when this method is called + * @return the deserialized object + * @throws SerializationException thrown when deserialization failed + */ + T deserialize(InputStream input) throws SerializationException; +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonSerializer.java new file mode 100644 index 000000000..4098c77bb --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonSerializer.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.serialization.jackson; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.registry.serialization.SerializationException; +import org.apache.nifi.registry.serialization.VersionedSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; + +import static org.apache.nifi.registry.serialization.VersionedProcessGroupSerializer.MAX_HEADER_BYTES; + +/** + * A Serializer that uses Jackson for serializing/deserializing. + */ +public abstract class JacksonSerializer implements VersionedSerializer { + + private static final Logger logger = LoggerFactory.getLogger(JacksonSerializer.class); + + private static final String JSON_HEADER = "\"header\""; + private static final String DATA_MODEL_VERSION = "dataModelVersion"; + + private final ObjectMapper objectMapper = ObjectMapperProvider.getMapper(); + + @Override + public void serialize(int dataModelVersion, T t, OutputStream out) throws SerializationException { + if (t == null) { + throw new IllegalArgumentException("The object to serialize cannot be null"); + } + + if (out == null) { + throw new IllegalArgumentException("OutputStream cannot be null"); + } + + final SerializationContainer container = new SerializationContainer<>(); + container.setHeader(Collections.singletonMap(DATA_MODEL_VERSION, String.valueOf(dataModelVersion))); + container.setContent(t); + + try { + objectMapper.writerWithDefaultPrettyPrinter().writeValue(out, container); + } catch (IOException e) { + throw new SerializationException("Unable to serialize object", e); + } + } + + @Override + public T deserialize(InputStream input) throws SerializationException { + final TypeReference> typeRef = getDeserializeTypeRef(); + try { + final SerializationContainer container = objectMapper.readValue(input, typeRef); + return container.getContent(); + } catch (IOException e) { + throw new SerializationException("Unable to deserialize object", e); + } + } + + abstract TypeReference> getDeserializeTypeRef() throws SerializationException; + + @Override + public int readDataModelVersion(InputStream input) throws SerializationException { + final byte[] headerBytes = new byte[MAX_HEADER_BYTES]; + final int readHeaderBytes; + try { + readHeaderBytes = input.read(headerBytes); + } catch (IOException e) { + throw new SerializationException("Could not read additional bytes to parse as serialization version 2 or later. " + + e.getMessage(), e); + } + + // Seek '"header"'. + final String headerStr = new String(headerBytes, 0, readHeaderBytes, StandardCharsets.UTF_8); + final int headerIndex = headerStr.indexOf(JSON_HEADER); + if (headerIndex < 0) { + throw new SerializationException(String.format("Could not find %s in the first %d bytes", + JSON_HEADER, readHeaderBytes)); + } + + final int headerStart = headerStr.indexOf("{", headerIndex); + if (headerStart < 0) { + throw new SerializationException(String.format("Could not find '{' starting header object in the first %d bytes.", readHeaderBytes)); + } + + final int headerEnd = headerStr.indexOf("}", headerStart); + if (headerEnd < 0) { + throw new SerializationException(String.format("Could not find '}' ending header object in the first %d bytes.", readHeaderBytes)); + } + + final String headerObjectStr = headerStr.substring(headerStart, headerEnd + 1); + logger.debug("headerObjectStr={}", headerObjectStr); + + try { + final TypeReference> typeRef = new TypeReference>() {}; + final HashMap header = objectMapper.readValue(headerObjectStr, typeRef); + if (!header.containsKey(DATA_MODEL_VERSION)) { + throw new SerializationException("Missing " + DATA_MODEL_VERSION); + } + + return Integer.parseInt(header.get(DATA_MODEL_VERSION)); + } catch (IOException e) { + throw new SerializationException(String.format("Failed to parse header string '%s' due to %s", headerObjectStr, e), e); + } catch (NumberFormatException e) { + throw new SerializationException(String.format("Failed to parse version string due to %s", e.getMessage()), e); + } + } +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonVersionedProcessGroupSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonVersionedProcessGroupSerializer.java new file mode 100644 index 000000000..21bdecc3f --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/JacksonVersionedProcessGroupSerializer.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.serialization.jackson; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.registry.serialization.SerializationException; + +/** + * A Jackson serializer for VersionedFlowSnapshots. + */ +public class JacksonVersionedProcessGroupSerializer extends JacksonSerializer { + + + @Override + TypeReference> getDeserializeTypeRef() throws SerializationException { + return new TypeReference>() {}; + } +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/ObjectMapperProvider.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/ObjectMapperProvider.java new file mode 100644 index 000000000..080d258fd --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/ObjectMapperProvider.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.serialization.jackson; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; + +/** + * Provides a singleton ObjectMapper. + */ +public abstract class ObjectMapperProvider { + + private static final ObjectMapper mapper = new ObjectMapper(); + + static { + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + mapper.setDefaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL)); + mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector(mapper.getTypeFactory())); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true); + } + + public static ObjectMapper getMapper() { + return mapper; + } +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/SerializationContainer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/SerializationContainer.java new file mode 100644 index 000000000..8c4d47401 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jackson/SerializationContainer.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.serialization.jackson; + +import io.swagger.annotations.ApiModelProperty; + +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlType; +import java.util.Map; + +@XmlRootElement +@XmlType(propOrder = {"header", "content"}) +public class SerializationContainer { + + private Map header; + private T content; + + @ApiModelProperty("The serialization headers") + public Map getHeader() { + return header; + } + + public void setHeader(Map header) { + this.header = header; + } + + @ApiModelProperty("The serialized content") + public T getContent() { + return content; + } + + public void setContent(T content) { + this.content = content; + } +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java index 515de10ae..5290fb5ad 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java @@ -17,19 +17,25 @@ package org.apache.nifi.registry.serialization.jaxb; import org.apache.nifi.registry.serialization.SerializationException; -import org.apache.nifi.registry.serialization.Serializer; +import org.apache.nifi.registry.serialization.VersionedSerializer; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; import javax.xml.bind.Marshaller; import javax.xml.bind.Unmarshaller; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; /** * A Serializer that uses JAXB for serializing/deserializing. */ -public class JAXBSerializer implements Serializer { +public class JAXBSerializer implements VersionedSerializer { + + private static final String MAGIC_HEADER = "Flows"; + private static final byte[] MAGIC_HEADER_BYTES = MAGIC_HEADER.getBytes(StandardCharsets.UTF_8); private final JAXBContext jaxbContext; @@ -45,7 +51,7 @@ public JAXBSerializer(final Class clazz) { } @Override - public void serialize(final T t, final OutputStream out) throws SerializationException { + public void serialize(final int dataModelVersion, final T t, final OutputStream out) throws SerializationException { if (t == null) { throw new IllegalArgumentException("The object to serialize cannot be null"); } @@ -54,6 +60,16 @@ public void serialize(final T t, final OutputStream out) throws SerializationExc throw new IllegalArgumentException("OutputStream cannot be null"); } + final ByteBuffer byteBuffer = ByteBuffer.allocate(9); + byteBuffer.put(MAGIC_HEADER_BYTES); + byteBuffer.putInt(dataModelVersion); + + try { + out.write(byteBuffer.array()); + } catch (final IOException e) { + throw new SerializationException("Unable to write header while serializing process group", e); + } + try { final Marshaller marshaller = jaxbContext.createMarshaller(); marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); @@ -70,6 +86,8 @@ public T deserialize(final InputStream input) throws SerializationException { } try { + // Consume the header bytes. + readDataModelVersion(input); final Unmarshaller unmarshaller = jaxbContext.createUnmarshaller(); return (T) unmarshaller.unmarshal(input); } catch (JAXBException e) { @@ -77,4 +95,33 @@ public T deserialize(final InputStream input) throws SerializationException { } } + @Override + public int readDataModelVersion(InputStream input) throws SerializationException { + final int headerLength = 9; + final byte[] buffer = new byte[headerLength]; + + int bytesRead = -1; + try { + bytesRead = input.read(buffer, 0, headerLength); + } catch (final IOException e) { + throw new SerializationException("Unable to read header while deserializing process group", e); + } + + if (bytesRead < headerLength) { + throw new SerializationException("Unable to read header while deserializing process group, expected" + + headerLength + " bytes, but found " + bytesRead); + } + + final ByteBuffer bb = ByteBuffer.wrap(buffer); + final byte[] magicHeaderBytes = new byte[MAGIC_HEADER_BYTES.length]; + bb.get(magicHeaderBytes); + for (int i = 0; i < MAGIC_HEADER_BYTES.length; i++) { + if (MAGIC_HEADER_BYTES[i] != magicHeaderBytes[i]) { + throw new SerializationException("Unable to read header while deserializing process group." + + " Header byte sequence does not match"); + } + } + + return bb.getInt(MAGIC_HEADER_BYTES.length); + } } diff --git a/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider b/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider index 6d4fdfce8..e456fa20f 100644 --- a/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider +++ b/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider @@ -12,4 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider \ No newline at end of file +org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider +org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider \ No newline at end of file diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java new file mode 100644 index 000000000..c8591ac22 --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow.git; + +import org.apache.nifi.registry.flow.FlowPersistenceException; +import org.apache.nifi.registry.provider.ProviderConfigurationContext; +import org.apache.nifi.registry.provider.ProviderCreationException; +import org.apache.nifi.registry.provider.StandardProviderConfigurationContext; +import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext; +import org.apache.nifi.registry.util.FileUtils; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.revwalk.RevCommit; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class TestGitFlowPersistenceProvider { + + private static final Logger logger = LoggerFactory.getLogger(TestGitFlowPersistenceProvider.class); + + private void assertCreationFailure(final Map properties, final Consumer assertion) { + final GitFlowPersistenceProvider persistenceProvider = new GitFlowPersistenceProvider(); + + try { + final ProviderConfigurationContext configurationContext = new StandardProviderConfigurationContext(properties); + persistenceProvider.onConfigured(configurationContext); + fail("Should fail"); + } catch (ProviderCreationException e) { + assertion.accept(e); + } + } + + @Test + public void testNoFlowStorageDirSpecified() { + final Map properties = new HashMap<>(); + assertCreationFailure(properties, + e -> assertEquals("The property Flow Storage Directory must be provided", e.getMessage())); + } + + @Test + public void testLoadNonExistingDir() { + final Map properties = new HashMap<>(); + properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target/non-existing"); + assertCreationFailure(properties, + e -> assertEquals("'target/non-existing' is not a directory or does not exist.", e.getCause().getMessage())); + } + + @Test + public void testLoadNonGitDir() { + final Map properties = new HashMap<>(); + properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target"); + assertCreationFailure(properties, + e -> assertEquals("Directory 'target' does not contain a .git directory." + + " Please init and configure the directory with 'git init' command before using it from NiFi Registry.", + e.getCause().getMessage())); + } + + @FunctionalInterface + private interface GitConsumer { + void accept(Git git) throws GitAPIException; + } + + private void assertProvider(final Map properties, final GitConsumer gitConsumer, final Consumer assertion, boolean deleteDir) + throws IOException, GitAPIException { + + final File gitDir = new File(properties.get(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP)); + try { + FileUtils.ensureDirectoryExistAndCanReadAndWrite(gitDir); + + try (final Git git = Git.init().setDirectory(gitDir).call()) { + logger.debug("Initiated a git repository {}", git); + gitConsumer.accept(git); + } + + final GitFlowPersistenceProvider persistenceProvider = new GitFlowPersistenceProvider(); + + final ProviderConfigurationContext configurationContext = new StandardProviderConfigurationContext(properties); + persistenceProvider.onConfigured(configurationContext); + assertion.accept(persistenceProvider); + + } finally { + if (deleteDir) { + FileUtils.deleteFile(gitDir, true); + } + } + } + + @Test + public void testLoadEmptyGitDir() throws GitAPIException, IOException { + final Map properties = new HashMap<>(); + properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target/empty-git"); + + assertProvider(properties, g -> {}, p -> { + try { + p.getFlowContent("bucket-id-A", "flow-id-1", 1); + } catch (FlowPersistenceException e) { + assertEquals("Bucket ID bucket-id-A was not found.", e.getMessage()); + } + }, true); + } + + @Test + public void testLoadCommitHistories() throws GitAPIException, IOException { + final Map properties = new HashMap<>(); + properties.put(GitFlowPersistenceProvider.FLOW_STORAGE_DIR_PROP, "target/repo-with-histories"); + + assertProvider(properties, g -> {}, p -> { + // Create some Flows and keep the directory. + final StandardFlowSnapshotContext.Builder contextBuilder = new StandardFlowSnapshotContext.Builder() + .bucketId("bucket-id-A") + .bucketName("BucketA") + .flowId("flow-id-1") + .flowName("Flow1") + .author("unit-test-user") + .comments("Initial commit.") + .snapshotTimestamp(new Date().getTime()) + .version(1); + + final byte[] flow1Ver1 = "Flow1 ver.1".getBytes(StandardCharsets.UTF_8); + p.saveFlowContent(contextBuilder.build(), flow1Ver1); + + contextBuilder.comments("2nd commit.").version(2); + final byte[] flow1Ver2 = "Flow1 ver.2".getBytes(StandardCharsets.UTF_8); + p.saveFlowContent(contextBuilder.build(), flow1Ver2); + + // Rename flow. + contextBuilder.flowName("FlowOne").comments("3rd commit.").version(3); + final byte[] flow1Ver3 = "FlowOne ver.3".getBytes(StandardCharsets.UTF_8); + p.saveFlowContent(contextBuilder.build(), flow1Ver3); + + // Adding another flow. + contextBuilder.flowId("flow-id-2").flowName("FlowTwo").comments("4th commit.").version(1); + final byte[] flow2Ver1 = "FlowTwo ver.1".getBytes(StandardCharsets.UTF_8); + p.saveFlowContent(contextBuilder.build(), flow2Ver1); + + // Rename bucket. + contextBuilder.bucketName("New name for Bucket A").comments("5th commit.").version(2); + final byte[] flow2Ver2 = "FlowTwo ver.2".getBytes(StandardCharsets.UTF_8); + p.saveFlowContent(contextBuilder.build(), flow2Ver2); + + + }, false); + + assertProvider(properties, g -> { + // Assert commit. + final AtomicInteger commitCount = new AtomicInteger(0); + final String[] commitMessages = { + "5th commit.", + "4th commit.", + "3rd commit.", + "2nd commit.", + "Initial commit." + }; + for (RevCommit commit : g.log().call()) { + assertEquals("unit-test-user", commit.getAuthorIdent().getName()); + final int commitIndex = commitCount.getAndIncrement(); + assertEquals(commitMessages[commitIndex], commit.getShortMessage()); + } + assertEquals(commitMessages.length, commitCount.get()); + }, p -> { + // Should be able to load flow from commit histories. + final byte[] flow1Ver1 = p.getFlowContent("bucket-id-A", "flow-id-1", 1); + assertEquals("Flow1 ver.1", new String(flow1Ver1, StandardCharsets.UTF_8)); + + final byte[] flow1Ver2 = p.getFlowContent("bucket-id-A", "flow-id-1", 2); + assertEquals("Flow1 ver.2", new String(flow1Ver2, StandardCharsets.UTF_8)); + + // Even if the name of flow has been changed, it can be retrieved by the same flow id. + final byte[] flow1Ver3 = p.getFlowContent("bucket-id-A", "flow-id-1", 3); + assertEquals("FlowOne ver.3", new String(flow1Ver3, StandardCharsets.UTF_8)); + + final byte[] flow2Ver1 = p.getFlowContent("bucket-id-A", "flow-id-2", 1); + assertEquals("FlowTwo ver.1", new String(flow2Ver1, StandardCharsets.UTF_8)); + + // Even if the name of bucket has been changed, it can be retrieved by the same flow id. + final byte[] flow2Ver2 = p.getFlowContent("bucket-id-A", "flow-id-2", 2); + assertEquals("FlowTwo ver.2", new String(flow2Ver2, StandardCharsets.UTF_8)); + + // Delete the 2nd flow. + p.deleteAllFlowContent("bucket-id-A", "flow-id-2"); + + }, false); + + assertProvider(properties, g -> { + // Assert commit. + final AtomicInteger commitCount = new AtomicInteger(0); + final String[] commitMessages = { + "Deleted flow FlowTwo.snapshot:flow-id-2 in bucket New name for Bucket A:bucket-id-A.", + "5th commit.", + "4th commit.", + "3rd commit.", + "2nd commit.", + "Initial commit." + }; + for (RevCommit commit : g.log().call()) { + // TODO: We don't support author for delete operations yet. The author is a global configured author here. + // assertEquals("unit-test-user", commit.getAuthorIdent().getName()); + final int commitIndex = commitCount.getAndIncrement(); + assertEquals(commitMessages[commitIndex], commit.getShortMessage()); + } + assertEquals(commitMessages.length, commitCount.get()); + }, p -> { + // Should be able to load flow from commit histories. + final byte[] flow1Ver1 = p.getFlowContent("bucket-id-A", "flow-id-1", 1); + assertEquals("Flow1 ver.1", new String(flow1Ver1, StandardCharsets.UTF_8)); + + final byte[] flow1Ver2 = p.getFlowContent("bucket-id-A", "flow-id-1", 2); + assertEquals("Flow1 ver.2", new String(flow1Ver2, StandardCharsets.UTF_8)); + + // Even if the name of flow has been changed, it can be retrieved by the same flow id. + final byte[] flow1Ver3 = p.getFlowContent("bucket-id-A", "flow-id-1", 3); + assertEquals("FlowOne ver.3", new String(flow1Ver3, StandardCharsets.UTF_8)); + + // The 2nd flow has been deleted, and should not exist. + try { + p.getFlowContent("bucket-id-A", "flow-id-2", 1); + } catch (FlowPersistenceException e) { + assertEquals("Flow ID flow-id-2 was not found in bucket New name for Bucket A:bucket-id-A.", e.getMessage()); + } + + try { + p.getFlowContent("bucket-id-A", "flow-id-2", 2); + } catch (FlowPersistenceException e) { + assertEquals("Flow ID flow-id-2 was not found in bucket New name for Bucket A:bucket-id-A.", e.getMessage()); + } + + // Delete the 1st flow, too. + p.deleteAllFlowContent("bucket-id-A", "flow-id-1"); + + }, false); + + assertProvider(properties, g -> { + // Assert commit. + final AtomicInteger commitCount = new AtomicInteger(0); + final String[] commitMessages = { + "Deleted flow FlowOne.snapshot:flow-id-1 in bucket New name for Bucket A:bucket-id-A.", + "Deleted flow FlowTwo.snapshot:flow-id-2 in bucket New name for Bucket A:bucket-id-A.", + "5th commit.", + "4th commit.", + "3rd commit.", + "2nd commit.", + "Initial commit." + }; + for (RevCommit commit : g.log().call()) { + // TODO: We don't support author for delete operations yet. The author is a global configured author here. + // assertEquals("unit-test-user", commit.getAuthorIdent().getName()); + final int commitIndex = commitCount.getAndIncrement(); + assertEquals(commitMessages[commitIndex], commit.getShortMessage()); + } + assertEquals(commitMessages.length, commitCount.get()); + }, p -> { + // The 1st flow has been deleted, and should not exist. Moreover, the bucket A has been deleted since there's no flow. + try { + p.getFlowContent("bucket-id-A", "flow-id-1", 1); + } catch (FlowPersistenceException e) { + assertEquals("Bucket ID bucket-id-A was not found.", e.getMessage()); + } + }, true); + } +} diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java index 75c76b7ed..584e2f7dd 100644 --- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestVersionedProcessGroupSerializer.java @@ -23,6 +23,11 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; public class TestVersionedProcessGroupSerializer { @@ -57,4 +62,69 @@ public void testSerializeDeserializeFlowSnapshot() throws SerializationException Assert.assertEquals(processor1.getName(), deserializedProcessor1.getName()); } + + @Test + public void testDeserializeJsonNonIntegerVersion() throws IOException { + final String file = "/serialization/json/non-integer-version.snapshot"; + final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer(); + try (final InputStream is = this.getClass().getResourceAsStream(file)) { + try { + serializer.deserialize(is); + fail("Should fail"); + } catch (SerializationException e) { + assertEquals("Unable to find a process group serializer compatible with the input.", e.getMessage()); + } + } + } + + @Test + public void testDeserializeJsonNoVersion() throws IOException { + final String file = "/serialization/json/no-version.snapshot"; + final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer(); + try (final InputStream is = this.getClass().getResourceAsStream(file)) { + try { + serializer.deserialize(is); + fail("Should fail"); + } catch (SerializationException e) { + assertEquals("Unable to find a process group serializer compatible with the input.", e.getMessage()); + } + } + } + + @Test + public void testDeserializeVer1() throws IOException { + final String file = "/serialization/ver1.snapshot"; + final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer(); + final VersionedProcessGroup processGroup; + try (final InputStream is = this.getClass().getResourceAsStream(file)) { + processGroup = serializer.deserialize(is); + } + System.out.printf("processGroup=" + processGroup); + } + + @Test + public void testDeserializeVer2() throws IOException { + final String file = "/serialization/ver2.snapshot"; + final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer(); + final VersionedProcessGroup processGroup; + try (final InputStream is = this.getClass().getResourceAsStream(file)) { + processGroup = serializer.deserialize(is); + } + System.out.printf("processGroup=" + processGroup); + } + + @Test + public void testDeserializeVer3() throws IOException { + final String file = "/serialization/ver3.snapshot"; + final VersionedProcessGroupSerializer serializer = new VersionedProcessGroupSerializer(); + try (final InputStream is = this.getClass().getResourceAsStream(file)) { + try { + serializer.deserialize(is); + fail("Should fail"); + } catch (SerializationException e) { + assertEquals("Unable to find a process group serializer compatible with the input.", e.getMessage()); + } + } + } + } diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBVersionedProcessGroupSerializer.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBVersionedProcessGroupSerializer.java index 1ea9e7c25..916e0531d 100644 --- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBVersionedProcessGroupSerializer.java +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBVersionedProcessGroupSerializer.java @@ -19,7 +19,7 @@ import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.VersionedProcessor; import org.apache.nifi.registry.serialization.SerializationException; -import org.apache.nifi.registry.serialization.Serializer; +import org.apache.nifi.registry.serialization.VersionedSerializer; import org.junit.Assert; import org.junit.Test; @@ -31,7 +31,7 @@ public class TestJAXBVersionedProcessGroupSerializer { @Test public void testSerializeDeserializeFlowSnapshot() throws SerializationException { - final Serializer serializer = new JAXBVersionedProcessGroupSerializer(); + final VersionedSerializer serializer = new JAXBVersionedProcessGroupSerializer(); final VersionedProcessGroup processGroup1 = new VersionedProcessGroup(); processGroup1.setIdentifier("pg1"); @@ -45,12 +45,18 @@ public void testSerializeDeserializeFlowSnapshot() throws SerializationException processGroup1.getProcessors().add(processor1); final ByteArrayOutputStream out = new ByteArrayOutputStream(); - serializer.serialize(processGroup1, out); + serializer.serialize(1, processGroup1, out); final String snapshotStr = new String(out.toByteArray(), StandardCharsets.UTF_8); //System.out.println(snapshotStr); final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); + in.mark(1024); + final int version = serializer.readDataModelVersion(in); + + Assert.assertEquals(1, version); + + in.reset(); final VersionedProcessGroup deserializedProcessGroup1 = serializer.deserialize(in); Assert.assertEquals(processGroup1.getIdentifier(), deserializedProcessGroup1.getIdentifier()); diff --git a/nifi-registry-framework/src/test/resources/serialization/json/no-version.snapshot b/nifi-registry-framework/src/test/resources/serialization/json/no-version.snapshot new file mode 100644 index 000000000..ce1901f5c --- /dev/null +++ b/nifi-registry-framework/src/test/resources/serialization/json/no-version.snapshot @@ -0,0 +1,5 @@ +{ + "header": { + }, + "content": {} +} \ No newline at end of file diff --git a/nifi-registry-framework/src/test/resources/serialization/json/non-integer-version.snapshot b/nifi-registry-framework/src/test/resources/serialization/json/non-integer-version.snapshot new file mode 100644 index 000000000..33d4da35a --- /dev/null +++ b/nifi-registry-framework/src/test/resources/serialization/json/non-integer-version.snapshot @@ -0,0 +1,6 @@ +{ + "header": { + "dataModelVersion": "One" + }, + "content": {} +} \ No newline at end of file diff --git a/nifi-registry-framework/src/test/resources/serialization/ver1.snapshot b/nifi-registry-framework/src/test/resources/serialization/ver1.snapshot new file mode 100644 index 0000000000000000000000000000000000000000..7c1ab49b60b229521ca7aab81f28b69bdb8ad632 GIT binary patch literal 4421 zcmd6rTW{hx6o9+0^D83Wa}Q}ExgK@sWh-@B1a$Xt<8!gUb9~E9n*U;FXJ@~mubXA6t%0&6&n^bGVGT6MqC8Hr#l_%hcB2OajTIz| zNt$PHF{pqIzTz)vYXb3@<`FP5HR;>V!Cy%4+ z>Ga3#tM+mY_teyl$eXQ(?~Ja-~@==h|I@2G3Q^;+R^y6ml9qtws>QL%KOH4Lio+17N7jTDrK?Sozj^&0 z3u%3Or+D8;5xtxckf-xq#`lyZf0dx^W+^*9%sELJC-E+kuSh|(o18AmE9soUhHIP> zlNVi2X^E9s)LV50ETYM(=q$-H)5j{{4TaaKwWcWJgsoCiJ&1j1w5wumDymp*>Njpy zWH!oq>E?U^qf?;kBm%t6#xa$Cr*eVb8&|tE?mE8vMhr5sSFW3gt>D1_X^DCj`iax- zAiQ*}QdJzO;(o(pB>x+1r5m3M^kTbB>+`uDm8{5@dLifAvDL1hp2~M5U1{~yRU7(E zKMwrAj_-4&;nZcnAD;?X^`y%CEk6#t+KBm3_<{sD5)zhD3W literal 0 HcmV?d00001 diff --git a/nifi-registry-framework/src/test/resources/serialization/ver2.snapshot b/nifi-registry-framework/src/test/resources/serialization/ver2.snapshot new file mode 100644 index 000000000..7f4dfc558 --- /dev/null +++ b/nifi-registry-framework/src/test/resources/serialization/ver2.snapshot @@ -0,0 +1,97 @@ +{ + "header": { + "dataModelVersion": 2 + }, + "content": { + "identifier": "a2c80883-171c-316d-ba25-24df2c352693", + "name": "Flow1", + "comments": "", + "position": { + "x": 1549.249149182042, + "y": 764.2426186568309 + }, + "processGroups": [], + "remoteProcessGroups": [], + "processors": [ + { + "identifier": "92fe4513-21c0-34f6-a916-2874f46ae864", + "name": "GenerateFlowFile", + "comments": "", + "position": { + "x": 488.99999411591034, + "y": 114.00000359389122 + }, + "bundle": { + "group": "org.apache.nifi", + "artifact": "nifi-standard-nar", + "version": "1.6.0-SNAPSHOT" + }, + "style": {}, + "type": "org.apache.nifi.processors.standard.GenerateFlowFile", + "properties": { + "character-set": "UTF-8", + "File Size": "0B", + "Batch Size": "1", + "Unique FlowFiles": "false", + "Data Format": "Text" + }, + "propertyDescriptors": { + "character-set": { + "name": "character-set", + "displayName": "Character Set", + "identifiesControllerService": false, + "sensitive": false + }, + "File Size": { + "name": "File Size", + "displayName": "File Size", + "identifiesControllerService": false, + "sensitive": false + }, + "generate-ff-custom-text": { + "name": "generate-ff-custom-text", + "displayName": "Custom Text", + "identifiesControllerService": false, + "sensitive": false + }, + "Batch Size": { + "name": "Batch Size", + "displayName": "Batch Size", + "identifiesControllerService": false, + "sensitive": false + }, + "Unique FlowFiles": { + "name": "Unique FlowFiles", + "displayName": "Unique FlowFiles", + "identifiesControllerService": false, + "sensitive": false + }, + "Data Format": { + "name": "Data Format", + "displayName": "Data Format", + "identifiesControllerService": false, + "sensitive": false + } + }, + "schedulingPeriod": "0 sec", + "schedulingStrategy": "TIMER_DRIVEN", + "executionNode": "ALL", + "penaltyDuration": "30 sec", + "yieldDuration": "1 sec", + "bulletinLevel": "WARN", + "runDurationMillis": 0, + "concurrentlySchedulableTaskCount": 1, + "componentType": "PROCESSOR", + "groupIdentifier": "a2c80883-171c-316d-ba25-24df2c352693" + } + ], + "inputPorts": [], + "outputPorts": [], + "connections": [], + "labels": [], + "funnels": [], + "controllerServices": [], + "variables": {}, + "componentType": "PROCESS_GROUP" + } +} \ No newline at end of file diff --git a/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot b/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot new file mode 100644 index 000000000..574fe56e0 --- /dev/null +++ b/nifi-registry-framework/src/test/resources/serialization/ver3.snapshot @@ -0,0 +1,6 @@ +{ + "header": { + "dataModelVersion": 3 + }, + "content": {} +} \ No newline at end of file diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java index c5e06f560..c4bdd4650 100644 --- a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java @@ -56,4 +56,9 @@ public interface FlowSnapshotContext { */ long getSnapshotTimestamp(); + /** + * @return the name of the user who created the snapshot + */ + String getAuthor(); + } diff --git a/nifi-registry-resources/src/main/resources/conf/providers.xml b/nifi-registry-resources/src/main/resources/conf/providers.xml index 40bf01297..720bee2c0 100644 --- a/nifi-registry-resources/src/main/resources/conf/providers.xml +++ b/nifi-registry-resources/src/main/resources/conf/providers.xml @@ -20,4 +20,13 @@ ./flow_storage +
\ No newline at end of file diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/NiFiRegistryJsonProvider.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/NiFiRegistryJsonProvider.java index a2174ba31..10a044aca 100644 --- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/NiFiRegistryJsonProvider.java +++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/NiFiRegistryJsonProvider.java @@ -16,10 +16,7 @@ */ package org.apache.nifi.registry.web.mapper; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; +import org.apache.nifi.registry.serialization.jackson.ObjectMapperProvider; import org.glassfish.jersey.jackson.internal.jackson.jaxrs.json.JacksonJaxbJsonProvider; import org.springframework.stereotype.Component; @@ -32,17 +29,8 @@ @Produces(MediaType.APPLICATION_JSON) public class NiFiRegistryJsonProvider extends JacksonJaxbJsonProvider { - private static final ObjectMapper mapper = new ObjectMapper(); - - static { - mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); - mapper.setPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL)); - mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector(mapper.getTypeFactory())); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - public NiFiRegistryJsonProvider() { super(); - setMapper(mapper); + setMapper(ObjectMapperProvider.getMapper()); } } From 90ca2ac5386aaeabaeaec59b76ca6fdd0e7d1471 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Fri, 27 Apr 2018 15:17:02 +0900 Subject: [PATCH 2/6] Added filename sanitizing. Also, changed git/Bucket.java to have bucketDirName instead of raw bucketName because it can not be restored when loaded from Git, and it is only used for logging. --- .../registry/provider/flow/git/Bucket.java | 18 +++++++--- .../nifi/registry/provider/flow/git/Flow.java | 4 +++ .../provider/flow/git/GitFlowMetaData.java | 16 ++++----- .../flow/git/GitFlowPersistenceProvider.java | 31 +++++++++-------- .../git/TestGitFlowPersistenceProvider.java | 4 +-- .../apache/nifi/registry/util/FileUtils.java | 34 +++++++++++++++++++ 6 files changed, 77 insertions(+), 30 deletions(-) diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java index 3d2bdda87..3595d847c 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java @@ -23,7 +23,7 @@ class Bucket { private final String bucketId; - private String bucketName; + private String bucketDirName; /** * Flow ID to Flow. @@ -38,12 +38,20 @@ public String getBucketId() { return bucketId; } - public String getBucketName() { - return bucketName; + /** + * Returns the directory name of this bucket. + * @return can be different from original bucket name if it contained sanitized character. + */ + public String getBucketDirName() { + return bucketDirName; } - public void setBucketName(String bucketName) { - this.bucketName = bucketName; + /** + * Set the name of bucket directory. + * @param bucketDirName The directory name must be sanitized, use {@link org.apache.nifi.registry.util.FileUtils#sanitizeFilename(String)} to do so. + */ + public void setBucketDirName(String bucketDirName) { + this.bucketDirName = bucketDirName; } public Flow getFlowOrCreate(String flowId) { diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java index 37078ad3e..1bc7f3f55 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java @@ -52,6 +52,10 @@ public static class FlowPointer { private String objectId; private final String fileName; + /** + * Create new FlowPointer instance. + * @param fileName The filename must be sanitized, use {@link org.apache.nifi.registry.util.FileUtils#sanitizeFilename(String)} to do so. + */ public FlowPointer(String fileName) { this.fileName = fileName; } diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java index 44447b2ce..ea57ba3a4 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java @@ -216,13 +216,12 @@ private void loadBuckets(Repository gitRepo, RevCommit commit, boolean isLatestC } } - // E.g. DirA/DirB/DirC/bucket.yml -> DirC will be the bucket name. - final String[] pathNames = bucketFilePath.split("/"); - final String bucketName = pathNames[pathNames.length - 2]; + // Since the bucketName is restored from pathname, it can be different from the original bucket name when it sanitized. + final String bucketDirName = bucketFilePath.substring(0, bucketFilePath.lastIndexOf("/")); // Since commits are read in LIFO order, avoid old commits overriding the latest bucket name. - if (isEmpty(bucket.getBucketName())) { - bucket.setBucketName(bucketName); + if (isEmpty(bucket.getBucketDirName())) { + bucket.setBucketDirName(bucketDirName); } final Map flows = (Map) bucketMeta.get(FLOWS); @@ -249,7 +248,7 @@ private void loadFlows(RevCommit commit, boolean isLatestCommit, Bucket bucket, if (flowOpt.isPresent()) { flow = flowOpt.get(); } else { - logger.debug("Flow {} does not exist in bucket {}:{} any longer. It may have been deleted.", flowId, bucket.getBucketName(), bucket.getBucketId()); + logger.debug("Flow {} does not exist in bucket {}:{} any longer. It may have been deleted.", flowId, bucket.getBucketDirName(), bucket.getBucketId()); continue; } } @@ -264,7 +263,7 @@ private void loadFlows(RevCommit commit, boolean isLatestCommit, Bucket bucket, final ObjectId objectId = flowSnapshotObjectIds.get(flowSnapshotFile.getPath()); if (objectId == null) { logger.warn("Git object id for Flow {} version {} with path {} in bucket {}:{} was not found. Ignoring this entry.", - flowId, version, flowSnapshotFile.getPath(), bucket.getBucketName(), bucket.getBucketId()); + flowId, version, flowSnapshotFile.getPath(), bucket.getBucketDirName(), bucket.getBucketId()); continue; } pointer.setGitRev(commit.getName()); @@ -337,7 +336,8 @@ void commit(String author, String message, Bucket bucket, Flow.FlowPointer flowP if (flowPointer != null) { final RevTree tree = commit.getTree(); - final String flowSnapshotPath = new File(bucket.getBucketName(), flowPointer.getFileName()).getPath(); + final String bucketDirName = bucket.getBucketDirName(); + final String flowSnapshotPath = new File(bucketDirName, flowPointer.getFileName()).getPath(); try (final TreeWalk treeWalk = new TreeWalk(gitRepo)) { treeWalk.addTree(tree); diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java index 1cc3b0631..b674c96d0 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java @@ -35,6 +35,7 @@ import static java.lang.String.format; import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.nifi.registry.util.FileUtils.sanitizeFilename; public class GitFlowPersistenceProvider implements FlowPersistenceProvider { @@ -102,13 +103,13 @@ public void saveFlowContent(FlowSnapshotContext context, byte[] content) throws final String bucketId = context.getBucketId(); final Bucket bucket = flowMetaData.getBucketOrCreate(bucketId); - final String currentBucketName = bucket.getBucketName(); - final String bucketName = context.getBucketName(); - final boolean isBucketNameChanged = !bucketName.equals(currentBucketName); - bucket.setBucketName(bucketName); + final String currentBucketDirName = bucket.getBucketDirName(); + final String bucketDirName = sanitizeFilename(context.getBucketName()); + final boolean isBucketNameChanged = !bucketDirName.equals(currentBucketDirName); + bucket.setBucketDirName(bucketDirName); final Flow flow = bucket.getFlowOrCreate(context.getFlowId()); - final String flowSnapshotFilename = context.getFlowName() + SNAPSHOT_EXTENSION; + final String flowSnapshotFilename = sanitizeFilename(context.getFlowName()) + SNAPSHOT_EXTENSION; final Optional currentFlowSnapshotFilename = flow .getLatestVersion().map(flow::getFlowVersion).map(Flow.FlowPointer::getFileName); @@ -117,13 +118,13 @@ public void saveFlowContent(FlowSnapshotContext context, byte[] content) throws final Flow.FlowPointer flowPointer = new Flow.FlowPointer(flowSnapshotFilename); flow.putVersion(context.getVersion(), flowPointer); - final File bucketDir = new File(flowStorageDir, bucket.getBucketName()); + final File bucketDir = new File(flowStorageDir, bucketDirName); final File flowSnippetFile = new File(bucketDir, flowSnapshotFilename); - final File currentBucketDir = isEmpty(currentBucketName) ? null : new File(flowStorageDir, currentBucketName); + final File currentBucketDir = isEmpty(currentBucketDirName) ? null : new File(flowStorageDir, currentBucketDirName); if (currentBucketDir != null && currentBucketDir.isDirectory()) { if (isBucketNameChanged) { - logger.debug("Detected bucket name change from {} to {}, moving it.", currentBucketName, bucketName); + logger.debug("Detected bucket name change from {} to {}, moving it.", currentBucketDirName, bucketDirName); if (!currentBucketDir.renameTo(bucketDir)) { throw new FlowPersistenceException(format("Failed to move existing bucket %s to %s.", currentBucketDir, bucketDir)); } @@ -170,7 +171,7 @@ public byte[] getFlowContent(String bucketId, String flowId, int version) throws final Flow flow = getFlowOrFail(bucket, flowId); if (!flow.hasVersion(version)) { throw new FlowPersistenceException(format("Flow ID %s version %d was not found in bucket %s:%s.", - flowId, version, bucket.getBucketName(), bucketId)); + flowId, version, bucket.getBucketDirName(), bucketId)); } final Flow.FlowPointer flowPointer = flow.getFlowVersion(version); @@ -178,7 +179,7 @@ public byte[] getFlowContent(String bucketId, String flowId, int version) throws return flowMetaData.getContent(flowPointer.getObjectId()); } catch (IOException e) { throw new FlowPersistenceException(format("Failed to get content of Flow ID %s version %d in bucket %s:%s due to %s.", - flowId, version, bucket.getBucketName(), bucketId, e), e); + flowId, version, bucket.getBucketDirName(), bucketId, e), e); } } @@ -196,12 +197,12 @@ public void deleteAllFlowContent(String bucketId, String flowId) throws FlowPers final Flow.FlowPointer flowPointer = flow.getFlowVersion(latestVersion); // Delete the flow snapshot. - final File bucketDir = new File(flowStorageDir, bucket.getBucketName()); + final File bucketDir = new File(flowStorageDir, bucket.getBucketDirName()); final File flowSnapshotFile = new File(bucketDir, flowPointer.getFileName()); if (flowSnapshotFile.exists()) { if (!flowSnapshotFile.delete()) { throw new FlowPersistenceException(format("Failed to delete flow content for %s:%s in bucket %s:%s", - flowPointer.getFileName(), flowId, bucket.getBucketName(), bucketId)); + flowPointer.getFileName(), flowId, bucket.getBucketDirName(), bucketId)); } } @@ -219,12 +220,12 @@ public void deleteAllFlowContent(String bucketId, String flowId) throws FlowPers // Create a Git Commit. final String commitMessage = format("Deleted flow %s:%s in bucket %s:%s.", - flowPointer.getFileName(), flowId, bucket.getBucketName(), bucketId); + flowPointer.getFileName(), flowId, bucket.getBucketDirName(), bucketId); flowMetaData.commit(null, commitMessage, bucket, null); } catch (IOException|GitAPIException e) { throw new FlowPersistenceException(format("Failed to delete flow %s:%s in bucket %s:%s due to %s", - flowPointer.getFileName(), flowId, bucket.getBucketName(), bucketId, e), e); + flowPointer.getFileName(), flowId, bucket.getBucketDirName(), bucketId, e), e); } } @@ -242,7 +243,7 @@ private Flow getFlowOrFail(Bucket bucket, String flowId) throws FlowPersistenceE final Optional flowOpt = bucket.getFlow(flowId); if (!flowOpt.isPresent()) { throw new FlowPersistenceException(format("Flow ID %s was not found in bucket %s:%s.", - flowId, bucket.getBucketName(), bucket.getBucketId())); + flowId, bucket.getBucketDirName(), bucket.getBucketId())); } return flowOpt.get(); diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java index c8591ac22..3b5bb232e 100644 --- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java @@ -135,9 +135,9 @@ public void testLoadCommitHistories() throws GitAPIException, IOException { // Create some Flows and keep the directory. final StandardFlowSnapshotContext.Builder contextBuilder = new StandardFlowSnapshotContext.Builder() .bucketId("bucket-id-A") - .bucketName("BucketA") + .bucketName("C'est/Bucket A/です。") .flowId("flow-id-1") - .flowName("Flow1") + .flowName("テスト_用/フロー#1\\[contains invalid chars]") .author("unit-test-user") .comments("Initial commit.") .snapshotTimestamp(new Date().getTime()) diff --git a/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java b/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java index b7476b947..54a6585b4 100644 --- a/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java +++ b/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java @@ -389,4 +389,38 @@ public static void sleepQuietly(final long millis) { /* do nothing */ } } + + + // The invalid character list is copied from this Stackoverflow page. + // https://stackoverflow.com/questions/1155107/is-there-a-cross-platform-java-method-to-remove-filename-special-chars + private final static int[] INVALID_CHARS = {34, 60, 62, 124, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, + 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 58, 42, 63, 92, 47}; + + static { + Arrays.sort(INVALID_CHARS); + } + + /** + * Replaces invalid characters for a file system name within a given filename string to underscore '_'. + * Be careful not to pass a file path as this method replaces path delimiter characters (i.e forward/back slashes). + * @param filename The filename to clean + * @return sanitized filename + */ + public static String sanitizeFilename(String filename) { + if (filename == null || filename.isEmpty()) { + return filename; + } + int codePointCount = filename.codePointCount(0, filename.length()); + + final StringBuilder cleanName = new StringBuilder(); + for (int i = 0; i < codePointCount; i++) { + int c = filename.codePointAt(i); + if (Arrays.binarySearch(INVALID_CHARS, c) < 0) { + cleanName.appendCodePoint(c); + } else { + cleanName.append('_'); + } + } + return cleanName.toString(); + } } From e9b529093698d15026255e44dc6359baca7362f8 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Fri, 27 Apr 2018 19:49:09 +0900 Subject: [PATCH 3/6] Push asynchronously. --- .../provider/flow/git/GitFlowMetaData.java | 64 ++++++++++++++++--- .../flow/git/GitFlowPersistenceProvider.java | 1 + 2 files changed, 56 insertions(+), 9 deletions(-) diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java index ea57ba3a4..3d9801000 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.registry.provider.flow.git; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.eclipse.jgit.api.Git; import org.eclipse.jgit.api.PushCommand; import org.eclipse.jgit.api.Status; @@ -47,6 +48,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static java.lang.String.format; @@ -69,6 +76,8 @@ class GitFlowMetaData { private String remoteToPush; private CredentialsProvider credentialsProvider; + private final BlockingQueue pushQueue = new ArrayBlockingQueue<>(1); + /** * Bucket ID to Bucket. */ @@ -175,9 +184,51 @@ public void loadGitRepository(File gitProjectRootDir) throws IOException, GitAPI } catch (NoHeadException e) { logger.debug("'{}' does not have any commit yet. Starting with empty buckets.", gitProjectRootDir); } + } } + void startPushThread() { + // If successfully loaded, start pushing thread if necessary. + if (isEmpty(remoteToPush)) { + return; + } + + final ThreadFactory threadFactory = new BasicThreadFactory.Builder() + .daemon(true).namingPattern(getClass().getSimpleName() + " Push thread").build(); + + // Use scheduled fixed delay to control the minimum interval between push activities. + // The necessity of executing push is controlled by offering messages to the pushQueue. + // If multiple commits are made within this time window, those are pushed by a single push execution. + final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + executorService.scheduleWithFixedDelay(() -> { + + final Long offeredTimestamp; + try { + offeredTimestamp = pushQueue.take(); + } catch (InterruptedException e) { + logger.warn("Waiting for push request has been interrupted due to {}", e.getMessage(), e); + return; + } + + logger.debug("Took a push request sent at {} to {}...", offeredTimestamp, remoteToPush); + final PushCommand pushCommand = new Git(gitRepo).push().setRemote(remoteToPush); + if (credentialsProvider != null) { + pushCommand.setCredentialsProvider(credentialsProvider); + } + + try { + final Iterable pushResults = pushCommand.call(); + for (PushResult pushResult : pushResults) { + logger.debug(pushResult.getMessages()); + } + } catch (GitAPIException e) { + logger.error(format("Failed to push commits to %s due to %s", remoteToPush, e), e); + } + + }, 10, 10, TimeUnit.SECONDS); + } + @SuppressWarnings("unchecked") private void loadBuckets(Repository gitRepo, RevCommit commit, boolean isLatestCommit, Map bucketObjectIds, Map flowSnapshotObjectIds) throws IOException { final Yaml yaml = new Yaml(); @@ -361,15 +412,10 @@ void commit(String author, String message, Bucket bucket, Flow.FlowPointer flowP // Push if necessary. if (!isEmpty(remoteToPush)) { - logger.debug("Pushing to {}...", remoteToPush); - final PushCommand pushCommand = new Git(gitRepo).push().setRemote(remoteToPush); - if (credentialsProvider != null) { - pushCommand.setCredentialsProvider(credentialsProvider); - } - - final Iterable pushResults = pushCommand.call(); - for (PushResult pushResult : pushResults) { - logger.debug(pushResult.getMessages()); + // Use different thread since it takes longer. + final long offeredTimestamp = System.currentTimeMillis(); + if (pushQueue.offer(offeredTimestamp)) { + logger.debug("New push request is offered at {}.", offeredTimestamp); } } diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java index b674c96d0..f642632b7 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java @@ -79,6 +79,7 @@ public void onConfigured(ProviderConfigurationContext configurationContext) thro try { flowStorageDir = new File(flowStorageDirValue); flowMetaData.loadGitRepository(flowStorageDir); + flowMetaData.startPushThread(); logger.info("Configured GitFlowPersistenceProvider with Flow Storage Directory {}", new Object[] {flowStorageDir.getAbsolutePath()}); } catch (IOException|GitAPIException e) { From 766f335744d98c214592c5493de16909ad6ba468 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Tue, 8 May 2018 12:10:56 +0900 Subject: [PATCH 4/6] Added white space(32) to the invalid character list --- .../git/TestGitFlowPersistenceProvider.java | 10 +++--- .../apache/nifi/registry/util/FileUtils.java | 4 +-- .../nifi/registry/util/TestFileUtils.java | 31 +++++++++++++++++++ 3 files changed, 38 insertions(+), 7 deletions(-) create mode 100644 nifi-registry-utils/src/test/java/org/apache/nifi/registry/util/TestFileUtils.java diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java index 3b5bb232e..970dd4892 100644 --- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java @@ -212,7 +212,7 @@ public void testLoadCommitHistories() throws GitAPIException, IOException { // Assert commit. final AtomicInteger commitCount = new AtomicInteger(0); final String[] commitMessages = { - "Deleted flow FlowTwo.snapshot:flow-id-2 in bucket New name for Bucket A:bucket-id-A.", + "Deleted flow FlowTwo.snapshot:flow-id-2 in bucket New_name_for_Bucket_A:bucket-id-A.", "5th commit.", "4th commit.", "3rd commit.", @@ -242,13 +242,13 @@ public void testLoadCommitHistories() throws GitAPIException, IOException { try { p.getFlowContent("bucket-id-A", "flow-id-2", 1); } catch (FlowPersistenceException e) { - assertEquals("Flow ID flow-id-2 was not found in bucket New name for Bucket A:bucket-id-A.", e.getMessage()); + assertEquals("Flow ID flow-id-2 was not found in bucket New_name_for_Bucket_A:bucket-id-A.", e.getMessage()); } try { p.getFlowContent("bucket-id-A", "flow-id-2", 2); } catch (FlowPersistenceException e) { - assertEquals("Flow ID flow-id-2 was not found in bucket New name for Bucket A:bucket-id-A.", e.getMessage()); + assertEquals("Flow ID flow-id-2 was not found in bucket New_name_for_Bucket_A:bucket-id-A.", e.getMessage()); } // Delete the 1st flow, too. @@ -260,8 +260,8 @@ public void testLoadCommitHistories() throws GitAPIException, IOException { // Assert commit. final AtomicInteger commitCount = new AtomicInteger(0); final String[] commitMessages = { - "Deleted flow FlowOne.snapshot:flow-id-1 in bucket New name for Bucket A:bucket-id-A.", - "Deleted flow FlowTwo.snapshot:flow-id-2 in bucket New name for Bucket A:bucket-id-A.", + "Deleted flow FlowOne.snapshot:flow-id-1 in bucket New_name_for_Bucket_A:bucket-id-A.", + "Deleted flow FlowTwo.snapshot:flow-id-2 in bucket New_name_for_Bucket_A:bucket-id-A.", "5th commit.", "4th commit.", "3rd commit.", diff --git a/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java b/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java index 54a6585b4..5abaf7e66 100644 --- a/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java +++ b/nifi-registry-utils/src/main/java/org/apache/nifi/registry/util/FileUtils.java @@ -391,10 +391,10 @@ public static void sleepQuietly(final long millis) { } - // The invalid character list is copied from this Stackoverflow page. + // The invalid character list is derived from this Stackoverflow page. // https://stackoverflow.com/questions/1155107/is-there-a-cross-platform-java-method-to-remove-filename-special-chars private final static int[] INVALID_CHARS = {34, 60, 62, 124, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, - 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 58, 42, 63, 92, 47}; + 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 58, 42, 63, 92, 47, 32}; static { Arrays.sort(INVALID_CHARS); diff --git a/nifi-registry-utils/src/test/java/org/apache/nifi/registry/util/TestFileUtils.java b/nifi-registry-utils/src/test/java/org/apache/nifi/registry/util/TestFileUtils.java new file mode 100644 index 000000000..d4bc9631b --- /dev/null +++ b/nifi-registry-utils/src/test/java/org/apache/nifi/registry/util/TestFileUtils.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.util; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestFileUtils { + @Test + public void testSanitizeFilename() { + String filename = "This / is / a test"; + final String sanitizedFilename = FileUtils.sanitizeFilename(filename); + assertEquals("This___is___a_test", sanitizedFilename); + } +} \ No newline at end of file From 92b28ca58ed7dc9e06f4aadbc2f4cf80494293f9 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Tue, 8 May 2018 14:44:18 +0900 Subject: [PATCH 5/6] Git user as commit author and append NiFi Registry user to commit message --- .../provider/flow/git/GitFlowMetaData.java | 12 +++------ .../git/TestGitFlowPersistenceProvider.java | 25 +++++++++++-------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java index 3d9801000..4faf00745 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java @@ -24,7 +24,6 @@ import org.eclipse.jgit.api.errors.NoHeadException; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.Repository; -import org.eclipse.jgit.lib.UserConfig; import org.eclipse.jgit.revwalk.RevCommit; import org.eclipse.jgit.revwalk.RevTree; import org.eclipse.jgit.storage.file.FileRepositoryBuilder; @@ -361,8 +360,7 @@ boolean isGitDirectoryClean() throws GitAPIException { /** * Create a Git commit. - * @param author The name of a user who created the snapshot, it will be used as the author name. - * If not specified, the one in Git config is used. + * @param author The name of a NiFi Registry user who created the snapshot. It will be added to the commit message. * @param message Commit message. * @param bucket A bucket to commit. * @param flowPointer A flow pointer for the flow snapshot which is updated. @@ -377,12 +375,10 @@ void commit(String author, String message, Bucket bucket, Flow.FlowPointer flowP // Execute add command again for deleted files (if any). git.add().addFilepattern(".").setUpdate(true).call(); - final UserConfig userConfig = gitRepo.getConfig().get(UserConfig.KEY); - final String authorName = isEmpty(author) ? userConfig.getAuthorName() : author; - final String authorEmail = userConfig.getAuthorEmail(); + final String commitMessage = isEmpty(author) ? message + : format("%s\n\nBy NiFi Registry user: %s", message, author); final RevCommit commit = git.commit() - .setAuthor(authorName, authorEmail) - .setMessage(message) + .setMessage(commitMessage) .call(); if (flowPointer != null) { diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java index 970dd4892..45351abc1 100644 --- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/git/TestGitFlowPersistenceProvider.java @@ -24,6 +24,7 @@ import org.apache.nifi.registry.util.FileUtils; import org.eclipse.jgit.api.Git; import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.lib.StoredConfig; import org.eclipse.jgit.revwalk.RevCommit; import org.junit.Test; import org.slf4j.Logger; @@ -96,6 +97,10 @@ private void assertProvider(final Map properties, final GitConsu try (final Git git = Git.init().setDirectory(gitDir).call()) { logger.debug("Initiated a git repository {}", git); + final StoredConfig config = git.getRepository().getConfig(); + config.setString("user", null, "name", "git-user"); + config.setString("user", null, "email", "git-user@example.com"); + config.save(); gitConsumer.accept(git); } @@ -172,16 +177,16 @@ public void testLoadCommitHistories() throws GitAPIException, IOException { // Assert commit. final AtomicInteger commitCount = new AtomicInteger(0); final String[] commitMessages = { - "5th commit.", - "4th commit.", - "3rd commit.", - "2nd commit.", - "Initial commit." + "5th commit.\n\nBy NiFi Registry user: unit-test-user", + "4th commit.\n\nBy NiFi Registry user: unit-test-user", + "3rd commit.\n\nBy NiFi Registry user: unit-test-user", + "2nd commit.\n\nBy NiFi Registry user: unit-test-user", + "Initial commit.\n\nBy NiFi Registry user: unit-test-user" }; for (RevCommit commit : g.log().call()) { - assertEquals("unit-test-user", commit.getAuthorIdent().getName()); + assertEquals("git-user", commit.getAuthorIdent().getName()); final int commitIndex = commitCount.getAndIncrement(); - assertEquals(commitMessages[commitIndex], commit.getShortMessage()); + assertEquals(commitMessages[commitIndex], commit.getFullMessage()); } assertEquals(commitMessages.length, commitCount.get()); }, p -> { @@ -220,8 +225,7 @@ public void testLoadCommitHistories() throws GitAPIException, IOException { "Initial commit." }; for (RevCommit commit : g.log().call()) { - // TODO: We don't support author for delete operations yet. The author is a global configured author here. - // assertEquals("unit-test-user", commit.getAuthorIdent().getName()); + assertEquals("git-user", commit.getAuthorIdent().getName()); final int commitIndex = commitCount.getAndIncrement(); assertEquals(commitMessages[commitIndex], commit.getShortMessage()); } @@ -269,8 +273,7 @@ public void testLoadCommitHistories() throws GitAPIException, IOException { "Initial commit." }; for (RevCommit commit : g.log().call()) { - // TODO: We don't support author for delete operations yet. The author is a global configured author here. - // assertEquals("unit-test-user", commit.getAuthorIdent().getName()); + assertEquals("git-user", commit.getAuthorIdent().getName()); final int commitIndex = commitCount.getAndIncrement(); assertEquals(commitMessages[commitIndex], commit.getShortMessage()); } From 0a3ad431374a42d9920a7147063a44130184e2bd Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Tue, 8 May 2018 15:35:50 +0900 Subject: [PATCH 6/6] Updated documents. --- .../main/asciidoc/administration-guide.adoc | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc b/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc index 5ea41717b..5dad4bc25 100644 --- a/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc @@ -902,7 +902,7 @@ NiFi Registry uses a pluggable flow persistence provider to store the content of Each persistence provider has its own configuration parameters, those can be configured in a XML file specified in <>. -The XML configuration file looks like below. It has a `flowPersistenceProvider` element in which qualified class name of a persistence provider implementation and its configuration properties are defined. See following sections for available configurations for each providers. +The XML configuration file looks like below. It has a `flowPersistenceProvider` element in which qualified class name of a persistence provider implementation and its configuration properties are defined. See following sections for available configurations for each provider. .Example providers.xml [source,xml] @@ -957,13 +957,13 @@ Buckets are represented as directories and Flow contents are stored as files in .... Flow Storage Directory/ ├── .git/ -├── Bucket A/ +├── Bucket_A/ │ ├── bucket.yml -│ ├── Flow 1.snapshot -│ └── Flow 2.snapshot -└── Bucket B/ +│ ├── Flow_1.snapshot +│ └── Flow_2.snapshot +└── Bucket_B/ ├── bucket.yml - └── Flow 4.snapshot + └── Flow_4.snapshot .... Each Bucket directory contains a YAML file named `bucket.yml`. The file manages links from NiFi Registry Bucket and Flow IDs to actual directory and file names. When NiFi Registry starts, this provider reads through Git commit histories and lookup these `bucket.yml` files to restore Buckets and Flows for each snapshot version. @@ -974,8 +974,8 @@ Each Bucket directory contains a YAML file named `bucket.yml`. The file manages layoutVer: 1 bucketId: d1beba88-32e9-45d1-bfe9-057cc41f7ce8 flows: - 219cf539-427f-43be-9294-0644fb07ca63: {ver: 7, file: Flow 1.snapshot} - 22cccb6c-3011-4493-a996-611f8f112969: {ver: 3, file: Flow 2.snapshot} + 219cf539-427f-43be-9294-0644fb07ca63: {ver: 7, file: Flow_1.snapshot} + 22cccb6c-3011-4493-a996-611f8f112969: {ver: 3, file: Flow_2.snapshot} .... Qualified class name: `org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider` @@ -1000,7 +1000,19 @@ https://git-scm.com/docs/git-clone ==== Git user configuration -Git distinguishes a user by its username and email address. This persistence provider uses NiFi Registry username when it creates Git commits. However since NiFi Registry users do not provide email address, preconfigured Git user email address is used. +This persistence provider uses preconfigured Git user name and user email address when it creates Git commits. NiFi Registry user name is added to commit messages. + +.Example commit +.... +commit 774d4bd125f2b1200f0a5ee1f1e9fedc6a415e83 +Author: git-user +Date: Tue May 8 14:30:31 2018 +0900 + + Commit message. + + By NiFi Registry user: nifi-registry-user-1 +.... + You can configure Git user name and email address by `git config` command. @@ -1031,6 +1043,19 @@ Host bitbucket.org IdentityFile ~/.ssh/key-for-bitbucket .... +=== Switching from other Persistence Provider + +In order to switch Persistence Provider to use, it is necessary to reset NiFi Registry. +For example, to switch from FileSystemFlowPersistenceProvider to GitFlowPersistenceProvider, follow these steps: + +. Stop version control on all ProcessGroups in NiFi +. Stop NiFi Registry +. Move the H2 DB (specified as `nifi.registry.db.directory` in nifi-registry.properties) and 'Flow Storage Directory' for FileSystemFlowPersistenceProvider directories somewhere for back up +. Configure GitFlowPersistenceProvider provider in providers.xml +. Start NiFi Registry +. Recreate any buckets +. Start version control on all ProcessGroups again + === Data model version of serialized Flow snapshots Serialized Flow snapshots saved by these persistence providers have versions, so that the data format and schema can evolve over time. Data model version update is done automatically by NiFi Registry when it reads and stores each Flow content. @@ -1042,20 +1067,3 @@ Here is the data model version histories: |2|0.2|JSON formatted text file. The root object contains header and Flow content object. |1|0.1|Binary format having header bytes at the beginning followed by Flow content represented as XML. |==== - -=== Migrating stored files between different Persistence Provider - -If you need to migrate existing Flow snapshot files, for example, switching from `FileSystemFlowPersistenceProvider` to `GitFlowPersistenceProvider`, then follow the steps described below. - -NOTE: The `export-flow-snapshots` tool for NiFi Registry is implemented as an additional mode to the existing tool in the `nifi-toolkit`. The following sections -assume you have downloaded the binary for the nifi-toolkit. - -. Ensure NiFi Registry is running with the current Persistence Provider to migrate (e.g. FileSystemFlowPersistenceProvider) -. Create a new `providers.xml` with the new Persistence Provider configuration (e.g. GitFlowPersistenceProvider) -. Execute `registry export-flow-snapshots` from `cli.sh` or `cli.bat` provided by NiFi Toolkit. Specify the URL of currently running NiFi Registry and the providers.xml file: - - registry export-flow-snapshots --baseUrl http://localhost:18080 --input /some-directory/providers.xml --verbose - -. Then, existing Flow snapshot files are created in the directory managed by the new Persistence Provider specified by the providers.xml. -. Stop running NiFi Registry, update the provers.xml that the NiFi Registry is using to switch to the new Persistence Provider. -. Start NiFi Registry. Now the new Persistence Provider can provide existing Flow snapshots as well as support storing new snapshots. \ No newline at end of file