# Concurrency

In [1]:
import os
import random

In [None]:
# create many files that we can open later
from os import makedirs
from os.path import join
from random import random

# save data to a file
def save_file(filepath, data):
    # open the file
    with open(filepath, 'w') as handle:
        # save the data
        handle.write(data)

# generate a line of mock data of 10 random data points
def generate_line():
    return ','.join([str(random()) for _ in range(10)])

# generate file data of 10K lines each with 10 data points
def generate_file_data():
    # generate many lines of data
    lines = [generate_line() for _ in range(10000)]
    # convert to a single ascii doc with new lines
    return '\n'.join(lines)

# generate 10K files in a directory
def generate_all_files(path='../../tmp'):
# create a local directory to save files
    makedirs(path, exist_ok=True)
    # create all files
    for i in range(10000):
    # generate data
        data = generate_file_data()
        # create filenames
        filepath = join(path, f'data-{i:04d}.csv')
        # save data file
        save_file(filepath, data)
        # report progress
        print(f'.saved {filepath}')

# entry point, generate all of the files
generate_all_files()

In [None]:
# load many files sequentially
from os import listdir
from os.path import join

# open a file and return the contents
def load_file(filepath):
    # open the file
    with open(filepath, 'r') as handle:
        # return the contents
        return handle.read()

# load all files in a directory into memory
def load_all_files(path='../../tmp'):
    # prepare all of the paths
    paths = [join(path, filepath) for filepath in listdir(path)]
    # load each file in the directory
    for filepath in paths:
        # open the file and load the data
        data = load_file(filepath)
        # report progress
        print(f'.loaded {filepath}')
    print('Done')

# entry point
if __name__ == '__main__':
    load_all_files()

In [None]:
# load many files concurrently with threads
from os import listdir
from os.path import join
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

# open a file and return the contents
def load_file(filepath):
    # open the file
    with open(filepath, 'r') as handle:
        # return the contents
        return handle.read(), filepath

# load all files in a directory into memory
def main(path='../../tmp'):
    # prepare all of the paths
    paths = [join(path, filepath) for filepath in listdir(path)]
    # create the thread pool
    with ThreadPoolExecutor(10) as executor:
        # submit all tasks
        futures = [executor.submit(load_file, p) for p in paths]
        # process all results
        for future in as_completed(futures):
            # open the file and load the data
            data, filepath = future.result()
            # report progress
            print(f'.loaded {filepath}')
    print('Done')

# entry point
if __name__ == '__main__':
    main()

In [None]:
# load many files concurrently with threads in batch
from os import listdir
from os.path import join
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

# open a file and return the contents
def load_files(filepaths):
    data_list = list()
    # load each file
    for filepath in filepaths:
        # open the file
        with open(filepath, 'r') as handle:
            # return the contents
            data_list.append(handle.read())
    return data_list, filepaths

# load all files in a directory into memory
def main(path='../../tmp'):
    # prepare all of the paths
    paths = [join(path, filepath) for filepath in listdir(path)]
    # determine chunksize
    n_workers = 100
    chunksize = round(len(paths) / n_workers)
    # create the thread pool
    with ThreadPoolExecutor(n_workers) as executor:
        futures = list()
        # split the load operations into chunks
        for i in range(0, len(paths), chunksize):
            # select a chunk of filenames
            filepaths = paths[i:(i + chunksize)]
            # submit the task
            future = executor.submit(load_files, filepaths)
            futures.append(future)
        # process all results
        for future in as_completed(futures):
            # open the file and load the data
            _, filepaths = future.result()
            for filepath in filepaths:
                # report progress
                print(f'.loaded {filepath}')
    print('Done')

# entry point
if __name__ == '__main__':
    main()

In [None]:
# load many files concurrently with processes
from os import listdir
from os.path import join
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed

# open a file and return the contents
def load_file(filepath):
    # open the file
    with open(filepath, 'r') as handle:
        # return the contents
        return handle.read(), filepath

# load all files in a directory into memory
def main(path='../../tmp'):
    # prepare all of the paths
    paths = [join(path, filepath) for filepath in listdir(path)]
    # create the process pool
    with ProcessPoolExecutor(4) as executor:
        # submit all tasks
        futures = [executor.submit(load_file, p) for p in paths]
        # process all results
        for future in as_completed(futures):
            # open the file and load the data
            data, filepath = future.result()
            # report progress
            print(f'.loaded {filepath}')
    print('Done')

# entry point
if __name__ == '__main__':
    main()

In [None]:
# load many files concurrently with processes in batch
from os import listdir
from os.path import join
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed

# open a file and return the contents
def load_files(filepaths):
    data_list = list()
    # load each file
    for filepath in filepaths:
        # open the file
        with open(filepath, 'r') as handle:
            # return the contents
            data_list.append(handle.read())
    return data_list, filepaths

# load all files in a directory into memory
def main(path='../../tmp'):
    # prepare all of the paths
    paths = [join(path, filepath) for filepath in listdir(path)]
    # determine chunksize
    chunksize = 20
    # create the process pool
    with ProcessPoolExecutor(4) as executor:
        futures = list()
        # split the load operations into chunks
        for i in range(0, len(paths), chunksize):
            # select a chunk of filenames
            filepaths = paths[i:(i + chunksize)]
            # submit the task
            future = executor.submit(load_files, filepaths)
            futures.append(future)
            # process all results
        for future in as_completed(futures):
            # open the file and load the data
            _, filepaths = future.result()
            for filepath in filepaths:
                # report progress
                print(f'.loaded {filepath}')
    print('Done')

# entry point
if __name__ == '__main__':
    main()

In [None]:
# load many files concurrently with processes and threads in batch
from os import listdir
from os.path import join
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

# load a file and return the contents
def load_file(filepath):
    # open the file
    with open(filepath, 'r') as handle:
        # return the contents
        handle.read()

# return the contents of many files
def load_files(filepaths):
    # create a thread pool
    with ThreadPoolExecutor(len(filepaths)) as exe:
        # load files
        futures = [exe.submit(load_file, name) for name in filepaths]
        # collect data
        data_list = [future.result() for future in futures]
        # return data and file paths
        return (data_list, filepaths)

# load all files in a directory into memory
def main(path='../../tmp'):
    # prepare all of the paths
    paths = [join(path, filepath) for filepath in listdir(path)]
    # determine chunksize
    n_workers = 8
    chunksize = round(len(paths) / n_workers)
    # create the process pool
    with ProcessPoolExecutor(n_workers) as executor:
        futures = list()
        # split the load operations into chunks
        for i in range(0, len(paths), chunksize):
            # select a chunk of filenames
            filepaths = paths[i:(i + chunksize)]
            # submit the task
            future = executor.submit(load_files, filepaths)
            futures.append(future)
        # process all results
        for future in as_completed(futures):
            # open the file and load the data
            _, filepaths = future.result()
            for filepath in filepaths:
                # report progress
                print(f'.loaded {filepath}')
    print('Done')

# entry poimt
if __name__ == '__main__':
    main()

In [None]:
# load many files concurrently with asyncio
from os import listdir
from os.path import join
import asyncio
import aiofiles

# open a file and return the contents
async def load_file(filepath):
    # open the file
    async with aiofiles.open(filepath, 'r') as handle:
        # return the contents
        return (await handle.read(), filepath)

# load all files in a directory into memory
async def main(path='../../tmp'):
    # prepare all of the paths
    paths = [join(path, filepath) for filepath in listdir(path)]
    # create coroutines
    tasks = [load_file(filepath) for filepath in paths]
    # execute tasks and process results as they are completed
    for task in asyncio.as_completed(tasks):
        # open the file and load the data
        data, filepath = await task
        # report progress
        print(f'.loaded {filepath}')
    print('Done')

# entry point
if __name__ == '__main__':
    asyncio.run(main())

In [None]:
# load many files concurrently with asyncio
from os import listdir
from os.path import join
import asyncio
import aiofiles

# open a file and return the contents
async def load_file(filepath, semaphore):
    # acquire the semaphore
    async with semaphore:
        # open the file
        async with aiofiles.open(filepath, 'r') as handle:
            # return the contents
            return (await handle.read(), filepath)

# load all files in a directory into memory
async def main(path='../../tmp'):
    # prepare all of the paths
    paths = [join(path, filepath) for filepath in listdir(path)]
    # create a semaphore to limit open files
    semaphore = asyncio.Semaphore(100)
    # create coroutines
    tasks = [load_file(filepath, semaphore) for filepath in paths]
    # execute tasks and process results as they are completed
    for task in asyncio.as_completed(tasks):
        # open the file and load the data
        data, filepath = await task
        # report progress
        print(f'.loaded {filepath}')
    print('Done')

# entry point
if __name__ == '__main__':
    asyncio.run(main())

In [None]:
# load many files concurrently with asyncio in batch
from os import listdir
from os.path import join
import asyncio
import aiofiles

# load and return the contents of a list of file paths
async def load_files(filepaths):
    # load all files
    data_list = list()
    for filepath in filepaths:
        # open the file
        async with aiofiles.open(filepath, 'r') as handle:
            # load the contents and add to list
            data = await handle.read()
            # store loaded data
            data_list.append(data)
    return (data_list, filepaths)

# load all files in a directory into memory
async def main(path='../../tmp'):
    # prepare all of the paths
    paths = [join(path, filepath) for filepath in listdir(path)]
    # split up the data
    chunksize = 10
    # split the operations into chunks
    tasks = list()
    for i in range(0, len(paths), chunksize):
        # select a chunk of filenames
        filepaths = paths[i:(i + chunksize)]
        # define the task
        tasks.append(load_file(filepaths))
        # execute tasks and process results as they are completed
    for task in asyncio.as_completed(tasks):
        # wait for the next task to complete
        _, filepaths = await task
        # process results
        for filepath in filepaths:
            # report progress
            print(f'.loaded {filepath}')
    print('Done')

# entry point
if __name__ == '__main__':
    asyncio.run(main())

In [None]:
path='../../tmp'
paths = listdir(path)
# determine chunksize
num_workers = 100
# create the thread pool
with ThreadPoolExecutor(num_workers) as executor:
    futures = list()
    # split the load operations into chunks
    for i in range(num_workers):
        # select a chunk of filenames
        filepaths = [paths[i::num_workers] for i in range(num_workers)]

In [3]:
path='../../tmp'
paths = os.listdir(path)
# determine chunksize
num_chunks = 30
# create the thread pool
filepaths = [paths[i::num_chunks] for i in range(num_chunks)]

In [12]:
path='../../tmp'
paths = os.listdir(path)
# determine chunksize
chunk_size = 3000
# create the thread pool
filepaths = [paths[i:i+chunk_size] for i in range(0, len(paths), chunk_size)]

In [16]:
for l in filepaths:
    print(len(l))

3000
3000
3000
1000


In [15]:
chunk_pairs = [[filepaths[i], filepaths[j]] for i in range(len(filepaths)) for j in range(i, len(filepaths))]

In [17]:
for l in chunk_pairs:
    print(len(l))

2
2
2
2
2
2
2
2
2
2


In [20]:
pmids = list(range(220_000))
chunk_size = 40_000
chunks = [pmids[i:i+chunk_size] for i in range(0, len(pmids), chunk_size)]
chunk_pairs = [[chunks[i], chunks[j]] for i in range(len(chunks)) for j in range(i, len(chunks))]

In [21]:
n = 0
for i in range(len(pmids)):
    for j in range(i+1, len(pmids)):
        pair = [pmids[i], pmids[j]]
        n += 1
print(n)

24199890000


In [23]:
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

def make_pairs(chunk_pair):
    n = 0
    for i in chunk_pair[0]:
        for j in chunk_pair[1]:
            if i < j:
                pair = [i, j]
                n += 1
    return n

# load all files in a directory into memory
def main(chunk_pairs):
    total_pairs = 0
    # create the thread pool
    with ThreadPoolExecutor(len(chunk_pairs)) as executor:
        # submit all tasks
        futures = [executor.submit(make_pairs, p) for p in chunk_pairs]
        # process all results
        for future in as_completed(futures):
            # open the file and load the data
            num_pairs = future.result()
            total_pairs += num_pairs
    print('Done', total_pairs)

# entry point
if __name__ == '__main__':
    main(chunk_pairs)

Done 24199890000
