Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions FileProcessor/FileProcessingWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,35 @@ public override async Task StartAsync(CancellationToken cancellationToken)
// TODO: Do we poll here for files incase they have been left from a previous run
var temporaryFileLocation = ConfigurationReader.GetValue("AppSettings", "TemporaryFileLocation");
this.LogInformation($"Starting up, TemporaryFileLocation is [{temporaryFileLocation}]");
Directory.CreateDirectory(temporaryFileLocation);

this.FileSystem.Directory.CreateDirectory(temporaryFileLocation);
this.LogInformation($"Created TemporaryFileLocation at [{temporaryFileLocation}]");
var fileProfiles = await this.FileProcessorManager.GetAllFileProfiles(cancellationToken);

foreach (FileProfile fileProfile in fileProfiles)
{
Directory.CreateDirectory($"{fileProfile.ListeningDirectory}//inprogress");
this.FileSystem.Directory.CreateDirectory($"{fileProfile.ListeningDirectory}//inprogress");
this.LogInformation($"Created in progress at [{ fileProfile.ListeningDirectory}//inprogress");
Directory.CreateDirectory(fileProfile.ProcessedDirectory);
this.FileSystem.Directory.CreateDirectory(fileProfile.ProcessedDirectory);
this.LogInformation($"Created ProcessedDirectory at [{fileProfile.ProcessedDirectory}]");
Directory.CreateDirectory(fileProfile.FailedDirectory);
this.FileSystem.Directory.CreateDirectory(fileProfile.FailedDirectory);
this.LogInformation($"Created FailedDirectory at [{fileProfile.FailedDirectory}]");
}

IDirectoryInfo inprogressDirectory = this.FileSystem.DirectoryInfo.FromDirectoryName(fileProfile.ListeningDirectory);
var inprogressList = inprogressDirectory.GetFiles();

if (inprogressList.Any())
{
this.LogInformation($"{inprogressList.Length} files in progress detected in [{ fileProfile.ListeningDirectory}//inprogress");

// Now move these to the listening directory
foreach (IFileInfo file in inprogressList)
{
file.MoveTo($"{fileProfile.ListeningDirectory}//{file.Name}");
}
}
}

await base.StartAsync(cancellationToken);
}

Expand Down Expand Up @@ -132,17 +147,18 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
try
{
List<Task> fileProcessingTasks = new List<Task>();
var fileProfiles = await this.FileProcessorManager.GetAllFileProfiles(stoppingToken);
List<FileProfile> fileProfiles = await this.FileProcessorManager.GetAllFileProfiles(stoppingToken);

foreach (FileProfile fileProfile in fileProfiles)
{
this.LogInformation($"About to look in {fileProfile.ListeningDirectory} for files");
var files = Directory.GetFiles(fileProfile.ListeningDirectory).Take(1).ToList(); // Only process 1 file per file profile concurrently

foreach (String file in files)
IDirectoryInfo listeningDirectory = this.FileSystem.DirectoryInfo.FromDirectoryName(fileProfile.ListeningDirectory);

List<IFileInfo> files = listeningDirectory.GetFiles().OrderBy(f => f.CreationTime).Take(1).ToList(); // Only process 1 file per file profile concurrently,

foreach (IFileInfo fileInfo in files)
{
this.LogInformation($"File {file} detected");

IFileInfo fileInfo = this.FileSystem.FileInfo.FromFileName(file);
this.LogInformation($"File {fileInfo.Name} detected");

String inProgressFolder = $"{fileProfile.ListeningDirectory}/inprogress/";
if (this.FileSystem.Directory.Exists(inProgressFolder) == false)
Expand All @@ -152,7 +168,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
String inProgressFilePath = $"{fileProfile.ListeningDirectory}/inprogress/{fileInfo.Name}";
fileInfo.MoveTo(inProgressFilePath, true);

var request = this.CreateProcessFileRequest(fileProfile, inProgressFilePath);
IRequest request = this.CreateProcessFileRequest(fileProfile, inProgressFilePath);
fileProcessingTasks.Add(this.Mediator.Send(request));
}
}
Expand Down