Skip to content
Permalink
Browse files
AMBARI-24833. Cloud base path can be URI as well
  • Loading branch information
oleewere committed Dec 7, 2018
1 parent a84364f commit a739360576790a52f8f67969a3ce97f08874d13e
Showing 8 changed files with 465 additions and 356 deletions.
@@ -311,12 +311,12 @@ public class LogFeederProps implements LogFeederProperties {

@LogSearchPropertyDescription(
name = LogFeederConstants.CLOUD_STORAGE_BASE_PATH,
description = "Base path prefix for storing logs (cloud storage / hdfs)",
examples = {"/user/logsearch/mypath"},
description = "Base path prefix for storing logs (cloud storage / hdfs), could be an absolute path or URI. (if URI used, that will override the default.FS with HDFS client)",
examples = {"/user/logsearch/mypath", "s3a:///user/logsearch"},
defaultValue = "/apps/logsearch",
sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
)
@Value("${" + LogFeederConstants.CLOUD_STORAGE_BASE_PATH + ":}")
@Value("${" + LogFeederConstants.CLOUD_STORAGE_BASE_PATH + ":/apps/logsearch}")
private String cloudBasePath;

@LogSearchPropertyDescription(
@@ -18,10 +18,12 @@
*/
package org.apache.ambari.logfeeder.output.cloud;

import com.google.common.annotations.VisibleForTesting;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.output.cloud.upload.UploadClient;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@@ -90,9 +92,7 @@ void doUpload(int timeout) {
logger.debug("Not found any files to upload.");
} else {
for (File file : filesToUpload) {
String basePath = logFeederProps.getCloudBasePath();
String outputPath = String.format("%s/%s/%s/%s/%s", basePath, clusterName, hostName, file.getParentFile().getName(), file.getName())
.replaceAll("//", "/");
final String outputPath = generateOutputPath(logFeederProps.getCloudBasePath(), clusterName, hostName, file);
logger.info("Upload will start: input: {}, output: {}", file.getAbsolutePath(), outputPath);
Future<?> future = executorService.submit(() -> {
try {
@@ -114,4 +114,20 @@ void doUpload(int timeout) {
}
}

@VisibleForTesting
String generateOutputPath(String basePath, String clusterName, String hostName, File localFile) {
final String outputWithoutBasePath = Paths.get(clusterName, hostName, localFile.getParentFile().getName(), localFile.getName()).toString();
final String outputPath;
if (StringUtils.isNotEmpty(basePath)) {
if (!basePath.endsWith("/")){
outputPath = basePath + "/" + outputWithoutBasePath;
} else {
outputPath = basePath + outputWithoutBasePath;
}
} else {
outputPath = outputWithoutBasePath;
}
return outputPath;
}

}
@@ -64,7 +64,12 @@ public void init(LogFeederProps logFeederProps) {
configuration = new Configuration();
logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath.");
}
if (hasScheme(logFeederProps.getCloudBasePath())) {
logger.info("Use cloud base path ({}) as fs.defaultFS", logFeederProps.getCloudBasePath());
configuration.set(FS_DEFAULT_FS, logFeederProps.getCloudBasePath());
}
if (StringUtils.isNotBlank(logFeederProps.getCustomFs())) {
logger.info("Override fs.defaultFS with {}", logFeederProps.getCustomFs());
configuration.set(FS_DEFAULT_FS, logFeederProps.getCustomFs());
}
if (hdfsOutputConfig.isHdfsKerberos()) {
@@ -90,6 +95,10 @@ public void init(LogFeederProps logFeederProps) {
LogFeederHDFSUtil.overrideFileSystemConfigs(logFeederProps, configurationRef.get());
}

private boolean hasScheme(String path) {
return StringUtils.isNotBlank(path) && path.split(":/").length > 1;
}

@Override
public void upload(String source, String target) throws Exception {
final FileSystem fs = LogFeederHDFSUtil.buildFileSystem(configurationRef.get());
@@ -19,6 +19,7 @@
package org.apache.ambari.logfeeder.util;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;

@@ -39,9 +40,10 @@ private LogFeederHDFSUtil() {

public static void copyFromLocal(String sourceFilepath, String destFilePath, FileSystem fileSystem, boolean overwrite,
boolean delSrc, FsPermission fsPermission) throws Exception {
String fsUri = fileSystem.getUri().toString();
Path src = new Path(sourceFilepath);
Path dst = new Path(destFilePath);
logger.info("copying localfile := " + sourceFilepath + " to hdfsPath := " + destFilePath);
logger.info("Copying localfile '{}' to hdfsPath (FS base URI: {}) '{}'", sourceFilepath, fsUri, destFilePath);
fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst);
if (fsPermission != null) {
fileSystem.setPermission(dst, fsPermission);
@@ -17,7 +17,7 @@
<configuration>
<property>
<name>fs.defaultFS</name>
<value>s3a://logfeeder</value>
<value>s3a://logfeeder/apps</value>
</property>
<property>
<name>fs.s3a.endpoint</name>
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.logfeeder.output.cloud;

import org.apache.ambari.logfeeder.conf.CloudStorageDestination;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.File;

public class CloudStorageUploaderTest {

private static final String CLUSTER_NAME = "cl";
private static final String HOSTNAME = "hostname";

private CloudStorageUploader underTest;

@Before
public void setUp() {
LogFeederProps logFeederProps = new LogFeederProps();
logFeederProps.setCloudStorageDestination(CloudStorageDestination.DEFAULT_FS);
underTest = new CloudStorageUploader("name", null, logFeederProps);
}

@Test
public void testGenerateOutputPath() {
// GIVEN
String basePath = "example";
// WHEN
String output = underTest.generateOutputPath(basePath, CLUSTER_NAME, HOSTNAME, new File("/my/path"));
// THEN
Assert.assertEquals("example/cl/hostname/my/path", output);
}

@Test
public void testGenerateOutputPathWithEmptyBasePath() {
// GIVEN
String basePath = "";
// WHEN
String output = underTest.generateOutputPath(basePath, CLUSTER_NAME, HOSTNAME, new File("/my/path"));
// THEN
Assert.assertEquals("cl/hostname/my/path", output);
}

@Test
public void testGenerateOutputPathWithSlashEndAndStart() {
// GIVEN
String basePath = "example/";
// WHEN
String output = underTest.generateOutputPath(basePath, CLUSTER_NAME, HOSTNAME, new File("/my/path"));
// THEN
Assert.assertEquals("example/cl/hostname/my/path", output);
}

@Test
public void testGenerateOutputPathWithScheme() {
// GIVEN
String basePath = "s3a://bucket/example";
// WHEN
String output = underTest.generateOutputPath(basePath, CLUSTER_NAME, HOSTNAME, new File("/my/path"));
// THEN
Assert.assertEquals("s3a://bucket/example/cl/hostname/my/path", output);
}
}

0 comments on commit a739360

Please sign in to comment.