Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use set_geometry from Dask DF and export as geopackage? #16

Closed
Guts opened this issue Aug 21, 2020 · 6 comments 路 Fixed by #17
Closed

How to use set_geometry from Dask DF and export as geopackage? #16

Guts opened this issue Aug 21, 2020 · 6 comments 路 Fixed by #17

Comments

@Guts
Copy link

Guts commented Aug 21, 2020

First of all, thanks for this work 馃憤.

Then, I'm trying to convert a Dask DataFrame into a GeoPandas, setting geometry from 2 columns (x,y) and I think there is a problem in the README instructions:

import dask.dataframe as dd
import dask_geopandas

df = dd.read_csv('...')

df = df.set_geometry(
    dask_geopandas.points_from_xy(df, 'latitude', 'longitude')
)

In that case, df is a dask dataframe and doesn't have attribute/method like set_geometry, raising a : AttributeError: 'DataFrame' object has no attribute 'set_geometry.

Digging into the source code, I've finally did:

geodf = dask_geopandas.from_dask_dataframe(df)

geodf = geodf.set_geometry(
    dask_geopandas.points_from_xy(
            df=geodf,
            x=geodf.x,
            y=geodf.y,
        )
)

But then, I'm not unable to export it as a geopackage. Am I missing something?

@jorisvandenbossche
Copy link
Member

Thanks for trying out and the feedback!

In that case, df is a dask dataframe and doesn't have attribute/method like set_geometry, raising a : AttributeError: 'DataFrame' object has no attribute 'set_geometry.

Hmm, yes, it seems that we didn't yet add this set_geometry method to the dask dataframe class. So what GeoPandas does is "monkeypatching" the normal pandas DataFrame to add a set_geometry which then returns a GeoDataFrame (so the above example would work with plain pandas/geopandas). But we didn't do that yet here. We should probably do that for consistency (although monkeypatching is also not the nicest solution).
But in any case, should update the example, as that is not working right now. What you did instead is the correct solution at the moment.

I'm not unable to export it as a geopackage. Am I missing something?

No, that's not yet implemented. Only writing to Parquet files is currently implemented.
I have a POC for reading GIS file formats (https://github.com/jsignell/dask-geopandas/issues/11 / https://nbviewer.jupyter.org/gist/jorisvandenbossche/c94bfc9626e622fda7285ed88a4d771a), but didn't yet try writing files. The main thing I am unsure about (and would need to research / experiment with) is how to let the different partitions write to a single file without concurrency / locking issues (as I suppose you will want a single GeoPackage file and not a directory of GeoPackage files?)

@Guts
Copy link
Author

Guts commented Aug 21, 2020

Thanks for your quick reply.

Thanks for your explanation. Until now, I used to chunk huge files (mostly csv) and stream into files (mode="a") or db.
GPKG beeing a SQLite wrapper, if I were to implement a complete workflow, I would try to come up with something like this:

import dask.dataframe as dd

chunked_df = dd.read_csv(
    "really_huge_geocsv.csv", 
    usecols=list(columns_definition.keys()),
    dtype=columns_definition,
)

chunked_geodf = dask_geopandas.from_dask_df(chunked_df)

for chunk in chunked_geodf:
        chunk.to_gpkg(layername='Table',
        if_exists='append',
        geometry_type="point", 
        x=chunked_df.long,
        y=chunked_df.lat,
        crs=32640
)

Still too new to these tools to really help you. Maybe using PyGEOS abilities? There is a small gpkg writer using it: https://github.com/brendan-ward/pgpkg but I still didn't try myself.

@jorisvandenbossche
Copy link
Member

jorisvandenbossche commented Aug 21, 2020

Thanks for that link to gpkg! Wasn't aware of that.
Regarding pygeos, I certainly recommend installing it (if installed, geopandas / dask_geopandas will use it automatically under the hood), as it makes things much faster / parallellizable.

I just was experimenting with a to_file for dask_geopandas:

from dask.delayed import delayed, tokenize


@delayed
def _extra_deps(func, *args, extras=None, **kwargs):
    return func(*args, **kwargs)


def to_file(df, path, driver="GPKG", parallel=False, compute=True, **kwargs):
    """
    Write to single file.
    
    Parameters
    ----------
    df : dask_geopandas.GeoDataFrame
    path : str
        Filename.
    parallel : bool, default False
        When true, have each block append itself to the DB table concurrently. This can result in DB rows being in a
        different order than the source DataFrame's corresponding rows. When false, load each block into the SQL DB in
        sequence.
    compute : bool, default True
        When true, call dask.compute and perform the load into SQL; otherwise, return a Dask object (or array of
        per-block objects when parallel=True)
    
    """
    # based on dask.dataframe's to_sql
    def make_meta(meta):
        return meta.to_file(path, driver=driver, mode="w", **kwargs)

    make_meta = delayed(make_meta)
    meta_task = make_meta(df._meta)

    # Partitions should always append to the empty file created from `meta` above
    worker_kwargs = dict(kwargs, driver=driver, mode="a")

    if parallel:
        # Perform the meta insert, then one task that inserts all blocks concurrently:
        result = [
            _extra_deps(
                d.to_file,
                path,
                extras=meta_task,
                **worker_kwargs,
                dask_key_name="to_file-%s" % tokenize(d, **worker_kwargs)
            )
            for d in df.to_delayed()
        ]
    else:
        # Chain the "meta" insert and each block's insert
        result = []
        last = meta_task
        for d in df.to_delayed():
            result.append(
                _extra_deps(
                    d.to_file,
                    path,
                    extras=last,
                    **worker_kwargs,
                    dask_key_name="to_file-%s" % tokenize(d, **worker_kwargs)
                )
            )
            last = result[-1]
    result = delayed(result)

    if compute:
        dask.compute(result, scheduler="processes")
    else:
        return result

And then you can use it like this:

to_file(gdf, "test.gpkg")

@jorisvandenbossche
Copy link
Member

Ah, and what I forgot to mention is that you need to change this one line in your fiona install: https://github.com/Toblerity/Fiona/pull/858/files (to enable "append" mode for GPKG), because that fix is not yet released

@Guts
Copy link
Author

Guts commented Aug 21, 2020

Thanks for that link to gpkg! Wasn't aware of that.

You're welcome!

Regarding pygeos, I certainly recommend installing it (if installed, geopandas / dask_geopandas will use it automatically under the hood), as it makes things much faster / parallellizable.

Yes it's well documented on geopandas, great tip!

I just was experimenting with a to_file for dask_geopandas:

to_file(gdf, "test.gpkg")

Nice! I'll give it a try if you need.

Ah, and what I forgot to mention is that you need to change this one line in your fiona install: https://github.com/Toblerity/Fiona/pull/858/files (to enable "append" mode for GPKG), because that fix is not yet released

Too bad a new version is not released, it's not always possible to install from Github in professional context.

@mziminski
Copy link

from dask.delayed import delayed, tokenize


@delayed
def _extra_deps(func, *args, extras=None, **kwargs):
    return func(*args, **kwargs)


def to_file(df, path, driver="GPKG", parallel=False, compute=True, **kwargs):  # replaced "GPKG" with "ESRI Shapefile"
    """
    Write to single file.
    
    Parameters
    ----------
    df : dask_geopandas.GeoDataFrame
    path : str
        Filename.
    parallel : bool, default False
        When true, have each block append itself to the DB table concurrently. This can result in DB rows being in a
        different order than the source DataFrame's corresponding rows. When false, load each block into the SQL DB in
        sequence.
    compute : bool, default True
        When true, call dask.compute and perform the load into SQL; otherwise, return a Dask object (or array of
        per-block objects when parallel=True)
    
    """
    # based on dask.dataframe's to_sql
    def make_meta(meta):
        return meta.to_file(path, driver=driver, mode="w", **kwargs)

    make_meta = delayed(make_meta)
    meta_task = make_meta(df._meta)

    # Partitions should always append to the empty file created from `meta` above
    worker_kwargs = dict(kwargs, driver=driver, mode="a")

    if parallel:
        # Perform the meta insert, then one task that inserts all blocks concurrently:
        result = [
            _extra_deps(
                d.to_file,
                path,
                extras=meta_task,
                **worker_kwargs,
                dask_key_name="to_file-%s" % tokenize(d, **worker_kwargs)
            )
            for d in df.to_delayed()
        ]
    else:
        # Chain the "meta" insert and each block's insert
        result = []
        last = meta_task
        for d in df.to_delayed():
            result.append(
                _extra_deps(
                    d.to_file,
                    path,
                    extras=last,
                    **worker_kwargs,
                    dask_key_name="to_file-%s" % tokenize(d, **worker_kwargs)
                )
            )
            last = result[-1]
    result = delayed(result)

    if compute:
        dask.compute(result, scheduler="processes")
    else:
        return result

And then you can use it like this:

to_file(gdf, "test.gpkg")  # replaced .gpkg with .shp

I understand this code is most definitely still experimental, so I tried to modify it slightly to work with ESRI Shapefiles (please reference the comments in the above code), but I got these errors for both the gpkg and shp versions of the above code:

---------------------------------------------------------------------------
BrokenProcessPool                         Traceback (most recent call last)
~/../some_dir/test.py in 
     183 
     184 
---> 185 to_file(gdf, 'test.gpkg')
     186 # to_file(gdf, 'test.shp')

~/../some_dir/test.py in to_file(df, path, driver, parallel, compute, **kwargs)
     178 
     179     if compute:
---> 180         dask.compute(result, scheduler="processes")
     181     else:
     182         return result

~/opt/anaconda3/envs/geo-tools/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

~/opt/anaconda3/envs/geo-tools/lib/python3.7/site-packages/dask/multiprocessing.py in get(dsk, keys, num_workers, func_loads, >func_dumps, optimize_graph, pool, chunksize, **kwargs)
    228             raise_exception=reraise,
    229             chunksize=chunksize,
--> 230             **kwargs
    231         )
    232     finally:

~/opt/anaconda3/envs/geo-tools/lib/python3.7/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, >get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    501             while state["waiting"] or state["ready"] or state["running"]:
    502                 fire_tasks(chunksize)
--> 503                 for key, res_info, failed in queue_get(queue).result():
    504                     if failed:
    505                         exc, tb = loads(res_info)

~/opt/anaconda3/envs/geo-tools/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    426                 raise CancelledError()
    427             elif self._state == FINISHED:
--> 428                 return self.__get_result()
    429 
    430             self._condition.wait(timeout)

~/opt/anaconda3/envs/geo-tools/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

I just started looking into Dask recently. It's still new to me, so I don't really get this error message and I'd like to know if there's an "easy" fix/workaround to make the to_file function work with ESRI shapefiles?

Please let me know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants