Skip to content

Commit

Permalink
change worker to rerun all generators when data changes
Browse files Browse the repository at this point in the history
  • Loading branch information
aspriddell committed Oct 16, 2023
1 parent d3070f6 commit e521854
Showing 1 changed file with 13 additions and 34 deletions.
47 changes: 13 additions & 34 deletions DragonFruit.OnionFruit.Web.Worker/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ private async Task PerformUpdate()
{
using var scope = _ssf.CreateScope();

var dataSourceUpdated = false;
var sourceInstances = new Dictionary<Type, IDataSource>();
var sourcesTypesToUse = new HashSet<Type>();

var redis = scope.ServiceProvider.GetRequiredService<IConnectionMultiplexer>().GetDatabase();
var nextVersion = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
Expand All @@ -63,59 +63,38 @@ private async Task PerformUpdate()

_stopwatch.Restart();

// populate list with the sources that have been updated since last check
// create instances of all source types and set switch if data source has been updated
foreach (var sourceType in _descriptors.SelectMany(x => x.SourceTypes).Distinct())
{
var source = (IDataSource)ActivatorUtilities.CreateInstance(scope.ServiceProvider, sourceType);

if (await source.HasDataChanged(lastVersion).ConfigureAwait(false))
{
sourcesTypesToUse.Add(sourceType);
}

sourceInstances[sourceType] = source;
dataSourceUpdated |= await source.HasDataChanged(lastVersion).ConfigureAwait(false);
}

if (!sourcesTypesToUse.Any())
if (!dataSourceUpdated)
{
_stopwatch.Stop();
_logger.LogInformation("No sources have been updated. Generator execution skipped (after {ts}).", _stopwatch.Elapsed);
return;
}

// add any source from full list if a generator that needs an updated source also needs one of the old sources
var generatorsToUse = _descriptors.Where(x => sourcesTypesToUse.Overlaps(x.SourceTypes)).ToList();

foreach (var sourceType in generatorsToUse.SelectMany(x => x.SourceTypes))
{
// hashset doesn't have addrange...
sourcesTypesToUse.Add(sourceType);
}

try
{
_logger.LogInformation("Collecting data for {sources}", string.Join(", ", sourcesTypesToUse.Select(x => x.Name)));
await Task.WhenAll(sourcesTypesToUse.Select(x => sourceInstances[x].CollectData())).ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Failed to collect data for one or more sources: {message}", e.Message);
return;
}
// fetch all data sources
await Task.WhenAll(sourceInstances.Select(x => x.Value.CollectData())).ConfigureAwait(false);

// file sink used to store static-generated assets for uploading to s3 or saving to a local path
using var fileSink = new FileSink();

foreach (var generator in generatorsToUse)
foreach (var generatorDescriptor in _descriptors)
{
IDisposable disposableGeneratorInstance = null;

try
{
_logger.LogInformation("Running generator for {name}...", generator.OutputFormat.Name);
_logger.LogInformation("Running generator for {name}...", generatorDescriptor.OutputFormat.Name);

var instanceSources = generator.SourceTypes.Select(x => (object)sourceInstances[x]).ToArray();
var generatorInstance = (IDatabaseGenerator)ActivatorUtilities.CreateInstance(scope.ServiceProvider, generator.OutputFormat, instanceSources);
var instanceSources = generatorDescriptor.SourceTypes.Select(x => (object)sourceInstances[x]).ToArray();
var generatorInstance = (IDatabaseGenerator)ActivatorUtilities.CreateInstance(scope.ServiceProvider, generatorDescriptor.OutputFormat, instanceSources);

disposableGeneratorInstance = generatorInstance as IDisposable;

Expand All @@ -124,7 +103,7 @@ private async Task PerformUpdate()
}
catch (Exception e)
{
_logger.LogError(e, "Database Generator {x} has failed: {err}", generator.OutputFormat.Name, e.Message);
_logger.LogError(e, "Database Generator {x} has failed: {err}", generatorDescriptor.OutputFormat.Name, e.Message);
}
finally
{
Expand Down Expand Up @@ -187,7 +166,7 @@ private IReadOnlyCollection<GeneratorDescriptor> GetDescriptors(IConfiguration c
return listing;
}

private IReadOnlyCollection<IDataExporter> GetExporters(IConfiguration config)
private static IReadOnlyCollection<IDataExporter> GetExporters(IConfiguration config)
{
var exporters = new List<IDataExporter>();

Expand Down Expand Up @@ -216,7 +195,7 @@ private IReadOnlyCollection<IDataExporter> GetExporters(IConfiguration config)
Task IHostedService.StartAsync(CancellationToken cancellationToken)
{
_workerTimer?.Dispose();
_workerTimer = new Timer(_ => PerformUpdate(), null, TimeSpan.Zero, TimeSpan.FromHours(6));
_workerTimer = new Timer(_ => PerformUpdate(), null, TimeSpan.Zero, TimeSpan.FromHours(12));

Check warning on line 198 in DragonFruit.OnionFruit.Web.Worker/Worker.cs

View workflow job for this annotation

GitHub Actions / docker-linux

Because this call is not awaited, execution of the current method continues before the call is completed. Consider applying the 'await' operator to the result of the call.

return Task.CompletedTask;
}
Expand Down

0 comments on commit e521854

Please sign in to comment.