Skip to content

Commit

Permalink
Fixed piece receiving timeout.
Browse files Browse the repository at this point in the history
Optimize endgame mode.
  • Loading branch information
ckovorodkin committed Feb 17, 2018
1 parent 559e06d commit 6252697
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 80 deletions.
15 changes: 12 additions & 3 deletions bt-cli/src/main/java/bt/cli/SessionStatePrinter.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public class SessionStatePrinter {
private static final String TORRENT_INFO = "Downloading %s (%,d B)";
private static final String DURATION_INFO ="Elapsed time: %s\t\tRemaining time: %s";
private static final String RATE_FORMAT = "%4.1f %s/s";
private static final String SESSION_INFO = "Peers: %2d\t\tDown: " + RATE_FORMAT + "\t\tUp: " + RATE_FORMAT + "\t\t";
private static final String SESSION_INFO =
"Peers: %2d/%d\t\tDown: " + RATE_FORMAT + "\t\tUp: " + RATE_FORMAT + "\t\t";

private static final String WHITESPACES = "\u0020\u0020\u0020\u0020\u0020\u0020\u0020\u0020\u0020\u0020\u0020";

Expand Down Expand Up @@ -191,8 +192,16 @@ public void print(TorrentSessionState sessionState) {
Rate downRate = new Rate(downloaded - this.downloaded);
Rate upRate = new Rate(uploaded - this.uploaded);
int peerCount = sessionState.getConnectedPeers().size();
String sessionInfo = String.format(SESSION_INFO, peerCount, downRate.getQuantity(), downRate.getMeasureUnit(),
upRate.getQuantity(), upRate.getMeasureUnit());
int activePeerCount = sessionState.getActivePeers().size();
String sessionInfo = String.format(
SESSION_INFO,
activePeerCount,
peerCount,
downRate.getQuantity(),
downRate.getMeasureUnit(),
upRate.getQuantity(),
upRate.getMeasureUnit()
);
graphics.putString(0, 3, sessionInfo);

double completePercents = getCompletePercentage(sessionState.getPiecesTotal(), sessionState.getPiecesRemaining());
Expand Down
2 changes: 1 addition & 1 deletion bt-core/src/main/java/bt/runtime/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public Config() {
this.shutdownHookTimeout = Duration.ofSeconds(30);
this.numOfHashingThreads = 1; // do not parallelize by default
this.maxConcurrentlyActivePeerConnectionsPerTorrent = 20;
this.maxPieceReceivingTime = Duration.ofSeconds(30);
this.maxPieceReceivingTime = Duration.ofSeconds(10);
this.maxMessageProcessingInterval = Duration.ofMillis(100);
this.unreachablePeerBanDuration = Duration.ofMinutes(30);
this.maxPendingConnectionRequests = 50;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,9 @@ public long getUploaded() {
public Set<Peer> getConnectedPeers() {
return Collections.unmodifiableSet(worker.getPeers());
}

@Override
public Set<Peer> getActivePeers() {
return Collections.unmodifiableSet(worker.getActivePeers());
}
}
5 changes: 5 additions & 0 deletions bt-core/src/main/java/bt/torrent/TorrentSessionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,9 @@ public interface TorrentSessionState {
* @since 1.0
*/
Set<Peer> getConnectedPeers();

/**
* @since 1.7
*/
Set<Peer> getActivePeers();
}
12 changes: 7 additions & 5 deletions bt-core/src/main/java/bt/torrent/messaging/Assignment.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import java.time.Duration;

import static java.lang.Math.max;

class Assignment {

enum Status { ACTIVE, DONE, TIMEOUT };

private Peer peer;
private Integer piece;
private int piece;
private ConnectionState connectionState;

private final Duration limit;
Expand All @@ -36,7 +38,7 @@ enum Status { ACTIVE, DONE, TIMEOUT };
private boolean aborted;
private boolean finished;

Assignment(Peer peer, Integer piece, Duration limit) {
Assignment(Peer peer, int piece, Duration limit) {
this.peer = peer;
this.piece = piece;
this.limit = limit;
Expand All @@ -46,15 +48,15 @@ Peer getPeer() {
return peer;
}

Integer getPiece() {
int getPiece() {
return piece;
}

Status getStatus() {
if (finished) {
if (finished || aborted) {
return Status.DONE;
} else if (started > 0) {
long duration = System.currentTimeMillis() - started;
long duration = System.currentTimeMillis() - max(started, checked);
if (duration > limit.toMillis()) {
return Status.TIMEOUT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private boolean isEndgame() {
return localBitfield.getPiecesRemaining() <= assignedPieces.cardinality();
}

private Assignment assign(Peer peer, Integer piece) {
private Assignment assign(Peer peer, int piece) {
Assignment assignment = new Assignment(peer, piece, config.getMaxPieceReceivingTime());
assignments.put(peer, assignment);
assignedPieces.set(piece);
Expand Down
20 changes: 13 additions & 7 deletions bt-core/src/main/java/bt/torrent/messaging/ConnectionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;

import static bt.torrent.messaging.Mapper.buildKey;

/**
* Contains basic information about a connection's state.
*
Expand All @@ -47,9 +49,9 @@ public class ConnectionState {
private Optional<Boolean> shouldChoke;
private long lastChoked;

private Set<Object> cancelledPeerRequests;
private Set<Object> pendingRequests;
private Map<Object, CompletableFuture<BlockWrite>> pendingWrites;
private Set<Mapper.Key> cancelledPeerRequests;
private Set<Mapper.Key> pendingRequests;
private Map<Mapper.Key, CompletableFuture<BlockWrite>> pendingWrites;

private Queue<Request> requestQueue;
private boolean initializedRequestQueue;
Expand Down Expand Up @@ -189,7 +191,7 @@ public void incrementUploaded(long uploaded) {
* @return Set of block request keys
* @since 1.0
*/
public Set<Object> getCancelledPeerRequests() {
public Set<Mapper.Key> getCancelledPeerRequests() {
return cancelledPeerRequests;
}

Expand All @@ -199,7 +201,7 @@ public Set<Object> getCancelledPeerRequests() {
* @since 1.0
*/
public void onCancel(Cancel cancel) {
cancelledPeerRequests.add(Mapper.mapper().buildKey(
cancelledPeerRequests.add(buildKey(
cancel.getPieceIndex(), cancel.getOffset(), cancel.getLength()));
}

Expand All @@ -210,7 +212,7 @@ public void onCancel(Cancel cancel) {
* @return Set of block request keys
* @since 1.0
*/
public Set<Object> getPendingRequests() {
public Set<Mapper.Key> getPendingRequests() {
return pendingRequests;
}

Expand All @@ -221,7 +223,7 @@ public Set<Object> getPendingRequests() {
* @return Pending block writes, mapped by keys of corresponding requests.
* @since 1.0
*/
public Map<Object, CompletableFuture<BlockWrite>> getPendingWrites() {
public Map<Mapper.Key, CompletableFuture<BlockWrite>> getPendingWrites() {
return pendingWrites;
}

Expand All @@ -246,6 +248,10 @@ Optional<Assignment> getCurrentAssignment() {
}

void setCurrentAssignment(Assignment assignment) {
assert !this.assignment.isPresent();
assert requestQueue.isEmpty();
assert pendingRequests.isEmpty();
assert !initializedRequestQueue;
this.assignment = Optional.of(assignment);
}

Expand Down
15 changes: 2 additions & 13 deletions bt-core/src/main/java/bt/torrent/messaging/Mapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,13 @@
package bt.torrent.messaging;

import java.util.Arrays;
import java.util.Optional;

/**
* Creates unique keys, that can be tested for equality with each other.
*
* @since 1.0
*/
public class Mapper {

private static final Mapper instance = new Mapper();

public static Mapper mapper() {
return instance;
}
public final class Mapper {

private Mapper() {}

Expand All @@ -39,14 +32,10 @@ private Mapper() {}
*
* @since 1.0
*/
public Object buildKey(int pieceIndex, int offset, int length) {
public static Key buildKey(int pieceIndex, int offset, int length) {
return new Key(pieceIndex, offset, length);
}

static Optional<Key> decodeKey(Object object) {
return (object instanceof Key) ? Optional.of((Key) object) : Optional.empty();
}

static class Key {

private final int[] key;
Expand Down
22 changes: 12 additions & 10 deletions bt-core/src/main/java/bt/torrent/messaging/PieceConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package bt.torrent.messaging;

import bt.data.Bitfield;
import bt.net.Peer;
import bt.protocol.Have;
import bt.protocol.Message;
import bt.protocol.Piece;
import bt.data.Bitfield;
import bt.torrent.annotation.Consumes;
import bt.torrent.annotation.Produces;
import bt.torrent.data.BlockWrite;
Expand All @@ -33,6 +33,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;

import static bt.torrent.messaging.Mapper.buildKey;

/**
* Consumes blocks, received from the remote peer.
*
Expand Down Expand Up @@ -62,6 +64,13 @@ public void consume(Piece piece, MessageContext context) {
return;
}

if (connectionState.getCurrentAssignment().isPresent()) {
Assignment assignment = connectionState.getCurrentAssignment().get();
if (piece.getPieceIndex() == assignment.getPiece()) {
assignment.check();
}
}

// discard blocks for pieces that have already been verified
if (bitfield.isVerified(piece.getPieceIndex())) {
if (LOGGER.isTraceEnabled()) {
Expand Down Expand Up @@ -99,7 +108,7 @@ public void consume(Piece piece, MessageContext context) {
}

private boolean checkBlockIsExpected(Peer peer, ConnectionState connectionState, Piece piece) {
Object key = Mapper.mapper().buildKey(piece.getPieceIndex(), piece.getOffset(), piece.getBlock().length);
Mapper.Key key = buildKey(piece.getPieceIndex(), piece.getOffset(), piece.getBlock().length);
boolean expected = connectionState.getPendingRequests().remove(key);
if (!expected && LOGGER.isTraceEnabled()) {
LOGGER.trace("Discarding unexpected block {} from peer: {}", piece, peer);
Expand All @@ -114,16 +123,9 @@ private CompletableFuture<BlockWrite> addBlock(Peer peer, ConnectionState connec
byte[] block = piece.getBlock();

connectionState.incrementDownloaded(block.length);
if (connectionState.getCurrentAssignment().isPresent()) {
Assignment assignment = connectionState.getCurrentAssignment().get();
if (pieceIndex == assignment.getPiece()) {
assignment.check();
}
}

CompletableFuture<BlockWrite> future = dataWorker.addBlock(peer, pieceIndex, offset, block);
connectionState.getPendingWrites().put(
Mapper.mapper().buildKey(pieceIndex, offset, block.length), future);
connectionState.getPendingWrites().put(buildKey(pieceIndex, offset, block.length), future);
return future;
}

Expand Down
69 changes: 33 additions & 36 deletions bt-core/src/main/java/bt/torrent/messaging/RequestProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,25 @@
package bt.torrent.messaging;

import bt.BtException;
import bt.data.Bitfield;
import bt.data.ChunkDescriptor;
import bt.data.DataDescriptor;
import bt.net.Peer;
import bt.protocol.Cancel;
import bt.protocol.InvalidMessageException;
import bt.protocol.Message;
import bt.protocol.Request;
import bt.data.Bitfield;
import bt.torrent.annotation.Produces;
import bt.torrent.data.BlockWrite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static bt.torrent.messaging.Mapper.buildKey;

/**
* Produces block requests to the remote peer.
Expand Down Expand Up @@ -89,47 +88,45 @@ public void produce(Consumer<Message> messageConsumer, MessageContext context) {
Queue<Request> requestQueue = connectionState.getRequestQueue();
while (!requestQueue.isEmpty() && connectionState.getPendingRequests().size() <= MAX_PENDING_REQUESTS) {
Request request = requestQueue.poll();
Object key = Mapper.mapper().buildKey(request.getPieceIndex(), request.getOffset(), request.getLength());
messageConsumer.accept(request);
connectionState.getPendingRequests().add(key);
ChunkDescriptor chunk = chunks.get(request.getPieceIndex());
assert request.getOffset() % chunk.blockSize() == 0;
final int blockIndex = (int) (request.getOffset() / chunk.blockSize());
if (!chunk.isPresent(blockIndex)) {
Mapper.Key key = buildKey(request.getPieceIndex(), request.getOffset(), request.getLength());
messageConsumer.accept(request);
connectionState.getPendingRequests().add(key);
} else {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Rejecting request to remote peer because the chunk block is already present: " +
"piece index {" + request.getPieceIndex() + "}, offset {" + request.getOffset()
+ "}, length {" + request.getLength() + "}");
}
}
}

if (requestQueue.isEmpty() //br
&& connectionState.getPendingRequests().isEmpty() //br
&& connectionState.getPendingWrites().isEmpty()) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Aborted downloading piece #{}. All queries are empty.", currentPiece);
}
assignment.abort();
}
}

private void resetConnection(ConnectionState connectionState, Consumer<Message> messageConsumer) {
connectionState.getRequestQueue().clear();
connectionState.setInitializedRequestQueue(false);
connectionState.getPendingRequests().forEach(r -> {
Mapper.decodeKey(r).ifPresent(key -> {
messageConsumer.accept(new Cancel(key.getPieceIndex(), key.getOffset(), key.getLength()));
});
});
connectionState.getRequestQueue().clear();
connectionState.getPendingRequests().forEach(key -> //br
messageConsumer.accept(new Cancel(key.getPieceIndex(), key.getOffset(), key.getLength())));
connectionState.getPendingRequests().clear();
}

private void initializeRequestQueue(ConnectionState connectionState, int pieceIndex) {
List<Request> requests = buildRequests(pieceIndex).stream()
.filter(request -> {
Object key = Mapper.mapper().buildKey(
request.getPieceIndex(), request.getOffset(), request.getLength());
if (connectionState.getPendingRequests().contains(key)) {
return false;
}

CompletableFuture<BlockWrite> future = connectionState.getPendingWrites().get(key);
if (future == null) {
return true;
} else if (!future.isDone()) {
return false;
}

boolean failed = future.isDone() && future.getNow(null).getError().isPresent();
if (failed) {
connectionState.getPendingWrites().remove(key);
}
return failed;

}).collect(Collectors.toList());

assert connectionState.getRequestQueue().isEmpty();
assert connectionState.getPendingRequests().isEmpty();
assert connectionState.getPendingWrites().isEmpty();
List<Request> requests = buildRequests(pieceIndex);
Collections.shuffle(requests);
connectionState.getRequestQueue().addAll(requests);
connectionState.setInitializedRequestQueue(true);
Expand Down
Loading

0 comments on commit 6252697

Please sign in to comment.