Skip to content

Commit

Permalink
Incorrect behavior when the same peer participates in more than one t…
Browse files Browse the repository at this point in the history
…orrent #67
  • Loading branch information
atomashpolskiy committed Feb 18, 2018
1 parent 666fa48 commit 39d5d67
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 145 deletions.
1 change: 1 addition & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ For the latest information visit project web site: http://atomashpolskiy.github.
* Randomized rarest-first selector behaves like a sequential selector when peers are seeds #53
* Empty files should not prevent successful verification of torrent's data
* NPE in DefaultChannelPipeline when there are unprocessed leftovers from MSE handshake #57
* Incorrect behavior when the same peer participates in more than one torrent #67

## 1.6

Expand Down
5 changes: 5 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# UPGRADE INSTRUCTIONS

## 1.7

* `bt.net.IPeerConnectionPool.getConnection` now requires two parameters to uniquely identify the connection: `Peer` and `TorrentId`
* `bt.net.IMessageDispatcher.addMessageConsumer` and `bt.net.IMessageDispatcher.addMessageSupplier` now require an additional parameter: `TorrentId`

## 1.5

* `bt.BaseClientBuilder#runtime(BtRuntime)` is now protected instead of public. Use a factory method `bt.Bt#client(BtRuntime)` to attach the newly created client to a shared runtime.
Expand Down
6 changes: 4 additions & 2 deletions bt-core/src/main/java/bt/metainfo/TorrentId.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ public int hashCode() {

@Override
public boolean equals(Object obj) {

if (obj == this) {
return true;
}
if (obj == null || !TorrentId.class.isAssignableFrom(obj.getClass())) {
return false;
}
return (obj == this) || Arrays.equals(torrentId, ((TorrentId) obj).getBytes());
return Arrays.equals(torrentId, ((TorrentId) obj).getBytes());
}

@Override
Expand Down
61 changes: 61 additions & 0 deletions bt-core/src/main/java/bt/net/ConnectionKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2016—2018 Andrei Tomashpolskiy and individual contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package bt.net;

import bt.metainfo.TorrentId;

import java.util.Objects;

class ConnectionKey {
private final Peer peer;
private final TorrentId torrentId;

public ConnectionKey(Peer peer, TorrentId torrentId) {
Objects.requireNonNull(peer);
Objects.requireNonNull(torrentId);
this.peer = peer;
this.torrentId = torrentId;
}

public Peer getPeer() {
return peer;
}

public TorrentId getTorrentId() {
return torrentId;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
ConnectionKey that = (ConnectionKey) obj;
return peer.equals(that.peer) && torrentId.equals(that.torrentId);

}

@Override
public int hashCode() {
int result = peer.hashCode();
result = 31 * result + torrentId.hashCode();
return result;
}
}
18 changes: 10 additions & 8 deletions bt-core/src/main/java/bt/net/ConnectionSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class ConnectionSource implements IConnectionSource {
private final ExecutorService connectionExecutor;
private final Config config;

private final Map<Peer, CompletableFuture<ConnectionResult>> pendingConnections;
private final Map<ConnectionKey, CompletableFuture<ConnectionResult>> pendingConnections;
// TODO: weak map
private final ConcurrentMap<Peer, Long> unreachablePeers;

Expand Down Expand Up @@ -83,7 +83,9 @@ public ConnectionResult getConnection(Peer peer, TorrentId torrentId) {

@Override
public CompletableFuture<ConnectionResult> getConnectionAsync(Peer peer, TorrentId torrentId) {
CompletableFuture<ConnectionResult> connection = getExistingOrPendingConnection(peer);
ConnectionKey key = new ConnectionKey(peer, torrentId);

CompletableFuture<ConnectionResult> connection = getExistingOrPendingConnection(key);
if (connection != null) {
if (connection.isDone() && LOGGER.isDebugEnabled()) {
LOGGER.debug("Returning existing connection for peer: {}. Torrent: {}", peer, torrentId);
Expand Down Expand Up @@ -116,7 +118,7 @@ public CompletableFuture<ConnectionResult> getConnectionAsync(Peer peer, Torrent
}

synchronized (pendingConnections) {
connection = getExistingOrPendingConnection(peer);
connection = getExistingOrPendingConnection(key);
if (connection != null) {
if (connection.isDone() && LOGGER.isDebugEnabled()) {
LOGGER.debug("Returning existing connection for peer: {}. Torrent: {}", peer, torrentId);
Expand All @@ -140,7 +142,7 @@ public CompletableFuture<ConnectionResult> getConnectionAsync(Peer peer, Torrent
}
} finally {
synchronized (pendingConnections) {
pendingConnections.remove(peer);
pendingConnections.remove(key);
}
}
}, connectionExecutor).whenComplete((acquiredConnection, throwable) -> {
Expand All @@ -157,18 +159,18 @@ public CompletableFuture<ConnectionResult> getConnectionAsync(Peer peer, Torrent
}
});

pendingConnections.put(peer, connection);
pendingConnections.put(key, connection);
return connection;
}
}

private CompletableFuture<ConnectionResult> getExistingOrPendingConnection(Peer peer) {
PeerConnection existingConnection = connectionPool.getConnection(peer);
private CompletableFuture<ConnectionResult> getExistingOrPendingConnection(ConnectionKey key) {
PeerConnection existingConnection = connectionPool.getConnection(key);
if (existingConnection != null) {
return CompletableFuture.completedFuture(ConnectionResult.success(existingConnection));
}

CompletableFuture<ConnectionResult> pendingConnection = pendingConnections.get(peer);
CompletableFuture<ConnectionResult> pendingConnection = pendingConnections.get(key);
if (pendingConnection != null) {
return pendingConnection;
}
Expand Down
15 changes: 9 additions & 6 deletions bt-core/src/main/java/bt/net/IMessageDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package bt.net;

import bt.metainfo.TorrentId;
import bt.protocol.Message;

import java.util.function.Consumer;
Expand All @@ -29,20 +30,22 @@
public interface IMessageDispatcher {

/**
* Add a message consumer to receive messages from a remote peer.
* Add a message consumer to receive messages from a remote peer for a given torrent.
*
* @param torrentId Torrent ID
* @param sender Remote peer, whose messages should be relayed to the consumer
* @param messageConsumer Message consumer
* @since 1.0
* @since 1.7
*/
void addMessageConsumer(Peer sender, Consumer<Message> messageConsumer);
void addMessageConsumer(TorrentId torrentId, Peer sender, Consumer<Message> messageConsumer);

/**
* Add a message supplier to send messages to a remote peer.
* Add a message supplier to send messages to a remote peer for a given torrent.
*
* @param torrentId Torrent ID
* @param recipient Remote peer, to whom the supplied messages should be sent
* @param messageSupplier Message supplier
* @since 1.0
* @since 1.7
*/
void addMessageSupplier(Peer recipient, Supplier<Message> messageSupplier);
void addMessageSupplier(TorrentId torrentId, Peer recipient, Supplier<Message> messageSupplier);
}
12 changes: 9 additions & 3 deletions bt-core/src/main/java/bt/net/IPeerConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@
public interface IPeerConnectionPool {

/**
* @return Connection for a given peer, if exists; null otherwise
* @since 1.0
* @return Connection for given peer and torrent, if exists; null otherwise
* @since 1.7
*/
PeerConnection getConnection(Peer peer);
PeerConnection getConnection(Peer peer, TorrentId torrentId);

/**
* @return Connection for given connection key, if exists; null otherwise
* @since 1.7
*/
PeerConnection getConnection(ConnectionKey key);

/**
* Visit connections for a given torrent ID.
Expand Down

0 comments on commit 39d5d67

Please sign in to comment.