Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] cannot create blazingcontext for async dask_cudf clients #1272

Open
lmeyerov opened this issue Dec 29, 2020 · 5 comments
Open

[BUG] cannot create blazingcontext for async dask_cudf clients #1272

lmeyerov opened this issue Dec 29, 2020 · 5 comments
Labels
? - Needs Triage needs team to review and classify bug Something isn't working
Projects

Comments

@lmeyerov
Copy link
Collaborator

lmeyerov commented Dec 29, 2020

Describe the bug

Passing in an asynchronous=True dask_cudf client to BlazingContext() throws an exception

This is unfortunate as:

  • async feat is good for sw (apps, dashboards, ..): we're using dask_cudf + async clients to make rapids stack less of a bottleneck, so bsql calls break this benefit

  • memory waste: for single-node (incl. multi-gpu), this means having to create 2 localcudaclusters: async clients need async clusters, and vice versa for sync (afaict!)

Steps/Code to reproduce bug

import pandas as pd, pytest
pytestmark = pytest.mark.asyncio

@pytest.mark.timeout(30)
def test_blazing_sync_client():
    import cudf
    from blazingsql import BlazingContext
    from dask.distributed import Client
    from dask_cuda import LocalCUDACluster
    with LocalCUDACluster(
            asynchronous = False,
            dashboard_address = None, #Otherwise > raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: doc_js.js
            processes = True, #multigpu
        ) as cluster:
        with Client(address=cluster, asynchronous=False) as client:
            bc = BlazingContext(client)
            pdf = pd.DataFrame({'a': [0, 1, 2, 3],'b': [1, 2, 2, 3]})
            gdf = cudf.DataFrame.from_pandas(pdf)
            bc.create_table('bg_test_dot_product', gdf)
            dgdf2 = bc.sql('SELECT SUM(bg_test_dot_product.a) AS my_sum FROM bg_test_dot_product')
            gdf2 = dgdf2.compute()
            assert 6 == gdf2['my_sum'][0]


@pytest.mark.timeout(30)
async def test_blazing_async_client():
    import cudf
    from blazingsql import BlazingContext
    from dask.distributed import Client
    from dask_cuda import LocalCUDACluster
    async with await LocalCUDACluster(
            asynchronous = True,
            dashboard_address = None, #Otherwise > raise TemplateNotFound(template) jinja2.exceptions.TemplateNotFound: doc_js.js
            processes = True, #multigpu
        ) as cluster:
        async with await Client(address=cluster, asynchronous=True) as client:
            bc = BlazingContext(client)
            pdf = pd.DataFrame({'a': [0, 1, 2, 3],'b': [1, 2, 2, 3]})
            gdf = cudf.DataFrame.from_pandas(pdf)
            bc.create_table('bg_test_dot_product', gdf)
            dgdf2 = bc.sql('SELECT SUM(bg_test_dot_product.a) AS my_sum FROM bg_test_dot_product')
            gdf2 = dgdf2.compute()
            assert 6 == gdf2['my_sum'][0]

The second test fails with:


test/server/test_blazing.py:66: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/conda/envs/rapids/lib/python3.7/site-packages/pyblazing/apiv2/context.py:1418: in __init__
    distributed_initialize_server_directory(self.dask_client, logging_dir_path)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

client = <Client: not connected>, dir_path = 'blazing_log'

    def distributed_initialize_server_directory(client, dir_path):
    
        # We are going to differentiate the two cases. When path is absolute,
        # we do the logging folder creation only once per host (server).
        # When path is relative, we have to group the workers according
        # to whether they have the same current working directory,
        # so, a unique folder will be created for each sub common cwd set.
    
        all_items = client.scheduler_info()["workers"].items()
    
        is_absolute_path = os.path.isabs(dir_path)
    
        import re
    
        if is_absolute_path:
            # Let's group the workers by host_name
            host_worker_dict = {}
            for worker, worker_info in all_items:
                host_name = re.findall(r"[0-9]+(?:\.[0-9]+){3}", worker)[0]
                if host_name not in host_worker_dict.keys():
                    host_worker_dict[host_name] = [worker]
                else:
                    host_worker_dict[host_name].append(worker)
    
            dask_futures = []
            for host_name, worker_list in host_worker_dict.items():
                dask_futures.append(
                    client.submit(
                        initialize_server_directory,
                        dir_path,
                        True,
                        workers=[worker_list[0]],
                        pure=False,
                    )
                )
    
            for connection in dask_futures:
                made_dir = connection.result()
                if not made_dir:
                    get_blazing_logger(is_dask=False).info("Directory already exists")
        else:
            # Let's get the current working directory of all workers
            dask_futures = []
            for worker, worker_info in all_items:
                dask_futures.append(
                    client.submit(get_current_directory_path, workers=[worker], pure=False)
                )
    
            current_working_dirs = client.gather(dask_futures)
    
            # Let's group the workers by host_name and by common cwd
            host_worker_dict = {}
>           for worker_key, cwd in zip(all_items, current_working_dirs):
E           TypeError: zip argument #2 must support iteration

/conda/envs/rapids/lib/python3.7/site-packages/pyblazing/apiv2/context.py:726: TypeError

Expected behavior

Both tests pass

Environment overview (please complete the following information)

docker w/ 10.2 -> conda -> rapids 0.17

----For BlazingSQL Developers----
Suspected source of the issue
Where and what are potential sources of the issue

Other design considerations
What components of the engine could be affected by this?

@lmeyerov lmeyerov added bug Something isn't working ? - Needs Triage needs team to review and classify labels Dec 29, 2020
@lmeyerov
Copy link
Collaborator Author

lmeyerov commented Dec 29, 2020

I'm wondering if there's a ~cheap workaround here, like:

  • async def proxiedLocalCUDAClusterClient() -> client:
    • returns an async dask client to an async dask cluster w/ 1 worker, w/ in Process mode
    • side effect of creating a sync cuda cluster + sync cuda client on that worker
  • proxy stubs for conveniently calling BlazingContext(), create_table, sql, (anything else?) on the worker
    • dgdf = await client.run( bcProxy.sql(...) )

Or if it's not hard to get the context creator to use an async client. (Though maybe that's a deep assumption in bsql python, so should use the stub approach?)

@lmeyerov
Copy link
Collaborator Author

lmeyerov commented Dec 29, 2020

Thinking a bit more: The proxy isn't that great because it still has the issue of 2 localcudaclusters

For our use case, we're moving to starting an a separate cuda cluster process (same-node) so mult processes can hit it. I think the proxy solution will work then: async dask client -> sync in-worker dask client -> 'remote' cluster.

So we'll do sync for now, and once our cluster service is up, revisit some sort of async proxy. Our bsql calls are fairly thin async def run_query(query : str, table: Dict[str, Union[cudf.DataFrame, dask_cudf.DataFrame]]) : Union[cudf.DataFrame, dask_cudf.DataFrmae] -- so am optimistic.

@lmeyerov
Copy link
Collaborator Author

@felipeblazing The return_futures=True was just what I needed. AFAICT this makes the blazingsql calls non-blocking -- functional tests seem to pass, but not benchmarked:

async def collect_bsql_futures(sync_client: Client, bc: BlazingContext, dask_futures : list, as_gdf = False) -> Union[cudf.DataFrame, dask_cudf.DataFrame]:
    """
        :param sync_client: dask.distributed.Client with asynchronous=False
        :param bc: BlazingContext with dask_cudf cluster
        :param dask_futures: Result of a bc.sql(..., return_futures=True) call
        :param as_gudf: Whether to return dask_cudf.DataFrame or cudf.DataFrame

        Async call of bsql, returning as original dgdf, or optionally, post-processed to gdf 

        May also need  'dask.config.set(scheduler=gpu_client_sync)' calls

        Examples:
            dgdf = await collect_bsql_futures(sybc_client, bc.sql(qry, return_futures=True))
            gdf = await collect_bsql_futures(sync_client, bc.sql(qry, return_futures=True), as_gdf = True)
    """
    from pyblazing.apiv2.context import distributed_remove_orc_files_from_disk

    logger.info('@collect_bsql_futures')
    #dask.config.set(scheduler=client)
    try:
        meta_results : list = await sync_client.gather(dask_futures, asynchronous=True)
        #meta_results : list = sync_client.gather(dask_futures)
        logger.debug('meta :: %s = %s', type(meta_results), meta_results)
    except Exception as e:
        logger.error('exn running query, cleaning up old orc file cache', exc_info=True)
        try:
            #FIXME somehow get txn id and plug in here?
            distributed_remove_orc_files_from_disk(sync_client, bc.cache_dir_path)
            logger.debug('... Cleaned up')
        except Exception as e2:
            logger.error('Failed cleanup of failed bsql query', exc_info=True)
        raise e

    futures : list = []
    for query_partids, meta, worker_id in meta_results:
        logger.info('meta results meta list item: %s', meta)
        for query_partid in query_partids:
            futures.append(sync_client.submit(
                pop_worker_query_part,
                query_partid,
                workers=[worker_id],
                pure=False))
    logger.debug('collected futures: %s', futures)
    result : dask_cudf.DataFrame = dask.dataframe.from_delayed(futures, meta=meta)
    logger.info('from_delayed result :: %s = %s', type(result), result)
    if as_gdf:
        #gdf2 = result.compute()
        [gdf2] = await sync_client.gather([result.compute(compute=False)], asynchronous=True )
        logger.debug('gdf2::%s', type(gdf2))
        logger.info('////collect_bsql_futures gdf')
        return gdf2
    else:
        logger.info('////collect_bsql_futures dgdf')
        return result

Passing tests:

@pytest.mark.timeout(30)
def test_bsql_sync_warmup(bc, gpu_client_sync):
    assert True

@pytest.mark.timeout(30)
def test_bsql_async_warmup(gpu_client):
    assert True

@pytest.mark.timeout(30)
def test_bsql_default(bc, gpu_client_sync):
    dask.config.set(scheduler=gpu_client_sync)
    dgdf = bc.sql('SELECT SUM(bc_sample.a) AS my_sum FROM bc_sample')
    gdf = dgdf.compute()
    assert gdf['my_sum'][0] == 6 

@pytest.mark.timeout(30)
async def test_collect_bsql_futures_dgdf(bc, gpu_client_sync, gpu_client):
    logger.debug('@test_collect_bsql_futures_dgdf')
    dask.config.set(scheduler=gpu_client_sync)
    dask_futures = bc.sql('SELECT SUM(bc_sample.a) AS my_sum FROM bc_sample', return_futures=True)
    logger.info('got futures: %s', dask_futures)
    dgdf2 = await collect_bsql_futures(gpu_client_sync, bc, dask_futures)
    logger.info('collected dgdf2: %s', dgdf2)
    gdf2 = dgdf2.compute()
    assert gdf2['my_sum'][0] == 6

@pytest.mark.timeout(30)
async def test_collect_bsql_futures_gdf(bc, gpu_client_sync, gpu_cluster):
    from server.util.dask_cudf import dask_cudf_init_helper_sync, make_cluster, UserClient
    async with await UserClient(f'async_{__name__}', allow_reset=False, address=gpu_cluster) as gpu_client:
        dask.config.set(scheduler=gpu_client_sync)
        
        dask_futures = bc.sql('SELECT SUM(bc_sample.a) AS my_sum FROM bc_sample', return_futures=True)
        gdf2 = await collect_bsql_futures(gpu_client_sync, bc, dask_futures, as_gdf=True)        
        assert gdf2['my_sum'][0] == 6

@wmalpica wmalpica added this to Needs prioritizing in Scrum board Jan 4, 2021
@felipeblazing felipeblazing moved this from Needs prioritizing to Next Release in Scrum board Jan 22, 2021
@wmalpica wmalpica added ? - Needs Triage needs team to review and classify and removed ? - Needs Triage needs team to review and classify labels Jan 22, 2021
@wmalpica
Copy link
Contributor

@lmeyerov I dont think we will be able to support asynchronous=True dask_cudf clients for a while. Its very non-trivial.

Additionally, a recent PR #1289 has gotten rid of return_futures. We will be instead implementing very soon a more native way of being able to run concurrent queries using the same BlazingContext
#1290
Feel free to comment on the proposed APIs. We will be tackling this soon

@lmeyerov
Copy link
Collaborator Author

lmeyerov commented Jan 23, 2021

Thanks! For our use case, where we're currently at:

  • we're using a sync bsql client to a remote dask cluster
    • ... so async dask_cudf + sync bsql clients can now share GPU mem! just gotta publish similar to above
  • wrapping concurrent bsql calls in dask.distributed.Lock <- solved in RAPIDS 0.18
  • bsql calls still blocking :( not a deal breaker b/c we have 4+ python processes per GPU. I would think that it wouldn't be so bad to solve for the remote dask worker case, but I didn't understand the sql entrypoint code enough to make it obvious to me.
    • interestingly, the dask.distributed.Lock at least gives async for the queued-but-not-actively-processing bsql clients, b/c locks are async!
  • bsql tasks bigger-than-memory datasets
  • bsql cannot bigger-than-memory dask cudf dataframes <- imo, more important than fixing blocking issue b/c we can at least work around sync via threads/processes, but not bigger-than-memory

FWIW, I'm about to get back to large dataset testing w/ bsql b/c of our big file loader feature + customer proveouts. I'm guessing more imp will be stuff like making sure groupbys work, and ideally, we can get big datasets back as part of a pipelie :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
? - Needs Triage needs team to review and classify bug Something isn't working
Projects
Scrum board
  
Next Release
Development

No branches or pull requests

2 participants