-
Notifications
You must be signed in to change notification settings - Fork 63
/
dataset.py
192 lines (147 loc) · 6.03 KB
/
dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
from __future__ import absolute_import
import pandas as pd
import geopandas as gpd
from shapely import wkt
from cartoframes.data import Dataset as CFDataset
from .entity import CatalogEntity
from .repository.dataset_repo import get_dataset_repo
from .repository.geography_repo import get_geography_repo
from .repository.variable_repo import get_variable_repo
from .repository.variable_group_repo import get_variable_group_repo
from .repository.constants import DATASET_FILTER
from .summary import dataset_describe, head, tail, counts, fields_by_type, geom_coverage
from . import subscription_info
from . import subscriptions
from . import utils
DATASET_TYPE = 'dataset'
class CatalogDataset(CatalogEntity):
entity_repo = get_dataset_repo()
@property
def variables(self):
return get_variable_repo().get_all({DATASET_FILTER: self.id})
@property
def variables_groups(self):
return get_variable_group_repo().get_all({DATASET_FILTER: self.id})
@property
def name(self):
return self.data['name']
@property
def description(self):
return self.data['description']
@property
def provider(self):
return self.data['provider_id']
@property
def category(self):
return self.data['category_id']
@property
def data_source(self):
return self.data['data_source_id']
@property
def country(self):
return self.data['country_id']
@property
def language(self):
return self.data['lang']
@property
def geography(self):
return self.data['geography_id']
@property
def temporal_aggregation(self):
return self.data['temporal_aggregation']
@property
def time_coverage(self):
return self.data['time_coverage']
@property
def update_frequency(self):
return self.data['update_frequency']
@property
def version(self):
return self.data['version']
@property
def is_public_data(self):
return self.data['is_public_data']
@property
def summary(self):
return self.data['summary_json']
def head(self):
data = self.data['summary_json']
return head(self.__class__, data)
def tail(self):
data = self.data['summary_json']
return tail(self.__class__, data)
def counts(self):
data = self.data['summary_json']
return counts(data)
def fields_by_type(self):
data = self.data['summary_json']
return fields_by_type(data)
def geom_coverage(self):
return geom_coverage(self.geography)
def describe(self):
return dataset_describe(self.variables)
@classmethod
def get_all(cls, filters=None, credentials=None):
return cls.entity_repo.get_all(filters, credentials)
def download(self, credentials=None):
"""Download Dataset data.
Args:
credentials (:py:class:`Credentials <cartoframes.auth.Credentials>`, optional):
credentials of CARTO user account. If not provided,
a default credentials (if set with :py:meth:`set_default_credentials
<cartoframes.auth.set_default_credentials>`) will be used.
"""
return self._download(credentials)
@classmethod
def get_datasets_spatial_filtered(cls, filter_dataset):
user_gdf = cls._get_user_geodataframe(filter_dataset)
# TODO: check if the dataframe has a geometry column if not exception
# Saving memory
user_gdf = user_gdf[[user_gdf.geometry.name]]
catalog_geographies_gdf = get_geography_repo().get_geographies_gdf()
matched_geographies_ids = cls._join_geographies_geodataframes(catalog_geographies_gdf, user_gdf)
# Get Dataset objects
return get_dataset_repo().get_all({'geography_id': matched_geographies_ids})
@staticmethod
def _get_user_geodataframe(filter_dataset):
if isinstance(filter_dataset, gpd.GeoDataFrame):
# Geopandas dataframe
return filter_dataset
if isinstance(filter_dataset, CFDataset):
# CARTOFrames Dataset
user_df = filter_dataset.download(decode_geom=True)
return gpd.GeoDataFrame(user_df, geometry='geometry')
if isinstance(filter_dataset, str):
# String WKT
df = pd.DataFrame([{'geometry': filter_dataset}])
df['geometry'] = df['geometry'].apply(wkt.loads)
return gpd.GeoDataFrame(df)
@staticmethod
def _join_geographies_geodataframes(geographies_gdf1, geographies_gdf2):
join_gdf = gpd.sjoin(geographies_gdf1, geographies_gdf2, how='inner', op='intersects')
return join_gdf['id'].unique()
def subscribe(self, credentials=None):
"""Subscribe to a Dataset.
Args:
credentials (:py:class:`Credentials <cartoframes.auth.Credentials>`, optional):
credentials of CARTO user account. If not provided,
a default credentials (if set with :py:meth:`set_default_credentials
<cartoframes.auth.set_default_credentials>`) will be used.
"""
_credentials = self._get_credentials(credentials)
_subscribed_ids = subscriptions.get_subscription_ids(_credentials)
if self.id in _subscribed_ids:
utils.display_existing_subscription_message(self.id, DATASET_TYPE)
else:
utils.display_subscription_form(self.id, DATASET_TYPE, _credentials)
def subscription_info(self, credentials=None):
"""Get the subscription information of a Dataset.
Args:
credentials (:py:class:`Credentials <cartoframes.auth.Credentials>`, optional):
credentials of CARTO user account. If not provided,
a default credentials (if set with :py:meth:`set_default_credentials
<cartoframes.auth.set_default_credentials>`) will be used.
"""
_credentials = self._get_credentials(credentials)
return subscription_info.SubscriptionInfo(
subscription_info.fetch_subscription_info(self.id, DATASET_TYPE, _credentials))