In [None]:
import concurrent, subprocess, sys, time

In [None]:
def subprocess_check(*args, **kwargs):
    if len(args) == 1 and type(args[0]) == str:
        kwargs['shell'] = True
    p = subprocess.Popen(*args,  
                         stdout=subprocess.PIPE, 
                         stderr=subprocess.PIPE,
                         **kwargs)
    (out, err) = p.communicate()
    ret = p.wait()
    if ret != 0:
        raise Exception(
            ('Call to subprocess_check failed with return code {ret}\n'
             'Standard error:\n{err}'
             'Standard out:\n{out}').format(**locals()))
    if len(err) > 0 and len(out) > 0 and err[-1] != '\n':
        err += '\n'
    return (err + out).decode('utf-8')

In [None]:
def download_file(url, filename):
    if os.path.exists(filename):
        sys.stdout.write('%s already downloaded\n' % filename)
    else:
        if not os.path.exists(os.path.dirname(filename)):
            os.makedirs(os.path.dirname(filename))
        sys.stdout.write('Downloading %s to %s\n' % (url, filename))
        conn = urllib2.urlopen(url)
        data = conn.read()
        # Confused by apparently partial reads from www2.census.gov 7/2017.  Will this ever trigger?  Should we
        # be reading until empty and then concatenating all?
        assert(not conn.read())
        open(filename + '.tmp', "wb").write(data)
        os.rename(filename + '.tmp', filename)
        sys.stdout.write('Done, wrote %d bytes to %s\n' % (len(data), filename))

def unzip_file(filename):
    exdir = os.path.splitext(filename)[0]
    if os.path.exists(exdir):
        sys.stdout.write('%s already unzipped\n' % (filename))
    else:
        tmpdir = exdir + '.tmp'
        shutil.rmtree(tmpdir, True)
        sys.stdout.write('Unzipping %s into %s\n' % (filename, tmpdir))
        subprocess_check(['unzip', filename, '-d', tmpdir])
        os.rename(tmpdir, exdir)
        print('Success, created %s' % exdir)
    return exdir
        
def gunzip_file(filename):
    dest = os.path.splitext(filename)[0]
    if os.path.exists(dest):
        sys.stdout.write('%s already unzipped\n' % (filename))
    else:
        tmp = dest + '.tmp'
        sys.stdout.write('gunzipping %s\n' % (filename))
        subprocess.check_call("gunzip -c '%s' > '%s'" % (filename, tmp), shell=True)
        os.rename(tmp, dest)
        sys.stdout.write('Success, created %s\n' % (dest))

In [None]:
# Raises worker exceptions in shutdown

class SimpleThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
    def __init__(self, max_workers):
        super(SimpleThreadPoolExecutor, self).__init__(max_workers=max_workers)
        self.futures = []
        
    def submit(self, fn, *args, **kwargs):
        future = super(SimpleThreadPoolExecutor, self).submit(fn, *args, **kwargs)
        self.futures.append(future)
        return future
    
    def get_futures(self):
        return self.futures

    def shutdown(self):
        exception_count = 0
        results = []
        for completed in concurrent.futures.as_completed(self.futures):
            try:
                results.append(completed.result())
            except Exception as e:
                exception_count += 1
                sys.stderr.write(
                    'Exception caught in SimpleThreadPoolExecutor.shutdown.  Continuing until all are finished.\n' +
                    'Exception follows:\n' +
                    traceback.format_exc())
        super(SimpleThreadPoolExecutor, self).shutdown()
        if exception_count:
            raise Exception('SimpleThreadPoolExecutor failed: %d of %d raised exception' % (exception_count, len(self.futures)))
        print('SimpleThreadPoolExecutor succeeded: all %d jobs completed' % len(self.futures))
        return results
        

class SimpleProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
    def __init__(self, max_workers):
        super(SimpleProcessPoolExecutor, self).__init__(max_workers=max_workers)
        self.futures = []
        
    def submit(self, fn, *args, **kwargs):
        future = super(SimpleProcessPoolExecutor, self).submit(fn, *args, **kwargs)
        self.futures.append(future)
        return future
    
    def get_futures(self):
        return self.futures

    def shutdown(self):
        exception_count = 0
        results = []
        for completed in concurrent.futures.as_completed(self.futures):
            try:
                results.append(completed.result())
            except Exception as e:
                exception_count += 1
                sys.stderr.write(
                    'Exception caught in SimpleProcessPoolExecutor.shutdown.  Continuing until all are finished.\n' +
                    'Exception follows:\n' +
                    traceback.format_exc())
        super(SimpleProcessPoolExecutor, self).shutdown()
        if exception_count:
            raise Exception('SimpleProcessPoolExecutor failed: %d of %d raised exception' % (exception_count, len(self.futures)))
        print('SimpleProcessPoolExecutor succeeded: all %d jobs completed' % len(self.futures))
        return results

In [None]:
class Stopwatch:
    def __init__(self, name):
        self.name = name
    def __enter__(self):
        self.start = time.time()
    def __exit__(self, type, value, traceback):
        sys.stdout.write('%s took %.1f seconds\n' % (self.name, time.time() - self.start))

#with Stopwatch('Sleeping for half a second'):
#    time.sleep(0.5)