-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
COPY TO for data obs queries - Part 2 use fetch #592
Changes from 11 commits
404450a
997dcfc
4bbb566
0b224c7
c12413d
a478deb
aca6acf
3299732
2727093
ca6f2d8
151ff7e
9c73e21
03a5133
6da1e4e
04469b2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -387,22 +387,6 @@ def delete(self, table_name): | |
err=err)) | ||
return None | ||
|
||
def _table_exists(self, table_name): | ||
"""Checks to see if table exists""" | ||
try: | ||
self.sql_client.send( | ||
'EXPLAIN SELECT * FROM "{table_name}"'.format( | ||
table_name=table_name), | ||
do_post=False) | ||
raise NameError( | ||
'Table `{table_name}` already exists. ' | ||
'Run with `overwrite=True` if you wish to replace the ' | ||
'table.'.format(table_name=table_name)) | ||
except CartoException as err: | ||
# If table doesn't exist, we get an error from the SQL API | ||
self._debug_print(err=err) | ||
return False | ||
|
||
def _check_import(self, import_id): | ||
"""Check the status of an Import API job""" | ||
|
||
|
@@ -1099,19 +1083,7 @@ def data_boundaries(self, boundary=None, region=None, decode_geom=False, | |
boundaries in `region` (or the world if `region` is ``None``) | ||
""" | ||
# TODO: create a function out of this? | ||
if (isinstance(region, collections.Iterable) | ||
and not isinstance(region, str)): | ||
if len(region) != 4: | ||
raise ValueError( | ||
'`region` should be a list of the geographic bounds of a ' | ||
'region in the following order: western longitude, ' | ||
'southern latitude, eastern longitude, and northern ' | ||
'latitude. For example, Switerland fits in ' | ||
'``[5.9559111595,45.8179931641,10.4920501709,' | ||
'47.808380127]``.') | ||
bounds = ('ST_MakeEnvelope({0}, {1}, {2}, {3}, 4326)').format( | ||
*region) | ||
elif isinstance(region, str): | ||
if isinstance(region, str): | ||
# see if it's a table | ||
try: | ||
geom_type = self._geom_type(region) | ||
|
@@ -1126,7 +1098,17 @@ def data_boundaries(self, boundary=None, region=None, decode_geom=False, | |
regionsearch = '"geom_tags"::text ilike \'%{}%\''.format( | ||
get_countrytag(region)) | ||
bounds = 'ST_MakeEnvelope(-180.0, -85.0, 180.0, 85.0, 4326)' | ||
|
||
elif isinstance(region, collections.Iterable): | ||
if len(region) != 4: | ||
raise ValueError( | ||
'`region` should be a list of the geographic bounds of a ' | ||
'region in the following order: western longitude, ' | ||
'southern latitude, eastern longitude, and northern ' | ||
'latitude. For example, Switerland fits in ' | ||
'``[5.9559111595,45.8179931641,10.4920501709,' | ||
'47.808380127]``.') | ||
bounds = ('ST_MakeEnvelope({0}, {1}, {2}, {3}, 4326)').format( | ||
*region) | ||
elif region is None: | ||
bounds = 'ST_MakeEnvelope(-180.0, -85.0, 180.0, 85.0, 4326)' | ||
else: | ||
|
@@ -1149,9 +1131,8 @@ def data_boundaries(self, boundary=None, region=None, decode_geom=False, | |
'{filters}')).format( | ||
bounds=bounds, | ||
timespan=utils.pgquote(timespan), | ||
filters='WHERE {}'.format(filters) if filters else '' | ||
) | ||
return self.query(query) | ||
filters='WHERE {}'.format(filters) if filters else '') | ||
return self.fetch(query, decode_geom=True) | ||
|
||
query = utils.minify_sql(( | ||
'SELECT the_geom, geom_refs', | ||
|
@@ -1162,7 +1143,7 @@ def data_boundaries(self, boundary=None, region=None, decode_geom=False, | |
bounds=bounds, | ||
boundary=utils.pgquote(boundary), | ||
time=utils.pgquote(timespan)) | ||
return self.query(query, decode_geom=decode_geom) | ||
return self.fetch(query, decode_geom=decode_geom) | ||
|
||
def data_discovery(self, region, keywords=None, regex=None, time=None, | ||
boundaries=None, include_quantiles=False): | ||
|
@@ -1425,8 +1406,7 @@ def data_discovery(self, region, keywords=None, regex=None, time=None, | |
numers=numers, | ||
quantiles=quantiles).strip() | ||
self._debug_print(query=query) | ||
resp = self.sql_client.send(query) | ||
return pd.DataFrame(resp['rows']) | ||
return self.fetch(query, decode_geom=True) | ||
|
||
def data(self, table_name, metadata, persist_as=None, how='the_geom'): | ||
"""Get an augmented CARTO dataset with `Data Observatory | ||
|
@@ -1536,8 +1516,7 @@ def data(self, table_name, metadata, persist_as=None, how='the_geom'): | |
' numeric, timespan_rownum numeric)', | ||
)).format(table_name=table_name, | ||
meta=json.dumps(metadata).replace('\'', '\'\'')) | ||
resp = self.sql_client.send(query) | ||
_meta = pd.DataFrame(resp['rows']) | ||
_meta = self.fetch(query) | ||
|
||
if _meta.shape[0] == 0: | ||
raise ValueError('There are no valid metadata entries. Check ' | ||
|
@@ -1550,30 +1529,40 @@ def data(self, table_name, metadata, persist_as=None, how='the_geom'): | |
'combine resulting DataFrames using ' | ||
'`pandas.concat`') | ||
|
||
tablecols = self.sql_client.send( | ||
'SELECT * FROM {table_name} LIMIT 0'.format(table_name=table_name), | ||
**DEFAULT_SQL_ARGS | ||
)['fields'].keys() | ||
# get column names except the_geom_webmercator | ||
dataset = Dataset(self, table_name) | ||
table_columns = dataset.get_table_column_names(exclude=['the_geom_webmercator']) | ||
|
||
names = {} | ||
for suggested in _meta['suggested_name']: | ||
if suggested in tablecols: | ||
names[suggested] = utils.unique_colname(suggested, tablecols) | ||
if suggested in table_columns: | ||
names[suggested] = utils.unique_colname(suggested, table_columns) | ||
warn( | ||
'{s0} was augmented as {s1} because of name ' | ||
'collision'.format(s0=suggested, s1=names[suggested]) | ||
) | ||
else: | ||
names[suggested] = suggested | ||
|
||
# drop description columns to lighten the query | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ❤️ |
||
# FIXME https://github.com/CartoDB/cartoframes/issues/593 | ||
meta_columns = _meta.columns.values | ||
drop_columns = [] | ||
for meta_column in meta_columns: | ||
if meta_column.endswith('_description'): | ||
drop_columns.append(meta_column) | ||
|
||
if len(drop_columns) > 0: | ||
_meta.drop(drop_columns, axis=1, inplace=True) | ||
|
||
cols = ', '.join( | ||
'(data->{n}->>\'value\')::{pgtype} AS {col}'.format( | ||
n=row[0], | ||
pgtype=row[1]['numer_type'], | ||
col=names[row[1]['suggested_name']]) | ||
for row in _meta.iterrows()) | ||
query = utils.minify_sql(( | ||
'SELECT t.*, {cols}', | ||
'SELECT {table_cols}, {cols}', | ||
' FROM OBS_GetData(', | ||
' (SELECT array_agg({how})', | ||
' FROM "{tablename}"),', | ||
|
@@ -1585,9 +1574,16 @@ def data(self, table_name, metadata, persist_as=None, how='the_geom'): | |
tablename=table_name, | ||
rowid='cartodb_id' if how == 'the_geom' else how, | ||
cols=cols, | ||
table_cols=','.join('t.{}'.format(c) for c in table_columns), | ||
meta=_meta.to_json(orient='records').replace('\'', '\'\'')) | ||
return self.query(query, | ||
table_name=persist_as) | ||
|
||
if persist_as: | ||
dataset = Dataset.from_query(self, query, persist_as) | ||
result = dataset.download(decode_geom=True) | ||
else: | ||
result = self.fetch(query, decode_geom=True) | ||
|
||
return result | ||
|
||
def _auth_send(self, relative_path, http_method, **kwargs): | ||
self._debug_print(relative_path=relative_path, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,18 @@ def __init__(self, carto_context, table_name, schema='public', df=None): | |
self.df = df | ||
warn('Table will be named `{}`'.format(table_name)) | ||
|
||
@staticmethod | ||
def from_query(cart_context, query, table_name): | ||
dataset = Dataset(cart_context, table_name) | ||
dataset.cc.batch_sql_client \ | ||
.create_and_wait_for_completion( | ||
'''BEGIN; {drop}; {create}; {cartodbfy}; COMMIT;''' | ||
.format(drop=dataset._drop_table_query(), | ||
create=dataset._create_table_from_query(query), | ||
cartodbfy=dataset._cartodbfy_query())) | ||
|
||
return dataset | ||
|
||
def upload(self, with_lonlat=None, if_exists='fail'): | ||
if self.df is None: | ||
raise ValueError('You have to create a `Dataset` with a pandas DataFrame in order to upload it to CARTO') | ||
|
@@ -46,7 +58,7 @@ def upload(self, with_lonlat=None, if_exists='fail'): | |
return self | ||
|
||
def download(self, limit=None, decode_geom=False, retry_times=DEFAULT_RETRY_TIMES): | ||
table_columns = self._get_table_columns() | ||
table_columns = self.get_table_columns() | ||
query = self._get_read_query(table_columns, limit) | ||
|
||
return self.cc.fetch(query, decode_geom=decode_geom) | ||
|
@@ -123,6 +135,10 @@ def _rows(self, df, cols, with_lonlat, geom_col): | |
def _drop_table_query(self): | ||
return '''DROP TABLE IF EXISTS {table_name}'''.format(table_name=self.table_name) | ||
|
||
def _create_table_from_query(self, query): | ||
create_query = '''CREATE TABLE {table_name} AS ({query})'''.format(table_name=self.table_name, query=query) | ||
return create_query | ||
|
||
def _create_table_query(self, with_lonlat=None): | ||
if with_lonlat is None: | ||
geom_type = _get_geom_col_type(self.df) | ||
|
@@ -158,11 +174,21 @@ def _get_read_query(self, table_columns, limit=None): | |
|
||
return query | ||
|
||
def _get_table_columns(self): | ||
def get_table_columns(self): | ||
"""Get column names and types from a table""" | ||
query = 'SELECT * FROM "{schema}"."{table}" limit 0'.format(table=self.table_name, schema=self.schema) | ||
return get_columns(self.cc, query) | ||
|
||
def get_table_column_names(self, exclude=None): | ||
"""Get column names and types from a table""" | ||
query = 'SELECT * FROM "{schema}"."{table}" limit 0'.format(table=self.table_name, schema=self.schema) | ||
columns = get_columns(self.cc, query).keys() | ||
|
||
if exclude and isinstance(exclude, list): | ||
columns = list(set(columns) - set(exclude)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should ensure that >>> exclude = 'the_geom_webmercator'
>>> set(exclude)
{'_', 'a', 'b', 'c', 'e', 'g', 'h', 'm', 'o', 'r', 't', 'w'} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, we are already checking that it is an instance of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ha, I missed the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. np 😅 |
||
|
||
return columns | ||
|
||
|
||
def get_columns(context, query): | ||
"""Get column names and types from a query""" | ||
|
@@ -314,7 +340,7 @@ def _decode_geom(ewkb): | |
def postprocess_dataframe(df, table_columns, decode_geom=False): | ||
"""Clean a DataFrame with a dataset from CARTO: | ||
- use cartodb_id as DataFrame index | ||
- process date columns | ||
- process date and bool columns | ||
- (optionally) decode geom as a `Shapely <https://github.com/Toblerity/Shapely>`__ object | ||
|
||
Args: | ||
|
@@ -334,8 +360,10 @@ def postprocess_dataframe(df, table_columns, decode_geom=False): | |
for column_name in table_columns: | ||
if table_columns[column_name]['type'] == 'date': | ||
df[column_name] = pd.to_datetime(df[column_name], errors='ignore') | ||
elif table_columns[column_name]['type'] == 'boolean': | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, I wasn't aware that bools weren't properly converted. Maybe the second bullet item in the docstring should be expanded to include |
||
df[column_name] = df[column_name].eq('t') | ||
|
||
if decode_geom: | ||
if decode_geom and 'the_geom' in df.columns: | ||
df['geometry'] = df.the_geom.apply(_decode_geom) | ||
|
||
return df |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, the
DEFAULT_SQL_ARGS
are the way to get the cartoframes calls to show up in logs so we can monitor performance, etc. They're fit in so that each method get registered once despite having multiple calls.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmm is this still true? I see DEFAULT_SQL_ARGS just having a
do_post
attribute.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. Are POST request args searchable in kibana?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oleurud ^^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The POST arguments are not saved by default. The only things we can check in Kibana about a POST request to SQL API are the queries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two options:
I'd do the user agent thing, which should not be hard to do and it makes it searchable everywhere: Kibana, Rollbar, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just in case it will be solved with a user agent header, I had created this issue the handle it #601
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the user-agent thing here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also (CartoDB/carto-python#111).
Nice, well solved 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👏