Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WikiCorpus processes lock-up when run from command line #1320

Closed
jli05 opened this issue May 14, 2017 · 6 comments
Closed

WikiCorpus processes lock-up when run from command line #1320

jli05 opened this issue May 14, 2017 · 6 comments
Labels
bug Issue described a bug difficulty easy Easy issue: required small fix good first issue Issue for new contributors (not required gensim understanding + very simple)

Comments

@jli05
Copy link

jli05 commented May 14, 2017

When we run the following function from within iPython, it runs successfully with multi-processes enabled by the processes argument when constructing WikiCorpus. However when we put this function in a .py file and use python3 xxx.py to invoke it, it never processes more than 200 documents, when we press Ctrl-C, there seems to be a semaphore lock-up caused by multi-processes.

def dump_bow(corpus, partition_size=10000, limit=200, output_prefix='dump'):
    ''' Dump Bag-of-Word from gensim WikiCorpus

    Iterate through the documents in the wiki dump and dump the
    Bag-of-Words of the documents in a series of .txt.gz files.
    Each line in the uncompressed file represent one document, with
    only lower-case words separated by space.

    PARAMETERS
    -----------
    corpus: gensim.corpora.WikiCorpus
        The Wikidump corpus.
    partition_size: int
        Number of documents in each .txt.gz dump file.
    limit: int
        The total number of documents to dump.
    output_prefix: str
        Prefix of the dump files.
    '''
    def write_buffer(buf, output_prefix, partition_id):
        ''' Dump current buffer of Bag-of-Words '''
        fname = '{}-{:06d}.txt.gz'.format(output_prefix, partition_id)
        with gzip.open(fname, 'wt') as partition_file:
            partition_file.write(buf)

    count_documents = 0
    partition_id = 0
    buf = ''

    for bow in corpus.get_texts():
        text = ' '.join([byte_array.decode('utf-8') for byte_array in bow])
        buf += text + '\n'
        count_documents += 1

        if count_documents % 100 == 0:
            print('Processed {} documents.'.format(count_documents))

        if count_documents % partition_size == 0:
            write_buffer(buf, output_prefix, partition_id)
            buf = ''
            partition_id += 1

        if limit is not None and count_documents >= limit:
            break

    if buf:
        write_buffer(buf, output_prefix, partition_id)
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
    _run_finalizers()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
    thread.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 342, in get
    with self._rlock:
  File "/usr/lib/python3.5/multiprocessing/synchronize.py", line 96, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
@jli05 jli05 closed this as completed May 14, 2017
@tmylk
Copy link
Contributor

tmylk commented May 15, 2017

@jli05 May I kindly ask about the resolution of this issue in case it comes up in the future here?

@jli05
Copy link
Author

jli05 commented May 17, 2017

There remains problem with WikiCorpus multi-processing.

If we run with p larger than l, the hanging up happens at the exit of function dump_bow().

Could we do some fix to it?

python3 dump_bow.py -p 50 -l 20 enwiki-20170420-pages-articles.xml.bz2 wiki_dictionary.txt 
Processing 20 documents in the corpus...
Dumped 20 documents.
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/util.py", line 254, in _run_finalizers
    finalizer()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py", line 557, in _terminate_pool
    task_handler.join()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

The code dump_bow.py is as below:

''' Dump Bag-of-Word from gensim WikiCorpus '''
import gzip
from argparse import ArgumentParser
from gensim.corpora.dictionary import Dictionary
from gensim.corpora.wikicorpus import WikiCorpus

def dump_bow(corpus, partition_size=50, limit=200, output_prefix='dump'):
    ''' Dump Bag-of-Word from gensim WikiCorpus

    Iterate through the documents in the wiki dump and dump the
    Bag-of-Words of the documents in a series of .txt.gz files.
    Each line in the uncompressed file represent one document, with
    only lower-case words separated by space.

    PARAMETERS
    -----------
    corpus: gensim.corpora.WikiCorpus
        The Wikidump corpus.
    partition_size: int
        Number of documents in each .txt.gz dump file.
    limit: int or None
        The total number of documents to dump, or None for all
        the documents in the corpus.
    output_prefix: str
        Prefix of the dump files.
    '''
    def write_buffer(buf, output_prefix, partition_id):
        ''' Dump current buffer of Bag-of-Words '''
        fname = '{}-{:06d}.txt.gz'.format(output_prefix, partition_id)
        with gzip.open(fname, 'wt') as partition_file:
            partition_file.write(buf)

    if limit is not None:
        print('Processing {} documents in the corpus...'.format(limit))
    else:
        print('Processing all the documents in the corpus...')

    assert partition_size >= 1
    assert limit is None or limit >= 1
    # gensim 2.0 requires this otherwise the multi-processing locks up
    # assert limit is None or partition_size <= limit

    count_documents = 0
    partition_id = 0
    buf = ''

    for bow in corpus.get_texts():
        text = ' '.join([byte_array.decode('utf-8') for byte_array in bow])
        buf += text + '\n'
        count_documents += 1

        if count_documents % 200 == 0:
            print('Processed {} documents.'.format(count_documents))

        if count_documents % partition_size == 0:
            write_buffer(buf, output_prefix, partition_id)
            buf = ''
            partition_id += 1

        if limit is not None and count_documents >= limit:
            break

    if buf:
        write_buffer(buf, output_prefix, partition_id)
    print('Dumped {} documents.'.format(count_documents))

def main():
    ''' Parse arguments and run '''
    parser = ArgumentParser(description='Dump bag-of-words in .txt.gz files')

    parser.add_argument('wikidump', type=str,
                        help='xxx-pages-articles.xml.bz2 wiki dump file')
    parser.add_argument('dictionary', type=str,
                        help='gensim dictionary .txt file')

    parser.add_argument('-j', '--jobs', type=int, default=2,
                        help='Number of parallel jobs, default: 2')
    parser.add_argument('-p', '--partition-size', type=int,
                        help='Number of documents in each .txt.gz file')
    parser.add_argument('-l', '--limit', type=int,
                        help=('Total number of documents to dump, '
                              'or all documents when not specified'))
    parser.add_argument('-o', '--output-prefix', type=str, default='dump',
                        help='Prefix of dump .txt.gz files, default: dump')

    args = parser.parse_args()

    wiki_dictionary = Dictionary.load_from_text(args.dictionary)
    wiki = WikiCorpus(args.wikidump, processes=args.jobs,
                      dictionary=wiki_dictionary)

    dump_bow(wiki, args.partition_size, args.limit,
             output_prefix=args.output_prefix)

if __name__ == '__main__':
    main()

@jli05 jli05 reopened this May 17, 2017
@menshikh-iv menshikh-iv added bug Issue described a bug difficulty easy Easy issue: required small fix test before incubator labels Oct 2, 2017
@menshikh-iv menshikh-iv added good first issue Issue for new contributors (not required gensim understanding + very simple) needs domain knowledge and removed test before incubator labels Oct 16, 2017
@xelez
Copy link
Contributor

xelez commented Oct 22, 2017

Tried to reproduce on current develop branch with python3.6 and couldn't.

WikiCorpus.get_texts() is a generator function, so the problem may be because generator isn't closed and garbage collected correctly after break in a loop (although it's not clear why).

@jli05 can you do something like this and see if it fixes the hanging?

def dump_bow(corpus, partition_size=50, limit=200, output_prefix='dump'):
    ...
    texts = corpus.get_texts() #save generator to separate variable
    for bow in texts:
        ...

    texts.close() # explicitly close the generator

@jli05
Copy link
Author

jli05 commented Oct 22, 2017

It was too long ago and I'd have to re-download the data if I want to do what you ask. So I trust what you say.

@xelez
Copy link
Contributor

xelez commented Oct 24, 2017

    # gensim 2.0 requires this otherwise the multi-processing locks up

I could reproduce it on 2.0.0.

Then I done git diff 2.0.0 wikicorpus.py to find out why.

Before e06c7c3#diff-eece52d95c280dabe57c803c95d6bb96
exception handling was incorrect and pool.terminate() haven't always run, so workers didn't stop and python main process kept waiting for them.

So, this is already fixed now. What do you think, @menshikh-iv?

@menshikh-iv
Copy link
Contributor

@xelez thanks for investigation
This works correctly for the current version, for this reason, I close this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Issue described a bug difficulty easy Easy issue: required small fix good first issue Issue for new contributors (not required gensim understanding + very simple)
Projects
None yet
Development

No branches or pull requests

4 participants