Skip to content

Commit

Permalink
Merge pull request #33 from Microsoft/ConcurrencyFixes
Browse files Browse the repository at this point in the history
Rare concurrecy bug fixes
  • Loading branch information
jongoldDB committed Jan 19, 2019
2 parents c970c13 + 9290b5a commit f35bbf8
Showing 1 changed file with 57 additions and 35 deletions.
92 changes: 57 additions & 35 deletions Ambrosia/Ambrosia/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,14 @@ internal static void AmbrosiaSerialize(this ConcurrentDictionary<string, OutputC
writeToStream.WriteInt(keyEncoding.Length);
writeToStream.Write(keyEncoding, 0, keyEncoding.Length);
writeToStream.WriteLongFixed(entry.Value.LastSeqNoFromLocalService);
var trimTo = entry.Value.TrimTo;
var replayableTrimTo = entry.Value.ReplayableTrimTo;
// Lock to ensure atomic update of both variables due to race in InputControlListenerAsync
long trimTo;
long replayableTrimTo;
lock (entry.Value._trimLock)
{
trimTo = entry.Value.TrimTo;
replayableTrimTo = entry.Value.ReplayableTrimTo;
}
writeToStream.WriteLongFixed(trimTo);
writeToStream.WriteLongFixed(replayableTrimTo);
entry.Value.BufferedOutput.Serialize(writeToStream);
Expand Down Expand Up @@ -884,14 +890,16 @@ internal class OutputConnectionRecord
public long _sendsEnqueued;
public AmbrosiaRuntime MyAmbrosia { get; set; }
public bool WillResetConnection { get; set; }
public bool ResettingConnection { get; set; }
public bool ConnectingAfterRestart { get; set; }
// The latest trim location on the other side. An associated trim message MAY have already been sent
public long RemoteTrim { get; set; }
// The latest replayable trim location on the other side. An associated trim message MAY have already been sent
public long RemoteTrimReplayable { get; set; }
// The seq no of the last RPC sent to the receiver
public long LastSeqSentToReceiver;
internal volatile bool ResettingConnection;
internal object _trimLock = new object();
internal object _remoteTrimLock = new object();

public OutputConnectionRecord(AmbrosiaRuntime inAmbrosia)
{
Expand Down Expand Up @@ -1139,8 +1147,12 @@ private void SendInputWatermarks(ConcurrentDictionary<string, LongPair> uncommit
outputs[kv.Key] = outputConnectionRecord;
Console.WriteLine("Adding output:{0}", kv.Key);
}
outputConnectionRecord.RemoteTrim = Math.Max(kv.Value.First, outputConnectionRecord.RemoteTrim);
outputConnectionRecord.RemoteTrimReplayable = Math.Max(kv.Value.Second, outputConnectionRecord.RemoteTrimReplayable);
// Must lock to atomically update due to race with ToControlStreamAsync
lock (outputConnectionRecord._remoteTrimLock)
{
outputConnectionRecord.RemoteTrim = Math.Max(kv.Value.First, outputConnectionRecord.RemoteTrim);
outputConnectionRecord.RemoteTrimReplayable = Math.Max(kv.Value.Second, outputConnectionRecord.RemoteTrimReplayable);
}
if (outputConnectionRecord.ControlWorkQ.IsEmpty)
{
outputConnectionRecord.ControlWorkQ.Enqueue(-2);
Expand Down Expand Up @@ -1451,7 +1463,8 @@ public async Task<long> AddRow(FlexReadBuffer copyFromFlexBuffer,
string outputToUpdate,
long newSeqNo,
long newReplayableSeqNo,
ConcurrentDictionary<string, OutputConnectionRecord> outputs)
ConcurrentDictionary<string, OutputConnectionRecord> outputs,
InputConnectionRecord associatedInputConnectionRecord)
{
var copyFromBuffer = copyFromFlexBuffer.Buffer;
var length = copyFromFlexBuffer.Length;
Expand Down Expand Up @@ -1489,6 +1502,10 @@ public async Task<long> AddRow(FlexReadBuffer copyFromFlexBuffer,
// Check if the compare and swap succeeded, otherwise try again
if (origVal == localStatus)
{
// We are now preventing recovery until addrow finishes and all resulting commits have completed. We can safely update
// LastProcessedID and LastProcessedReplayableID
associatedInputConnectionRecord.LastProcessedID = newSeqNo;
associatedInputConnectionRecord.LastProcessedReplayableID = newReplayableSeqNo;
if (sealing)
{
// This call successfully sealed the buffer. Remember we still have an extra
Expand Down Expand Up @@ -2946,6 +2963,7 @@ private async Task ToControlStreamAsync(Stream writeToStream,
var sizeBytes = inputFlexBuffer.LengthLength;
// Get the seqNo of the replay/filter point
var lastRemoteTrim = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, sizeBytes + 1);
long lastRemoteTrimReplayable;

// This code dequeues output producing tasks and runs them
long currentTrim = -1;
Expand All @@ -2968,8 +2986,13 @@ private async Task ToControlStreamAsync(Stream writeToStream,
if (lastRemoteTrim < outputConnectionRecord.RemoteTrim)
{
// This is a send watermark
lastRemoteTrim = outputConnectionRecord.RemoteTrim;
var lastRemoteTrimReplayable = outputConnectionRecord.RemoteTrimReplayable;
// Must lock to atomically read due to races with CheckpointAsync and SendInputWatermarks
lock (outputConnectionRecord._remoteTrimLock)
{

lastRemoteTrim = outputConnectionRecord.RemoteTrim;
lastRemoteTrimReplayable = outputConnectionRecord.RemoteTrimReplayable;
}
watermarkStream.Position = 0;
var watermarkLength = 1 + StreamCommunicator.LongSize(lastRemoteTrim) + StreamCommunicator.LongSize(lastRemoteTrimReplayable);
watermarkStream.WriteInt(watermarkLength);
Expand Down Expand Up @@ -3107,7 +3130,7 @@ private async Task InputDataListenerAsync(InputConnectionRecord inputRecord,
while (true)
{
await FlexReadBuffer.DeserializeAsync(inputRecord.DataConnectionStream, inputFlexBuffer, ct);
await ProcessInputMessage(inputRecord, inputName, inputFlexBuffer);
await ProcessInputMessageAsync(inputRecord, inputName, inputFlexBuffer);
}
}

Expand Down Expand Up @@ -3136,8 +3159,12 @@ private async Task InputControlListenerAsync(InputConnectionRecord inputRecord,
// Check to make sure this is progress, otherwise, can ignore
if (commitSeqNo > outputConnectionRecord.TrimTo && !outputConnectionRecord.WillResetConnection && !outputConnectionRecord.ConnectingAfterRestart)
{
outputConnectionRecord.TrimTo = Math.Max(outputConnectionRecord.TrimTo, commitSeqNo);
outputConnectionRecord.ReplayableTrimTo = Math.Max(outputConnectionRecord.ReplayableTrimTo, replayableCommitSeqNo);
// Lock to ensure atomic update of both variables due to race in AmbrosiaSerialize
lock (outputConnectionRecord._trimLock)
{
outputConnectionRecord.TrimTo = Math.Max(outputConnectionRecord.TrimTo, commitSeqNo);
outputConnectionRecord.ReplayableTrimTo = Math.Max(outputConnectionRecord.ReplayableTrimTo, replayableCommitSeqNo);
}
if (outputConnectionRecord.ControlWorkQ.IsEmpty)
{
outputConnectionRecord.ControlWorkQ.Enqueue(-2);
Expand All @@ -3156,24 +3183,25 @@ private async Task InputControlListenerAsync(InputConnectionRecord inputRecord,
}
}

private async Task ProcessInputMessage(InputConnectionRecord inputRecord,
string inputName,
FlexReadBuffer inputFlexBuffer)
private async Task ProcessInputMessageAsync(InputConnectionRecord inputRecord,
string inputName,
FlexReadBuffer inputFlexBuffer)
{
var sizeBytes = inputFlexBuffer.LengthLength;
switch (inputFlexBuffer.Buffer[sizeBytes])
{
case RPCByte:
var methodID = inputFlexBuffer.Buffer.ReadBufferedInt(sizeBytes + 2);
long newFileSize;
if (inputFlexBuffer.Buffer[sizeBytes + 2 + StreamCommunicator.IntSize(methodID)] != (byte)RpcTypes.RpcType.Impulse)
{
inputRecord.LastProcessedReplayableID++;
newFileSize = await _committer.AddRow(inputFlexBuffer, inputName, inputRecord.LastProcessedID + 1, inputRecord.LastProcessedReplayableID + 1, _outputs, inputRecord);
}
else
{
newFileSize = await _committer.AddRow(inputFlexBuffer, inputName, inputRecord.LastProcessedID + 1, inputRecord.LastProcessedReplayableID, _outputs, inputRecord);
}
inputRecord.LastProcessedID++;
var newFileSize = await _committer.AddRow(inputFlexBuffer, inputName, inputRecord.LastProcessedID, inputRecord.LastProcessedReplayableID, _outputs);
inputFlexBuffer.ResetBuffer();
//!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Uncomment for testing
//Console.WriteLine("Received {0}", inputRecord.LastProcessedID);
if (_newLogTriggerSize > 0 && newFileSize >= _newLogTriggerSize)
{
// Make sure only one input thread is moving to the next log file. Won't break the system if we don't do this, but could result in
Expand All @@ -3191,13 +3219,9 @@ private async Task ProcessInputMessage(InputConnectionRecord inputRecord,
var memStream = new MemoryStream(inputFlexBuffer.Buffer, restOfBatchOffset, inputFlexBuffer.Length - restOfBatchOffset);
var numRPCs = memStream.ReadInt();
var numReplayableRPCs = memStream.ReadInt();
inputRecord.LastProcessedID += numRPCs;
inputRecord.LastProcessedReplayableID += numReplayableRPCs;
newFileSize = await _committer.AddRow(inputFlexBuffer, inputName, inputRecord.LastProcessedID, inputRecord.LastProcessedReplayableID, _outputs);
newFileSize = await _committer.AddRow(inputFlexBuffer, inputName, inputRecord.LastProcessedID + numRPCs, inputRecord.LastProcessedReplayableID + numReplayableRPCs, _outputs, inputRecord);
inputFlexBuffer.ResetBuffer();
memStream.Dispose();
//!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Uncomment for testing
//Console.WriteLine("Received {0}", inputRecord.LastProcessedID);
if (_newLogTriggerSize > 0 && newFileSize >= _newLogTriggerSize)
{
// Move to next log if checkpoints aren't manual, and we've hit the trigger size
Expand All @@ -3209,13 +3233,9 @@ private async Task ProcessInputMessage(InputConnectionRecord inputRecord,
restOfBatchOffset = inputFlexBuffer.LengthLength + 1;
memStream = new MemoryStream(inputFlexBuffer.Buffer, restOfBatchOffset, inputFlexBuffer.Length - restOfBatchOffset);
numRPCs = memStream.ReadInt();
inputRecord.LastProcessedID += numRPCs;
inputRecord.LastProcessedReplayableID += numRPCs;
newFileSize = await _committer.AddRow(inputFlexBuffer, inputName, inputRecord.LastProcessedID, inputRecord.LastProcessedReplayableID, _outputs);
newFileSize = await _committer.AddRow(inputFlexBuffer, inputName, inputRecord.LastProcessedID + numRPCs, inputRecord.LastProcessedReplayableID + numRPCs, _outputs, inputRecord);
inputFlexBuffer.ResetBuffer();
memStream.Dispose();
//!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Uncomment for testing
//Console.WriteLine("Received {0}", inputRecord.LastProcessedID);
if (_newLogTriggerSize > 0 && newFileSize >= _newLogTriggerSize)
{
// Make sure only one input thread is moving to the next log file. Won't break the system if we don't do this, but could result in
Expand All @@ -3235,8 +3255,7 @@ private async Task ProcessInputMessage(InputConnectionRecord inputRecord,
GetSystemTimePreciseAsFileTime(out time);
memStream.WriteLongFixed(time);
// Treat as RPC
inputRecord.LastProcessedID++;
await _committer.AddRow(inputFlexBuffer, inputName, inputRecord.LastProcessedID, inputRecord.LastProcessedReplayableID, _outputs);
await _committer.AddRow(inputFlexBuffer, inputName, inputRecord.LastProcessedID + 1, inputRecord.LastProcessedReplayableID + 1, _outputs, inputRecord);
inputFlexBuffer.ResetBuffer();
memStream.Dispose();
break;
Expand All @@ -3247,8 +3266,7 @@ private async Task ProcessInputMessage(InputConnectionRecord inputRecord,
GetSystemTimePreciseAsFileTime(out time);
memStream.WriteLongFixed(time);
// Treat as RPC
inputRecord.LastProcessedID++;
await _committer.AddRow(inputFlexBuffer, inputName, inputRecord.LastProcessedID, inputRecord.LastProcessedReplayableID, _outputs);
await _committer.AddRow(inputFlexBuffer, inputName, inputRecord.LastProcessedID + 1, inputRecord.LastProcessedReplayableID + 1, _outputs, inputRecord);
inputFlexBuffer.ResetBuffer();
memStream.Dispose();
break;
Expand Down Expand Up @@ -3337,8 +3355,12 @@ public async Task CheckpointAsync()
outputConnectionRecord = new OutputConnectionRecord(this);
_outputs[kv.Key] = outputConnectionRecord;
}
outputConnectionRecord.RemoteTrim = Math.Max(kv.Value.LastProcessedID, outputConnectionRecord.RemoteTrim);
outputConnectionRecord.RemoteTrimReplayable = Math.Max(kv.Value.LastProcessedReplayableID, outputConnectionRecord.RemoteTrimReplayable);
// Must lock to atomically update due to race with ToControlStreamAsync
lock (outputConnectionRecord._remoteTrimLock)
{
outputConnectionRecord.RemoteTrim = Math.Max(kv.Value.LastProcessedID, outputConnectionRecord.RemoteTrim);
outputConnectionRecord.RemoteTrimReplayable = Math.Max(kv.Value.LastProcessedReplayableID, outputConnectionRecord.RemoteTrimReplayable);
}
if (outputConnectionRecord.ControlWorkQ.IsEmpty)
{
outputConnectionRecord.ControlWorkQ.Enqueue(-2);
Expand Down

0 comments on commit f35bbf8

Please sign in to comment.