Permalink
Browse files

Fixes and Tweaks, fixes and tweaks

- Added a DownloadManager test for switching away from slow uploaders
- Added ReportedPosition property to DownloadManager.Transfer
- Fixed Transfer.ExpectedDurationRemaining using wrong Position
- Adjusted TransferClient.EstimatedBandwidth
- Fixed possible null reference in DownloadManager.OnReceiveClientMapInfo
- Fixed remainingData computation in TryFindImprovableTransfer
- Throttled updating the exception form
- Moved LastSendPosition from Transfer to TransferClient
- Fixed WC3.Map constructor mis-setting _streamFactory
- Fixed WC3.Player not being disposed on disconnect
- Fixed flood of unexpected InvalidOperationException from WC3.Socket
- Fixed several bugs in DownloadManager
- Folded Bnet.Client.ProcessPacket into BeginHandlingPackets
- Removed DownloadManager handling of MapDataReceived
- Packet read errors are now logged as unexpected
  • Loading branch information...
1 parent 4289eff commit aede4dc2d1e54eda0131f0b5ef6c777b4463a86b @Strilanc committed Feb 2, 2010
View
@@ -371,19 +371,14 @@ Namespace Bnet
Contract.Requires(Me._state > ClientState.InitiatingConnection)
AsyncProduceConsumeUntilError(
producer:=AddressOf _socket.AsyncReadPacket,
- consumer:=AddressOf ProcessPacket,
- errorHandler:=Sub(exception) QueueDisconnect(expected:=False,
- reason:="Error receiving packet: {0}.".Frmt(exception.Message))
+ consumer:=AddressOf _packetHandler.HandlePacket,
+ errorHandler:=Sub(exception)
+ exception.RaiseAsUnexpected("Receiving packet")
+ QueueDisconnect(expected:=False,
+ reason:="Error receiving packet: {0}.".Frmt(exception.Message))
+ End Sub
)
End Sub
- Private Function ProcessPacket(ByVal packetData As IReadableList(Of Byte)) As ifuture
- Contract.Requires(packetData IsNot Nothing)
- Contract.Requires(packetData.Count >= 4)
- Dim result = Me._packetHandler.HandlePacket(packetData)
- result.Catch(Sub(exception) QueueDisconnect(expected:=False,
- reason:="Error handling packet {0}: {1}.".Frmt(CType(packetData(1), Protocol.PacketId), exception.Message)))
- Return result
- End Function
Private Function BeginLogOn(ByVal credentials As ClientCredentials) As IFuture
Contract.Requires(credentials IsNot Nothing)
View
@@ -7,7 +7,7 @@ implements the IBotComponent interface, and the Model/whatever does all the nitt
bot class has no special knowledge of any component, the Controllers have no internal knowledge of their View, and the Models
have no knowledge of their Controller, View, or the main bot.
----------------
-Concurrency via Futures:
+Concurrency via Futures and Call Queues:
Many classes are synchronized using message passing, not locks. Internally these classes use an inQueue (sometimes named 'ref') to
queue requests, and an outQueue (sometimes named 'eref') to queue outgoing events. Any 'public' function which is not thread-safe
@@ -163,12 +163,12 @@ Public Class DownloadManagerTest
Return _failFuture
End Get
End Property
- Public Sub InjectClientMapInfo(ByVal state As MapTransferState, ByVal position As UInt32)
- InjectReceivedPacket(MakeClientMapInfo(state, position))
- End Sub
- Public Sub InjectMapDataReceived(ByVal position As UInt32, ByVal senderPid As PID)
- InjectReceivedPacket(MakeMapFileDataReceived(senderPid, PID, position))
- End Sub
+ Public Function InjectClientMapInfo(ByVal state As MapTransferState, ByVal position As UInt32) As ifuture
+ Return InjectReceivedPacket(MakeClientMapInfo(state, position))
+ End Function
+ Public Function InjectMapDataReceived(ByVal position As UInt32, ByVal senderPid As PID) As ifuture
+ Return InjectReceivedPacket(MakeMapFileDataReceived(senderPid, PID, position))
+ End Function
End Class
<TestMethod()>
@@ -345,4 +345,73 @@ Public Class DownloadManagerTest
Assert.IsFalse(BlockOnFuture(uler1.FailFuture, 50.Milliseconds))
Assert.IsFalse(BlockOnFuture(uler2.FailFuture, 50.Milliseconds))
End Sub
+
+ <TestMethod()>
+ Public Sub PeerDownload_SlowSwitch()
+ Dim game = New TestGame()
+ Dim clock = New ManualClock()
+ Dim dm = New DownloadManager(clock, game, allowDownloads:=True, allowUploads:=False)
+ Dim dler = New TestPlayer(New PID(2), game.Logger, "dler")
+ Dim uler1 = New TestPlayer(New PID(3), game.Logger, "uler1")
+ Dim uler2 = New TestPlayer(New PID(4), game.Logger, "uler2")
+
+ 'Start initial transfer
+ WaitUntilFutureSucceeds(game.AddPlayer(dler))
+ WaitUntilFutureSucceeds(game.AddPlayer(uler1))
+ dler.InjectClientMapInfo(MapTransferState.Idle, 0)
+ uler1.InjectClientMapInfo(MapTransferState.Idle, game.Map.FileSize)
+ dler.InjectReceivedPacket(MakePeerConnectionInfo({uler1.PID}))
+ WaitUntilFutureSucceeds(uler1.InjectReceivedPacket(MakePeerConnectionInfo({dler.PID})))
+ uler1.ExpectNoPacket()
+ dler.ExpectNoPacket()
+ clock.Advance(DownloadManager.UpdatePeriod)
+
+ 'Check for transfer
+ dler.ExpectSentPacket(MakeSetDownloadSource(uler1.PID))
+ uler1.ExpectSentPacket(MakeSetUploadTarget(dler.PID, 0))
+ dler.ExpectNoPacket()
+ uler1.ExpectNoPacket()
+
+ 'Add second peer
+ WaitUntilFutureSucceeds(game.AddPlayer(uler2))
+ uler2.InjectClientMapInfo(MapTransferState.Idle, game.Map.FileSize)
+ uler2.InjectReceivedPacket(MakePeerConnectionInfo({dler.PID}))
+ dler.InjectReceivedPacket(MakePeerConnectionInfo({uler1.PID, uler2.PID}))
+
+ 'Go slowly
+ Dim dt = DownloadManager.MinSwitchPeriod
+ Dim i = 0UI
+ While dt >= DownloadManager.UpdatePeriod
+ clock.Advance(DownloadManager.UpdatePeriod)
+ dt -= DownloadManager.UpdatePeriod
+ i += 1UI
+ WaitUntilFutureSucceeds(dler.InjectClientMapInfo(MapTransferState.Downloading, i))
+ End While
+ dler.ExpectNoPacket()
+ uler1.ExpectNoPacket()
+ uler2.ExpectNoPacket()
+
+ 'Check for cancellation
+ clock.Advance(DownloadManager.UpdatePeriod)
+ dler.ExpectSentPacket(MakeOtherPlayerLeft(uler1.PID, PlayerLeaveType.Disconnect))
+ uler1.ExpectSentPacket(MakeOtherPlayerLeft(dler.PID, PlayerLeaveType.Disconnect))
+ dler.ExpectSentPacket(uler1.MakePacketOtherPlayerJoined())
+ uler1.ExpectSentPacket(dler.MakePacketOtherPlayerJoined())
+ dler.ExpectNoPacket()
+ uler1.ExpectNoPacket()
+ uler2.ExpectNoPacket()
+ WaitUntilFutureSucceeds(dler.InjectClientMapInfo(MapTransferState.Idle, i))
+ clock.Advance(DownloadManager.UpdatePeriod)
+
+ 'Check for new transfer
+ dler.ExpectSentPacket(MakeSetDownloadSource(uler2.PID))
+ uler2.ExpectSentPacket(MakeSetUploadTarget(dler.PID, i))
+ dler.ExpectNoPacket()
+ uler1.ExpectNoPacket()
+ uler2.ExpectNoPacket()
+
+ Assert.IsFalse(BlockOnFuture(dler.FailFuture, 50.Milliseconds))
+ Assert.IsFalse(BlockOnFuture(uler1.FailFuture, 50.Milliseconds))
+ Assert.IsFalse(BlockOnFuture(uler2.FailFuture, 50.Milliseconds))
+ End Sub
End Class
@@ -2,6 +2,7 @@
Public Class ExceptionForm
Private _exceptions As New List(Of Tuple(Of String, Exception))()
Private inQueue As ICallQueue = New InvokedCallQueue(Me)
+ Private ReadOnly _addThrottle As New Throttle(1.Seconds, New SystemClock())
Public Sub New()
InitializeComponent()
@@ -29,7 +30,7 @@ Public Class ExceptionForm
btnUpdate.Visible = True
lblBuffering.Visible = True
Else
- UpdateExceptionText()
+ _addThrottle.SetActionToRun(Sub() inQueue.QueueAction(Sub() UpdateExceptionText()))
End If
End If
RaiseEvent ExceptionCountChanged(Me)
@@ -108,7 +108,7 @@ Namespace WC3
Public Shared ReadOnly SwitchPenaltyPeriod As TimeSpan = 1.Seconds
Public Shared ReadOnly SwitchPenaltyFactor As Double = 1.2
Public Shared ReadOnly TypicalBandwidthPerSecond As Double = 1024
- Public Shared ReadOnly MaxBufferedMapSize As Integer = 64000
+ Public Shared ReadOnly MaxBufferedMapSize As Integer = Protocol.Packets.MaxFileDataSize * 8
Public Shared ReadOnly UpdatePeriod As TimeSpan = 1.Seconds
Private Class Transfer
@@ -120,7 +120,6 @@ Namespace WC3
Private ReadOnly _durationTimer As ITimer
Private ReadOnly _lastActivityTimer As ITimer
Private ReadOnly _startingPosition As UInt32
- Public Property LastSentPosition As UInt32
Private _totalProgress As UInt32
<ContractInvariantMethod()> Private Sub ObjectInvariant()
@@ -185,6 +184,11 @@ Namespace WC3
Return _totalProgress
End Get
End Property
+ Public ReadOnly Property ReportedPosition As UInt32
+ Get
+ Return StartingPosition + TotalProgress
+ End Get
+ End Property
#End Region
Public ReadOnly Property BandwidthPerSecond As Double
@@ -197,7 +201,7 @@ Namespace WC3
Public ReadOnly Property ExpectedDurationRemaining As TimeSpan
Get
- Return New TimeSpan(ticks:=CLng(TimeSpan.TicksPerSecond * ((_fileSize - LastSentPosition) / BandwidthPerSecond)))
+ Return New TimeSpan(ticks:=CLng(TimeSpan.TicksPerSecond * ((_fileSize - ReportedPosition) / BandwidthPerSecond)))
End Get
End Property
@@ -221,12 +225,14 @@ Namespace WC3
Private _transfer As Transfer
Private _lastTransferPartner As TransferClient
+ Public Property LastSendPosition As UInt32
Public Property ReportedPosition As UInt32
Private _hasReported As Boolean
Private _reportedState As Protocol.MapTransferState = Protocol.MapTransferState.Idle
Private _expectedState As Protocol.MapTransferState = Protocol.MapTransferState.Idle
Private _totalPastProgress As UInt64
Private _totalPastTransferTime As TimeSpan
+ Private _numTransfers As Integer = 0
Private ReadOnly _clock As IClock
Private _transferStartPosition As UInt32
Private ReadOnly _links As New List(Of TransferClient)
@@ -332,6 +338,7 @@ Namespace WC3
End Property
Public ReadOnly Property IsSteady As Boolean
Get
+ If _lastTransferPartner IsNot Nothing AndAlso _lastTransferPartner.Player Is Nothing Then Return True
Return HasReported AndAlso ExpectedState = ReportedState
End Get
End Property
@@ -342,7 +349,8 @@ Namespace WC3
Dim dt = TotalMeasurementTime.TotalSeconds
Dim dp = TotalProgress
If dt < 1 Then Return TypicalBandwidthPerSecond
- Return dp / dt
+ Dim expansionFactor = 1 + 1 / (1 + _numTransfers)
+ Return dp / dt * expansionFactor
End Get
End Property
Public Function FindBestAvailableUploader() As TransferClient
@@ -374,11 +382,14 @@ Namespace WC3
startingPosition:=downloader.ReportedPosition,
Clock:=downloader._clock,
FileSize:=downloader._map.FileSize)
+ downloader._numTransfers += 1
downloader._transfer = transfer
- uploader._transfer = transfer
downloader._expectedState = Protocol.MapTransferState.Downloading
- uploader._expectedState = Protocol.MapTransferState.Uploading
downloader._lastTransferPartner = uploader
+
+ uploader._numTransfers += 1
+ uploader._transfer = transfer
+ uploader._expectedState = Protocol.MapTransferState.Uploading
uploader._lastTransferPartner = downloader
End Sub
Public Sub ClearTransfer()
@@ -450,11 +461,10 @@ Namespace WC3
Contract.Requires(client IsNot Nothing)
Contract.Requires(client.Transfer IsNot Nothing)
Contract.Requires(client.Transfer.Uploader Is _defaultClient)
- Dim transfer = client.Transfer
- transfer.LastSentPosition = Math.Max(transfer.LastSentPosition, reportedPosition)
- While transfer.LastSentPosition < reportedPosition + MaxBufferedMapSize AndAlso transfer.LastSentPosition < FileSize
- _game.QueueSendMapPiece(client.Player, transfer.LastSentPosition)
- transfer.LastSentPosition += Protocol.Packets.MaxFileDataSize
+ client.LastSendPosition = Math.Max(client.LastSendPosition, reportedPosition)
+ While client.LastSendPosition < reportedPosition + MaxBufferedMapSize AndAlso client.LastSendPosition < FileSize
+ _game.QueueSendMapPiece(client.Player, client.LastSendPosition)
+ client.LastSendPosition += Protocol.Packets.MaxFileDataSize
End While
End Sub
@@ -468,10 +478,12 @@ Namespace WC3
If Not _playerClients.ContainsKey(player) Then Return latencyDescription
Dim client = _playerClients(player)
+ If Not client.HasReported Then Return latencyDescription
If client.Transfer Is Nothing Then
If client.IsSteady Then Return latencyDescription
Return If(client.ReportedHasFile, "(..ul)", "(..dl)")
Else
+ If client.Transfer.Uploader Is _defaultClient Then Return "(DL)"
If client.IsSteady Then Return If(client.ReportedHasFile, "(ul)", "(dl)")
Return If(client.ReportedHasFile, "(ul..)", "(dl..)")
End If
@@ -525,9 +537,6 @@ Namespace WC3
player.QueueAddPacketHandler(id:=Protocol.PacketId.ClientMapInfo,
jar:=Protocol.Packets.ClientMapInfo,
handler:=Function(pickle) QueueOnReceiveClientMapInfo(player, pickle)),
- player.QueueAddPacketHandler(id:=Protocol.PacketId.MapFileDataReceived,
- jar:=Protocol.Packets.MapFileDataReceived,
- handler:=Function(pickle) QueueOnReceiveMapFileDataReceived(player, pickle)),
player.QueueAddPacketHandler(id:=Protocol.PacketId.PeerConnectionInfo,
jar:=Protocol.Packets.PeerConnectionInfo,
handler:=Function(pickle) QueueOnReceivePeerConnectionInfo(player, pickle))
@@ -556,14 +565,6 @@ Namespace WC3
state:=CType(vals("transfer state"), Protocol.MapTransferState),
position:=CUInt(vals("total downloaded"))))
End Function
- Private Function QueueOnReceiveMapFileDataReceived(ByVal player As IPlayerDownloadAspect, ByVal pickle As IPickle(Of Dictionary(Of InvariantString, Object))) As ifuture
- Contract.Requires(player IsNot Nothing)
- Contract.Requires(pickle IsNot Nothing)
- Contract.Ensures(Contract.Result(Of ifuture)() IsNot Nothing)
- Dim vals = pickle.Value
- Return inQueue.QueueAction(Sub() OnReceiveMapFileDataReceived(player:=player,
- position:=CUInt(vals("total downloaded"))))
- End Function
Private Function QueueOnReceivePeerConnectionInfo(ByVal player As IPlayerDownloadAspect, ByVal pickle As IPickle(Of UInt16)) As ifuture
Contract.Requires(player IsNot Nothing)
Contract.Requires(pickle IsNot Nothing)
@@ -585,22 +586,6 @@ Namespace WC3
Where (flags And pidFlag) <> 0
Select peer)
End Sub
- Private Sub OnReceiveMapFileDataReceived(ByVal player As IPlayerDownloadAspect, ByVal position As UInt32)
- Contract.Requires(player IsNot Nothing)
- If Not _playerClients.ContainsKey(player) Then Return
-
- Dim client = _playerClients(player)
- Dim transfer = client.Transfer
- If position > _game.Map.FileSize Then
- Throw New IO.InvalidDataException("Moved download position past end of file at {0} to {1}.".Frmt(_game.Map.FileSize, position))
- ElseIf position < client.ReportedPosition Then
- Throw New IO.InvalidDataException("Moved download position backwards from {0} to {1}.".Frmt(client.ReportedPosition, position))
- End If
-
- If transfer IsNot Nothing AndAlso transfer.Uploader Is _defaultClient Then
- SendMapFileData(client, position)
- End If
- End Sub
Private Sub OnReceiveClientMapInfo(ByVal player As IPlayerDownloadAspect, ByVal state As Protocol.MapTransferState, ByVal position As UInt32)
Contract.Requires(player IsNot Nothing)
If Not _playerClients.ContainsKey(player) Then Return
@@ -626,15 +611,20 @@ Namespace WC3
Throw New IO.InvalidDataException("Downloads not allowed.")
End If
Else
- If client.ReportedState = Protocol.MapTransferState.Downloading Then
+ If client.Transfer IsNot Nothing Then
client.Transfer.Advance(position - client.ReportedPosition)
- End If
- If state = Protocol.MapTransferState.Idle AndAlso client.Transfer IsNot Nothing Then
- client.Transfer.Dispose()
+ If state = Protocol.MapTransferState.Idle Then
+ client.Transfer.Dispose()
+ Contract.Assume(client.Transfer Is Nothing)
+ End If
End If
client.ReportedPosition = position
client.ReportedState = state
+
+ If client.Transfer IsNot Nothing AndAlso client.Transfer.Uploader Is _defaultClient Then
+ SendMapFileData(client, position)
+ End If
End If
End Sub
#End Region
@@ -661,11 +651,12 @@ Namespace WC3
Where client.HasReported
Where Not client.ReportedHasFile
Where client.Transfer IsNot Nothing
+ Order By client.EstimatedBandwidthPerSecond Descending
Select t = client.Transfer
Where t.Duration > MinSwitchPeriod
Where t.ExpectedDurationRemaining > MinSwitchPeriod
Dim downloader = transfer.Downloader
- Dim remainingData = _game.Map.FileSize - transfer.LastSentPosition
+ Dim remainingData = _game.Map.FileSize - transfer.ReportedPosition
Dim curExpectedCost = transfer.ExpectedDurationRemaining.TotalSeconds
'expected cost of switching and downloading from another uploader
Oops, something went wrong.

0 comments on commit aede4dc

Please sign in to comment.