Skip to content
This repository has been archived by the owner on May 14, 2021. It is now read-only.

MINIFI-366: Adds S3ConfigurationCache. #90

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 19 additions & 0 deletions minifi-c2/minifi-c2-assembly/NOTICE
Expand Up @@ -8,6 +8,9 @@ The Apache Software Foundation (http://www.apache.org/).
Apache Software License v2
===========================================

This product includes the following work from the Apache Kaka project:
S3OutputStream.java

The following binary components are provided under the Apache Software License v2

(ASLv2) Apache NiFi
Expand Down Expand Up @@ -67,6 +70,22 @@ The following binary components are provided under the Apache Software License v
Original source copyright:
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.

(ASLv2) AWS Java SDK
The following NOTICE information applies:
Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.

This product includes software developed by
Amazon Technologies, Inc (http://www.amazon.com/).

**********************
THIRD PARTY COMPONENTS
**********************
This software includes third party software subject to the following copyrights:
- XML parsing and utility functions from JetS3t - Copyright 2006-2009 James Murty.
- PKCS#1 PEM encoded private key parsing and utility functions from oauth.googlecode.com - Copyright 1998-2010 AOL Inc.

The licenses for these third party components are included in LICENSE.txt

(ASLv2) JsonPath
The following NOTICE information applies:
Copyright 2011 JsonPath authors
Expand Down
5 changes: 5 additions & 0 deletions minifi-c2/minifi-c2-assembly/pom.xml
Expand Up @@ -82,6 +82,11 @@ limitations under the License.
<artifactId>minifi-c2-cache-filesystem</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi.minifi</groupId>
<artifactId>minifi-c2-cache-s3</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi.minifi</groupId>
<artifactId>minifi-c2-provider-cache</artifactId>
Expand Down
Expand Up @@ -64,6 +64,35 @@
<value>\${class}.v\${version}</value>
</constructor-arg>
</bean>-->
<!--<bean class="org.apache.nifi.minifi.c2.provider.cache.CacheConfigurationProvider">
<constructor-arg>
<list>
<value>text/yml</value>
</list>
</constructor-arg>
<constructor-arg>
<bean class="org.apache.nifi.minifi.c2.cache.s3.S3ConfigurationCache">
<constructor-arg>
<value>bucket</value>
</constructor-arg>
<constructor-arg>
<value>prefix/</value>
</constructor-arg>
<constructor-arg>
<value>\${class}</value>
</constructor-arg>
<constructor-arg>
<value>access-key</value>
</constructor-arg>
<constructor-arg>
<value>secret-key</value>
</constructor-arg>
<constructor-arg>
<value>aws-region</value>
</constructor-arg>
</bean>
</constructor-arg>
</bean> -->
</list>
</constructor-arg>
<constructor-arg>
Expand Down
49 changes: 49 additions & 0 deletions minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/pom.xml
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>minifi-c2-cache</artifactId>
<groupId>org.apache.nifi.minifi</groupId>
<version>0.2.1-SNAPSHOT</version>
</parent>
<artifactId>minifi-c2-cache-s3</artifactId>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.nifi.minifi</groupId>
<artifactId>minifi-c2-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.minifi.c2.cache.s3;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.iterable.S3Objects;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;

import java.io.IOException;
import java.util.Comparator;
import java.util.Objects;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.minifi.c2.api.ConfigurationProviderException;
import org.apache.nifi.minifi.c2.api.cache.ConfigurationCacheFileInfo;
import org.apache.nifi.minifi.c2.api.cache.WriteableConfiguration;
import org.apache.nifi.minifi.c2.api.util.Pair;

public class S3CacheFileInfoImpl implements ConfigurationCacheFileInfo {

private final AmazonS3 s3;
private final String bucket;
private final String prefix;
private final String expectedFilename;

/**
* Creates a new S3 cache file info.
* @param s3 The {@link AmazonS3 client}.
* @param bucket The S3 bucket.
* @param prefix The S3 object prefix.
*/
public S3CacheFileInfoImpl(AmazonS3 s3, String bucket, String prefix,
String expectedFilename) {

this.s3 = s3;
this.bucket = bucket;
this.prefix = prefix;
this.expectedFilename = expectedFilename;

}

@Override
public Integer getVersionIfMatch(String objectKey) {

String filename = objectKey.substring(prefix.length());

int expectedFilenameLength = expectedFilename.length();
if (!filename.startsWith(expectedFilename) || filename.length() == expectedFilenameLength) {
return null;
}
try {
return Integer.parseInt(filename.substring(expectedFilenameLength));
} catch (NumberFormatException e) {
return null;
}
}

@Override
public WriteableConfiguration getConfiguration(Integer version)
throws ConfigurationProviderException {

if (version == null) {

try {
return getCachedConfigurations().findFirst()
.orElseThrow(() -> new ConfigurationProviderException("No configurations found."));
} catch (IOException e) {
throw new ConfigurationProviderException("Unable to get cached configurations.", e);
}

} else {

final S3Object s3Object;

if (StringUtils.isEmpty(prefix) || StringUtils.equals(prefix, "/")) {
s3Object = s3.getObject(new GetObjectRequest(bucket,
expectedFilename + version.toString()));
} else {
s3Object = s3.getObject(new GetObjectRequest(bucket,
prefix + expectedFilename + version.toString()));
}

if (s3Object == null) {
throw new ConfigurationProviderException("No configurations found for object key.");
}

return new S3WritableConfiguration(s3, s3Object, Integer.toString(version));

}

}

@Override
public Stream<WriteableConfiguration> getCachedConfigurations() throws IOException {

Iterable<S3ObjectSummary> objectSummaries = S3Objects.withPrefix(s3, bucket, prefix);
Stream<S3ObjectSummary> objectStream = StreamSupport.stream(objectSummaries.spliterator(), false);

return objectStream.map(p -> {
Integer version = getVersionIfMatch(p.getKey());
if (version == null) {
return null;
}
return new Pair<>(version, p);
}).filter(Objects::nonNull)
.sorted(Comparator.comparing(pair -> ((Pair<Integer, S3ObjectSummary>) pair).getFirst())
.reversed()).map(pair -> new S3WritableConfiguration(s3, pair.getSecond(), Integer.toString(pair.getFirst())));

}

}
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.minifi.c2.cache.s3;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.minifi.c2.api.InvalidParameterException;
import org.apache.nifi.minifi.c2.api.cache.ConfigurationCache;
import org.apache.nifi.minifi.c2.api.cache.ConfigurationCacheFileInfo;

public class S3ConfigurationCache implements ConfigurationCache {

private final AmazonS3 s3;
private final String bucket;
private final String prefix;
private final String pathPattern;

/**
* Creates a new S3 configuration cache.
* @param bucket The S3 bucket.
* @param prefix The S3 object prefix.
* @param pathPattern The path pattern.
* @param accessKey The (optional) S3 access key.
* @param secretKey The (optional) S3 secret key.
* @param region The AWS region (e.g. us-east-1).
* @throws IOException Thrown if the configuration cannot be read.
*/
public S3ConfigurationCache(String bucket, String prefix, String pathPattern,
String accessKey, String secretKey, String region) throws IOException {

this.bucket = bucket;
this.prefix = prefix;
this.pathPattern = pathPattern;

if (!StringUtils.isEmpty(accessKey)) {

s3 = AmazonS3Client.builder()
.withCredentials(new AWSStaticCredentialsProvider(
new BasicAWSCredentials(accessKey, secretKey)))
.withRegion(Regions.fromName(region))
.build();

} else {

s3 = AmazonS3Client.builder()
.withRegion(Regions.fromName(region))
.build();
}

}

@Override
public ConfigurationCacheFileInfo getCacheFileInfo(String contentType,
Map<String, List<String>> parameters) throws InvalidParameterException {

String pathString = pathPattern;
for (Map.Entry<String, List<String>> entry : parameters.entrySet()) {
if (entry.getValue().size() != 1) {
throw new InvalidParameterException("Multiple values for same parameter"
+ " are not supported by this provider.");
}
pathString = pathString.replaceAll(Pattern.quote("${" + entry.getKey() + "}"),
entry.getValue().get(0));
}
pathString = pathString + "." + contentType.replace('/', '.');
String[] split = pathString.split("/");
for (String s1 : split) {
int openBrace = s1.indexOf("${");
if (openBrace >= 0 && openBrace < s1.length() + 2) {
int closeBrace = s1.indexOf("}", openBrace + 2);
if (closeBrace >= 0) {
throw new InvalidParameterException("Found unsubstituted variable "
+ s1.substring(openBrace + 2, closeBrace));
}
}
}

return new S3CacheFileInfoImpl(s3, bucket, prefix, pathString + ".v");
}

}