Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
AMBARI-24878 - Infra Manager: kerberos support (#14)
  • Loading branch information
kasakrisz committed Nov 15, 2018
1 parent e59866b commit d309efe50de7d1bddb6b33a9bad0bf452b8f784a
Showing 7 changed files with 200 additions and 44 deletions.
@@ -18,10 +18,16 @@
*/
package org.apache.ambari.infra;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import static org.apache.commons.lang.StringUtils.isBlank;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.io.IOUtils;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -36,15 +42,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

import static org.apache.commons.lang.StringUtils.isBlank;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

// TODO: use swagger
public class InfraClient implements AutoCloseable {
@@ -96,6 +97,12 @@ public JobExecutionInfo startJob(String jobName, String parameters) {
try {
String responseText = execute(new HttpPost(uriBuilder.build())).getBody();
Map<String, Object> responseContent = new ObjectMapper().readValue(responseText, new TypeReference<HashMap<String,Object>>() {});
if (!responseContent.containsKey("jobId"))
throw new NullPointerException("jobId is not found in start job responseContent");
if (!responseContent.containsKey("jobExecutionData"))
throw new NullPointerException("jobExecutionData is not found in start job responseContent");
if (!((Map)responseContent.get("jobExecutionData")).containsKey("id"))
throw new NullPointerException("id is not found in jobExecutionData");
return new JobExecutionInfo(responseContent.get("jobId").toString(), ((Map)responseContent.get("jobExecutionData")).get("id").toString());
} catch (URISyntaxException | JsonParseException | JsonMappingException e) {
throw new RuntimeException(e);
@@ -54,6 +54,8 @@ public class ArchivingParameters implements Validatable {
@JsonSerialize(converter = FsPermissionToStringConverter.class)
@JsonDeserialize(converter = StringToFsPermissionConverter.class)
private FsPermission hdfsFilePermission;
private String hdfsKerberosPrincipal;
private String hdfsKerberosKeytabPath;
private String start;
private String end;
@JsonSerialize(converter = DurationToStringConverter.class)
@@ -172,6 +174,22 @@ public void setHdfsFilePermission(FsPermission hdfsFilePermission) {
this.hdfsFilePermission = hdfsFilePermission;
}

public String getHdfsKerberosPrincipal() {
return hdfsKerberosPrincipal;
}

public void setHdfsKerberosPrincipal(String hdfsKerberosPrincipal) {
this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
}

public String getHdfsKerberosKeytabPath() {
return hdfsKerberosKeytabPath;
}

public void setHdfsKerberosKeytabPath(String hdfsKerberosKeytabPath) {
this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
}

public Optional<S3Properties> s3Properties() {
if (isBlank(s3BucketName))
return Optional.empty();
@@ -183,6 +201,18 @@ public Optional<S3Properties> s3Properties() {
s3Endpoint));
}

public Optional<HdfsProperties> hdfsProperties() {
if (isBlank(hdfsDestinationDirectory))
return Optional.empty();

return Optional.of(new HdfsProperties(
hdfsEndpoint,
hdfsDestinationDirectory,
hdfsFilePermission,
hdfsKerberosPrincipal,
hdfsKerberosKeytabPath));
}

public String getStart() {
return start;
}
@@ -234,12 +264,9 @@ public void validate() {
break;

case HDFS:
if (isBlank(hdfsEndpoint))
throw new IllegalArgumentException(String.format(
"The property hdfsEndpoint can not be null or empty string when destination is set to %s!", HDFS.name()));
if (isBlank(hdfsDestinationDirectory))
throw new IllegalArgumentException(String.format(
"The property hdfsDestinationDirectory can not be null or empty string when destination is set to %s!", HDFS.name()));
hdfsProperties()
.orElseThrow(() -> new IllegalArgumentException("HDFS related properties must be set if the destination is " + HDFS.name()))
.validate();
}

requireNonNull(solr, "No solr query was specified for archiving job!");
@@ -32,7 +32,6 @@
import org.apache.ambari.infra.job.JobContextRepository;
import org.apache.ambari.infra.job.JobScheduler;
import org.apache.ambari.infra.job.ObjectSource;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
@@ -103,8 +102,8 @@ public DocumentExporter documentExporter(DocumentItemReader documentItemReader,
break;
case HDFS:
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("fs.defaultFS", parameters.getHdfsEndpoint());
fileAction.add(new HdfsUploader(conf, new Path(parameters.getHdfsDestinationDirectory()), parameters.getHdfsFilePermission()));
fileAction.add(new HdfsUploader(conf,
parameters.hdfsProperties().orElseThrow(() -> new IllegalStateException("HDFS properties are not provided!"))));
break;
case LOCAL:
baseDir = new File(parameters.getLocalDestinationDirectory());
@@ -23,7 +23,6 @@
import static org.apache.commons.lang.StringUtils.isBlank;

import java.time.Duration;
import java.util.Optional;

import org.apache.ambari.infra.job.JobProperties;
import org.apache.ambari.infra.json.DurationToStringConverter;
@@ -40,6 +39,7 @@ public class DocumentArchivingProperties extends JobProperties<ArchivingParamete
private String fileNameSuffixDateFormat;
private Duration ttl;
private SolrProperties solr;

private String s3AccessFile;
private String s3KeyPrefix;
private String s3BucketName;
@@ -48,6 +48,8 @@ public class DocumentArchivingProperties extends JobProperties<ArchivingParamete
private String hdfsEndpoint;
private String hdfsDestinationDirectory;
private FsPermission hdfsFilePermission;
private String hdfsKerberosPrincipal;
private String hdfsKerberosKeytabPath;

public int getReadBlockSize() {
return readBlockSize;
@@ -145,17 +147,6 @@ public void setS3Endpoint(String s3Endpoint) {
this.s3Endpoint = s3Endpoint;
}

public Optional<S3Properties> s3Properties() {
if (isBlank(s3BucketName))
return Optional.empty();

return Optional.of(new S3Properties(
s3AccessFile,
s3KeyPrefix,
s3BucketName,
s3Endpoint));
}

public String getHdfsEndpoint() {
return hdfsEndpoint;
}
@@ -180,6 +171,22 @@ public void setHdfsDestinationDirectory(String hdfsDestinationDirectory) {
this.hdfsDestinationDirectory = hdfsDestinationDirectory;
}

public String getHdfsKerberosPrincipal() {
return hdfsKerberosPrincipal;
}

public void setHdfsKerberosPrincipal(String hdfsKerberosPrincipal) {
this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
}

public String getHdfsKerberosKeytabPath() {
return hdfsKerberosKeytabPath;
}

public void setHdfsKerberosKeytabPath(String hdfsKerberosKeytabPath) {
this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
}

private int getIntJobParameter(JobParameters jobParameters, String parameterName, int defaultValue) {
String valueText = jobParameters.getString(parameterName);
if (isBlank(valueText))
@@ -203,6 +210,8 @@ public ArchivingParameters merge(JobParameters jobParameters) {
archivingParameters.setHdfsEndpoint(jobParameters.getString("hdfsEndpoint", hdfsEndpoint));
archivingParameters.setHdfsDestinationDirectory(jobParameters.getString("hdfsDestinationDirectory", hdfsDestinationDirectory));
archivingParameters.setHdfsFilePermission(toFsPermission(jobParameters.getString("hdfsFilePermission", FsPermissionToStringConverter.toString(hdfsFilePermission))));
archivingParameters.setHdfsKerberosPrincipal(jobParameters.getString("hdfsKerberosPrincipal", hdfsKerberosPrincipal));
archivingParameters.setHdfsKerberosKeytabPath(jobParameters.getString("hdfsKerberosKeytabPath", hdfsKerberosKeytabPath));
archivingParameters.setSolr(solr.merge(jobParameters));
archivingParameters.setStart(jobParameters.getString("start"));
archivingParameters.setEnd(jobParameters.getString("end"));
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.infra.job.archive;

import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.commons.lang.StringUtils.isNotBlank;

import org.apache.hadoop.fs.permission.FsPermission;

public class HdfsProperties {
private static final String DEFAULT_FILE_PERMISSION = "640";

private final String hdfsEndpoint;
private final String hdfsDestinationDirectory;
private final FsPermission hdfsFilePermission;
private final String hdfsKerberosPrincipal;
private final String hdfsKerberosKeytabPath;

public HdfsProperties(String hdfsEndpoint, String hdfsDestinationDirectory, FsPermission hdfsFilePermission, String hdfsKerberosPrincipal, String hdfsKerberosKeytabPath) {
this.hdfsEndpoint = hdfsEndpoint;
this.hdfsDestinationDirectory = hdfsDestinationDirectory;
this.hdfsFilePermission = hdfsFilePermission == null ? new FsPermission(DEFAULT_FILE_PERMISSION) : hdfsFilePermission;
this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
}

public String getHdfsEndpoint() {
return hdfsEndpoint;
}

public String getHdfsDestinationDirectory() {
return hdfsDestinationDirectory;
}

public FsPermission getHdfsFilePermission() {
return hdfsFilePermission;
}

public String getHdfsKerberosPrincipal() {
return hdfsKerberosPrincipal;
}

public String getHdfsKerberosKeytabPath() {
return hdfsKerberosKeytabPath;
}

@Override
public String toString() {
return "HdfsProperties{" +
"hdfsEndpoint='" + hdfsEndpoint + '\'' +
", hdfsDestinationDirectory='" + hdfsDestinationDirectory + '\'' +
", hdfsFilePermission=" + hdfsFilePermission +
", hdfsKerberosPrincipal='" + hdfsKerberosPrincipal + '\'' +
", hdfsKerberosKeytabPath='" + hdfsKerberosKeytabPath + '\'' +
'}';
}

public void validate() {
if (isBlank(hdfsDestinationDirectory))
throw new IllegalArgumentException("The property hdfsDestinationDirectory can not be null or empty string!");

if (isNotBlank(hdfsKerberosPrincipal) && isBlank(hdfsKerberosKeytabPath))
throw new IllegalArgumentException("The property hdfsKerberosPrincipal is specified but hdfsKerberosKeytabPath is blank!");

if (isBlank(hdfsKerberosPrincipal) && isNotBlank(hdfsKerberosKeytabPath))
throw new IllegalArgumentException("The property hdfsKerberosKeytabPath is specified but hdfsKerberosPrincipal is blank!");
}
}
@@ -18,38 +18,68 @@
*/
package org.apache.ambari.infra.job.archive;

import static org.apache.commons.lang3.StringUtils.isNotBlank;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;

public class HdfsUploader extends AbstractFileAction {
private static final Logger LOG = LoggerFactory.getLogger(HdfsUploader.class);

private static final String DEFAULT_FILE_PERMISSION = "640";
private final Configuration configuration;
private final Path destinationDirectory;
private final FsPermission fsPermission;
private final HdfsProperties properties;

public HdfsUploader(Configuration configuration, Path destinationDirectory, FsPermission fsPermission) {
this.destinationDirectory = destinationDirectory;
public HdfsUploader(Configuration configuration, HdfsProperties properties) {
this.properties = properties;
this.configuration = configuration;
this.fsPermission = fsPermission == null ? new FsPermission(DEFAULT_FILE_PERMISSION) : fsPermission;

if (new ClassPathResource("core-site.xml").exists()) {
LOG.info("Hdfs core-site.xml is found in the classpath.");
}
else {
LOG.warn("Hdfs core-site.xml is not found in the classpath. Using defaults.");
}
if (new ClassPathResource("hdfs-site.xml").exists()) {
LOG.info("Hdfs hdfs-site.xml is found in the classpath.");
}
else {
LOG.warn("Hdfs hdfs-site.xml is not found in the classpath. Using defaults.");
}
if (isNotBlank(properties.getHdfsEndpoint())) {
LOG.info("Hdfs endpoint is defined in Infra Manager properties. Setting fs.defaultFS to {}", properties.getHdfsEndpoint());
this.configuration.set("fs.defaultFS", properties.getHdfsEndpoint());
}

UserGroupInformation.setConfiguration(configuration);
}

@Override
protected File onPerform(File inputFile) {
try {
if ("kerberos".equalsIgnoreCase(configuration.get("hadoop.security.authentication")))
UserGroupInformation.loginUserFromKeytab(properties.getHdfsKerberosPrincipal(), properties.getHdfsKerberosKeytabPath());
} catch (IOException e) {
throw new UncheckedIOException(e);
}

try (FileSystem fileSystem = FileSystem.get(configuration)) {
Path destination = new Path(destinationDirectory, inputFile.getName());

Path destination = new Path(properties.getHdfsDestinationDirectory(), inputFile.getName());
if (fileSystem.exists(destination)) {
throw new UnsupportedOperationException(String.format("File '%s' already exists!", destination));
}

fileSystem.copyFromLocalFile(new Path(inputFile.getAbsolutePath()), destination);
fileSystem.setPermission(destination, fsPermission);
fileSystem.setPermission(destination, properties.getHdfsFilePermission());

return inputFile;
}
@@ -31,7 +31,7 @@
<deb.python.ver>python (&gt;= 2.6)</deb.python.ver>
<deb.architecture>amd64</deb.architecture>
<deb.dependency.list>${deb.python.ver}</deb.dependency.list>
<hadoop.version>3.0.0</hadoop.version>
<hadoop.version>3.1.1</hadoop.version>
<surefire.argLine>-Xmx1024m -Xms512m</surefire.argLine>
<zookeeper.version>3.4.6.2.3.0.0-2557</zookeeper.version>
<ambari-metrics.version>2.7.0.0.0</ambari-metrics.version>

0 comments on commit d309efe

Please sign in to comment.