Skip to content

Commit

Permalink
Working in a more robust implementation of fetchMagnet, still with a …
Browse files Browse the repository at this point in the history
…multithreading errors.
  • Loading branch information
aldenml committed Feb 2, 2016
1 parent 1f82ca1 commit fc00072
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 88 deletions.
55 changes: 28 additions & 27 deletions src/main/java/com/frostwire/jlibtorrent/Downloader.java
Expand Up @@ -17,6 +17,8 @@
*/
public final class Downloader {

private static final Logger LOG = Logger.getLogger(Downloader.class);

private static final int[] LISTENER_TYPES = new int[]{AlertType.METADATA_RECEIVED.getSwig()};

private final Session s;
Expand Down Expand Up @@ -60,8 +62,6 @@ public void download(TorrentInfo ti, File saveDir) {
}

/**
* This method is not thread safe.
*
* @param uri
* @param timeout in milliseconds
* @return
Expand All @@ -77,11 +77,10 @@ public byte[] fetchMagnet(String uri, long timeout) {

final sha1_hash info_hash = p.getInfo_hash();

torrent_handle th = s.getSwig().find_torrent(info_hash);

boolean add = true;

if (th != null && th.is_valid()) {
torrent_handle th = null;//s.getSwig().find_torrent(info_hash);
/*if (th != null && th.is_valid()) {
// we have a download with the same info-hash, let's see if we have the torrent info
torrent_info ti = th.get_torrent_copy();
if (ti != null && ti.is_valid()) {
Expand All @@ -90,9 +89,10 @@ public byte[] fetchMagnet(String uri, long timeout) {
} else {
add = false;
}
}
}*/

final CountDownLatch signal = new CountDownLatch(1);
final byte[][] data = new byte[1][];

This comment has been minimized.

Copy link
@gubatron

gubatron Feb 2, 2016

Collaborator

why not just a byte[] ? experimenting for something coming?


AlertListener l = new AlertListener() {
@Override
Expand All @@ -102,13 +102,24 @@ public int[] types() {

@Override
public void alert(Alert<?> alert) {
if (!(alert instanceof MetadataReceivedAlert)) {
return;
}

MetadataReceivedAlert mr = (MetadataReceivedAlert) alert;

if (mr.getSwig().getHandle().info_hash().op_eq(info_hash)) {
try {
if (!(alert instanceof MetadataReceivedAlert)) {
return;
}

MetadataReceivedAlert mr = (MetadataReceivedAlert) alert;
torrent_handle th = mr.getHandle().getSwig();

if (th.info_hash().op_eq(info_hash)) {
try {
data[0] = new TorrentInfo(th.get_torrent_copy()).bencode();
} catch (Throwable e) {
LOG.error("Error encoding torrent info", e);
}
signal.countDown();
}
} catch (Throwable e) {
LOG.error("Error in fetch magnet internal listener", e);
signal.countDown();
}
}
Expand All @@ -117,7 +128,8 @@ public void alert(Alert<?> alert) {
s.addListener(l);

if (add) {
p.setName("fetchMagnet - " + uri);
p.setName("fetch_magnet:" + uri);
p.setSave_path("fetch_magnet/" + uri);

long flags = p.get_flags();
flags &= ~add_torrent_params.flags_t.flag_auto_managed.swigValue();
Expand All @@ -135,21 +147,10 @@ public void alert(Alert<?> alert) {

s.removeListener(l);

try {
th = s.getSwig().find_torrent(info_hash);
if (th != null && th.is_valid()) {
// we have a download with the same info-hash, let's see if we have the torrent info
torrent_info ti = th.get_torrent_copy();
if (ti != null && ti.is_valid()) {
// ok. we have it, ready to return the data
return new TorrentInfo(th.get_torrent_copy()).bencode();
}
}

} finally {
if (add && th != null && th.is_valid()) {
s.getSwig().remove_torrent(th);
}

return null;
return data[0];
}
}
61 changes: 0 additions & 61 deletions src/main/java/com/frostwire/jlibtorrent/demo/GetMagnet2.java

This file was deleted.

97 changes: 97 additions & 0 deletions src/test/java/com/frostwire/jlibtorrent/demo/GetMagnet2.java
@@ -0,0 +1,97 @@
package com.frostwire.jlibtorrent.demo;

import com.frostwire.jlibtorrent.AlertListener;
import com.frostwire.jlibtorrent.Downloader;
import com.frostwire.jlibtorrent.Entry;
import com.frostwire.jlibtorrent.Session;
import com.frostwire.jlibtorrent.alerts.Alert;
import com.frostwire.jlibtorrent.alerts.AlertType;
import com.frostwire.jlibtorrent.alerts.DhtStatsAlert;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author gubatron
* @author aldenml
*/
public final class GetMagnet2 {

public static void main(String[] args) throws Throwable {

final String uri = "magnet:?xt=urn:btih:a83cc13bf4a07e85b938dcf06aa707955687ca7c";

final Session s = new Session();

final CountDownLatch signal = new CountDownLatch(1);

// the session stats are posted about once per second.
AlertListener l = new AlertListener() {
@Override
public int[] types() {
return new int[]{AlertType.SESSION_STATS.getSwig(), AlertType.DHT_STATS.getSwig()};
}

@Override
public void alert(Alert<?> alert) {
if (alert.getType().equals(AlertType.SESSION_STATS)) {
s.postDHTStats();
}

if (alert.getType().equals(AlertType.DHT_STATS)) {

long nodes = ((DhtStatsAlert) alert).totalNodes();
// wait for at least 10 nodes in the DHT.
if (nodes >= 10) {
System.out.println("DHT contains " + nodes + " nodes");
signal.countDown();
}
}
}
};

s.addListener(l);
s.postDHTStats();

final Downloader d = new Downloader(s);

System.out.println("Waiting for nodes in DHT (10 seconds)...");
boolean r = signal.await(10, TimeUnit.SECONDS);
if (!r) {
System.out.println("DHT bootstrap timeout");
System.exit(0);
}

// no more trigger of DHT stats
s.removeListener(l);

System.out.println("Fetching the magnet uri (multi thread), please wait...");

final AtomicInteger counter = new AtomicInteger(0);
for (int i = 0; i < 3; i++) {
final int index = i;
Thread t = new Thread() {
@Override
public void run() {
byte[] data = d.fetchMagnet(uri, 30000);

int count = counter.incrementAndGet();
if (data != null) {
System.out.println("Success fetching magnet: " + index + "/" + count);
} else {
System.out.println("Failed to retrieve the magnet: " + index + "/" + count);
}
}
};

t.start();
//t.join();
}

System.out.println("Press ENTER to exit");
System.in.read();

s.abort();
}
}

0 comments on commit fc00072

Please sign in to comment.