Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistent behaviour when using map_partitions with DataFrames as arguments #3973

Open
avnovikov opened this issue Sep 11, 2018 · 2 comments

Comments

@avnovikov
Copy link

When trying to use map_partitions and passing GeoDataFrames as an argument I run into inconsistent behaviour of DASK when the DASK DataFrame is created from delayed. When passing raw DataFrame it throws error about unknown divisions, when first scattering this DataFrame and only then passing It as an argument everything works as (un)expected. Sorry for the long description, but I spent quite a long time trying to figure out what went wrong after DASK upgrade (from 0.17)

import pandas as pd
import numpy as np

import geopandas as gpd
from dask import dataframe as dd
from distributed import Client
from shapely.geometry import Point
from dask import delayed

def find_fids(df, gdf_harbours):   #, gdf_anchorage, gdf_faculty, gdf_berths, gdf_buoy):
    print("This is partition with {} points".format(len(df.index)))
    m_df = df[['lat', 'lon', 'port_id', 'anch_id', 'fac_id', 'berth_id', 'boy_id']]
    m_df = m_df.reset_index()
    m_df.loc[:,'port_id'] = 1
    m_df = m_df.set_index('index')
    print("Finished partition")
    return m_df[['port_id', 'anch_id', 'fac_id', 'berth_id', 'boy_id']]

def rnp(length):
    data_pos = (np.random.rand(length,2)-0.5)*90
    data_id = -np.ones((length,5))
    data = np.concatenate((data_pos, data_id), axis=1)
    return pd.DataFrame(data = data, columns=['lat', 'lon', 'port_id', 'anch_id', 'fac_id', 'berth_id', 'boy_id'])

def get_fdf():
    lazy_dataframes=[]
    for i in range(10000):
        ndf = delayed(rnp)(1000)
        lazy_dataframes.append(ndf)
    ddf = dd.from_delayed(lazy_dataframes, meta=lazy_dataframes[0].compute())
    ldf = ddf.copy()
    ddf = ddf.append(ldf)
    return ddf

client = Client('127.0.0.1:8786', processes=True)

length = 10000
data_pos = (np.random.rand(length,2)-0.5)*90
z = list(map(Point, data_pos))
harbours = gpd.GeoDataFrame({'116': list(np.ones((length, 1))), 'geometry': z}, crs={'init': 'epsg:4326'})

sharbours = client.scatter(harbours)
ddf = client.persist(get_fdf())
ddf[['port_id', 'anch_id', 'fac_id', 'berth_id', 'boy_id']] = ddf.map_partitions(
            find_fids, sharbours, meta=pd.DataFrame({'port_id':[-1], 'anch_id':[-1], 'fac_id':[-1], 
                                                                'berth_id':[-1], 'boy_id':[-1]}))
df = ddf.compute()

Works fine. However if I the last part will be only

ddf = client.persist(get_fdf())
ddf[['port_id', 'anch_id', 'fac_id', 'berth_id', 'boy_id']] = ddf.map_partitions(
            find_fids, harbours, meta=pd.DataFrame({'port_id':[-1], 'anch_id':[-1], 'fac_id':[-1], 
                                                                'berth_id':[-1], 'boy_id':[-1]}))
df = ddf.compute()

then I receive the following error

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-30-5088b0035b38> in <module>()
      1 ddf[['port_id', 'anch_id', 'fac_id', 'berth_id', 'boy_id']] = ddf.map_partitions(
      2             find_fids, harbours, meta=pd.DataFrame({'port_id':[-1], 'anch_id':[-1], 'fac_id':[-1], 
----> 3                                                                 'berth_id':[-1], 'boy_id':[-1]}))
      4 df = ddf.compute()

/anaconda3/envs/mariquant/lib/python3.6/site-packages/dask/dataframe/core.py in map_partitions(self, func, *args, **kwargs)
    579         >>> ddf.map_partitions(func).clear_divisions()  # doctest: +SKIP
    580         """
--> 581         return map_partitions(func, self, *args, **kwargs)
    582 
    583     @insert_meta_param_description(pad=12)

/anaconda3/envs/mariquant/lib/python3.6/site-packages/dask/dataframe/core.py in map_partitions(func, *args, **kwargs)
   3614     from .multi import _maybe_align_partitions
   3615     args = _maybe_from_pandas(args)
-> 3616     args = _maybe_align_partitions(args)
   3617 
   3618     if meta is no_default:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/dask/dataframe/multi.py in _maybe_align_partitions(args)
    145     divisions = dfs[0].divisions
    146     if not all(df.divisions == divisions for df in dfs):
--> 147         dfs2 = iter(align_partitions(*dfs)[0])
    148         return [a if not isinstance(a, _Frame) else next(dfs2) for a in args]
    149     return args

/anaconda3/envs/mariquant/lib/python3.6/site-packages/dask/dataframe/multi.py in align_partitions(*dfs)
    101         raise ValueError("dfs contains no DataFrame and Series")
    102     if not all(df.known_divisions for df in dfs1):
--> 103         raise ValueError("Not all divisions are known, can't align "
    104                          "partitions. Please use `set_index` "
    105                          "to set the index.")

ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.

If I try to set index (adding) the following line

ddf = client.persist(ddf.reset_index().set_index('index'))
it takes forever and when killed gives the following trace

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-12-2911203558a9> in <module>()
----> 1 ddf = client.persist(ddf.reset_index().set_index('index'))

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/client.py in persist(self, collections, optimize_graph, workers, allow_other_workers, resources, retries, priority, fifo_timeout, actors, **kwargs)
   2469                                          user_priority=priority,
   2470                                          fifo_timeout=fifo_timeout,
-> 2471                                          actors=actors)
   2472 
   2473         postpersists = [c.__dask_postpersist__() for c in collections]

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
   2109                 dsk = dask.optimization.inline(dsk, keys=values)
   2110 
-> 2111             d = {k: unpack_remotedata(v, byte_keys=True) for k, v in dsk.items()}
   2112             extra_futures = set.union(*[v[1] for v in d.values()]) if d else set()
   2113             extra_keys = {tokey(future.key) for future in extra_futures}

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/client.py in <dictcomp>(.0)
   2109                 dsk = dask.optimization.inline(dsk, keys=values)
   2110 
-> 2111             d = {k: unpack_remotedata(v, byte_keys=True) for k, v in dsk.items()}
   2112             extra_futures = set.union(*[v[1] for v in d.values()]) if d else set()
   2113             extra_keys = {tokey(future.key) for future in extra_futures}

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in unpack_remotedata(o, byte_keys, myset)
    173     if myset is None:
    174         myset = set()
--> 175         out = unpack_remotedata(o, byte_keys, myset)
    176         return out, myset
    177 

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in unpack_remotedata(o, byte_keys, myset)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in <listcomp>(.0)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in unpack_remotedata(o, byte_keys, myset)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in <listcomp>(.0)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in unpack_remotedata(o, byte_keys, myset)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in <listcomp>(.0)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in unpack_remotedata(o, byte_keys, myset)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in <listcomp>(.0)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in unpack_remotedata(o, byte_keys, myset)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in <listcomp>(.0)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in unpack_remotedata(o, byte_keys, myset)
    176         return out, myset
    177 
--> 178     typ = type(o)
    179 
    180     if typ in collection_types:

if done without persist, i.e.
ddf = ddf.reset_index().set_index('index')
then it got stack on map_partitions with unscattered geodataframe and when killed produce the following trace:


---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-14-5088b0035b38> in <module>()
      2             find_fids, harbours, meta=pd.DataFrame({'port_id':[-1], 'anch_id':[-1], 'fac_id':[-1], 
      3                                                                 'berth_id':[-1], 'boy_id':[-1]}))
----> 4 df = ddf.compute()

/anaconda3/envs/mariquant/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

/anaconda3/envs/mariquant/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    393     keys = [x.__dask_keys__() for x in collections]
    394     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 395     results = schedule(dsk, keys, **kwargs)
    396     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    397 

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
   2202                                          fifo_timeout=fifo_timeout,
   2203                                          retries=retries,
-> 2204                                          user_priority=priority,
   2205                                          )
   2206         packed = pack_data(keys, futures)

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
   2107                       and k not in keyset}
   2108             if values:
-> 2109                 dsk = dask.optimization.inline(dsk, keys=values)
   2110 
   2111             d = {k: unpack_remotedata(v, byte_keys=True) for k, v in dsk.items()}

/anaconda3/envs/mariquant/lib/python3.6/site-packages/dask/optimization.py in inline(dsk, keys, inline_constants, dependencies)
    249     if dependencies is None:
    250         dependencies = {k: get_dependencies(dsk, k)
--> 251                         for k in dsk}
    252 
    253     if inline_constants:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/dask/optimization.py in <dictcomp>(.0)
    249     if dependencies is None:
    250         dependencies = {k: get_dependencies(dsk, k)
--> 251                         for k in dsk}
    252 
    253     if inline_constants:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/dask/core.py in get_dependencies(dsk, key, task, as_list)
    199         new_work = []
    200         for w in work:
--> 201             typ = type(w)
    202             if typ is tuple and w and callable(w[0]):  # istask(w)
    203                 new_work += w[1:]

KeyboardInterrupt: 

I run with scattered geodataframe, then it got stack on map_partitions, consumes all the memory (in real life example) and when killed produce the following trace:

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-12-9df5c63458da> in <module>()
      2             find_fids, sharbours, meta=pd.DataFrame({'port_id':[-1], 'anch_id':[-1], 'fac_id':[-1], 
      3                                                                 'berth_id':[-1], 'boy_id':[-1]}))
----> 4 df = ddf.compute()

/anaconda3/envs/mariquant/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

/anaconda3/envs/mariquant/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    393     keys = [x.__dask_keys__() for x in collections]
    394     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 395     results = schedule(dsk, keys, **kwargs)
    396     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    397 

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
   2202                                          fifo_timeout=fifo_timeout,
   2203                                          retries=retries,
-> 2204                                          user_priority=priority,
   2205                                          )
   2206         packed = pack_data(keys, futures)

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
   2109                 dsk = dask.optimization.inline(dsk, keys=values)
   2110 
-> 2111             d = {k: unpack_remotedata(v, byte_keys=True) for k, v in dsk.items()}
   2112             extra_futures = set.union(*[v[1] for v in d.values()]) if d else set()
   2113             extra_keys = {tokey(future.key) for future in extra_futures}

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/client.py in <dictcomp>(.0)
   2109                 dsk = dask.optimization.inline(dsk, keys=values)
   2110 
-> 2111             d = {k: unpack_remotedata(v, byte_keys=True) for k, v in dsk.items()}
   2112             extra_futures = set.union(*[v[1] for v in d.values()]) if d else set()
   2113             extra_keys = {tokey(future.key) for future in extra_futures}

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in unpack_remotedata(o, byte_keys, myset)
    173     if myset is None:
    174         myset = set()
--> 175         out = unpack_remotedata(o, byte_keys, myset)
    176         return out, myset
    177 

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in unpack_remotedata(o, byte_keys, myset)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in <listcomp>(.0)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in unpack_remotedata(o, byte_keys, myset)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in <listcomp>(.0)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in unpack_remotedata(o, byte_keys, myset)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in <listcomp>(.0)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in unpack_remotedata(o, byte_keys, myset)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in <listcomp>(.0)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in unpack_remotedata(o, byte_keys, myset)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in <listcomp>(.0)
    181         if not o:
    182             return o
--> 183         outs = [unpack_remotedata(item, byte_keys, myset) for item in o]
    184         return type(o)(outs)
    185     elif typ is dict:

/anaconda3/envs/mariquant/lib/python3.6/site-packages/distributed/utils_comm.py in unpack_remotedata(o, byte_keys, myset)
    176         return out, myset
    177 
--> 178     typ = type(o)
    179 
    180     if typ in collection_types:

KeyboardInterrupt: 
@TomAugspurger
Copy link
Member

Is this caused by the same issue as #3972 (comment)?

@avnovikov
Copy link
Author

avnovikov commented Sep 11, 2018

Seems not, as

  1. passing GeoPandas as an argument for the map_partitions was working fine at least in 0.17.x. It seems that this was working for quite a while as there are numerous examples of map_partitions with geodataframes (not necessary as arguments) literally all over the net,
  2. passing Geo DataFrame with the from_pandas dask DataFrame (in the toy example) is still working,
  3. map_partitions with scattered GeoDataFrame works in toy example and I almost managed to make it working in real-life.

Just for the cross-reference - my answer to your comment on probably another issue #3972 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants