Skip to content

Commit

Permalink
Merge branch 'main' into feature/fix-cleanup-delay
Browse files Browse the repository at this point in the history
  • Loading branch information
andreeapad committed Dec 21, 2022
2 parents d8ea3fe + 37ce2e3 commit 8131792
Show file tree
Hide file tree
Showing 20 changed files with 442 additions and 152 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Throw exception if `cleanupDelay` can't be parsed instead of returning the default value.
### Changed
- Don't return records for cleanup after 10 attempts have been reached.
- Added additional checks for the number of levels in table and partition paths so that invalid paths are not scheduled for deletion.

## [3.4.14] - 2022-11-25
### Fixed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2019-2021 Expedia, Inc.
* Copyright (C) 2019-2022 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (C) 2019-2021 Expedia, Inc.
* Copyright (C) 2019-2022 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,5 +19,6 @@ public enum HousekeepingStatus {
SCHEDULED,
FAILED,
DELETED,
DISABLED
DISABLED,
SKIPPED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright (C) 2019-2022 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.expediagroup.beekeeper.core.validation;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3PathValidator {

private static final Logger log = LoggerFactory.getLogger(S3PathValidator.class);

/**
* The minimum number of "/" chars for partition location: s3://basePath/table/partition = 4
*/
public static boolean validPartitionPath(String location) {
boolean valid = getNumSlashes(location) >= 4;
if (!valid) {
log.warn("Partition \"{}\" doesn't have the correct number of levels in the path", location);
}
return valid;
}

/**
* The minimum number of "/" chars for table location: s3://basePath/table = 3
*/
public static boolean validTablePath(String location) {
boolean valid = getNumSlashes(location) >= 3;
if (!valid) {
log.warn("Table \"{}\" doesn't have the correct number of levels in the path", location);
}
return valid;
}

private static int getNumSlashes(String location) {
return StringUtils.countMatches(location, "/");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright (C) 2019-2022 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.expediagroup.beekeeper.core.validation;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;

public class S3PathValidatorTest {

@Test
void validTablePath() {
assertThat(S3PathValidator.validTablePath("s3://bucket/table")).isTrue();
assertThat(S3PathValidator.validTablePath("s3://bucket/table/1")).isTrue();
}

@Test
void invalidTablePath() {
assertThat(S3PathValidator.validTablePath("s3://bucket")).isFalse();
}

@Test
void validPartitionPath() {
assertThat(S3PathValidator.validPartitionPath("s3://bucket/table/partition")).isTrue();
assertThat(S3PathValidator.validPartitionPath("s3://bucket/table/partition/1")).isTrue();
}

@Test
void invalidPartitionPath() {
assertThat(S3PathValidator.validPartitionPath("s3://bucket/table")).isFalse();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public class BeekeeperExpiredMetadataSchedulerApiaryIntegrationTest extends Beek
private static final String PARTITION_B_VALUES = "[ \"2020-01-01\", \"1\" ]";
private static final String PARTITION_A_NAME = "event_date=2020-01-01/event_hour=0";
private static final String PARTITION_B_NAME = "event_date=2020-01-01/event_hour=1";
private static final String LOCATION_A = "s3://location-a";
private static final String LOCATION_B = "s3://location-b";
private static final String LOCATION_A = "s3://bucket/table1/partition";
private static final String LOCATION_B = "s3://bucket/table2/partition";

@Container
private static final LocalStackContainer SQS_CONTAINER = ContainerTestUtils.awsContainer(SQS);
Expand Down Expand Up @@ -148,7 +148,7 @@ public void expiredMetadataAddPartitionEvent() throws SQLException, IOException,
amazonSQS.sendMessage(sendMessageRequest(addPartitionSqsMessage.getFormattedString()));

// creating entry for table
insertExpiredMetadata("s3://location", null);
insertExpiredMetadata("s3://bucket/table1", null);

await().atMost(60, TimeUnit.SECONDS).until(() -> getExpiredMetadataRowCount() == 2);

Expand All @@ -168,7 +168,7 @@ public void expiredMetadataMultipleAddPartitionEvents() throws SQLException, IOE
amazonSQS.sendMessage(sendMessageRequest(addPartitionSqsMessage2.getFormattedString()));

// creating entry for table
insertExpiredMetadata("s3://location", null);
insertExpiredMetadata("s3://bucket/table1", null);

await().atMost(60, TimeUnit.SECONDS).until(() -> getExpiredMetadataRowCount() == 3);

Expand Down Expand Up @@ -216,16 +216,16 @@ public void expiredMetadataMultipleAlterPartitionTableEvents() throws SQLExcepti
public void healthCheck() {
CloseableHttpClient client = HttpClientBuilder.create().build();
HttpGet request = new HttpGet(HEALTHCHECK_URI);
await().atMost(TIMEOUT, TimeUnit.SECONDS)
await()
.atMost(TIMEOUT, TimeUnit.SECONDS)
.until(() -> client.execute(request).getStatusLine().getStatusCode() == 200);
}

@Test
public void prometheus() {
CloseableHttpClient client = HttpClientBuilder.create().build();
HttpGet request = new HttpGet(PROMETHEUS_URI);
await().atMost(30, TimeUnit.SECONDS)
.until(() -> client.execute(request).getStatusLine().getStatusCode() == 200);
await().atMost(30, TimeUnit.SECONDS).until(() -> client.execute(request).getStatusLine().getStatusCode() == 200);
}

private SendMessageRequest sendMessageRequest(String payload) {
Expand All @@ -237,7 +237,9 @@ private void assertExpiredMetadata(HousekeepingMetadata actual, String expectedP
assertMetrics();
}

public void assertHousekeepingMetadata(HousekeepingMetadata actual, String expectedPath,
public void assertHousekeepingMetadata(
HousekeepingMetadata actual,
String expectedPath,
String expectedPartitionName) {
assertThat(actual.getPath()).isEqualTo(expectedPath);
assertThat(actual.getDatabaseName()).isEqualTo(DATABASE_NAME_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,86 +111,86 @@ public void stop() throws InterruptedException {

@Test
public void unreferencedAlterTableEvent() throws SQLException, IOException, URISyntaxException {
AlterTableSqsMessage alterTableSqsMessage = new AlterTableSqsMessage("s3://tableLocation", "s3://oldTableLocation",
AlterTableSqsMessage alterTableSqsMessage = new AlterTableSqsMessage("s3://bucket/tableLocation", "s3://bucket/oldTableLocation",
true, true);
amazonSQS.sendMessage(sendMessageRequest(alterTableSqsMessage.getFormattedString()));
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 1);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://oldTableLocation");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/oldTableLocation");
}

@Test
public void unreferencedMultipleAlterTableEvents() throws SQLException, IOException, URISyntaxException {
AlterTableSqsMessage alterTableSqsMessage = new AlterTableSqsMessage("s3://tableLocation", "s3://oldTableLocation",
AlterTableSqsMessage alterTableSqsMessage = new AlterTableSqsMessage("s3://bucket/tableLocation", "s3://bucket/oldTableLocation",
true, true);
amazonSQS.sendMessage(sendMessageRequest(alterTableSqsMessage.getFormattedString()));
alterTableSqsMessage.setTableLocation("s3://tableLocation2");
alterTableSqsMessage.setOldTableLocation("s3://tableLocation");
alterTableSqsMessage.setTableLocation("s3://bucket/tableLocation2");
alterTableSqsMessage.setOldTableLocation("s3://bucket/tableLocation");
amazonSQS.sendMessage(sendMessageRequest(alterTableSqsMessage.getFormattedString()));
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://oldTableLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://tableLocation");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/oldTableLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/tableLocation");
}

@Test
public void unreferencedAlterPartitionEvent() throws SQLException, IOException, URISyntaxException {
AlterPartitionSqsMessage alterPartitionSqsMessage = new AlterPartitionSqsMessage("s3://expiredTableLocation",
"s3://partitionLocation", "s3://unreferencedPartitionLocation", true, true);
AlterPartitionSqsMessage alterPartitionSqsMessage = new AlterPartitionSqsMessage("s3://bucket/table/expiredTableLocation",
"s3://bucket/table/partitionLocation", "s3://bucket/table/unreferencedPartitionLocation", true, true);
amazonSQS.sendMessage(sendMessageRequest(alterPartitionSqsMessage.getFormattedString()));
alterPartitionSqsMessage.setTableLocation("s3://expiredTableLocation2");
alterPartitionSqsMessage.setPartitionLocation("s3://partitionLocation2");
alterPartitionSqsMessage.setOldPartitionLocation("s3://partitionLocation");
alterPartitionSqsMessage.setTableLocation("s3://bucket/table/expiredTableLocation2");
alterPartitionSqsMessage.setPartitionLocation("s3://bucket/table/partitionLocation2");
alterPartitionSqsMessage.setOldPartitionLocation("s3://bucket/table/partitionLocation");
amazonSQS.sendMessage(sendMessageRequest(alterPartitionSqsMessage.getFormattedString()));
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://partitionLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://unreferencedPartitionLocation");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/unreferencedPartitionLocation");
}

@Test
public void unreferencedMultipleAlterPartitionEvent() throws IOException, SQLException, URISyntaxException {
List.of(
new AlterPartitionSqsMessage("s3://expiredTableLocation", "s3://partitionLocation",
"s3://unreferencedPartitionLocation", true, true),
new AlterPartitionSqsMessage("s3://expiredTableLocation2", "s3://partitionLocation2",
"s3://partitionLocation", true, true)
new AlterPartitionSqsMessage("s3://bucket/table/expiredTableLocation", "s3://bucket/table/partitionLocation",
"s3://bucket/table/unreferencedPartitionLocation", true, true),
new AlterPartitionSqsMessage("s3://bucket/table/expiredTableLocation2", "s3://bucket/table/partitionLocation2",
"s3://bucket/table/partitionLocation", true, true)
).forEach(msg -> amazonSQS.sendMessage(sendMessageRequest(msg.getFormattedString())));

await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://partitionLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://unreferencedPartitionLocation");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/unreferencedPartitionLocation");
}

@Test
public void unreferencedDropPartitionEvent() throws SQLException, IOException, URISyntaxException {
DropPartitionSqsMessage dropPartitionSqsMessage = new DropPartitionSqsMessage("s3://partitionLocation", true, true);
DropPartitionSqsMessage dropPartitionSqsMessage = new DropPartitionSqsMessage("s3://bucket/table/partitionLocation", true, true);
amazonSQS.sendMessage(sendMessageRequest(dropPartitionSqsMessage.getFormattedString()));
dropPartitionSqsMessage.setPartitionLocation("s3://partitionLocation2");
dropPartitionSqsMessage.setPartitionLocation("s3://bucket/table/partitionLocation2");
amazonSQS.sendMessage(sendMessageRequest(dropPartitionSqsMessage.getFormattedString()));
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://partitionLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://partitionLocation2");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/table/partitionLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/table/partitionLocation2");
}

@Test
public void unreferencedDropTableEvent() throws SQLException, IOException, URISyntaxException {
DropTableSqsMessage dropTableSqsMessage = new DropTableSqsMessage("s3://tableLocation", true, true);
DropTableSqsMessage dropTableSqsMessage = new DropTableSqsMessage("s3://bucket/tableLocation", true, true);
amazonSQS.sendMessage(sendMessageRequest(dropTableSqsMessage.getFormattedString()));
dropTableSqsMessage.setTableLocation("s3://tableLocation2");
dropTableSqsMessage.setTableLocation("s3://bucket/tableLocation2");
amazonSQS.sendMessage(sendMessageRequest(dropTableSqsMessage.getFormattedString()));
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> getUnreferencedPathsRowCount() == 2);

List<HousekeepingPath> unreferencedPaths = getUnreferencedPaths();
assertUnreferencedPath(unreferencedPaths.get(0), "s3://tableLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://tableLocation2");
assertUnreferencedPath(unreferencedPaths.get(0), "s3://bucket/tableLocation");
assertUnreferencedPath(unreferencedPaths.get(1), "s3://bucket/tableLocation2");
}

@Test
Expand Down

0 comments on commit 8131792

Please sign in to comment.