In [1]:
import os
os.chdir("..")

In [2]:
"""

This script iterates over a specified collection of nxml articles and extracts a
specified set of data from them.

You can specify the url for the Mongo server, as well as the name of the
database and collection.

You *must* also specify one or more of the extractor functions from the
pubcrawler.extractors module. But specify them by just their name; this package
adds the correct suffix automatically. This should be fixed in a later version,
but it was the only good way to allow an argument from the command line.

You can also specify a -skip_field. You don't have to do this, but it's best to,
because this is what's used to report progress (because of ugly multiprocess
stuff, and because python's Queue.qsize() method is not implemented on macOS).

You can also specify a limit, as well as the number of worker processes you
want.

"""

import multiprocessing as mp
import time
import sys
import pymongo
# from annotator.keyword_annotator import KeywordAnnotator
# from annotator.geoname_annotator import GeonameAnnotator
import pubcrawler.extractors as ex

In [3]:
def chunk_slices(length, by):
    items = list(range(0, length + 1, by))
    if length % by != 0:
        items.append(length)
    slices = [slice(items[i], items[i+1]) for i in range(0, len(items)-1)]
    return(slices)

def worker(url, db, collection, to_extract, query, index_queue):
    articles = pymongo.MongoClient()[db][collection]
    cursor = articles.find(query, modifiers={"$snapshot": True})
    print("Cursor count: {}".format(cursor.count()))
    for i in iter(index_queue.get, 'STOP'):
        print("Trying article {}.".format(i))
        try:
            article = cursor[i]
        except IndexError:
            print("Failed lookup for article{}.".format(i))
            continue
        to_write = ex.combine_extracted_info(article, to_extract)
        articles.update_one({'_id': article['_id']}, {'$set': to_write})

In [4]:
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
    "-u", "--mongo_url", default="localhost", dest = "u"
)
parser.add_argument(
    "-d", "--mongo_db", default="pmc", dest = "d"
)
parser.add_argument(
    "-c", "--mongo_collection", default="articlesubset", dest = "c"
)
parser.add_argument(
    "-x", "-extract", action="append", default=None, dest = "x"
)
parser.add_argument(
    "-s", "-skip_field", default=None, dest = "s"
)
parser.add_argument(
    "-w", "-workers", default=4, dest = "w"
)
parser.add_argument(
    "-l", "-limit", default=None, dest = "l"
)
args = parser.parse_args(["-x", "extract_meta", "-s", "meta", "-w", "8", "-c", "articles"])
print(args)

Namespace(c='articles', d='pmc', l=None, s='meta', u='localhost', w='8', x=['extract_meta'])


In [5]:
if args.x is not None:
    extractor_funs = [eval(x) for x in ['ex.' + x for x in args.x]]
else:
    print("Please specify at least one extractor function", file=sys.stderr)
    sys.exit(1)

if args.s is not None:
    query = {args.s: {'$exists': False}}
else:
    query = {}

print("Making connection.")
articles = pymongo.MongoClient(args.u)[args.d][args.c]

Making connection.


In [249]:
print("About to count.")
total_for_query = articles.count(query)
num_to_annotate = args.l if args.l is not None else total_for_query
num_workers = int(args.w)
print("Total for query is {}.".format(total_for_query))

Making connection.
About to count.


KeyboardInterrupt: 

In [25]:
t1 = time.time()
cursor = articles.find(query, ["_id"], limit=4000, no_cursor_timeout=True)

queue = mp.Queue()

for i in cursor:
    queue.put(i)
time.time() - t1

Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.5.2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/queues.py", line 247, in _feed
    send_bytes(obj)
  File "/usr/local/Cellar/python3/3.5.2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/local/Cellar/python3/3.5.2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/local/Cellar/python3/3.5.2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe


1.4686439037322998

In [29]:
queue.get()

{'_id': '20_Century_Br_Hist_2015_Sep_27_26(3)_450-476'}

In [176]:
queue = mp.Queue()
for i in range(num_to_annotate):
    queue.put(i)
for w in range(num_workers):
    queue.put('STOP')

# # Chunking, which we don't do any more.
# queue = mp.Queue()
# for i in chunk_slices(num_to_annotate, by = 100):
#     queue.put(i)
# for w in range(num_workers):
#     queue.put('STOP')

worker_args = (
    args.u,
    args.d,
    args.c,
    extractor_funs,
    query,
    queue,
)

print("About to start.")

for w in range(num_workers):
    mp.Process(target=worker, args=worker_args).start()

# while not queue.empty():
#     print("Still going...")
#     # total_for_query_now = articles.count(query)
#     # done = total_for_query - total_for_query_now
#     # left = num_to_annotate - done
#     # print("Annotated {} out of {} articles ({:.2%}). {} remaining.".format(done,
#         # num_to_annotate, done / num_to_annotate, left))
#     time.sleep(5)

About to start.
Cursor count: 10000
Cursor count: 10000
Cursor count: 10000
Trying article 0.
Trying article 1.
Trying article 2.
Cursor count: 10000
Trying article 3.
Cursor count: 10000
Trying article 4.
Cursor count: 10000
Trying article 6.
Cursor count: 10000
Trying article 7.
Trying article 5.
Cursor count: 9999
Trying article 8.
Trying article 9.
Trying article 10.
Trying article 11.
Trying article 12.
Trying article 13.
Trying article 14.
Trying article 15.
Trying article 16.
Trying article 17.
Trying article 18.
Trying article 19.
Trying article 20.
Trying article 22.
Trying article 21.
Trying article 23.
Trying article 24.
Trying article 25.
Trying article 26.
Trying article 27.
Trying article 29.
Trying article 28.
Trying article 31.
Trying article 30.
Trying article 32.
Trying article 33.
Trying article 34.
Trying article 35.
Trying article 37.
Trying article 36.
Trying article 38.
Trying article 39.
Trying article 42.
Trying article 40.
Trying article 41.
Trying article 43.

Process Process-7:
Process Process-1:
Process Process-3:
Process Process-4:
Process Process-8:
Process Process-5:
Traceback (most recent call last):
Process Process-6:
  File "/usr/local/Cellar/python3/3.5.2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.5.2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python3/3.5.2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.5.2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-4-3e404bb8dc9d>", line 19, in worker
    

In [147]:
test.drop()
test = pymongo.MongoClient()["test"]["test"]

In [148]:
for i in range(1000):
    test.insert_one({"_id": i})

In [149]:
cursor1 = test.find()

In [150]:
query = {"meta": {'$exists': False}}

In [151]:
cursor1 = test.find(query)

In [152]:
x = range(test.find(query).count())
x

range(0, 1000)

In [153]:
queue = mp.Queue()
for i in x:
    queue.put(i)

In [154]:
# cursor1 = test.find(query, modifiers={"$snapshot": True})

In [166]:
i = queue.get()
print("Trying article {}.".format(i))
try:
    article = cursor1[i]
    print(article)
except IndexError:
    print("Failed lookup for article{}.".format(i))
to_write = {"meta": "bar"}
test.update_one({'_id': article['_id']}, {'$set': to_write})

Trying article 11.
{'_id': 22}


<pymongo.results.UpdateResult at 0x109d76438>

In [106]:
i = queue.get()
print("Trying article {}.".format(i))
try:
    article = cursor2.next()
    print(article)
except IndexError:
    print("Failed lookup for article{}.".format(i))
to_write = {"meta": "bar"}
test.update_one({'_id': article['_id']}, {'$set': to_write})

Trying article 18.
{'meta': 'bar', '_id': 5}


<pymongo.results.UpdateResult at 0x109d24b40>