Skip to content
Permalink
Browse files
AMBARI-24731 - Infra Manager: scheduled cleanup of old jobs (#4)
  • Loading branch information
kasakrisz committed Oct 12, 2018
1 parent 4cf7eb9 commit 3e941c5e3442ef7a5c7cbdc61b987e4654224827
Show file tree
Hide file tree
Showing 47 changed files with 1,581 additions and 408 deletions.
@@ -15,3 +15,4 @@
derby.log
pass.txt
out
job-repository.db
@@ -45,9 +45,9 @@
<version>${solr.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.5</version>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>5.0.1</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.infra;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;

import io.minio.MinioClient;
import io.minio.Result;
import io.minio.messages.Item;

public class S3Client {
private final MinioClient s3client;
private final String bucket;

public S3Client(String host, int port, String bucket) {
try {
s3client = new MinioClient(String.format("http://%s:%d", host, port), "remote-identity", "remote-credential");
this.bucket = bucket;
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}

public void putObject(String key, InputStream inputStream, long length) {
try {
s3client.putObject(bucket, key, inputStream, length, "application/octet-stream");
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}

public void putObject(String key, byte[] bytes) {
try (ByteArrayInputStream inputStream = new ByteArrayInputStream("anything".getBytes())) {
putObject(key, inputStream, bytes.length);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public List<String> listObjectKeys() {
try {
List<String> keys = new ArrayList<>();
for (Result<Item> item : s3client.listObjects(bucket)) {
keys.add(item.get().objectName());
}
return keys;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

public List<String> listObjectKeys(String text) {
try {
List<String> keys = new ArrayList<>();
for (Result<Item> item : s3client.listObjects(bucket)) {
String objectName = item.get().objectName();
if (objectName.contains(text))
keys.add(objectName);
}
return keys;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

public void deleteObject(String key) {
try {
s3client.removeObject(bucket, key);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@@ -55,7 +55,7 @@ public Solr() {

public Solr(String configSetPath) {
this.configSetPath = configSetPath;
this.solrClient = new LBHttpSolrClient.Builder().withBaseSolrUrls(String.format("http://%s:%d/solr/%s_shard1_replica1",
this.solrClient = new LBHttpSolrClient.Builder().withBaseSolrUrls(String.format("http://%s:%d/solr/%s_shard1_replica_n1",
getDockerHost(),
SOLR_PORT,
AUDIT_LOGS_COLLECTION)).build();
@@ -29,8 +29,10 @@
import java.net.URL;
import java.time.OffsetDateTime;
import java.util.Date;
import java.util.List;

import org.apache.ambari.infra.InfraClient;
import org.apache.ambari.infra.S3Client;
import org.apache.ambari.infra.Solr;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
@@ -44,11 +46,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;

public abstract class AbstractInfraSteps {
private static final Logger LOG = LoggerFactory.getLogger(AbstractInfraSteps.class);

@@ -59,7 +56,7 @@ public abstract class AbstractInfraSteps {
private String ambariFolder;
private String shellScriptLocation;
private String dockerHost;
private AmazonS3Client s3client;
private S3Client s3client;
private int documentId = 0;
private Solr solr;

@@ -71,7 +68,7 @@ public Solr getSolr() {
return solr;
}

public AmazonS3Client getS3client() {
public S3Client getS3client() {
return s3client;
}

@@ -86,8 +83,11 @@ public void initDockerContainer() throws Exception {
URL location = AbstractInfraSteps.class.getProtectionDomain().getCodeSource().getLocation();
ambariFolder = new File(location.toURI()).getParentFile().getParentFile().getParentFile().getParent();

LOG.info("Clean local data folder {}", getLocalDataFolder());
FileUtils.cleanDirectory(new File(getLocalDataFolder()));
String localDataFolder = getLocalDataFolder();
if (new File(localDataFolder).exists()) {
LOG.info("Clean local data folder {}", localDataFolder);
FileUtils.cleanDirectory(new File(localDataFolder));
}

shellScriptLocation = ambariFolder + "/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh";
LOG.info("Create new docker container for testing Ambari Infra Manager ...");
@@ -102,9 +102,7 @@ public void initDockerContainer() throws Exception {
solr.createSolrCollection(HADOOP_LOGS_COLLECTION);

LOG.info("Initializing s3 client");
s3client = new AmazonS3Client(new BasicAWSCredentials("remote-identity", "remote-credential"));
s3client.setEndpoint(String.format("http://%s:%d", dockerHost, FAKE_S3_PORT));
s3client.createBucket(S3_BUCKET_NAME);
s3client = new S3Client(dockerHost, FAKE_S3_PORT, S3_BUCKET_NAME);

checkInfraManagerReachable();
}
@@ -155,10 +153,9 @@ protected void addDocument(OffsetDateTime logtime) {
@AfterStories
public void shutdownContainers() throws Exception {
Thread.sleep(2000); // sync with s3 server
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
ObjectListing objectListing = getS3client().listObjects(listObjectsRequest);
LOG.info("Found {} files on s3.", objectListing.getObjectSummaries().size());
objectListing.getObjectSummaries().forEach(s3ObjectSummary -> LOG.info("Found file on s3 with key {}", s3ObjectSummary.getKey()));
List<String> objectKeys = getS3client().listObjectKeys();
LOG.info("Found {} files on s3.", objectKeys.size());
objectKeys.forEach(objectKey -> LOG.info("Found file on s3 with key {}", objectKey));

LOG.info("Listing files on hdfs.");
try (FileSystem fileSystem = getHdfs()) {
@@ -23,22 +23,22 @@
import static org.apache.ambari.infra.TestUtil.doWithin;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.core.IsCollectionContaining.hasItem;
import static org.junit.Assert.assertThat;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.ambari.infra.InfraClient;
import org.apache.ambari.infra.JobExecutionInfo;
import org.apache.ambari.infra.S3Client;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@@ -50,11 +50,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;

public class ExportJobsSteps extends AbstractInfraSteps {
private static final Logger LOG = LoggerFactory.getLogger(ExportJobsSteps.class);

@@ -80,9 +75,7 @@ public void addDocuments(long count, OffsetDateTime startLogtime, OffsetDateTime

@Given("a file on s3 with key $key")
public void addFileToS3(String key) throws Exception {
try (ByteArrayInputStream inputStream = new ByteArrayInputStream("anything".getBytes())) {
getS3client().putObject(S3_BUCKET_NAME, key, inputStream, new ObjectMetadata());
}
getS3client().putObject(key, "anything".getBytes());
}

@When("start $jobName job")
@@ -113,10 +106,8 @@ public void restartJob(String jobName, int waitSec) {

@When("stop job $jobName after at least $count file exists in s3 with filename containing text $text within $waitSec seconds")
public void stopJob(String jobName, int count, String text, int waitSec) throws Exception {
AmazonS3Client s3Client = getS3client();
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME)
&& fileCountOnS3(text, s3Client, listObjectsRequest) > count);
S3Client s3Client = getS3client();
doWithin(waitSec, "check uploaded files to s3", () -> s3Client.listObjectKeys(text).size() > count);

try (InfraClient httpClient = getInfraClient()) {
httpClient.stopJob(launchedJobs.get(jobName).getExecutionId());
@@ -125,40 +116,29 @@ public void stopJob(String jobName, int count, String text, int waitSec) throws

@When("delete file with key $key from s3")
public void deleteFileFromS3(String key) {
getS3client().deleteObject(S3_BUCKET_NAME, key);
getS3client().deleteObject(key);
}

@Then("Check filenames contains the text $text on s3 server after $waitSec seconds")
public void checkS3After(String text, int waitSec) {
AmazonS3Client s3Client = getS3client();
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME)
&& !s3Client.listObjects(listObjectsRequest).getObjectSummaries().isEmpty());
S3Client s3Client = getS3client();
doWithin(waitSec, "check uploaded files to s3", () -> !s3Client.listObjectKeys().isEmpty());

ObjectListing objectListing = s3Client.listObjects(listObjectsRequest);
assertThat(objectListing.getObjectSummaries(), hasItem(hasProperty("key", containsString(text))));
List<String> objectKeys = s3Client.listObjectKeys(text);
assertThat(objectKeys, hasItem(containsString(text)));
}

@Then("Check $count files exists on s3 server with filenames containing the text $text after $waitSec seconds")
public void checkNumberOfFilesOnS3(long count, String text, int waitSec) {
AmazonS3Client s3Client = getS3client();
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME)
&& fileCountOnS3(text, s3Client, listObjectsRequest) == count);
}

private long fileCountOnS3(String text, AmazonS3Client s3Client, ListObjectsRequest listObjectsRequest) {
return s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream()
.filter(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text))
.count();
S3Client s3Client = getS3client();
doWithin(waitSec, "check uploaded files to s3", () -> s3Client.listObjectKeys(text).size() == count);
}

@Then("Less than $count files exists on s3 server with filenames containing the text $text after $waitSec seconds")
public void checkLessThanFileExistsOnS3(long count, String text, int waitSec) {
AmazonS3Client s3Client = getS3client();
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) && between(
fileCountOnS3(text, s3Client, listObjectsRequest), 1L, count - 1L));
S3Client s3Client = getS3client();
doWithin(waitSec, "check uploaded files to s3", () -> between(
s3Client.listObjectKeys(text).size(), 1L, count - 1L));
}

private boolean between(long count, long from, long to) {
@@ -167,10 +147,9 @@ private boolean between(long count, long from, long to) {

@Then("No file exists on s3 server with filenames containing the text $text")
public void fileNotExistOnS3(String text) {
AmazonS3Client s3Client = getS3client();
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
assertThat(s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream()
.anyMatch(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text)), is(false));
S3Client s3Client = getS3client();
assertThat(s3Client.listObjectKeys().stream()
.anyMatch(objectKey -> objectKey.contains(text)), is(false));
}

@Then("solr contains $count documents between $startLogtime and $endLogtime")
@@ -59,9 +59,9 @@ And solr does not contain documents between 2014-02-04T05:00:00.000Z and 2014-02

Scenario: Launch Archiving job. Initiate stop and check that part of the data is archived. After restart all data must be extracted.

Given 200 documents in solr with logtime from 2014-03-09T05:00:00.000Z to 2014-03-09T20:00:00.000Z
Given 500 documents in solr with logtime from 2014-03-09T05:00:00.000Z to 2014-03-09T20:00:00.000Z
When start archive_audit_logs job with parameters writeBlockSize=20,start=2014-03-09T05:00:00.000Z,end=2014-03-09T20:00:00.000Z after 2 seconds
And stop job archive_audit_logs after at least 1 file exists in s3 with filename containing text solr_archive_audit_logs_-_2014-03-09 within 10 seconds
Then Less than 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds
Then Less than 20 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds
When restart archive_audit_logs job within 10 seconds
Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds
Then Check 25 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds
@@ -2,4 +2,5 @@ out/*
*.pid
Profile
.env
test-out
test-out
test.db
@@ -45,14 +45,15 @@ services:
- "-z"
- ${ZOOKEEPER_CONNECTION_STRING}
volumes:
- $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-server/src/main/configsets:/opt/solr/configsets
- $AMBARI_INFRA_LOCATION/ambari-infra-manager/docker/configsets:/opt/solr/configsets
fakes3:
image: localstack/localstack
hostname: fakes3
ports:
- "4569:4569"
environment:
- SERVICES=s3:4569
- DEBUG=s3
networks:
infra-network:
aliases:
@@ -96,8 +97,8 @@ services:
ZK_CONNECT_STRING: ${ZOOKEEPER_CONNECTION_STRING}
DISPLAY: $DOCKERIP:0
volumes:
- $AMBARI_LOCATION/ambari-infra/ambari-infra-manager/target/package:/root/ambari-infra-manager
- $AMBARI_LOCATION/ambari-infra/ambari-infra-manager/docker/test-out:/root/archive
- $AMBARI_INFRA_LOCATION/ambari-infra-manager/target/package:/root/ambari-infra-manager
- $AMBARI_INFRA_LOCATION/ambari-infra-manager/docker/test-out:/root/archive
networks:
infra-network:
driver: bridge
@@ -61,13 +61,13 @@ function check_env_file() {

function setup_env() {
pushd $sdir/../../
local AMBARI_LOCATION=$(pwd)
local AMBARI_INFRA_LOCATION=$(pwd)
popd
local docker_ip=$(get_docker_ip)
cat << EOF > $sdir/.env
DOCKERIP=$docker_ip
MAVEN_REPOSITORY_LOCATION=$HOME/.m2
AMBARI_LOCATION=$AMBARI_LOCATION
AMBARI_INFRA_LOCATION=$AMBARI_INFRA_LOCATION
ZOOKEEPER_VERSION=3.4.10
ZOOKEEPER_CONNECTION_STRING=zookeeper:2181

0 comments on commit 3e941c5

Please sign in to comment.