Skip to content

Commit

Permalink
FileTarget - WindowsMultiProcessFileAppender should also use ArchiveM…
Browse files Browse the repository at this point in the history
…utex. BaseMutexFileAppender now handles the archive mutex (Test)
  • Loading branch information
snakefoot committed Jan 7, 2017
1 parent 2373624 commit f036ab9
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 57 deletions.
58 changes: 50 additions & 8 deletions src/NLog/Targets/FileTarget.cs
Expand Up @@ -1330,7 +1330,36 @@ private void ArchiveFile(string fileName, string archiveFileName)
}
else
{
File.Move(fileName, archiveFileName);
try
{
File.Move(fileName, archiveFileName);
}
catch (System.IO.IOException ex)
{
if (KeepFileOpen && !ConcurrentWrites)
throw; // No need to retry, when only single process access

if (!string.Equals(Path.GetPathRoot(fileName), Path.GetPathRoot(archiveFileName), StringComparison.OrdinalIgnoreCase))
throw; // No need to retry when different paths, as open file handles has to be closed first

if (!PlatformDetector.SupportsSharableMutex)
throw; // No need to retry when not having archive mutex to protect us

// It is possible to move a file within the same path-root with open file-handles
// But if another process is actively writing to the file, then it might fail.
// We are already holding the archive-mutex, so lets retry if things are stable
InternalLogger.Warn(ex, "Archiving failed. Checking for retry move of {0} to {1}.", fileName, archiveFileName);
if (!File.Exists(fileName) || File.Exists(archiveFileName))
throw;

Thread.Sleep(50);

if (!File.Exists(fileName) || File.Exists(archiveFileName))
throw;

InternalLogger.Info("Archiving retrying move of {0} to {1}.", fileName, archiveFileName);
File.Move(fileName, archiveFileName);
}
}
}
}
Expand Down Expand Up @@ -1878,13 +1907,24 @@ private void TryArchiveFile(string fileName, LogEventInfo ev, int upcomingWriteS
#if SupportsMutex
// Acquire the mutex from the file-appender, before closing the file-apppender (remember not to close the Mutex)
archiveMutex = this.fileAppenderCache.GetArchiveMutex(archiveFile);
if (archiveMutex == null && archiveFile != fileName)
if (archiveMutex == null)
{
if (!KeepFileOpen || ConcurrentWrites)
{
archiveMutex = this.fileAppenderCache.GetArchiveMutex(fileName);
if (archiveMutex != null)
InternalLogger.Info("Archive mutex fallback: {0} -> {1}", archiveFile, fileName);
if (fileName == archiveFile && string.IsNullOrEmpty(previousLogFileName))
{
InternalLogger.Info("Archive mutex needed before first write. Creating appender to acquire mutex for: {0}", archiveFile);
if (this.fileAppenderCache.AllocateAppender(fileName) != null)
{
archiveMutex = this.fileAppenderCache.GetArchiveMutex(fileName);
}
}
if (archiveMutex == null && archiveFile != fileName)
{
archiveMutex = this.fileAppenderCache.GetArchiveMutex(fileName);
if (archiveMutex != null)
InternalLogger.Info("Archive mutex fallback: {0} -> {1}", archiveFile, fileName);
}
}
}
#endif
Expand All @@ -1893,8 +1933,10 @@ private void TryArchiveFile(string fileName, LogEventInfo ev, int upcomingWriteS
this.fileAppenderCache.InvalidateAppendersForInvalidFiles();
#endif
// Close possible stale file handles, before doing extra check
if (archiveFile != fileName)
if (!string.IsNullOrEmpty(fileName) && fileName != archiveFile)
this.fileAppenderCache.InvalidateAppender(fileName);
if (!string.IsNullOrEmpty(previousLogFileName) && previousLogFileName != archiveFile && previousLogFileName != fileName)
this.fileAppenderCache.InvalidateAppender(previousLogFileName);
this.fileAppenderCache.InvalidateAppender(archiveFile);
}
else
Expand Down Expand Up @@ -1958,9 +2000,9 @@ private void TryArchiveFile(string fileName, LogEventInfo ev, int upcomingWriteS
finally
{
#if SupportsMutex
if (archiveFile != validatedArchiveFile)
if (!string.IsNullOrEmpty(validatedArchiveFile) && archiveFile != validatedArchiveFile)
{
// Hard to get the right archive-mutex now, as we have closed all file-appenders
// Hard to get the right archive-mutex the second time, as we have closed all file-appenders
if (archiveMutex != null)
InternalLogger.Info("Archive mutex held for wrong file: {0} != {1}", archiveFile, validatedArchiveFile);
}
Expand Down
149 changes: 100 additions & 49 deletions tests/NLog.UnitTests/Targets/ConcurrentFileTargetTests.cs
Expand Up @@ -52,20 +52,28 @@ private void ConfigureSharedFile(string mode, string fileName)
var modes = mode.Split('|');

FileTarget ft = new FileTarget();
ft.FileName = "${basedir}/" + fileName;
ft.FileName = fileName;
ft.Layout = "${message}";
ft.KeepFileOpen = true;
ft.OpenFileCacheTimeout = 10;
ft.OpenFileCacheSize = 1;
ft.LineEnding = LineEndingMode.LF;
ft.ForceMutexConcurrentWrites = modes.Length == 2 && modes[1] == "mutex" ? true : false;
ft.KeepFileOpen = Array.IndexOf(modes, "retry") >= 0 ? false : true;
ft.ForceMutexConcurrentWrites = Array.IndexOf(modes, "mutex") >= 0 ? true : false;
ft.ArchiveAboveSize = Array.IndexOf(modes, "archive") >= 0 ? 50 : -1;
if (ft.ArchiveAboveSize > 0)
{
string archivePath = Path.Combine(Path.GetDirectoryName(fileName), "Archive");
ft.ArchiveFileName = Path.Combine(archivePath, "{####}_" + Path.GetFileName(fileName));
ft.MaxArchiveFiles = 10000;
}

var name = "ConfigureSharedFile_" + mode.Replace('|', '_') + "-wrapper";

switch (modes[0])
{
case "async":
SimpleConfigurator.ConfigureForTargetLogging(new AsyncTargetWrapper(ft, 100, AsyncTargetWrapperOverflowAction.Grow) { Name = name }, LogLevel.Debug);
SimpleConfigurator.ConfigureForTargetLogging(new AsyncTargetWrapper(ft, 100, AsyncTargetWrapperOverflowAction.Grow) { Name = name, TimeToSleepBetweenBatches = 10 }, LogLevel.Debug);
break;

case "buffered":
Expand All @@ -80,15 +88,17 @@ private void ConfigureSharedFile(string mode, string fileName)
SimpleConfigurator.ConfigureForTargetLogging(ft, LogLevel.Debug);
break;
}

if (ft.ArchiveAboveSize > 0)
LogManager.ThrowExceptions = true; // Any exception during archive, means something is not good
}

public void Process(string processIndex, string numProcessesString, string numLogsString, string mode)
public void Process(string processIndex, string fileName, string numLogsString, string mode)
{
Thread.CurrentThread.Name = processIndex;

int numProcesses = Convert.ToInt32(numProcessesString);
int numLogs = Convert.ToInt32(numLogsString);
string fileName = MakeFileName(numProcesses, numLogs, mode);
int idxProcess = Convert.ToInt32(processIndex);

ConfigureSharedFile(mode, fileName);

Expand All @@ -104,7 +114,7 @@ public void Process(string processIndex, string numProcessesString, string numLo
logger.Debug(format, i);
}

LogManager.Configuration = null;
LogManager.Configuration = null; // Flush + Close
}

private string MakeFileName(int numProcesses, int numLogs, string mode)
Expand All @@ -115,63 +125,104 @@ private string MakeFileName(int numProcesses, int numLogs, string mode)

private void DoConcurrentTest(int numProcesses, int numLogs, string mode)
{
string logFile = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, MakeFileName(numProcesses, numLogs, mode));
string tempPath = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString());
string archivePath = Path.Combine(tempPath, "Archive");

if (File.Exists(logFile))
File.Delete(logFile);
try
{
Directory.CreateDirectory(tempPath);
Directory.CreateDirectory(archivePath);

Process[] processes = new Process[numProcesses];
string logFile = Path.Combine(tempPath, MakeFileName(numProcesses, numLogs, mode));
if (File.Exists(logFile))
File.Delete(logFile);

for (int i = 0; i < numProcesses; ++i)
{
processes[i] = ProcessRunner.SpawnMethod(
this.GetType(),
"Process",
i.ToString(),
numProcesses.ToString(),
numLogs.ToString(),
mode);
}
Process[] processes = new Process[numProcesses];

// In case we'd like to capture stdout, we would need to drain it continuously.
// StandardOutput.ReadToEnd() wont work, since the other processes console only has limited buffer.
for (int i = 0; i < numProcesses; ++i)
{
processes[i].WaitForExit();
Assert.Equal(0, processes[i].ExitCode);
processes[i].Dispose();
processes[i] = null;
}
for (int i = 0; i < numProcesses; ++i)
{
processes[i] = ProcessRunner.SpawnMethod(
this.GetType(),
"Process",
i.ToString(),
logFile,
numLogs.ToString(),
mode);
}

// In case we'd like to capture stdout, we would need to drain it continuously.
// StandardOutput.ReadToEnd() wont work, since the other processes console only has limited buffer.
for (int i = 0; i < numProcesses; ++i)
{
processes[i].WaitForExit();
Assert.Equal(0, processes[i].ExitCode);
processes[i].Dispose();
processes[i] = null;
}

int[] maxNumber = new int[numProcesses];
var files = new System.Collections.Generic.List<string>(Directory.GetFiles(archivePath));
files.Add(logFile);

Console.WriteLine("Verifying output file {0}", logFile);
using (StreamReader sr = File.OpenText(logFile))
{
string line;
bool verifyFileSize = files.Count > 1;

while ((line = sr.ReadLine()) != null)
int[] maxNumber = new int[numProcesses];
Console.WriteLine("Verifying output file {0}", logFile);
foreach (var file in files)
{
string[] tokens = line.Split(' ');
Assert.Equal(2, tokens.Length);
try
{
int thread = Convert.ToInt32(tokens[0]);
int number = Convert.ToInt32(tokens[1]);
Assert.True(thread >= 0);
Assert.True(thread < numProcesses);
Assert.Equal(maxNumber[thread], number);
maxNumber[thread]++;
}
catch (Exception ex)
using (StreamReader sr = File.OpenText(file))
{
throw new InvalidOperationException("Error when parsing line '" + line + "'", ex);
string line;

while ((line = sr.ReadLine()) != null)
{
string[] tokens = line.Split(' ');
Assert.Equal(2, tokens.Length);
try
{
int thread = Convert.ToInt32(tokens[0]);
int number = Convert.ToInt32(tokens[1]);
Assert.True(thread >= 0);
Assert.True(thread < numProcesses);
Assert.Equal(maxNumber[thread], number);
maxNumber[thread]++;
}
catch (Exception ex)
{
throw new InvalidOperationException(string.Format("Error when parsing line '{0}' in file {1}", line, file), ex);
}
}

if (verifyFileSize)
{
// MONO Doesn't work well with archive-mutex, so ignore exceptions coming from archive race-conditions
if (sr.BaseStream.Length > 75)
throw new InvalidOperationException(string.Format("Error when reading file {0}, size {1} is too large", file, sr.BaseStream.Length));
}
}
}
}
finally
{
try
{
if (Directory.Exists(archivePath))
Directory.Delete(archivePath, true);
if (Directory.Exists(tempPath))
Directory.Delete(tempPath, true);
}
catch
{
}
}
}

[Theory]
#if !MONO
// MONO Doesn't work well with global mutex, and it is needed for succesful concurrent archive operations
[InlineData(2, 500, "none|archive")]
[InlineData(2, 500, "none|mutex|archive")]
[InlineData(2, 500, "none|retry|archive")]
#endif
[InlineData(2, 10000, "none")]
[InlineData(5, 4000, "none")]
[InlineData(10, 2000, "none")]
Expand Down

0 comments on commit f036ab9

Please sign in to comment.