Skip to content
Permalink
Browse files
AMBARI-22702. Infra Manager: scheduled deleting of Infra Solr documen…
…ts (Krisztian Kasa via oleewere

)
  • Loading branch information
kasakrisz authored and oleewere committed Jan 3, 2018
1 parent a8530df commit 42cc9a966c4e88f7d44aefcdb35f020de5d78896
Show file tree
Hide file tree
Showing 43 changed files with 1,322 additions and 266 deletions.
@@ -25,7 +25,7 @@
import java.time.format.DateTimeFormatter;

public class OffsetDateTimeConverter implements ParameterConverters.ParameterConverter {
private static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX");
public static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX");

@Override
public boolean accept(Type type) {
@@ -46,7 +46,6 @@
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.Date;
import java.util.UUID;
import java.util.function.BooleanSupplier;

import static java.lang.System.currentTimeMillis;
@@ -64,6 +63,7 @@ public abstract class AbstractInfraSteps {
private String dockerHost;
private SolrClient solrClient;
private AmazonS3Client s3client;
private int documentId = 0;

public InfraClient getInfraClient() {
return new InfraClient(String.format("http://%s:%d/api/v1/jobs", dockerHost, INFRA_MANAGER_PORT));
@@ -189,7 +189,7 @@ protected void addDocument(OffsetDateTime logtime) throws SolrServerException, I
solrInputDocument.addField("action", "getfileinfo");
solrInputDocument.addField("log_message", "allowed=true\tugi=ambari-qa (auth:SIMPLE)\tip=/192.168.64.102\tcmd=getfileinfo\tsrc=/ats/active\tdst=null\tperm=null\tproto=rpc\tcallerContext=HIVE_QUERY_ID:ambari-qa_20160317200111_223b3079-4a2d-431c-920f-6ba37ed63e9f");
solrInputDocument.addField("logger_name", "FSNamesystem.audit");
solrInputDocument.addField("id", UUID.randomUUID().toString());
solrInputDocument.addField("id", Integer.toString(documentId++));
solrInputDocument.addField("authType", "SIMPLE");
solrInputDocument.addField("logfile_line_number", 1);
solrInputDocument.addField("cliIP", "/192.168.64.102");
@@ -23,6 +23,8 @@
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.ambari.infra.InfraClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.jbehave.core.annotations.Alias;
import org.jbehave.core.annotations.Given;
import org.jbehave.core.annotations.Then;
@@ -31,9 +33,13 @@
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.OffsetDateTime;

import static org.apache.ambari.infra.OffsetDateTimeConverter.SOLR_DATETIME_FORMATTER;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.core.IsCollectionContaining.hasItem;
@@ -44,8 +50,10 @@ public class ExportJobsSteps extends AbstractInfraSteps {

@Given("$count documents in solr")
public void addDocuments(int count) throws Exception {
for (int i = 0; i < count; ++i)
addDocument(OffsetDateTime.now().minusMinutes(i));
OffsetDateTime intervalEnd = OffsetDateTime.now();
for (int i = 0; i < count; ++i) {
addDocument(intervalEnd.minusMinutes(i % (count / 10)));
}
getSolrClient().commit();
}

@@ -84,7 +92,7 @@ public void deleteFileFromS3(String key) {
}

@Then("Check filenames contains the text $text on s3 server after $waitSec seconds")
public void checkS3After(String text, int waitSec) throws Exception {
public void checkS3After(String text, int waitSec) {
AmazonS3Client s3Client = getS3client();
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME)
@@ -103,4 +111,38 @@ public void checkNumberOfFilesOnS3(int count, String text, int waitSec) {
.filter(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text))
.count() == count);
}

@Then("No file exists on s3 server with filenames containing the text $text")
public void fileNotExistOnS3(String text) {
AmazonS3Client s3Client = getS3client();
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
assertThat(s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream()
.anyMatch(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text)), is(false));
}

@Then("solr contains $count documents between $startLogtime and $endLogtime")
public void documentCount(int count, OffsetDateTime startLogTime, OffsetDateTime endLogTime) throws Exception {
SolrQuery query = new SolrQuery();
query.setRows(count * 2);
query.setQuery(String.format("logtime:[\"%s\" TO \"%s\"]", SOLR_DATETIME_FORMATTER.format(startLogTime), SOLR_DATETIME_FORMATTER.format(endLogTime)));
assertThat(getSolrClient().query(query).getResults().size(), is(count));
}

@Then("solr does not contain documents between $startLogtime and $endLogtime after $waitSec seconds")
public void isSolrEmpty(OffsetDateTime startLogTime, OffsetDateTime endLogTime, int waitSec) {
SolrQuery query = new SolrQuery();
query.setRows(1);
query.setQuery(String.format("logtime:[\"%s\" TO \"%s\"]", SOLR_DATETIME_FORMATTER.format(startLogTime), SOLR_DATETIME_FORMATTER.format(endLogTime)));
doWithin(waitSec, "check solr is empty", () -> isSolrEmpty(query));
}

private boolean isSolrEmpty(SolrQuery query) {
try {
return getSolrClient().query(query).getResults().isEmpty();
} catch (SolrServerException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
@@ -1,23 +1,38 @@
Scenario: Export documents form solr and upload them to s3 using defult configuration
Scenario: Exporting documents form solr and upload them to s3 using defult configuration

Given 1000 documents in solr
When start export_audit_logs job
When start archive_audit_logs job
Then Check filenames contains the text audit_logs on s3 server after 20 seconds


Scenario: Exporting 10 documents using writeBlockSize=3 produces 4 files

Given 10 documents in solr with logtime from 2010-10-09T05:00:00.000Z to 2010-10-09T20:00:00.000Z
When start export_audit_logs job with parameters writeBlockSize=3,start=2010-10-09T00:00:00.000Z,end=2010-10-11T00:00:00.000Z
When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-10-09T00:00:00.000Z,end=2010-10-11T00:00:00.000Z
Then Check 4 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-10-09 after 20 seconds
And solr does not contain documents between 2010-10-09T05:00:00.000Z and 2010-10-09T20:00:00.000Z after 5 seconds

Scenario: Running archiving job with a bigger start value than end value exports and deletes 0 documents

Scenario: Export job fails when part of the data is exported. After resolving the issue and restarting the job exports the rest of the data.
Given 10 documents in solr with logtime from 2010-01-01T05:00:00.000Z to 2010-01-04T05:00:00.000Z
When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-01-03T05:00:00.000Z,end=2010-01-02T05:00:00.000Z
Then No file exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-01-0
And solr contains 10 documents between 2010-01-01T05:00:00.000Z and 2010-01-04T05:00:00.000Z

Scenario: Archiving job fails when part of the data is exported. After resolving the issue and restarting the job exports the rest of the data.

Given 200 documents in solr with logtime from 2011-10-09T05:00:00.000Z to 2011-10-09T20:00:00.000Z
And a file on s3 with key solr_archive_audit_logs_-_2011-10-09T08:00:00.000Z.json.tar.gz
When start export_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z
And a file on s3 with key solr_archive_audit_logs_-_2011-10-09T08-00-00.000Z.json.tar.gz
When start archive_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z
Then Check 3 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds
When delete file with key solr_archive_audit_logs_-_2011-10-09T08:00:00.000Z.json.tar.gz from s3
And restart export_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z
And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T07:59:59.999Z after 5 seconds
When delete file with key solr_archive_audit_logs_-_2011-10-09T08-00-00.000Z.json.tar.gz from s3
And restart archive_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z
Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds
And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T20:00:00.000Z after 5 seconds

Scenario: After Deleting job deletes documents from solr no document found in the specified interval

Given 10 documents in solr with logtime from 2012-10-09T05:00:00.000Z to 2012-10-09T20:00:00.000Z
When start delete_audit_logs job with parameters start=2012-10-09T05:00:00.000Z,end=2012-10-09T20:00:00.000Z
Then solr does not contain documents between 2012-10-09T05:00:00.000Z and 2012-10-09T20:00:00.000Z after 5 seconds
@@ -81,6 +81,8 @@ function setup_profile() {
local AMBARI_LOCATION=$(pwd)
popd
cat << EOF > $sdir/Profile
AWS_ACCESS_KEY_ID=test
AWS_SECRET_ACCESS_KEY=test
EOF
}

@@ -428,6 +428,11 @@
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.5</version>
</dependency>
</dependencies>

</project>
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.infra.job;

import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.batch.core.JobParameters;

import java.io.IOException;
import java.io.UncheckedIOException;

public abstract class JobProperties<T extends JobProperties<T>> {
private final Class<T> clazz;

protected JobProperties(Class<T> clazz) {
this.clazz = clazz;
}

public T deepCopy() {
try {
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(this);
return objectMapper.readValue(json, clazz);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public abstract void apply(JobParameters jobParameters);

public abstract void validate();
}
@@ -16,33 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.infra.job.archive;
package org.apache.ambari.infra.job;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

public class DocumentExportJobListener implements JobExecutionListener {
public class JobPropertyMap<T extends JobProperties<T>> implements JobExecutionListener {

private final DocumentExportPropertyMap propertyMap;
private final PropertyMap<T> propertyMap;

public DocumentExportJobListener(DocumentExportPropertyMap propertyMap) {
public JobPropertyMap(PropertyMap<T> propertyMap) {
this.propertyMap = propertyMap;
}


@Override
public void beforeJob(JobExecution jobExecution) {
try {
String jobName = jobExecution.getJobInstance().getJobName();
DocumentExportProperties defaultProperties = propertyMap.getSolrDataExport().get(jobName);
T defaultProperties = propertyMap.getPropertyMap().get(jobName);
if (defaultProperties == null)
throw new UnsupportedOperationException("Properties not found for job " + jobName);

DocumentExportProperties properties = defaultProperties.deepCopy();
T properties = defaultProperties.deepCopy();
properties.apply(jobExecution.getJobParameters());
properties.validate();
jobExecution.getExecutionContext().put("exportProperties", properties);
jobExecution.getExecutionContext().put("jobProperties", properties);
}
catch (UnsupportedOperationException | IllegalArgumentException ex) {
jobExecution.stop();
@@ -53,6 +52,6 @@ public void beforeJob(JobExecution jobExecution) {

@Override
public void afterJob(JobExecution jobExecution) {
jobExecution.setExitStatus(new ExitStatus(ExitStatus.COMPLETED.getExitCode()));

}
}
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.infra.job;

import java.util.Map;

public interface PropertyMap<T extends JobProperties<T>> {
Map<String, T> getPropertyMap();
}
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.infra.job;

import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;

public abstract class SolrDAOBase {
private static final Logger LOG = LoggerFactory.getLogger(SolrDAOBase.class);

private final String zooKeeperConnectionString;
private final String defaultCollection;

protected SolrDAOBase(String zooKeeperConnectionString, String defaultCollection) {
this.zooKeeperConnectionString = zooKeeperConnectionString;
this.defaultCollection = defaultCollection;
}

protected void delete(String deleteQueryText) {
try (CloudSolrClient client = createClient()) {
try {
LOG.info("Executing solr delete by query {}", deleteQueryText);
client.deleteByQuery(deleteQueryText);
client.commit();
} catch (Exception e) {
try {
client.rollback();
} catch (SolrServerException e1) {
LOG.warn("Unable to rollback after solr delete operation failure.", e1);
}
throw new RuntimeException(e);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

protected CloudSolrClient createClient() {
CloudSolrClient client = new CloudSolrClient.Builder().withZkHost(zooKeeperConnectionString).build();
client.setDefaultCollection(defaultCollection);
return client;
}
}

0 comments on commit 42cc9a9

Please sign in to comment.