Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Asyncio rewrite of magneticod #76

Merged
merged 22 commits into from Jun 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8da8d20
First crack at porting to asyncio.
richardkiss May 14, 2017
eb8a2c7
Fix conditional.
richardkiss May 14, 2017
2f68ac3
Improve naming.
richardkiss May 14, 2017
4aea5df
Reduce leakage in bittorrent.py.
richardkiss May 14, 2017
f3ae493
More clean-up in error cases.
richardkiss May 14, 2017
635fbe8
More clean-up and simplification.
richardkiss May 14, 2017
4515fa8
More cleanup. Only hit bootstrap if it seems necessary.
richardkiss May 14, 2017
d04634b
Improve dht shutdown. Notice writing pauses.
richardkiss May 14, 2017
73d97d8
Better cancel all outstanding tasks.
richardkiss May 14, 2017
4b4c312
SybilNode now support pause_writing.
richardkiss May 14, 2017
f38a796
Add connection_lost. Properly handle shutdown of watch_q.
richardkiss May 14, 2017
71f55f0
Reduce noise.
richardkiss May 14, 2017
35c6176
Move neighbour task work to tick task.
richardkiss May 15, 2017
e6098ff
Use uvloop if available.
richardkiss May 15, 2017
3e4eba7
Do explicit look-up of bootstrap nodes, and query all responses.
richardkiss May 15, 2017
29b99a3
ModuleNotFoundError (new in 3.6) => ImportError.
richardkiss May 17, 2017
9b1bbfc
Properly clean up fetch_metadata tasks.
richardkiss May 17, 2017
8df4015
Be a little smarter with task clean-up.
richardkiss May 24, 2017
4dc11b0
Tidy up clean-up. Simplify fetch_metadata.
richardkiss May 24, 2017
5d37737
Add some resource debug logging.
richardkiss May 27, 2017
d7ead95
Refactor create_tasks out of main.
richardkiss May 27, 2017
0e389aa
Query DB when checking if an infohash is new or not.
richardkiss May 27, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
161 changes: 40 additions & 121 deletions magneticod/magneticod/__main__.py
Expand Up @@ -13,50 +13,38 @@
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
import argparse
import collections
import functools
import asyncio
import logging
import ipaddress
import selectors
import textwrap
import urllib.parse
import itertools
import os
import sys
import time
import typing

import appdirs
import humanfriendly

from .constants import TICK_INTERVAL, MAX_ACTIVE_PEERS_PER_INFO_HASH, DEFAULT_MAX_METADATA_SIZE
from .constants import DEFAULT_MAX_METADATA_SIZE
from . import __version__
from . import bittorrent
from . import dht
from . import persistence


# Global variables are bad bla bla bla, BUT these variables are used so many times that I think it is justified; else
# the signatures of many functions are literally cluttered.
#
# If you are using a global variable, please always indicate that at the VERY BEGINNING of the function instead of right
# before using the variable for the first time.
selector = selectors.DefaultSelector()
database = None # type: persistence.Database
node = None
peers = collections.defaultdict(list) # type: typing.DefaultDict[dht.InfoHash, typing.List[bittorrent.DisposablePeer]]
# info hashes whose metadata is valid & complete (OR complete but deemed to be corrupt) so do NOT download them again:
complete_info_hashes = set()


def main():
global complete_info_hashes, database, node, peers, selector

arguments = parse_cmdline_arguments()
def create_tasks():
arguments = parse_cmdline_arguments(sys.argv[1:])

logging.basicConfig(level=arguments.loglevel, format="%(asctime)s %(levelname)-8s %(message)s")
logging.info("magneticod v%d.%d.%d started", *__version__)

# use uvloop if it's installed
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
logging.info("using uvloop")
except ImportError:
pass

# noinspection PyBroadException
try:
path = arguments.database_file
Expand All @@ -67,106 +55,25 @@ def main():

complete_info_hashes = database.get_complete_info_hashes()

node = dht.SybilNode(arguments.node_addr)

node.when_peer_found = lambda info_hash, peer_address: on_peer_found(info_hash=info_hash,
peer_address=peer_address,
max_metadata_size=arguments.max_metadata_size)

selector.register(node, selectors.EVENT_READ)

try:
loop()
except KeyboardInterrupt:
logging.critical("Keyboard interrupt received! Exiting gracefully...")
pass
finally:
database.close()
selector.close()
node.shutdown()
for peer in itertools.chain.from_iterable(peers.values()):
peer.shutdown()

return 0


def on_peer_found(info_hash: dht.InfoHash, peer_address, max_metadata_size: int=DEFAULT_MAX_METADATA_SIZE) -> None:
global selector, peers, complete_info_hashes

if len(peers[info_hash]) > MAX_ACTIVE_PEERS_PER_INFO_HASH or info_hash in complete_info_hashes:
return

try:
peer = bittorrent.DisposablePeer(info_hash, peer_address, max_metadata_size)
except ConnectionError:
return

selector.register(peer, selectors.EVENT_READ | selectors.EVENT_WRITE)
peer.when_metadata_found = on_metadata_found
peer.when_error = functools.partial(on_peer_error, peer, info_hash)
peers[info_hash].append(peer)


def on_metadata_found(info_hash: dht.InfoHash, metadata: bytes) -> None:
global complete_info_hashes, database, peers, selector

succeeded = database.add_metadata(info_hash, metadata)
if not succeeded:
logging.info("Corrupt metadata for %s! Ignoring.", info_hash.hex())

# When we fetch the metadata of an info hash completely, shut down all other peers who are trying to do the same.
for peer in peers[info_hash]:
selector.unregister(peer)
peer.shutdown()
del peers[info_hash]
loop = asyncio.get_event_loop()
node = dht.SybilNode(arguments.node_addr, database.is_infohash_new, arguments.max_metadata_size)
loop.create_task(node.launch(loop))
watch_q_task = loop.create_task(watch_q(database, node.metadata_q()))
watch_q_task.add_done_callback(lambda x: clean_up(loop, database, node))
return watch_q_task

complete_info_hashes.add(info_hash)

def clean_up(loop, database, node):
database.close()
loop.run_until_complete(node.shutdown())

def on_peer_error(peer: bittorrent.DisposablePeer, info_hash: dht.InfoHash) -> None:
global peers, selector
peer.shutdown()
peers[info_hash].remove(peer)
selector.unregister(peer)


# TODO:
# Consider whether time.monotonic() is a good choice. Maybe we should use CLOCK_MONOTONIC_RAW as its not affected by NTP
# adjustments, and all we need is how many seconds passed since a certain point in time.
def loop() -> None:
global selector, node, peers

t0 = time.monotonic()
async def watch_q(database, q):
while True:
keys_and_events = selector.select(timeout=TICK_INTERVAL)

# Check if it is time to tick
delta = time.monotonic() - t0
if delta >= TICK_INTERVAL:
if not (delta < 2 * TICK_INTERVAL):
logging.warning("Belated TICK! (Δ = %d)", delta)

node.on_tick()
for peer_list in peers.values():
for peer in peer_list:
peer.on_tick()

t0 = time.monotonic()

for key, events in keys_and_events:
if events & selectors.EVENT_READ:
key.fileobj.on_receivable()
if events & selectors.EVENT_WRITE:
key.fileobj.on_sendable()

# Check for entities that would like to write to their socket
keymap = selector.get_map()
for fd in keymap:
fileobj = keymap[fd].fileobj
if fileobj.would_send():
selector.modify(fileobj, selectors.EVENT_READ | selectors.EVENT_WRITE)
else:
selector.modify(fileobj, selectors.EVENT_READ)
info_hash, metadata = await q.get()
succeeded = database.add_metadata(info_hash, metadata)
if not succeeded:
logging.info("Corrupt metadata for %s! Ignoring.", info_hash.hex())


def parse_ip_port(netloc) -> typing.Optional[typing.Tuple[str, int]]:
Expand Down Expand Up @@ -196,7 +103,7 @@ def parse_size(value: str) -> int:
raise argparse.ArgumentTypeError("Invalid argument. {}".format(e))


def parse_cmdline_arguments() -> typing.Optional[argparse.Namespace]:
def parse_cmdline_arguments(args) -> typing.Optional[argparse.Namespace]:
parser = argparse.ArgumentParser(
description="Autonomous BitTorrent DHT crawler and metadata fetcher.",
epilog=textwrap.dedent("""\
Expand Down Expand Up @@ -240,7 +147,19 @@ def parse_cmdline_arguments() -> typing.Optional[argparse.Namespace]:
action="store_const", dest="loglevel", const=logging.DEBUG, default=logging.INFO,
help="Print debugging information in addition to normal processing.",
)
return parser.parse_args(sys.argv[1:])
return parser.parse_args(args)


def main():
main_task = create_tasks()
try:
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
logging.critical("Keyboard interrupt received! Exiting gracefully...")
finally:
main_task.cancel()

return 0


if __name__ == "__main__":
Expand Down