Skip to content

Commit

Permalink
Merge 8131792 into 37ce2e3
Browse files Browse the repository at this point in the history
  • Loading branch information
andreeapad committed Dec 21, 2022
2 parents 37ce2e3 + 8131792 commit c412e6b
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 62 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.5.0] - TBD
### Fixed
- Throw exception if `cleanupDelay` can't be parsed instead of returning the default value.
### Changed
- Don't return records for cleanup after 10 attempts have been reached.
- Added additional checks for the number of levels in table and partition paths so that invalid paths are not scheduled for deletion.

## [3.4.14] - 2022-11-25
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,21 @@

import com.expediagroup.beekeeper.core.model.HousekeepingMetadata;

public interface HousekeepingMetadataRepository extends PagingAndSortingRepository<HousekeepingMetadata, Long>,
JpaSpecificationExecutor<HousekeepingMetadata> {
public interface HousekeepingMetadataRepository
extends PagingAndSortingRepository<HousekeepingMetadata, Long>, JpaSpecificationExecutor<HousekeepingMetadata> {

@Query(value = "from HousekeepingMetadata t where t.cleanupTimestamp <= :instant "
+ "and (t.housekeepingStatus = 'SCHEDULED' or t.housekeepingStatus = 'FAILED') "
+ "and t.modifiedTimestamp <= :instant order by t.modifiedTimestamp")
+ "and t.modifiedTimestamp <= :instant and t.cleanupAttempts < 10 order by t.modifiedTimestamp")
Slice<HousekeepingMetadata> findRecordsForCleanupByModifiedTimestamp(
@Param("instant") LocalDateTime instant,
Pageable pageable);

/**
* Returns the record that matches the inputs given, if there is one.
*
* @implNote To get the record for a partitioned table both the input value and the value of the partitionName of the current
* record must be NULL.
*
* @implNote To get the record for a partitioned table both the input value and the value of the partitionName of the
* current record must be NULL.
* @param databaseName
* @param tableName
* @param partitionName
Expand All @@ -58,7 +57,8 @@ Slice<HousekeepingMetadata> findRecordsForCleanupByModifiedTimestamp(
+ "and (t.housekeepingStatus = 'SCHEDULED' or t.housekeepingStatus = 'FAILED')")
Optional<HousekeepingMetadata> findRecordForCleanupByDbTableAndPartitionName(
@Param("databaseName") String databaseName,
@Param("tableName") String tableName, @Param("partitionName") String partitionName);
@Param("tableName") String tableName,
@Param("partitionName") String partitionName);

/**
* Returns the maximum value for the cleanupTimestamp for a database and table name pair.
Expand All @@ -76,7 +76,8 @@ LocalDateTime findMaximumCleanupTimestampForDbAndTable(
@Param("tableName") String tableName);

/**
* This method returns the count of all records for a database and table name pair where the partitionName is not null.
* This method returns the count of all records for a database and table name pair where the partitionName is not
* null.
*
* @param databaseName
* @param tableName
Expand Down Expand Up @@ -122,7 +123,8 @@ Long countRecordsForDryRunWherePartitionIsNotNullOrExpired(
+ "and t.tableName = :tableName "
+ "and t.partitionName is not NULL "
+ "and (t.housekeepingStatus = 'SCHEDULED' or t.housekeepingStatus = 'FAILED')")
void deleteScheduledOrFailedPartitionRecordsForTable(@Param("databaseName") String databaseName,
void deleteScheduledOrFailedPartitionRecordsForTable(
@Param("databaseName") String databaseName,
@Param("tableName") String tableName);

/**
Expand All @@ -134,7 +136,8 @@ void deleteScheduledOrFailedPartitionRecordsForTable(@Param("databaseName") Stri
List<HousekeepingMetadata> findActiveTables();

/**
* This method deletes the rows which have "DELETED" or "DISABLED" status and are older than the specified {@code instant}.
* This method deletes the rows which have "DELETED" or "DISABLED" status and are older than the specified
* {@code instant}.
*
* @param instant
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ public interface HousekeepingPathRepository

@Query(value = "from HousekeepingPath p where p.cleanupTimestamp <= :instant "
+ "and (p.housekeepingStatus = 'SCHEDULED' or p.housekeepingStatus = 'FAILED') "
+ "and p.modifiedTimestamp <= :instant")
Slice<HousekeepingPath> findRecordsForCleanupByModifiedTimestamp(
@Param("instant") LocalDateTime instant,
Pageable pageable);
+ "and p.modifiedTimestamp <= :instant and p.cleanupAttempts < 10")
Slice<HousekeepingPath> findRecordsForCleanup(@Param("instant") LocalDateTime instant, Pageable pageable);

@Modifying
@Query(value = "delete from HousekeepingPath p where p.cleanupTimestamp < :instant "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,17 @@ public void timezone() {
assertThat(savedTable.getCreationTimestamp().getHour()).isEqualTo(utcHour);
}

@Test
public void findRecordsForCleanupByModifiedTimestampMaxCleanupAttemptsReached() {
HousekeepingMetadata table = createPartitionedEntityHousekeepingTable();
table.setCleanupAttempts(10);
housekeepingMetadataRepository.save(table);

Slice<HousekeepingMetadata> result = housekeepingMetadataRepository
.findRecordsForCleanupByModifiedTimestamp(CLEANUP_TIMESTAMP, PageRequest.of(PAGE, PAGE_SIZE));
assertThat(result.getContent().size()).isEqualTo(0);
}

@Test
public void findRecordsForCleanupByModifiedTimestamp() {
HousekeepingMetadata table = createPartitionedEntityHousekeepingTable();
Expand Down Expand Up @@ -225,8 +236,8 @@ public void findRecordForCleanupByDatabaseAndTable() {
HousekeepingMetadata table = createPartitionedEntityHousekeepingTable();
housekeepingMetadataRepository.save(table);

Optional<HousekeepingMetadata> result = housekeepingMetadataRepository.findRecordForCleanupByDbTableAndPartitionName(
DATABASE_NAME, TABLE_NAME, PARTITION_NAME);
Optional<HousekeepingMetadata> result = housekeepingMetadataRepository
.findRecordForCleanupByDbTableAndPartitionName(DATABASE_NAME, TABLE_NAME, PARTITION_NAME);

assertTrue(result.isPresent());
compare(result.get(), table);
Expand All @@ -238,8 +249,8 @@ public void findRecordForCleanupByDatabaseAndTableForNullCase() {
table.setPartitionName(null);
housekeepingMetadataRepository.save(table);

Optional<HousekeepingMetadata> result = housekeepingMetadataRepository.findRecordForCleanupByDbTableAndPartitionName(
DATABASE_NAME, TABLE_NAME, null);
Optional<HousekeepingMetadata> result = housekeepingMetadataRepository
.findRecordForCleanupByDbTableAndPartitionName(DATABASE_NAME, TABLE_NAME, null);

assertTrue(result.isPresent());
compare(result.get(), table);
Expand All @@ -250,8 +261,8 @@ public void findRecordForCleanupByDatabaseAndTableZeroResults() {
HousekeepingMetadata table = createPartitionedEntityHousekeepingTable(DELETED);
housekeepingMetadataRepository.save(table);

Optional<HousekeepingMetadata> result = housekeepingMetadataRepository.findRecordForCleanupByDbTableAndPartitionName(
DATABASE_NAME, TABLE_NAME, PARTITION_NAME);
Optional<HousekeepingMetadata> result = housekeepingMetadataRepository
.findRecordForCleanupByDbTableAndPartitionName(DATABASE_NAME, TABLE_NAME, PARTITION_NAME);

assertTrue(result.isEmpty());
}
Expand Down Expand Up @@ -329,8 +340,8 @@ public void dryRunCountPartitionsForPartitionedTable() {
HousekeepingMetadata housekeepingTable = createPartitionedEntityHousekeepingTable();
housekeepingMetadataRepository.save(housekeepingTable);

long result = housekeepingMetadataRepository.countRecordsForDryRunWherePartitionIsNotNullOrExpired(
CLEANUP_TIMESTAMP, DATABASE_NAME, TABLE_NAME);
long result = housekeepingMetadataRepository
.countRecordsForDryRunWherePartitionIsNotNullOrExpired(CLEANUP_TIMESTAMP, DATABASE_NAME, TABLE_NAME);

assertEquals(1L, result);
}
Expand All @@ -340,8 +351,8 @@ public void dryRunCountPartitionsForPartitionedTableEmpty() {
HousekeepingMetadata housekeepingTable = createPartitionedEntityHousekeepingTable(DELETED);
housekeepingMetadataRepository.save(housekeepingTable);

long result = housekeepingMetadataRepository.countRecordsForDryRunWherePartitionIsNotNullOrExpired(
CLEANUP_TIMESTAMP, DATABASE_NAME, TABLE_NAME);
long result = housekeepingMetadataRepository
.countRecordsForDryRunWherePartitionIsNotNullOrExpired(CLEANUP_TIMESTAMP, DATABASE_NAME, TABLE_NAME);

assertEquals(0L, result);
}
Expand All @@ -351,8 +362,8 @@ public void dryRunCountPartitionsForUnpartitionedTable() {
HousekeepingMetadata housekeepingTable = createUnpartitionedEntityHousekeepingTable();
housekeepingMetadataRepository.save(housekeepingTable);

long result = housekeepingMetadataRepository.countRecordsForDryRunWherePartitionIsNotNullOrExpired(
CLEANUP_TIMESTAMP, DATABASE_NAME, TABLE_NAME);
long result = housekeepingMetadataRepository
.countRecordsForDryRunWherePartitionIsNotNullOrExpired(CLEANUP_TIMESTAMP, DATABASE_NAME, TABLE_NAME);

assertEquals(0L, result);
}
Expand Down Expand Up @@ -407,7 +418,7 @@ public void findActiveTables() {
housekeepingMetadataRepository.save(table3);
HousekeepingMetadata partition = createPartitionedEntityHousekeepingTable();
housekeepingMetadataRepository.save(partition);

List<HousekeepingMetadata> result = housekeepingMetadataRepository.findActiveTables();
assertThat(result.size()).isEqualTo(3);
assertThat(result.get(0).getTableName()).isEqualTo("tbl1");
Expand Down Expand Up @@ -484,8 +495,7 @@ private HousekeepingMetadata createEntityHouseKeepingTable(
return createEntityHouseKeepingTable(databaseName, tableName, partitionName, CREATION_TIMESTAMP, SCHEDULED);
}

private HousekeepingMetadata createPartitionedEntityHousekeepingTable(
HousekeepingStatus status) {
private HousekeepingMetadata createPartitionedEntityHousekeepingTable(HousekeepingStatus status) {
return createEntityHouseKeepingTable(DATABASE_NAME, TABLE_NAME, PARTITION_NAME, CREATION_TIMESTAMP, status);
}

Expand All @@ -501,7 +511,8 @@ private HousekeepingMetadata createEntityHouseKeepingTable(
String partitionName,
LocalDateTime creationDate,
HousekeepingStatus status) {
return HousekeepingMetadata.builder()
return HousekeepingMetadata
.builder()
.path(PATH)
.databaseName(databaseName)
.tableName(tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,28 +148,39 @@ public void checkDuplicatePathThrowsException() {
}

@Test
void findRecordsForCleanupByModifiedTimestamp() {
void findRecordsForCleanup() {
HousekeepingPath path = createEntityHousekeepingPath();
housekeepingPathRepository.save(path);

Slice<HousekeepingPath> result = housekeepingPathRepository
.findRecordsForCleanupByModifiedTimestamp(CLEANUP_TIMESTAMP, PageRequest.of(PAGE, PAGE_SIZE));
.findRecordsForCleanup(CLEANUP_TIMESTAMP, PageRequest.of(PAGE, PAGE_SIZE));
assertThat(result.getContent().get(0).getPath()).isEqualTo("path");
}

@Test
void findRecordsForCleanupByModifiedTimestampZeroResults() {
public void findRecordsForCleanupMaxCleanupAttemptsReached() {
HousekeepingPath path = createEntityHousekeepingPath();
path.setCleanupAttempts(10);
housekeepingPathRepository.save(path);

Slice<HousekeepingPath> result = housekeepingPathRepository
.findRecordsForCleanup(CLEANUP_TIMESTAMP, PageRequest.of(PAGE, PAGE_SIZE));
assertThat(result.getContent().size()).isEqualTo(0);
}

@Test
void findRecordsForCleanupZeroResults() {
HousekeepingPath path = createEntityHousekeepingPath();
path.setHousekeepingStatus(DELETED);
housekeepingPathRepository.save(path);

Slice<HousekeepingPath> result = housekeepingPathRepository
.findRecordsForCleanupByModifiedTimestamp(LocalDateTime.now(), PageRequest.of(PAGE, PAGE_SIZE));
.findRecordsForCleanup(LocalDateTime.now(), PageRequest.of(PAGE, PAGE_SIZE));
assertThat(result.getContent().size()).isEqualTo(0);
}

@Test
void findRecordsForCleanupByModifiedTimestampMixedPathStatus() {
void findRecordsForCleanupMixedPathStatus() {
HousekeepingPath housekeepingPath1 = createEntityHousekeepingPath();
housekeepingPathRepository.save(housekeepingPath1);

Expand All @@ -184,12 +195,12 @@ void findRecordsForCleanupByModifiedTimestampMixedPathStatus() {
housekeepingPathRepository.save(housekeepingPath3);

Slice<HousekeepingPath> result = housekeepingPathRepository
.findRecordsForCleanupByModifiedTimestamp(CLEANUP_TIMESTAMP, PageRequest.of(PAGE, PAGE_SIZE));
.findRecordsForCleanup(CLEANUP_TIMESTAMP, PageRequest.of(PAGE, PAGE_SIZE));
assertThat(result.getContent().size()).isEqualTo(2);
}

@Test
void findRecordsForCleanupByModifiedTimestampRespectsOrder() {
void findRecordsForCleanupRespectsOrder() {
String path1 = "path1";
String path2 = "path2";

Expand All @@ -202,7 +213,7 @@ void findRecordsForCleanupByModifiedTimestampRespectsOrder() {
housekeepingPathRepository.save(housekeepingPath2);

List<HousekeepingPath> result = housekeepingPathRepository
.findRecordsForCleanupByModifiedTimestamp(CLEANUP_TIMESTAMP, PageRequest.of(PAGE, PAGE_SIZE))
.findRecordsForCleanup(CLEANUP_TIMESTAMP, PageRequest.of(PAGE, PAGE_SIZE))
.getContent();
assertThat(result.get(0).getPath()).isEqualTo(path1);
assertThat(result.get(1).getPath()).isEqualTo(path2);
Expand Down Expand Up @@ -259,9 +270,12 @@ private HousekeepingPath createEntityHousekeepingPath() {
return createEntityHousekeepingPath("path", CREATION_TIMESTAMP, SCHEDULED);
}

private HousekeepingPath createEntityHousekeepingPath(String path, LocalDateTime creationDate,
private HousekeepingPath createEntityHousekeepingPath(
String path,
LocalDateTime creationDate,
HousekeepingStatus status) {
return HousekeepingPath.builder()
return HousekeepingPath
.builder()
.path(path)
.databaseName("database")
.tableName("table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ public UnreferencedPathHandler(

@Override
public Slice<HousekeepingPath> findRecordsToClean(LocalDateTime instant, Pageable pageable) {
return housekeepingPathRepository.findRecordsForCleanupByModifiedTimestamp(instant, pageable);
return housekeepingPathRepository.findRecordsForCleanup(instant, pageable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
@ExtendWith(MockitoExtension.class)
public class UnreferencedPathHandlerTest {

@Mock private HousekeepingPathRepository housekeepingPathRepository;
@Mock private S3PathCleaner s3PathCleaner;
@Mock
private HousekeepingPathRepository housekeepingPathRepository;
@Mock
private S3PathCleaner s3PathCleaner;
private LifecycleEventType lifecycleEventType = UNREFERENCED;

private UnreferencedPathHandler handler;
Expand All @@ -63,6 +65,6 @@ public void verifyHousekeepingPathFetch() {
LocalDateTime now = LocalDateTime.now();
Pageable emptyPageable = PageRequest.of(0, 1);
handler.findRecordsToClean(now, emptyPageable);
verify(housekeepingPathRepository).findRecordsForCleanupByModifiedTimestamp(now, emptyPageable);
verify(housekeepingPathRepository).findRecordsForCleanup(now, emptyPageable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ public Duration extractCleanupDelay(ListenerEvent listenerEvent) {
String tableCleanupDelay = listenerEvent.getTableParameters().get(propertyKey);
try {
Duration value = tableCleanupDelay == null ? defaultValue : Duration.parse(tableCleanupDelay);
log.info("Using value '{}' for key {} for table {}.{}.", value, propertyKey, listenerEvent.getDbName(),
log
.info("Using value '{}' for key {} for table {}.{}.", value, propertyKey, listenerEvent.getDbName(),
listenerEvent.getTableName());
return value;
} catch (DateTimeParseException | NullPointerException e) {
log.warn(
"Overridden delay value '{}' for key '{}' cannot be parsed to a Duration for table '{}.{}'. Using default setting {}.",
tableCleanupDelay, propertyKey, listenerEvent.getDbName(), listenerEvent.getTableName(), defaultValue);
return defaultValue;
} catch (DateTimeParseException e) {
throw new BeekeeperException(String
.format("Cleanup delay value '%s' for key '%s' cannot be parsed to a Duration for table '%s.%s'.",
tableCleanupDelay, propertyKey, listenerEvent.getDbName(), listenerEvent.getTableName()),
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,22 @@ public class MessageEventHandler {
private final HousekeepingEntityGenerator generator;
private final List<ListenerEventFilter> filters;

public MessageEventHandler(
HousekeepingEntityGenerator generator,
List<ListenerEventFilter> filters
) {
public MessageEventHandler(HousekeepingEntityGenerator generator, List<ListenerEventFilter> filters) {
this.generator = generator;
this.filters = filters;
this.lifecycleEventType = generator.getLifecycleEventType();
}

public List<HousekeepingEntity> handleMessage(MessageEvent event) {
ListenerEvent listenerEvent = event.getEvent();

if (shouldFilterMessage(listenerEvent)) {
return Collections.emptyList();
}

return generateHousekeepingEntities(listenerEvent);
}

private boolean shouldFilterMessage(ListenerEvent listenerEvent) {
return filters.stream()
.anyMatch(filter -> filter.isFiltered(listenerEvent, lifecycleEventType));
return filters.stream().anyMatch(filter -> filter.isFiltered(listenerEvent, lifecycleEventType));
}

private List<HousekeepingEntity> generateHousekeepingEntities(ListenerEvent listenerEvent) {
Expand Down

0 comments on commit c412e6b

Please sign in to comment.