Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 46 additions & 18 deletions src/Exceptionless.Core/Jobs/CleanupOrphanedDataJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Exceptionless.Core.Jobs;
[Job(Description = "Deletes orphaned data.", IsContinuous = false)]
public class CleanupOrphanedDataJob : JobWithLockBase, IHealthCheck
{
private static readonly TimeSpan OrphanedEventLookback = TimeSpan.FromDays(3);
private readonly ExceptionlessElasticConfiguration _config;
private readonly IElasticClient _elasticClient;
private readonly IStackRepository _stackRepository;
Expand Down Expand Up @@ -60,9 +61,13 @@ protected override async Task<JobResult> RunInternalAsync(JobContext context)

public async Task DeleteOrphanedEventsByStackAsync(JobContext context)
{
var orphanedEventCutoffUtc = _timeProvider.GetUtcNow().UtcDateTime.Subtract(OrphanedEventLookback);

// get approximate number of unique stack ids
var stackCardinality = await _elasticClient.SearchAsync<PersistentEvent>(s => s.Size(0).Aggregations(a => a
.Cardinality("cardinality_stack_id", c => c.Field(f => f.StackId).PrecisionThreshold(40000))));
var stackCardinality = await _elasticClient.SearchAsync<PersistentEvent>(s => s
.Size(0)
.Query(q => q.DateRange(r => r.Field(f => f.CreatedUtc).GreaterThanOrEquals(orphanedEventCutoffUtc)))
.Aggregations(a => a.Cardinality("cardinality_stack_id", c => c.Field(f => f.StackId).PrecisionThreshold(40000))));

double? uniqueStackIdCount = stackCardinality.Aggregations.Cardinality("cardinality_stack_id")?.Value;
if (!uniqueStackIdCount.HasValue || uniqueStackIdCount.Value <= 0)
Expand All @@ -79,8 +84,10 @@ public async Task DeleteOrphanedEventsByStackAsync(JobContext context)
{
await RenewLockAsync(context);

var stackIdTerms = await _elasticClient.SearchAsync<PersistentEvent>(s => s.Size(0).Aggregations(a => a
.Terms("terms_stack_id", c => c.Field(f => f.StackId).Include(batchNumber, buckets).Size(batchSize * 2))));
var stackIdTerms = await _elasticClient.SearchAsync<PersistentEvent>(s => s
.Size(0)
.Query(q => q.DateRange(r => r.Field(f => f.CreatedUtc).GreaterThanOrEquals(orphanedEventCutoffUtc)))
.Aggregations(a => a.Terms("terms_stack_id", c => c.Field(f => f.StackId).Include(batchNumber, buckets).Size(batchSize * 2))));

string[] stackIds = stackIdTerms.Aggregations.Terms("terms_stack_id").Buckets.Select(b => b.Key).ToArray();
if (stackIds.Length == 0)
Expand All @@ -100,17 +107,24 @@ public async Task DeleteOrphanedEventsByStackAsync(JobContext context)

totalOrphanedEventCount += missingStackIds.Length;
_logger.LogInformation("{BatchNumber}/{BatchCount}: Found {OrphanedEventCount} orphaned events from missing stacks {MissingStackIds} out of {StackIdCount}", batchNumber, buckets, missingStackIds.Length, missingStackIds, stackIds.Length);
await _elasticClient.DeleteByQueryAsync<PersistentEvent>(r => r.Query(q => q.Terms(t => t.Field(f => f.StackId).Terms(missingStackIds))));
await _elasticClient.DeleteByQueryAsync<PersistentEvent>(r => r.Query(q => q.Bool(b => b
.Filter(
f => f.Terms(t => t.Field(e => e.StackId).Terms(missingStackIds)),
f => f.DateRange(d => d.Field(e => e.CreatedUtc).GreaterThanOrEquals(orphanedEventCutoffUtc))))));
}

_logger.LogInformation("Found {OrphanedEventCount} orphaned events from missing stacks out of {StackIdCount}", totalOrphanedEventCount, totalStackIds);
_logger.LogInformation("Found {OrphanedEventCount} orphaned events from missing stacks out of {StackIdCount} since {OrphanedEventCutoffUtc}", totalOrphanedEventCount, totalStackIds, orphanedEventCutoffUtc);
}

public async Task DeleteOrphanedEventsByProjectAsync(JobContext context)
{
var orphanedEventCutoffUtc = _timeProvider.GetUtcNow().UtcDateTime.Subtract(OrphanedEventLookback);

// get approximate number of unique project ids
var projectCardinality = await _elasticClient.SearchAsync<PersistentEvent>(s => s.Size(0).Aggregations(a => a
.Cardinality("cardinality_project_id", c => c.Field(f => f.ProjectId).PrecisionThreshold(40000))));
var projectCardinality = await _elasticClient.SearchAsync<PersistentEvent>(s => s
.Size(0)
.Query(q => q.DateRange(r => r.Field(f => f.CreatedUtc).GreaterThanOrEquals(orphanedEventCutoffUtc)))
.Aggregations(a => a.Cardinality("cardinality_project_id", c => c.Field(f => f.ProjectId).PrecisionThreshold(40000))));

double? uniqueProjectIdCount = projectCardinality.Aggregations.Cardinality("cardinality_project_id")?.Value;
if (!uniqueProjectIdCount.HasValue || uniqueProjectIdCount.Value <= 0)
Expand All @@ -127,8 +141,10 @@ public async Task DeleteOrphanedEventsByProjectAsync(JobContext context)
{
await RenewLockAsync(context);

var projectIdTerms = await _elasticClient.SearchAsync<PersistentEvent>(s => s.Size(0).Aggregations(a => a
.Terms("terms_project_id", c => c.Field(f => f.ProjectId).Include(batchNumber, buckets).Size(batchSize * 2))));
var projectIdTerms = await _elasticClient.SearchAsync<PersistentEvent>(s => s
.Size(0)
.Query(q => q.DateRange(r => r.Field(f => f.CreatedUtc).GreaterThanOrEquals(orphanedEventCutoffUtc)))
.Aggregations(a => a.Terms("terms_project_id", c => c.Field(f => f.ProjectId).Include(batchNumber, buckets).Size(batchSize * 2))));

string[] projectIds = projectIdTerms.Aggregations.Terms("terms_project_id").Buckets.Select(b => b.Key).ToArray();
if (projectIds.Length == 0)
Expand All @@ -146,17 +162,24 @@ public async Task DeleteOrphanedEventsByProjectAsync(JobContext context)
}

_logger.LogInformation("{BatchNumber}/{BatchCount}: Found {OrphanedEventCount} orphaned events from missing projects {MissingProjectIds} out of {ProjectIdCount}", batchNumber, buckets, missingProjectIds.Length, missingProjectIds, projectIds.Length);
await _elasticClient.DeleteByQueryAsync<PersistentEvent>(r => r.Query(q => q.Terms(t => t.Field(f => f.ProjectId).Terms(missingProjectIds))));
await _elasticClient.DeleteByQueryAsync<PersistentEvent>(r => r.Query(q => q.Bool(b => b
.Filter(
f => f.Terms(t => t.Field(e => e.ProjectId).Terms(missingProjectIds)),
f => f.DateRange(d => d.Field(e => e.CreatedUtc).GreaterThanOrEquals(orphanedEventCutoffUtc))))));
}

_logger.LogInformation("Found {OrphanedEventCount} orphaned events from missing projects out of {ProjectIdCount}", totalOrphanedEventCount, totalProjectIds);
_logger.LogInformation("Found {OrphanedEventCount} orphaned events from missing projects out of {ProjectIdCount} since {OrphanedEventCutoffUtc}", totalOrphanedEventCount, totalProjectIds, orphanedEventCutoffUtc);
}

public async Task DeleteOrphanedEventsByOrganizationAsync(JobContext context)
{
var orphanedEventCutoffUtc = _timeProvider.GetUtcNow().UtcDateTime.Subtract(OrphanedEventLookback);

// get approximate number of unique organization ids
var organizationCardinality = await _elasticClient.SearchAsync<PersistentEvent>(s => s.Size(0).Aggregations(a => a
.Cardinality("cardinality_organization_id", c => c.Field(f => f.OrganizationId).PrecisionThreshold(40000))));
var organizationCardinality = await _elasticClient.SearchAsync<PersistentEvent>(s => s
.Size(0)
.Query(q => q.DateRange(r => r.Field(f => f.CreatedUtc).GreaterThanOrEquals(orphanedEventCutoffUtc)))
.Aggregations(a => a.Cardinality("cardinality_organization_id", c => c.Field(f => f.OrganizationId).PrecisionThreshold(40000))));

double? uniqueOrganizationIdCount = organizationCardinality.Aggregations.Cardinality("cardinality_organization_id")?.Value;
if (!uniqueOrganizationIdCount.HasValue || uniqueOrganizationIdCount.Value <= 0)
Expand All @@ -173,8 +196,10 @@ public async Task DeleteOrphanedEventsByOrganizationAsync(JobContext context)
{
await RenewLockAsync(context);

var organizationIdTerms = await _elasticClient.SearchAsync<PersistentEvent>(s => s.Size(0).Aggregations(a => a
.Terms("terms_organization_id", c => c.Field(f => f.OrganizationId).Include(batchNumber, buckets).Size(batchSize * 2))));
var organizationIdTerms = await _elasticClient.SearchAsync<PersistentEvent>(s => s
.Size(0)
.Query(q => q.DateRange(r => r.Field(f => f.CreatedUtc).GreaterThanOrEquals(orphanedEventCutoffUtc)))
.Aggregations(a => a.Terms("terms_organization_id", c => c.Field(f => f.OrganizationId).Include(batchNumber, buckets).Size(batchSize * 2))));

string[] organizationIds = organizationIdTerms.Aggregations.Terms("terms_organization_id").Buckets.Select(b => b.Key).ToArray();
if (organizationIds.Length == 0)
Expand All @@ -192,10 +217,13 @@ public async Task DeleteOrphanedEventsByOrganizationAsync(JobContext context)
}

_logger.LogInformation("{BatchNumber}/{BatchCount}: Found {OrphanedEventCount} orphaned events from missing organizations {MissingOrganizationIds} out of {OrganizationIdCount}", batchNumber, buckets, missingOrganizationIds.Length, missingOrganizationIds, organizationIds.Length);
await _elasticClient.DeleteByQueryAsync<PersistentEvent>(r => r.Query(q => q.Terms(t => t.Field(f => f.OrganizationId).Terms(missingOrganizationIds))));
await _elasticClient.DeleteByQueryAsync<PersistentEvent>(r => r.Query(q => q.Bool(b => b
.Filter(
f => f.Terms(t => t.Field(e => e.OrganizationId).Terms(missingOrganizationIds)),
f => f.DateRange(d => d.Field(e => e.CreatedUtc).GreaterThanOrEquals(orphanedEventCutoffUtc))))));
}

_logger.LogInformation("Found {OrphanedEventCount} orphaned events from missing organizations out of {OrganizationIdCount}", totalOrphanedEventCount, totalOrganizationIds);
_logger.LogInformation("Found {OrphanedEventCount} orphaned events from missing organizations out of {OrganizationIdCount} since {OrphanedEventCutoffUtc}", totalOrphanedEventCount, totalOrganizationIds, orphanedEventCutoffUtc);
}

public async Task FixDuplicateStacks(JobContext context)
Expand Down
28 changes: 28 additions & 0 deletions tests/Exceptionless.Tests/Jobs/CleanupDataJobTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,34 @@ public async Task CanDeleteOrphanedEventsByStack()
Assert.Equal(5000, eventCount);
}

[Fact]
public async Task CanDeleteOnlyRecentOrphanedEventsByStack()
{
var organization = await _organizationRepository.AddAsync(_organizationData.GenerateSampleOrganization(_billingManager, _plans), o => o.ImmediateConsistency());
var project = await _projectRepository.AddAsync(_projectData.GenerateSampleProject(), o => o.ImmediateConsistency());
var stack = await _stackRepository.AddAsync(_stackData.GenerateSampleStack(), o => o.ImmediateConsistency());

var validEvent = _eventData.GenerateEvent(organization.Id, project.Id, stack.Id);
var recentOrphanedEvent = _eventData.GenerateEvent(organization.Id, project.Id);
var olderOrphanedEvent = _eventData.GenerateEvent(organization.Id, project.Id);

string missingStackId = ObjectId.GenerateNewId().ToString();
recentOrphanedEvent.StackId = missingStackId;
recentOrphanedEvent.CreatedUtc = TimeProvider.GetUtcNow().UtcDateTime;
olderOrphanedEvent.StackId = missingStackId;
olderOrphanedEvent.CreatedUtc = TimeProvider.GetUtcNow().SubtractDays(4).UtcDateTime;

await _eventRepository.AddAsync([validEvent, recentOrphanedEvent, olderOrphanedEvent], o => o.ImmediateConsistency());

await GetService<CleanupOrphanedDataJob>().RunAsync(TestCancellationToken);

var events = await _eventRepository.GetAllAsync(o => o.PageLimit(10).ImmediateConsistency());
Assert.Equal(2, events.Total);
Assert.Contains(events.Documents, e => e.Id == validEvent.Id);
Assert.Contains(events.Documents, e => e.Id == olderOrphanedEvent.Id);
Assert.DoesNotContain(events.Documents, e => e.Id == recentOrphanedEvent.Id);
}

[Fact]
public async Task RemoveProjectsAsync_SoftDeletedProjectWithEvents_IncrementsDeletedUsage()
{
Expand Down