diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4dd674303..d87030f1b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -40,6 +40,35 @@ CARTOframes documentation is located inline in the functions, classes, and metho **Tip:** A convenient, easy way of proposing changes in documentation is by using the GitHub editor directly on the web. You can easily create a branch with your changes and make a PR from there. +## Releases + +To release a new version of cartoframes, create a new branch off of `master` called `vX.Y.Z_release`, where `X.Y.Z` should be replaced with the specific version to be released (e.g., 0.10.1). After this branch is created, update the following files: + +1. ``cartoframes/__version__.py`` should have the newest version number in the ``__version__`` variable +2. NEWS.rst should be updated with all of the changes happening in this version. It should include the release number and the date of the release. Looking at merged pull requests sorted by last updated is a good way to ensure features are not missed. +3. The README.rst should be updated so that the mybinder tag at the top of the file is the release number/tag + +Ensure that documentation is building correctly by building this branch in readthedocs. If not, this is a good time to fix documentation before publishing. You needed to be added as a contributor on readthedocs to be able to configure builds. + +After the tests pass, merge into master. Next, we publish a release to [PyPi](https://pypi.org/project/cartoframes/) and [GitHub](https://github.com/CartoDB/cartoframes/releases). + +### Documentation (readthedocs) + +This step needs to be completed before any releases, but is here as a reminder that documentation should not be ignored. Docs are built with [ReadTheDocs](https://cartoframes.readthedocs.io/en/stable/) automatically from any tagged release and a few select branches. ``master`` is the docs build for ``latest``. Once docs are working from master from the previous step, ensure that the version shows up in the default docs page: https://cartoframes.readthedocs.io/en/stable/ + +### PyPi release + +Run `make publish` in the base cartoframes directory. For a new release to be published on PyPi you need to be added as an author on the [PyPi's cartoframes project](https://pypi.org/project/cartoframes/). Also make sure that [`twine`](https://pypi.org/project/twine/) is installed. + + +### GitHub release + +1. Make sure `master` is fresh from the `vX.Y.Z_release` merge +2. Title release `vX.Y.Z Release` +3. Add latest entry from NEWS.rst +4. Add the dist files from `make dist` (``cartoframes-X.Y.Z-py2-py3-none-any.whl`` and ``cartoframes-X.Y.Z.tar.gz``) +5. Select pre-release (for now) + ## Submitting contributions You will need to sign a Contributor License Agreement (CLA) before making a submission. [Learn more here](https://carto.com/contributions). diff --git a/README.rst b/README.rst index ee9788eb7..ee4475bdb 100644 --- a/README.rst +++ b/README.rst @@ -6,8 +6,8 @@ CARTOframes :target: https://travis-ci.org/CartoDB/cartoframes .. image:: https://coveralls.io/repos/github/CartoDB/cartoframes/badge.svg?branch=master :target: https://coveralls.io/github/CartoDB/cartoframes?branch=master -.. image:: https://mybinder.org/badge.svg - :target: https://mybinder.org/v2/gh/CartoDB/cartoframes/master?filepath=examples +.. image:: https://mybinder.org/badge_logo.svg + :target: https://mybinder.org/v2/gh/cartodb/cartoframes/v0.9.2?filepath=examples A Python package for integrating `CARTO `__ maps, analysis, and data services into data science workflows. diff --git a/cartoframes/columns.py b/cartoframes/columns.py index c35a2c94c..a89fb0c2e 100644 --- a/cartoframes/columns.py +++ b/cartoframes/columns.py @@ -7,6 +7,9 @@ class Column(object): + DATETIME_DTYPES = ['datetime64[D]', 'datetime64[ns]', 'datetime64[ns, UTC]'] + SUPPORTED_GEOM_COL_NAMES = ['geom', 'the_geom', 'geometry'] + RESERVED_COLUMN_NAMES = SUPPORTED_GEOM_COL_NAMES + ['the_geom_webmercator', 'cartodb_id'] MAX_LENGTH = 63 MAX_COLLISION_LENGTH = MAX_LENGTH - 4 RESERVED_WORDS = ('ALL', 'ANALYSE', 'ANALYZE', 'AND', 'ANY', 'ARRAY', 'AS', 'ASC', 'ASYMMETRIC', 'AUTHORIZATION', @@ -21,20 +24,27 @@ class Column(object): 'TO', 'TRAILING', 'TRUE', 'UNION', 'UNIQUE', 'USER', 'USING', 'VERBOSE', 'WHEN', 'WHERE', 'XMIN', 'XMAX', 'FORMAT', 'CONTROLLER', 'ACTION', ) - def __init__(self, name): + @staticmethod + def from_sql_api_fields(sql_api_fields): + return [Column(column, normalize=False, pgtype=sql_api_fields[column]['type']) for column in sql_api_fields] + + def __init__(self, name, normalize=True, pgtype=None): if not name: raise ValueError('Column name cannot be null or empty') self.name = str(name) - self.normalize() + self.pgtype = pgtype + self.dtype = pg2dtypes(pgtype) + if normalize: + self.normalize() - def normalize(self, forbidden_columns=None): + def normalize(self, forbidden_column_names=None): self._sanitize() self.name = self._truncate() - if forbidden_columns: + if forbidden_column_names: i = 1 - while self.name in forbidden_columns: + while self.name in forbidden_column_names: self.name = '{}_{}'.format(self._truncate(length=Column.MAX_COLLISION_LENGTH), str(i)) i += 1 @@ -105,14 +115,48 @@ def normalize_names(column_names): """ result = [] for column_name in column_names: - column = Column(column_name).normalize(forbidden_columns=result) + column = Column(column_name).normalize(forbidden_column_names=result) result.append(column.name) return result -def normalize_name(name): - if name is None: +def normalize_name(column_name): + if column_name is None: return None - return normalize_names([name])[0] + return normalize_names([column_name])[0] + + +def dtypes(columns, exclude_dates=False, exclude_the_geom=False): + return {x.name: x.dtype if not x.name == 'cartodb_id' else 'int64' + for x in columns if not (exclude_dates is True and x.dtype in Column.DATETIME_DTYPES) + and not(exclude_the_geom is True and x.name in Column.SUPPORTED_GEOM_COL_NAMES)} + + +def date_columns_names(columns): + return [x.name for x in columns if x.dtype in Column.DATETIME_DTYPES] + + +def pg2dtypes(pgtype): + """Returns equivalent dtype for input `pgtype`.""" + mapping = { + 'bigint': 'float64', + 'boolean': 'bool', + 'date': 'datetime64[D]', + 'double precision': 'float64', + 'geometry': 'object', + 'int': 'int64', + 'integer': 'float64', + 'number': 'float64', + 'numeric': 'float64', + 'real': 'float64', + 'smallint': 'float64', + 'string': 'object', + 'timestamp': 'datetime64[ns]', + 'timestampz': 'datetime64[ns]', + 'timestamp with time zone': 'datetime64[ns]', + 'timestamp without time zone': 'datetime64[ns]', + 'USER-DEFINED': 'object', + } + return mapping.get(str(pgtype), 'object') diff --git a/cartoframes/context.py b/cartoframes/context.py index 6df80e9d9..9012ea075 100644 --- a/cartoframes/context.py +++ b/cartoframes/context.py @@ -7,9 +7,7 @@ import os import random import sys -import time import collections -import binascii as ba from warnings import warn import requests @@ -32,7 +30,8 @@ get_map_template, top_basemap_layer_url) from .analysis import Table from .__version__ import __version__ -from .dataset import Dataset, recursive_read, postprocess_dataframe, get_columns +from .columns import dtypes, date_columns_names +from .dataset import Dataset, recursive_read, _decode_geom, get_columns if sys.version_info >= (3, 0): from urllib.parse import urlparse, urlencode @@ -210,7 +209,8 @@ def _is_org_user(self): return res['rows'][0]['unnest'] != 'public' def read(self, table_name, limit=None, decode_geom=False, shared_user=None, retry_times=3): - """Read a table from CARTO into a pandas DataFrames. + """Read a table from CARTO into a pandas DataFrames. Column types are inferred from database types, to + avoid problems with integer columns with NA or null values, they are automatically retrieved as float64 Args: table_name (str): Name of table in user's CARTO account. @@ -291,6 +291,9 @@ def write(self, df, table_name, temp_dir=CACHE_DIR, overwrite=False, cc.map(layers=Layer('life_expectancy', color='both_sexes_life_expectancy')) + .. warning:: datetime64[ns] column will lose precision sending a dataframe to CARTO + because postgresql has millisecond resolution while pandas does nanoseconds + Args: df (pandas.DataFrame): DataFrame to write to ``table_name`` in user CARTO account @@ -507,34 +510,120 @@ def fetch(self, query, decode_geom=False): """ copy_query = 'COPY ({query}) TO stdout WITH (FORMAT csv, HEADER true)'.format(query=query) - query_columns = get_columns(self, query) result = recursive_read(self, copy_query) - df = pd.read_csv(result) - return postprocess_dataframe(df, query_columns, decode_geom) + query_columns = get_columns(self, query) + df_types = dtypes(query_columns, exclude_dates=True, exclude_the_geom=True) + date_column_names = date_columns_names(query_columns) + + df = pd.read_csv(result, dtype=df_types, + parse_dates=date_column_names, + true_values=['t'], + false_values=['f'], + index_col='cartodb_id' if 'cartodb_id' in df_types else False, + converters={'the_geom': lambda x: _decode_geom(x) if decode_geom else x}) + + if decode_geom: + df.rename({'the_geom': 'geometry'}, axis='columns', inplace=True) + + return df + + def execute(self, query): + """Runs an arbitrary query to a CARTO account. + + This method is specially useful for queries that do not return any data and just + perform a database operation like: - def query(self, query, table_name=None, decode_geom=False): - """Pull the result from an arbitrary SQL query from a CARTO account - into a pandas DataFrame. Can also be used to perform database - operations (creating/dropping tables, adding columns, updates, etc.). + - INSERT, UPDATE, DROP, CREATE, ALTER, stored procedures, etc. + + Queries are run using a `Batch SQL API job + `__ + in the user account + + The execution of the queries is asynchronous but this method automatically + waits for its completion (or failure). The `job_id` of the Batch SQL API job + will be printed. In case there's any issue you can contact the CARTO support team + specifying that `job_id`. + + Args: + query (str): An SQL query to run against CARTO user database. + + Returns: + None + + Raises: + CartoException: If the query fails to execute + + Examples: + + Drops `my_table` + + .. code:: python + + cc.execute( + ''' + DROP TABLE my_table + ''' + ) + + Updates the column `my_column` in the table `my_table` + + .. code:: python + + cc.query( + ''' + UPDATE my_table SET my_column = 1 + ''' + ) + + """ + self.batch_sql_client.create_and_wait_for_completion(query) + + def query(self, query, table_name=None, decode_geom=False, is_select=None): + """Pull the result from an arbitrary SQL SELECT query from a CARTO account + into a pandas DataFrame. This is the default behavior, when `is_select=True` + + Can also be used to perform database operations (creating/dropping tables, + adding columns, updates, etc.). In this case, you have to explicitly + specify `is_select=False` + + This method is a helper for the `CartoContext.fetch` and `CartoContext.execute` + methods. We strongly encourage you to use any of those methods depending on the + type of query you want to run. If you want to get the results of a `SELECT` query + into a pandas DataFrame, then use `CartoContext.fetch`. For any other query that + performs an operation into the CARTO database, use `CartoContext.execute` Args: query (str): Query to run against CARTO user database. This data will then be converted into a pandas DataFrame. - table_name (str, optional): If set, this will create a new - table in the user's CARTO account that is the result of the - query. Defaults to None (no table created). + table_name (str, optional): If set (and `is_select=True`), this will create a new + table in the user's CARTO account that is the result of the SELECT + query provided. Defaults to None (no table created). decode_geom (bool, optional): Decodes CARTO's geometries into a `Shapely `__ object that can be used, for example, in `GeoPandas - `__. + `__. It only works for SELECT queries when `is_select=True` + is_select (bool, optional): This argument has to be set depending on the query + performed. True for SELECT queries, False for any other query. + For the case of a SELECT SQL query (`is_select=True`) the result will be stored into a + pandas DataFrame. + When an arbitrary SQL query (`is_select=False`) it will perform a database + operation (UPDATE, DROP, INSERT, etc.) + By default `is_select=None` that means that the method will return a dataframe if + the `query` starts with a `select` clause, otherwise it will just execute the query + and return `None` Returns: - pandas.DataFrame: DataFrame representation of query supplied. + pandas.DataFrame: When `is_select=True` and the query is actually a SELECT query + this method returns a pandas DataFrame representation of query supplied otherwise + returns None. Pandas data types are inferred from PostgreSQL data types. In the case of PostgreSQL date types, dates are attempted to be converted, but on failure a data type 'object' is used. + Raises: + CartoException: If there's any error when executing the query + Examples: Query a table in CARTO and write a new table that is result of query. This query gets the 10 highest values from a table and @@ -572,75 +661,40 @@ def query(self, query, table_name=None, decode_geom=False): decode_geom=True ) - """ - self._debug_print(query=query) - if table_name: - # TODO: replace the following error catching with Import API - # checking once Import API sql/table_name collision_strategy=skip - # bug is fixed ref: support/1127 - try: - self.sql_client.send(''' - CREATE TABLE {0} AS SELECT 1; - DROP TABLE {0}; - '''.format(table_name)) - resp = self._auth_send( - 'api/v1/imports', 'POST', - params=dict(table_name=table_name), - json=dict(sql=query), - # collision_strategy='', - headers={'Content-Type': 'application/json'}) - except CartoException as err: - raise CartoException( - 'Cannot create table `{0}`: {1}'.format(table_name, err)) - - while True: - import_job = self._check_import(resp['item_queue_id']) - self._debug_print(import_job=import_job) - final_table_name = self._handle_import(import_job, table_name) - if import_job['state'] == 'complete': - - print('Table successfully written to CARTO: ' - '{table_url}'.format( - table_url=utils.join_url(self.creds.base_url(), - 'dataset', - final_table_name))) - break - time.sleep(1.0) - - select_res = self.sql_client.send( - 'SELECT * FROM {table_name}'.format( - table_name=final_table_name), - skipfields='the_geom_webmercator', - **DEFAULT_SQL_ARGS) - else: - select_res = self.sql_client.send( - query, - skipfields='the_geom_webmercator', - **DEFAULT_SQL_ARGS) - if 'error' in select_res: - raise CartoException(str(select_res['error'])) + Drops `my_table` - self._debug_print(select_res=select_res) + .. code:: python + + cc.query( + ''' + DROP TABLE my_table + ''' + ) - fields = select_res['fields'] - if select_res['total_rows'] == 0: - return pd.DataFrame(columns=set(fields.keys()) - {'cartodb_id'}) + Updates the column `my_column` in the table `my_table` - df = pd.DataFrame(data=select_res['rows']) - for field in fields: - if fields[field]['type'] == 'date': - df[field] = pd.to_datetime(df[field], errors='ignore') + .. code:: python - self._debug_print(columns=df.columns, - dtypes=df.dtypes) + cc.query( + ''' + UPDATE my_table SET my_column = 1 + ''' + ) - if 'cartodb_id' in fields: - df.set_index('cartodb_id', inplace=True) + """ + dataframe = None - if decode_geom: - df['geometry'] = df.the_geom.apply(_decode_geom) + is_select_query = is_select or (is_select is None and query.strip().lower().startswith('select')) + if is_select_query: + if table_name: + dataset = Dataset.create_from_query(self, query, table_name) + dataframe = dataset.download(decode_geom=decode_geom) + else: + dataframe = self.fetch(query, decode_geom=decode_geom) + else: + self.execute(query) - return df + return dataframe @utils.temp_ignore_warnings def map(self, layers=None, interactive=True, @@ -1543,7 +1597,6 @@ def data(self, table_name, metadata, persist_as=None, how='the_geom'): names[suggested] = suggested # drop description columns to lighten the query - # FIXME https://github.com/CartoDB/cartoframes/issues/593 meta_columns = _meta.columns.values drop_columns = [] for meta_column in meta_columns: @@ -1575,15 +1628,7 @@ def data(self, table_name, metadata, persist_as=None, how='the_geom'): table_cols=','.join('t.{}'.format(c) for c in table_columns), meta=_meta.to_json(orient='records').replace('\'', '\'\'')) - if persist_as: - dataset = Dataset.from_query(query, context=self) - dataset.table_name = persist_as - dataset.upload() - result = dataset.download(decode_geom=True) - else: - result = self.fetch(query, decode_geom=True) - - return result + return self.query(query, table_name=persist_as, decode_geom=False, is_select=True) def _auth_send(self, relative_path, http_method, **kwargs): self._debug_print(relative_path=relative_path, @@ -1627,13 +1672,14 @@ def _send_map_template(self, layers, has_zoom): 'api/v1/map/named', 'POST', headers={'Content-Type': 'application/json'}, data=get_map_template(layers, has_zoom=has_zoom)) - # TODO: remove this after testing if 'errors' in resp: resp = self._auth_send( 'api/v1/map/named/{}'.format(map_name), 'PUT', headers={'Content-Type': 'application/json'}, data=get_map_template(layers, has_zoom=has_zoom)) + if 'errors' in resp: + raise CartoException(resp) self._map_templates[map_name] = True return map_name @@ -1709,65 +1755,3 @@ def _debug_print(self, **kwargs): str_value[-50:]) print('{key}: {value}'.format(key=key, value=str_value)) - - -# TODO: move all of the below to the utils module -def _add_encoded_geom(df, geom_col): - """Add encoded geometry to DataFrame""" - # None if not a GeoDataFrame - is_geopandas = getattr(df, '_geometry_column_name', None) - if is_geopandas is None and geom_col is None: - warn('`encode_geom` works best with Geopandas ' - '(http://geopandas.org/) and/or shapely ' - '(https://pypi.python.org/pypi/Shapely).') - geom_col = 'geometry' if 'geometry' in df.columns else None - if geom_col is None: - raise KeyError('Geometries were requested to be encoded ' - 'but a geometry column was not found in the ' - 'DataFrame.'.format(geom_col=geom_col)) - elif is_geopandas and geom_col: - warn('Geometry column of the input DataFrame does not ' - 'match the geometry column supplied. Using user-supplied ' - 'column...\n' - '\tGeopandas geometry column: {}\n' - '\tSupplied `geom_col`: {}'.format(is_geopandas, - geom_col)) - elif is_geopandas and geom_col is None: - geom_col = is_geopandas - # updates in place - df['the_geom'] = df[geom_col].apply(_encode_geom) - return None - - -def _encode_decode_decorator(func): - """decorator for encoding and decoding geoms""" - def wrapper(*args): - """error catching""" - try: - processed_geom = func(*args) - return processed_geom - except ImportError as err: - raise ImportError('The Python package `shapely` needs to be ' - 'installed to encode or decode geometries. ' - '({})'.format(err)) - return wrapper - - -@_encode_decode_decorator -def _encode_geom(geom): - """Encode geometries into hex-encoded wkb - """ - from shapely import wkb - if geom: - return ba.hexlify(wkb.dumps(geom)).decode() - return None - - -@_encode_decode_decorator -def _decode_geom(ewkb): - """Decode encoded wkb into a shapely geometry - """ - from shapely import wkb - if ewkb: - return wkb.loads(ba.unhexlify(ewkb)) - return None diff --git a/cartoframes/dataset.py b/cartoframes/dataset.py index f1e7705e2..9f4d99f81 100644 --- a/cartoframes/dataset.py +++ b/cartoframes/dataset.py @@ -4,7 +4,7 @@ import time from tqdm import tqdm -from .columns import normalize_names, normalize_name +from .columns import Column, normalize_names, normalize_name from carto.exceptions import CartoException, CartoRateLimitException from .geojson import load_geojson @@ -22,9 +22,6 @@ def set_default_context(context): class Dataset(object): - SUPPORTED_GEOM_COL_NAMES = ['geom', 'the_geom', 'geometry'] - RESERVED_COLUMN_NAMES = SUPPORTED_GEOM_COL_NAMES + ['the_geom_webmercator', 'cartodb_id'] - FAIL = 'fail' REPLACE = 'replace' APPEND = 'append' @@ -53,10 +50,16 @@ def __init__(self, table_name=None, schema='public', query=None, df=None, gdf=No self.state = state self.cc = context or default_context - if df is not None: - self.normalized_column_names = _normalize_column_names(df) - elif gdf is not None: - self.normalized_column_names = _normalize_column_names(gdf) + self.normalized_column_names = None + if self.df is not None: + _save_index_as_column(self.df) + self.normalized_column_names = _normalize_column_names(self.df) + elif self.gdf is not None: + _save_index_as_column(self.gdf) + self.normalized_column_names = _normalize_column_names(self.gdf) + + if self.table_name != table_name: + warn('Table will be named `{}`'.format(table_name)) @classmethod def from_table(cls, table_name, schema='public', context=None): @@ -83,6 +86,18 @@ def from_geojson(cls, geojson, table_name=None, schema='public', context=None): return cls( gdf=load_geojson(geojson), table_name=table_name, schema=schema, context=context, state=cls.STATE_LOCAL) + @staticmethod + def create_from_query(context, query, table_name): + dataset = Dataset.from_table(table_name=table_name, context=context) + dataset.cc.batch_sql_client \ + .create_and_wait_for_completion( + '''BEGIN; {drop}; {create}; {cartodbfy}; COMMIT;''' + .format(drop=dataset._drop_table_query(), + create=dataset._create_table_from_query(query), + cartodbfy=dataset._cartodbfy_query())) + + return dataset + def upload(self, with_lonlat=None, if_exists='fail'): if self.query and self.table_name is not None and not self.exists(): self.cc.batch_sql_client.create_and_wait_for_completion( @@ -155,15 +170,17 @@ def _copyfrom(self, with_lonlat=None): self.cc.copy_client.copyfrom( """COPY {table_name}({columns},the_geom) FROM stdin WITH (FORMAT csv, DELIMITER '|');""".format(table_name=self.table_name, columns=columns), - self._rows(self.df, self.df.columns, with_lonlat, geom_col) + self._rows(self.df, [c for c in self.df.columns if c != 'cartodb_id'], with_lonlat, geom_col) ) def _rows(self, df, cols, with_lonlat, geom_col): for i, row in df.iterrows(): csv_row = '' the_geom_val = None + lng_val = None + lat_val = None for col in cols: - if with_lonlat and col in Dataset.SUPPORTED_GEOM_COL_NAMES: + if with_lonlat and col in Column.SUPPORTED_GEOM_COL_NAMES: continue val = row[col] if pd.isnull(val) or val is None: @@ -182,7 +199,7 @@ def _rows(self, df, cols, with_lonlat, geom_col): geom = _decode_geom(the_geom_val) if geom: csv_row += 'SRID=4326;{geom}'.format(geom=geom.wkt) - if with_lonlat is not None: + if with_lonlat is not None and lng_val is not None and lat_val is not None: csv_row += 'SRID=4326;POINT({lng} {lat})'.format(lng=lng_val, lat=lat_val) csv_row += '\n' @@ -216,8 +233,7 @@ def _create_table_query(self, with_lonlat=None): def _get_read_query(self, table_columns, limit=None): """Create the read (COPY TO) query""" - query_columns = list(table_columns.keys()) - query_columns.remove('the_geom_webmercator') + query_columns = [column.name for column in table_columns if column.name != 'the_geom_webmercator'] query = 'SELECT {columns} FROM "{schema}"."{table_name}"'.format( table_name=self.table_name, @@ -234,13 +250,27 @@ def _get_read_query(self, table_columns, limit=None): def get_table_columns(self): """Get column names and types from a table""" - query = 'SELECT * FROM "{schema}"."{table}" limit 0'.format(table=self.table_name, schema=self.schema) - return get_columns(self.cc, query) + query = ''' + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = '{table}' AND table_schema = '{schema}' + '''.format(table=self.table_name, schema=self.schema) + + try: + table_info = self.cc.sql_client.send(query) + return [Column(c['column_name'], pgtype=c['data_type']) for c in table_info['rows']] + except CartoException as e: + # this may happen when using the default_public API key + if str(e) == 'Access denied': + query = ''' + SELECT * + FROM "{schema}"."{table}" LIMIT 0 + '''.format(table=self.table_name, schema=self.schema) + return get_columns(self.cc, query) def get_table_column_names(self, exclude=None): """Get column names and types from a table""" - query = 'SELECT * FROM "{schema}"."{table}" limit 0'.format(table=self.table_name, schema=self.schema) - columns = get_columns(self.cc, query).keys() + columns = [c.name for c in self.get_table_columns()] if exclude and isinstance(exclude, list): columns = list(set(columns) - set(exclude)) @@ -292,15 +322,6 @@ def _map_geom_type(self, geom_type): }[geom_type] -def get_columns(context, query): - """Get column names and types from a query""" - table_info = context.sql_client.send(query) - if 'fields' in table_info: - return table_info['fields'] - - return None - - def recursive_read(context, query, retry_times=Dataset.DEFAULT_RETRY_TIMES): try: return context.copy_client.copyto_stream(query) @@ -317,8 +338,22 @@ def recursive_read(context, query, retry_times=Dataset.DEFAULT_RETRY_TIMES): raise err +def get_columns(context, query): + col_query = '''SELECT * FROM ({query}) _q LIMIT 0'''.format(query=query) + table_info = context.sql_client.send(col_query) + return Column.from_sql_api_fields(table_info['fields']) + + +def _save_index_as_column(df): + index_name = df.index.name + if index_name is not None: + if index_name not in df.columns: + df.reset_index(inplace=True) + df.set_index(index_name, drop=False, inplace=True) + + def _normalize_column_names(df): - column_names = [c for c in df.columns if c not in Dataset.RESERVED_COLUMN_NAMES] + column_names = [c for c in df.columns if c not in Column.RESERVED_COLUMN_NAMES] normalized_columns = normalize_names(column_names) column_tuples = [(norm, orig) for orig, norm in zip(column_names, normalized_columns)] @@ -340,9 +375,9 @@ def _dtypes2pg(dtype): """Returns equivalent PostgreSQL type for input `dtype`""" mapping = { 'float64': 'numeric', - 'int64': 'numeric', + 'int64': 'integer', 'float32': 'numeric', - 'int32': 'numeric', + 'int32': 'integer', 'object': 'text', 'bool': 'boolean', 'datetime64[ns]': 'timestamp', @@ -355,7 +390,7 @@ def _get_geom_col_name(df): geom_col = getattr(df, '_geometry_column_name', None) if geom_col is None: try: - geom_col = next(x for x in df.columns if x.lower() in Dataset.SUPPORTED_GEOM_COL_NAMES) + geom_col = next(x for x in df.columns if x.lower() in Column.SUPPORTED_GEOM_COL_NAMES) except StopIteration: pass @@ -425,35 +460,3 @@ def _decode_geom(ewkb): except Exception: pass return None - - -def postprocess_dataframe(df, table_columns, decode_geom=False): - """Clean a DataFrame with a dataset from CARTO: - - use cartodb_id as DataFrame index - - process date and bool columns - - (optionally) decode geom as a `Shapely `__ object - - Args: - df (pandas.DataFrame): DataFrame with a dataset from CARTO. - table_columns (dict): column names and types from a table. - decode_geom (bool, optional): Decodes CARTO's geometries into a - `Shapely `__ - object that can be used, for example, in `GeoPandas - `__. - - Returns: - pandas.DataFrame - """ - if 'cartodb_id' in df.columns: - df.set_index('cartodb_id', inplace=True) - - for column_name in table_columns: - if table_columns[column_name]['type'] == 'date': - df[column_name] = pd.to_datetime(df[column_name], errors='ignore') - elif table_columns[column_name]['type'] == 'boolean': - df[column_name] = df[column_name].eq('t') - - if decode_geom and 'the_geom' in df.columns: - df['geometry'] = df.the_geom.apply(_decode_geom) - - return df diff --git a/cartoframes/utils.py b/cartoframes/utils.py index 1908b643a..2b4c86949 100644 --- a/cartoframes/utils.py +++ b/cartoframes/utils.py @@ -93,32 +93,3 @@ def dtypes2pg(dtype): 'datetime64[ns]': 'timestamp', } return mapping.get(str(dtype), 'text') - - -# NOTE: this is not currently used anywhere -def pg2dtypes(pgtype): - """Returns equivalent dtype for input `pgtype`.""" - mapping = { - 'date': 'datetime64[ns]', - 'number': 'float64', - 'string': 'object', - 'boolean': 'bool', - 'geometry': 'object', - } - return mapping.get(str(pgtype), 'object') - - -def df2pg_schema(dataframe, pgcolnames): - """Print column names with PostgreSQL schema for the SELECT statement of - a SQL query""" - util_cols = set(('the_geom', 'the_geom_webmercator', 'cartodb_id')) - if set(dataframe.columns).issubset(util_cols): - return ', '.join(dataframe.columns) - schema = ', '.join([ - 'NULLIF("{col}", \'\')::{t} AS {col}'.format(col=c, - t=dtypes2pg(t)) - for c, t in zip(pgcolnames, dataframe.dtypes) - if c not in util_cols]) - if 'the_geom' in pgcolnames: - return '"the_geom", ' + schema - return schema diff --git a/docs/developer-center/guides/01-Quickstart.md b/docs/developer-center/guides/01-Quickstart.md index a9bb99212..89f83ee10 100644 --- a/docs/developer-center/guides/01-Quickstart.md +++ b/docs/developer-center/guides/01-Quickstart.md @@ -147,6 +147,6 @@ from cartoframes import Layer cc.map(layers=Layer('brooklyn_poverty_w_rates', color='poverty_per_pop')) ``` -![](../img/guides/01-brooklyn_poverty.png) +![](../../img/guides/01-brooklyn_poverty.png) Note: Legends are not yet implemented for stable releases of cartoframes. See [this pull request](https://github.com/CartoDB/cartoframes/pull/184) for more information. diff --git a/test/test_columns.py b/test/test_columns.py index 8129de241..a503edcc4 100644 --- a/test/test_columns.py +++ b/test/test_columns.py @@ -3,7 +3,7 @@ """Unit tests for cartoframes.columns""" import unittest -from cartoframes.columns import Column, normalize_names +from cartoframes.columns import Column, normalize_names, pg2dtypes class TestColumns(unittest.TestCase): @@ -68,3 +68,16 @@ def test_normalize_names(self): def test_normalize_names_unchanged(self): self.assertListEqual(normalize_names(self.cols_ans), self.cols_ans) + + def test_pg2dtypes(self): + results = { + 'date': 'datetime64[D]', + 'number': 'float64', + 'string': 'object', + 'boolean': 'bool', + 'geometry': 'object', + 'unknown_pgdata': 'object' + } + for i in results: + result = pg2dtypes(i) + self.assertEqual(result, results[i]) diff --git a/test/test_context.py b/test/test_context.py index c0eae9565..fa03cf969 100644 --- a/test/test_context.py +++ b/test/test_context.py @@ -5,7 +5,7 @@ import matplotlib matplotlib.use('agg') import matplotlib.pyplot as plt -except ImportError: +except RuntimeError: plt = None import unittest @@ -15,6 +15,7 @@ import random import warnings import requests +from datetime import datetime from carto.exceptions import CartoException from carto.auth import APIKeyAuthClient @@ -23,8 +24,10 @@ import IPython import cartoframes -from cartoframes.columns import normalize_name +from cartoframes.dataset import Dataset +from cartoframes.columns import Column, normalize_name from cartoframes.utils import dict_items + from utils import _UserUrlLoader WILL_SKIP = False @@ -225,6 +228,31 @@ def test_cartocontext_read(self): self.assertEqual(len(df), 0) self.assertIsInstance(df, pd.DataFrame) + def test_cartocontext_read_with_same_schema(self): + cc = cartoframes.CartoContext(base_url=self.baseurl, + api_key=self.apikey) + df = pd.DataFrame({'fips': ['01'], + 'cfips': ['0001'], + 'intval': [1], + 'floatval': [1.0], + 'boolval': [True], + 'textval': ['text'], + 'dateval': datetime.now() + }) + df['boolval'] = df['boolval'].astype(bool) + cc.write(df, self.test_write_table, overwrite=True) + read_df = cc.read(self.test_write_table) + + read_df.drop('the_geom', axis=1, inplace=True) + self.assertSetEqual(set(df.columns), set(read_df.columns)) + self.assertTupleEqual( + tuple('float64' if str(d) == 'int64' else str(d) for d in df.dtypes), + tuple(str(d) for d in read_df.dtypes), + msg='Should have same schema/types' + ) + self.assertEqual(read_df.index.name, 'cartodb_id') + self.assertEqual(read_df.index.dtype, 'int64') + @unittest.skipIf(WILL_SKIP, 'no carto credentials, skipping this test') def test_cartocontext_write(self): """context.CartoContext.write normal usage""" @@ -321,21 +349,20 @@ def test_cartocontext_write(self): # privacy = cc._get_privacy('i_am_not_a_table_in_this_account') # self.assertIsNone(privacy) - # FIXME in https://github.com/CartoDB/cartoframes/issues/580 - # @unittest.skipIf(WILL_SKIP, 'no carto credentials, skipping') - # def test_cartocontext_write_index(self): - # """context.CartoContext.write with non-default index""" - # cc = cartoframes.CartoContext(base_url=self.baseurl, - # api_key=self.apikey) - # df = pd.DataFrame({'vals': range(3), 'ids': list('abc')}, - # index=list('xyz')) - # df.index.name = 'named_index' - # dataset = cc.write(df, self.write_named_index) - # self.write_named_index = dataset.table_name + @unittest.skipIf(WILL_SKIP, 'no carto credentials, skipping') + def test_cartocontext_write_index(self): + """context.CartoContext.write with non-default index""" + cc = cartoframes.CartoContext(base_url=self.baseurl, + api_key=self.apikey) + df = pd.DataFrame({'vals': range(3), 'ids': list('abc')}, + index=list('xyz')) + df.index.name = 'named_index' + dataset = cc.write(df, self.write_named_index) + self.write_named_index = dataset.table_name - # df_index = cc.read(self.write_named_index)) - # self.assertSetEqual(set(('the_geom', 'vals', 'ids', 'named_index')), - # set(df_index.columns)) + df_index = cc.read(self.write_named_index) + self.assertSetEqual(set(('the_geom', 'vals', 'ids', 'named_index')), + set(df_index.columns)) @unittest.skipIf(WILL_SKIP, 'no carto credentials, skipping') def test_cartocontext_mixed_case(self): @@ -373,30 +400,6 @@ def test_cartocontext_delete_non_existent_table(self): msg='''The table `{}` doesn't exist'''.format(table_name)): cc.delete(table_name) - def test_cartocontext_send_dataframe(self): - """context.CartoContext._send_dataframe""" - pass - - def test_cartocontext_handle_import(self): - """context.CartoContext._handle_import""" - - cc = cartoframes.CartoContext(base_url=self.baseurl, - api_key=self.apikey) - import_failures = ( - dict(error_code=8001, state='failure'), - dict(error_code=6668, state='failure'), - dict(error_code=1234, state='failure'), - ) - - for import_job in import_failures: - with self.assertRaises(CartoException): - cc._handle_import(import_job, 'foo') - - diff_table_err = dict(state='complete', - table_name='bar') - with self.assertRaises(Exception): - cc._handle_import(diff_table_err, 'foo') - @unittest.skipIf(WILL_SKIP, 'no carto credentials, skipping') def test_cartoframes_sync(self): """context.CartoContext.sync""" @@ -409,7 +412,12 @@ def test_cartocontext_query(self): """context.CartoContext.query""" cc = cartoframes.CartoContext(base_url=self.baseurl, api_key=self.apikey) - cols = ('link', 'body', 'displayname', 'friendscount', 'postedtime', ) + columns = (Column('link', pgtype='string'), + Column('body', pgtype='string'), + Column('displayname', pgtype='string'), + Column('friendscount', pgtype='number'), + Column('postedtime', pgtype='date'), ) + cols = [col.name for col in columns] df = cc.query(''' SELECT {cols}, '02-06-1429'::date as invalid_df_date FROM tweets_obama @@ -474,7 +482,7 @@ def test_cartocontext_query(self): # should be specified length self.assertEqual(len(df), 100) # should have requested columns + utility columns from CARTO - self.assertSetEqual({'link', 'body', 'displayname', 'friendscount', + self.assertSetEqual({'body', 'displayname', 'link', 'friendscount', 'the_geom', }, set(df.columns), msg='Should have the columns requested') @@ -491,7 +499,7 @@ def test_cartocontext_query(self): ) SELECT ST_X(the_geom) as xval, ST_Y(the_geom) as yval FROM cte - ''') + ''', is_select=True) @unittest.skipIf(WILL_SKIP, 'no carto credentials, skipping') def test_cartocontext_fetch(self): @@ -499,7 +507,12 @@ def test_cartocontext_fetch(self): cc = cartoframes.CartoContext(base_url=self.baseurl, api_key=self.apikey) - cols = ('link', 'body', 'displayname', 'friendscount', 'postedtime', ) + columns = (Column('link', pgtype='string'), + Column('body', pgtype='string'), + Column('displayname', pgtype='string'), + Column('friendscount', pgtype='number'), + Column('postedtime', pgtype='date'), ) + cols = [col.name for col in columns] df = cc.fetch(''' SELECT {cols}, '02-06-1429'::date as invalid_df_date FROM tweets_obama @@ -573,7 +586,7 @@ def test_cartocontext_fetch_with_cte(self): msg='Should have the columns requested') # should have exected schema - expected_dtypes = ('int64', 'float64') + expected_dtypes = ('float64', 'float64') self.assertTupleEqual( tuple(str(d) for d in df.dtypes), expected_dtypes, @@ -599,13 +612,13 @@ def test_cartocontext_fetch_with_decode_geom(self): 'Should be a pandas DataFrame') # same column names - requested_cols = {'the_geom', 'i', 'geometry'} + requested_cols = {'geometry', 'i'} self.assertSetEqual(requested_cols, set(df.columns), msg='Should have the columns requested') # should have exected schema - expected_dtypes = ('object', 'int64', 'object') + expected_dtypes = ('object', 'float64') self.assertTupleEqual( tuple(str(d) for d in df.dtypes), expected_dtypes, @@ -637,6 +650,35 @@ def test_cartocontext_fetch_with_exception(self): FROM cte ''') + @unittest.skipIf(WILL_SKIP, 'no carto credentials, skipping') + def test_cartocontext_execute(self): + """context.CartoContext.execute""" + cc = cartoframes.CartoContext(base_url=self.baseurl, + api_key=self.apikey) + + df = pd.DataFrame({'vals': list('abcd'), 'ids': list('wxyz')}) + df = df.astype({'vals': str, 'ids': str}) + cc.write(df, self.test_write_table, overwrite=True) + + self.assertEquals(Dataset.from_table(context=cc, table_name=self.test_write_table).exists(), True) + + cc.execute(''' + DROP TABLE {table_name} + '''.format(table_name=self.test_write_table)) + + self.assertEquals(Dataset.from_table(context=cc, table_name=self.test_write_table).exists(), False) + + @unittest.skipIf(WILL_SKIP, 'no carto credentials, skipping') + def test_cartocontext_execute_wrong_query(self): + """context.CartoContext.execute""" + cc = cartoframes.CartoContext(base_url=self.baseurl, + api_key=self.apikey) + + with self.assertRaises(CartoException): + cc.execute(''' + DROPP TABLE {table_name} + '''.format(table_name=self.test_write_table)) + @unittest.skipIf(WILL_SKIP, 'no carto credentials, skipping this test') def test_cartocontext_map(self): """context.CartoContext.map normal usage""" @@ -883,64 +925,6 @@ def test_cartocontext_check_query(self): with self.assertRaises(ValueError): cc._check_query(success_query, style_cols=fail_cols) - @unittest.skipIf(WILL_SKIP, 'no carto credentials, skipping this test') - def test_add_encoded_geom(self): - """context._add_encoded_geom""" - from cartoframes.context import _add_encoded_geom, _encode_geom - cc = cartoframes.CartoContext(base_url=self.baseurl, - api_key=self.apikey) - - # encode_geom=True adds a column called 'geometry' - df = cc.read(self.test_read_table, limit=5, - decode_geom=True) - - # alter the geometry - df['geometry'] = df['geometry'].apply(lambda x: x.buffer(0.1)) - - # the_geom should reflect encoded 'geometry' column - _add_encoded_geom(df, 'geometry') - - # geometry column should equal the_geom after function call - self.assertTrue(df['the_geom'].equals(df['geometry'].apply(_encode_geom))) - - # don't specify geometry column (should exist since decode_geom==True) - df = cc.read(self.test_read_table, limit=5, - decode_geom=True) - df['geometry'] = df['geometry'].apply(lambda x: x.buffer(0.2)) - - # the_geom should reflect encoded 'geometry' column - _add_encoded_geom(df, None) - - # geometry column should equal the_geom after function call - self.assertTrue(df['the_geom'].equals(df['geometry'].apply(_encode_geom))) - - df = cc.read(self.test_read_table, limit=5) - - # raise error if 'geometry' column does not exist - with self.assertRaises(KeyError): - _add_encoded_geom(df, None) - - def test_decode_geom(self): - """context._decode_geom""" - from cartoframes.context import _decode_geom - # Point (0, 0) without SRID - ewkb = '010100000000000000000000000000000000000000' - decoded_geom = _decode_geom(ewkb) - self.assertEqual(decoded_geom.wkt, 'POINT (0 0)') - self.assertIsNone(_decode_geom(None)) - - def test_encode_geom(self): - """context._encode_geom""" - from cartoframes.context import _encode_geom - from shapely import wkb - import binascii as ba - # Point (0 0) without SRID - ewkb = '010100000000000000000000000000000000000000' - geom = wkb.loads(ba.unhexlify(ewkb)) - ewkb_resp = _encode_geom(geom) - self.assertEqual(ewkb_resp, ewkb) - self.assertIsNone(_encode_geom(None)) - def test_debug_print(self): """context._debug_print""" cc = cartoframes.CartoContext(base_url=self.baseurl, @@ -1106,17 +1090,14 @@ def test_data(self): data = cc.data(self.test_data_table, meta) anscols = set(meta['suggested_name']) origcols = set(cc.read(self.test_data_table, limit=1, decode_geom=True).columns) - self.assertSetEqual(anscols, set(data.columns) - origcols) + self.assertSetEqual(anscols, set(data.columns) - origcols - {'the_geom', 'cartodb_id'}) meta = [{'numer_id': 'us.census.acs.B19013001', 'geom_id': 'us.census.tiger.block_group', 'numer_timespan': '2011 - 2015'}, ] data = cc.data(self.test_data_table, meta) self.assertSetEqual(set(('median_income_2011_2015', )), - set(data.columns) - origcols) - - # with self.assertRaises(NotImplementedError): - # cc.data(self.test_data_table, meta, how='geom_ref') + set(data.columns) - origcols - {'the_geom', 'cartodb_id'}) with self.assertRaises(ValueError, msg='no measures'): meta = cc.data_discovery('United States', keywords='not a measure') @@ -1139,16 +1120,21 @@ def test_data_with_persist_as(self): data = cc.data(self.test_data_table, meta) anscols = set(meta['suggested_name']) origcols = set(cc.read(self.test_data_table, limit=1, decode_geom=True).columns) - self.assertSetEqual(anscols, set(data.columns) - origcols) + self.assertSetEqual(anscols, set(data.columns) - origcols - {'the_geom', 'cartodb_id'}) meta = [{'numer_id': 'us.census.acs.B19013001', 'geom_id': 'us.census.tiger.block_group', 'numer_timespan': '2011 - 2015'}, ] data = cc.data(self.test_data_table, meta, persist_as=self.test_write_table) self.assertSetEqual(set(('median_income_2011_2015', )), - set(data.columns) - origcols) + set(data.columns) - origcols - {'the_geom', 'cartodb_id'}) + self.assertEqual(data.index.name, 'cartodb_id') + self.assertEqual(data.index.dtype, 'int64') + + df = cc.read(self.test_write_table, decode_geom=False) - df = cc.read(self.test_write_table, decode_geom=True) + self.assertEqual(df.index.name, 'cartodb_id') + self.assertEqual(data.index.dtype, 'int64') # same number of rows self.assertEqual(len(df), len(data), diff --git a/test/test_dataset.py b/test/test_dataset.py index bd7e00e49..b731bb4b4 100644 --- a/test/test_dataset.py +++ b/test/test_dataset.py @@ -10,7 +10,7 @@ from carto.exceptions import CartoException from cartoframes.context import CartoContext -from cartoframes import Dataset +from cartoframes.dataset import Dataset, _decode_geom from cartoframes.columns import normalize_name from utils import _UserUrlLoader @@ -258,6 +258,13 @@ def test_cartocontext_write_if_exists_replace(self): result = self.cc.sql_client.send('SELECT * FROM {} WHERE the_geom IS NOT NULL'.format(self.test_write_table)) self.assertEqual(result['total_rows'], 2049) + def test_decode_geom(self): + # Point (0, 0) without SRID + ewkb = '010100000000000000000000000000000000000000' + decoded_geom = _decode_geom(ewkb) + self.assertEqual(decoded_geom.wkt, 'POINT (0 0)') + self.assertIsNone(_decode_geom(None)) + # FIXME does not work in python 2.7 (COPY stucks and blocks the table, fix after # https://github.com/CartoDB/CartoDB-SQL-API/issues/579 is fixed) # @unittest.skipIf(WILL_SKIP, 'no carto credentials, skipping this test') diff --git a/test/test_utils.py b/test/test_utils.py index 2b5ad0f4a..cceb852f7 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -137,44 +137,3 @@ def test_dtypes2pg(self): } for i in results: self.assertEqual(dtypes2pg(i), results[i]) - - def test_pg2dtypes(self): - """context._pg2dtypes""" - from cartoframes.utils import pg2dtypes - results = { - 'date': 'datetime64[ns]', - 'number': 'float64', - 'string': 'object', - 'boolean': 'bool', - 'geometry': 'object', - 'unknown_pgdata': 'object' - } - for i in results: - result = pg2dtypes(i) - self.assertEqual(result, results[i]) - - def test_df2pg_schema(self): - """utils.df2pg_schema""" - from cartoframes.utils import df2pg_schema - data = [{'id': 'a', 'val': 1.1, 'truth': True, 'idnum': 1}, - {'id': 'b', 'val': 2.2, 'truth': True, 'idnum': 2}, - {'id': 'c', 'val': 3.3, 'truth': False, 'idnum': 3}] - df = pd.DataFrame(data).astype({'id': 'object', - 'val': float, - 'truth': bool, - 'idnum': int}) - # specify order of columns - df = df[['id', 'val', 'truth', 'idnum']] - pgcols = ['id', 'val', 'truth', 'idnum'] - ans = ('NULLIF("id", \'\')::text AS id, ' - 'NULLIF("val", \'\')::numeric AS val, ' - 'NULLIF("truth", \'\')::boolean AS truth, ' - 'NULLIF("idnum", \'\')::numeric AS idnum') - - self.assertEqual(ans, df2pg_schema(df, pgcols)) - - # add the_geom - df['the_geom'] = 'Point(0 0)' - ans = '\"the_geom\", ' + ans - pgcols.append('the_geom') - self.assertEqual(ans, df2pg_schema(df, pgcols))