Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue when reprojecting multiple CSV files #29

Open
bilelomrani1 opened this issue Oct 19, 2020 · 2 comments
Open

Issue when reprojecting multiple CSV files #29

bilelomrani1 opened this issue Oct 19, 2020 · 2 comments

Comments

@bilelomrani1
Copy link

I have multiple CSV files opened with dask as is:

import dask.dataframe as dd
import dask_geopandas

df = dd.read_csv('csv/*_timeseries.csv')
gdf = dask_geopandas.from_dask_dataframe(df)
gdf = gdf.set_geometry(
    dask_geopandas.points_from_xy(gdf, x='Longitude', y='Latitude')
).set_crs('epsg:4326').to_crs('epsg:3395')

When invoking gdf.compute(), the following exception is raised:

------------------------------------------------------------------------
ProjError                              Traceback (most recent call last)
<ipython-input-251-f87bb3768545> in <module>
----> 1 gdf.compute()

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    165         dask.base.compute
    166         """
--> 167         (result,) = compute(self, traverse=False, **kwargs)
    168         return result
    169 

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    450         postcomputes.append(x.__dask_postcompute__())
    451 
--> 452     results = schedule(dsk, keys, **kwargs)
    453     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    454 

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2723                     should_rejoin = False
   2724             try:
-> 2725                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2726             finally:
   2727                 for f in futures.values():

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1984             else:
   1985                 local_worker = None
-> 1986             return self.sync(
   1987                 self._gather,
   1988                 futures,

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    830             return future
    831         else:
--> 832             return sync(
    833                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    834             )

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

~/.local/lib/python3.8/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1849                             exc = CancelledError(key)
   1850                         else:
-> 1851                             raise exception.with_traceback(traceback)
   1852                         raise exc
   1853                     if errors == "skip":

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/optimization.py in __call__()
    959         if not len(args) == len(self.inkeys):
    960             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 961         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    962 
    963     def __reduce__(self):

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/core.py in get()
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/core.py in _execute_task()
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/utils.py in apply()
     27 def apply(func, args, kwargs=None):
     28     if kwargs:
---> 29         return func(*args, **kwargs)
     30     else:
     31         return func(*args)

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/dataframe/core.py in apply_and_enforce()
   5296     func = kwargs.pop("_func")
   5297     meta = kwargs.pop("_meta")
-> 5298     df = func(*args, **kwargs)
   5299     if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
   5300         if not len(df):

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/dask/utils.py in __call__()
    891 
    892     def __call__(self, obj, *args, **kwargs):
--> 893         return getattr(obj, self.method)(*args, **kwargs)
    894 
    895     def __reduce__(self):

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/geopandas/geodataframe.py in to_crs()
    814         else:
    815             df = self.copy()
--> 816         geom = df.geometry.to_crs(crs=crs, epsg=epsg)
    817         df.geometry = geom
    818         df.crs = geom.crs

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/geopandas/geoseries.py in to_crs()
    541         transformer = Transformer.from_crs(self.crs, crs, always_xy=True)
    542 
--> 543         new_data = vectorized.transform(self.values.data, transformer.transform)
    544         return GeoSeries(
    545             GeometryArray(new_data), crs=crs, index=self.index, name=self.name

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/geopandas/_vectorized.py in transform()
    877     if compat.USE_PYGEOS:
    878         coords = pygeos.get_coordinates(data)
--> 879         new_coords = func(coords[:, 0], coords[:, 1])
    880         result = pygeos.set_coordinates(data.copy(), np.array(new_coords).T)
    881         return result

~/opt/miniconda3/envs/gis/lib/python3.8/site-packages/pyproj/transformer.py in transform()
    428             intime = None
    429         # call pj_transform.  inx,iny,inz buffers modified in place.
--> 430         self._transformer._transform(
    431             inx,
    432             iny,

pyproj/_transformer.pyx in pyproj._transformer._Transformer._transform()

ProjError: x, y, z, and time must be same size

The exception disapears when df if a single csv file df = dd.read_csv('csv/2019_timeseries.csv')

@felixlapalma
Copy link

Hi, I was trying to reproduce the issue with some generated data but i could not. Maybe am I missing something?

# make some data
import os
import pandas as pd
import numpy as np

make_data=True
lat_lon_size=100000
csvs_num=10

if make_data:
    #first make some data
    np.random.seed(1234)
    #
    lat=np.random.random(lat_lon_size)*(10) - 37
    lon=np.random.random(lat_lon_size)*(10) - 64
    #
    df_=pd.DataFrame({'id':1,'Latitude':lat,'Longitude':lon})
    # dump dir
    os.makedirs('./csv',exist_ok=True)

    for i,f in enumerate(np.random.random(csvs_num)):
        df_.sample(frac=f).to_csv('./csv/test_{}.csv'.format(str(i)),index=False)


### Issue code 
import dask.dataframe as dd
import dask_geopandas
        
# from issue  just replace *_timeseries.csv' by test_*.csv
df = dd.read_csv('csv/test_*.csv')
gdf = dask_geopandas.from_dask_dataframe(df)
gdf = gdf.set_geometry(
    dask_geopandas.points_from_xy(gdf, x='Longitude', y='Latitude')
).set_crs('epsg:4326').to_crs('epsg:3395')
#
gdf.compute() 

which ends up with no errors.

@bilelomrani1
Copy link
Author

Sorry for the delay. The snippet you provided indeed works on my machine. Now my code works fine on the latest version of dask-geopandas. Maybe the mistake was on my side in the first place or the latest version fixed the issue. Anyway thank you very much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants