-
-
Notifications
You must be signed in to change notification settings - Fork 394
/
spatialpandas_dask.py
89 lines (71 loc) · 2.81 KB
/
spatialpandas_dask.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import sys
import numpy as np
from .dask import DaskInterface
from .interface import Interface
from .spatialpandas import SpatialPandasInterface
class DaskSpatialPandasInterface(SpatialPandasInterface):
base_interface = DaskInterface
datatype = 'dask_spatialpandas'
@classmethod
def loaded(cls):
return 'spatialpandas.dask' in sys.modules
@classmethod
def data_types(cls):
from spatialpandas.dask import DaskGeoDataFrame, DaskGeoSeries
return (DaskGeoDataFrame, DaskGeoSeries)
@classmethod
def series_type(cls):
from spatialpandas.dask import DaskGeoSeries
return DaskGeoSeries
@classmethod
def frame_type(cls):
from spatialpandas.dask import DaskGeoDataFrame
return DaskGeoDataFrame
@classmethod
def init(cls, eltype, data, kdims, vdims):
import dask.dataframe as dd
data, dims, params = super().init(
eltype, data, kdims, vdims
)
if not isinstance(data, cls.frame_type()):
data = dd.from_pandas(data, npartitions=1)
return data, dims, params
@classmethod
def partition_values(cls, df, dataset, dimension, expanded, flat):
ds = dataset.clone(df, datatype=['spatialpandas'])
return ds.interface.values(ds, dimension, expanded, flat)
@classmethod
def values(cls, dataset, dimension, expanded=True, flat=True, compute=True, keep_index=False):
if compute and not keep_index:
dtype = cls.dtype(dataset, dimension)
meta = np.array([], dtype=dtype.base)
return dataset.data.map_partitions(
cls.partition_values, meta=meta, dataset=dataset,
dimension=dimension, expanded=expanded, flat=flat
).compute()
values = super().values(
dataset, dimension, expanded, flat, compute, keep_index
)
if compute and not keep_index and hasattr(values, 'compute'):
return values.compute()
return values
@classmethod
def split(cls, dataset, start, end, datatype, **kwargs):
ds = dataset.clone(dataset.data.compute(), datatype=['spatialpandas'])
return ds.interface.split(ds, start, end, datatype, **kwargs)
@classmethod
def iloc(cls, dataset, index):
rows, cols = index
if rows is not None:
raise NotImplementedError
return super().iloc(dataset, index)
@classmethod
def add_dimension(cls, dataset, dimension, dim_pos, values, vdim):
return cls.base_interface.add_dimension(dataset, dimension, dim_pos, values, vdim)
@classmethod
def dframe(cls, dataset, dimensions):
if dimensions:
return dataset.data[dimensions].compute()
else:
return dataset.data.compute()
Interface.register(DaskSpatialPandasInterface)