Skip to content

Commit

Permalink
Merge 0545d64 into 36fff18
Browse files Browse the repository at this point in the history
  • Loading branch information
jgoizueta committed Oct 8, 2019
2 parents 36fff18 + 0545d64 commit c837747
Show file tree
Hide file tree
Showing 5 changed files with 487 additions and 414 deletions.
4 changes: 2 additions & 2 deletions cartoframes/data/services/__init__.py
@@ -1,9 +1,9 @@
from __future__ import absolute_import

from .geocode import Geocode
from .geocoding import Geocoding
from .isolines import Isolines

__all__ = [
'Geocode',
'Geocoding',
'Isolines'
]
Expand Up @@ -13,6 +13,7 @@

HASH_COLUMN = 'carto_geocode_hash'
BATCH_SIZE = 200
DEFAULT_STATUS = {'gc_status_rel': 'relevance'}


QUOTA_SERVICE = 'hires_geocoder'
Expand Down Expand Up @@ -134,7 +135,7 @@ def _posterior_summary_query(table):
)


def _geocode_query(table, street, city, state, country, metadata):
def _geocode_query(table, street, city, state, country, status):
hash_expression = _hash_expr(street, city, state, country)
query = """
SELECT * FROM {table} WHERE {needs_geocoding}
Expand All @@ -160,15 +161,13 @@ def _geocode_query(table, street, city, state, country, metadata):
batch_size=BATCH_SIZE
)

metadata_assignment = ''
if metadata:
metadata_assignment = '{col} = _g.metadata,'.format(col=metadata)
status_assignment, status_columns = _status_assignment(status)

return """
query = """
UPDATE {table}
SET
the_geom = _g.the_geom,
{metadata_assignment}
{status_assignment}
{hash_column} = {hash_expression}
FROM (SELECT * FROM {geocode_expression}) _g
WHERE _g.cartodb_id = {table}.cartodb_id
Expand All @@ -177,9 +176,54 @@ def _geocode_query(table, street, city, state, country, metadata):
hash_column=HASH_COLUMN,
hash_expression=hash_expression,
geocode_expression=geocode_expression,
metadata_assignment=metadata_assignment
status_assignment=status_assignment
)

return (query, status_columns)


STATUS_FIELDS = {
'relevance': ('numeric', "(_g.metadata->>'relevance')::numeric"),
'precision': ('text', "_g.metadata->>'precision'"),
'match_types': ('text', "cdb_dataservices_client.cdb_jsonb_array_casttext(_g.metadata->>'match_types')"),
'*': ('jsonb', "_g.metadata")
}
STATUS_FIELDS_KEYS = sorted(STATUS_FIELDS.keys())


def _status_column(column_name, field):
column_type, value = STATUS_FIELDS[field]
return (column_name, column_type, value)


def _column_assignment(column_name, value):
return '{} = {},'.format(column_name, value)


def _status_assignment(status):
status_assignment = ''
status_columns = []
if isinstance(status, dict):
# new style: define status assignments with dictionary
# {'column_name': 'status_field', ...}
# allows to assign individual status attributes to columns
invalid_fields = _list_difference(status.values(), STATUS_FIELDS_KEYS)
if any(invalid_fields):
raise ValueError("Invalid status fields {} valid keys are: {}".format(
invalid_fields,
STATUS_FIELDS_KEYS))
columns = [_status_column(name, field) for name, field in list(status.items())]
status_assignments = [_column_assignment(name, value) for name, _, value in columns]
status_columns = [(name, type_) for name, type_, _ in columns]
status_assignment = ''.join(status_assignments)
elif status:
# old style: column name
# stores all status as json in a single column
name, type_, value = _status_column(status, '*')
status_columns = [(name, type_)]
status_assignment = _column_assignment(name, value)
return (status_assignment, status_columns)


def _hash_as_big_int(text):
# Calculate a positive bigint hash, hence the 63-bit mask.
Expand All @@ -189,11 +233,12 @@ def _hash_as_big_int(text):
def _set_pre_summary_info(summary, output):
logging.debug(summary)
output['total_rows'] = sum(summary.values())
output['required_quota'] = sum([summary[s] for s in ['new_geocoded', 'new_nongeocoded', 'changed_geocoded', 'changed_nongeocoded']])
output['required_quota'] = sum(
[summary[s] for s in ['new_geocoded', 'new_nongeocoded', 'changed_geocoded', 'changed_nongeocoded']])
output['previously_geocoded'] = summary.get('previously_geocoded', 0)
output['previously_failed'] = summary.get('previously_nongeocoded', 0)
output['records_with_geometry'] = sum([summary[s] for s in ['new_geocoded', 'changed_geocoded', 'previously_geocoded']])
# output['records_without_geometry'] = sum([summary[s] for s in ['new_nongeocoded', 'changed_nongeocoded', 'previously_nongeocoded']])
output['records_with_geometry'] = sum(
summary[s] for s in ['new_geocoded', 'changed_geocoded', 'previously_geocoded'])


def _set_post_summary_info(summary, result, output):
Expand All @@ -203,7 +248,8 @@ def _set_post_summary_info(summary, result, output):
output['final_records_with_geometry'] = geom_count
# output['final_records_without_geometry'] = null_geom_count
output['geocoded_increment'] = output['final_records_with_geometry'] - output['records_with_geometry']
output['successfully_geocoded'] = output['geocoded_increment'] + sum([summary[s] for s in ['new_geocoded', 'changed_geocoded']])
new_or_changed = sum([summary[s] for s in ['new_geocoded', 'changed_geocoded']])
output['successfully_geocoded'] = output['geocoded_increment'] + new_or_changed
output['failed_geocodings'] = output['required_quota'] - output['successfully_geocoded']


Expand Down Expand Up @@ -238,7 +284,10 @@ def _column_or_value_arg(arg, valid_columns=None):
if any(invalid_keys):
invalid_keys_list = ', '.join(list(invalid_keys))
valid_keys_list = ', '.join(VALID_GEOCODE_KEYS)
raise ValueError("Invalid key for argument {} valid keys are: {}".format(invalid_keys_list, valid_keys_list))
raise ValueError("Invalid key for argument {} valid keys are: {}".format(
invalid_keys_list,
valid_keys_list)
)
if len(arg.keys()) != 1:
valid_keys_list = ', '.join(VALID_GEOCODE_KEYS)
raise ValueError("Exactly one key of {} must be present in argument".format(valid_keys_list))
Expand All @@ -256,8 +305,8 @@ def _column_or_value_arg(arg, valid_columns=None):
return arg


class Geocode(Service):
"""Geocode using CARTO data services.
class Geocoding(Service):
"""Geocoding using CARTO data services.
This requires a CARTO account with and API key that allows for using geocoding services;
(through explicit argument in constructor or via the default credentials).
Use of these methods will incur in geocoding credit consumption for the provided account.
Expand All @@ -268,10 +317,10 @@ class Geocode(Service):
.. code::
from data.services import Geocode
from data.services import Geocoding
from cartoframes.auth import set_default_credentials
set_default_credentials('YOUR_USER_NAME', 'YOUR_API_KEY')
gc = Geocode()
gc = Geocoding()
_, info = gc.geocode(dataset, street='address', dry_run=True)
print(info['required_quota'])
Expand All @@ -280,13 +329,13 @@ class Geocode(Service):
.. code::
import pandas
from data.services import Geocode
from data.services import Geocoding
from cartoframes.data import Dataset
from cartoframes.auth import set_default_credentials
set_default_credentials('YOUR_USER_NAME', 'YOUR_API_KEY')
dataframe = pandas.DataFrame([['Gran Vía 46', 'Madrid'], ['Ebro 1', 'Sevilla']], columns=['address','city'])
gc = Geocode()
gc = Geocoding()
geocoded_dataframe, info = gc.geocode(dataframe, street='address', city='city', country={'value': 'Spain'})
print(geocoded_dataframe)
Expand All @@ -295,13 +344,13 @@ class Geocode(Service):
.. code::
import pandas
from data.services import Geocode
from data.services import Geocoding
from cartoframes.data import Dataset
from cartoframes.auth import set_default_credentials
set_default_credentials('YOUR_USER_NAME', 'YOUR_API_KEY')
dataset = Dataset('YOUR_TABLE_NAME')
gc = Geocode()
gc = Geocoding()
geocoded_dataset, info = gc.geocode(dataset, street='address', city='city', country={'value': 'Spain'})
print(geocoded_dataset.download())
Expand All @@ -310,25 +359,25 @@ class Geocode(Service):
.. code::
import pandas
from data.services import Geocode
from data.services import Geocoding
from cartoframes.data import Dataset
from cartoframes.auth import set_default_credentials
set_default_credentials('YOUR_USER_NAME', 'YOUR_API_KEY')
df = pandas.DataFrame([['Gran Vía 46', 'Madrid'], ['Ebro 1', 'Sevilla']], columns=['address','city'])
gc = Geocode()
df, info = gc.geocode(df, street='address', city='city', country={'value': 'Spain'}, metadata='meta')
gc = Geocoding()
df = gc.geocode(df, street='address', city='city', country={'value': 'Spain'}, status=['relevance']).data
# show rows with relevance greater than 0.7:
print(df[df.apply(lambda x: json.loads(x['meta'])['relevance']>0.7, axis=1)])
print(df[df['carto_geocode_relevance']>0.7, axis=1)])
"""

def __init__(self, credentials=None):
super(Geocode, self).__init__(credentials=credentials, quota_service=QUOTA_SERVICE)
super(Geocoding, self).__init__(credentials=credentials, quota_service=QUOTA_SERVICE)

def geocode(self, dataset, street,
city=None, state=None, country=None,
metadata=None,
status=DEFAULT_STATUS,
table_name=None, if_exists=Dataset.FAIL,
dry_run=False):
"""Geocode a dataset
Expand All @@ -348,8 +397,13 @@ def geocode(self, dataset, street,
with the name of a column containing the addresses' country names or
a `value` key with a literal country value value, e.g. 'US'.
It also accepts a string, in which case `column` is implied.
metadata (str, optional): name of a column where metadata (in JSON format)
will be stored (see https://carto.com/developers/data-services-api/reference/)
status (dict, optional): dictionary that defines a mapping from geocoding state
attributes ('relevance', 'precision', 'match_types') to column names.
(See https://carto.com/developers/data-services-api/reference/)
Columns will be added to the result data for the requested attributes.
By default a column ``gc_status_rel`` will be created for the geocoding
_relevance_. The special attribute '*' refers to all the status
attributes as a JSON object.
table_name (str, optional): the geocoding results will be placed in a new
CARTO table with this name.
if_exists (str, optional): Behavior for creating new datasets, only applicable
Expand All @@ -359,8 +413,13 @@ def geocode(self, dataset, street,
check the needed quota)
Returns:
Result: (Dataset, info_dict)
A named-tuple ``(data, metadata)`` containing either a ``data`` Dataset or DataFrame
(same type as the input) and a ``metadata`` dictionary with global information
about the geocoding process
The data contains a ``the_geom`` column with point locations for the geocoded addresses
and also a ``carto_geocode_hash`` that, if preserved, can avoid re-geocoding
unchanged data in future calls to geocode.
"""

input_dataframe = None
Expand All @@ -377,7 +436,7 @@ def geocode(self, dataset, street,

input_table_name, is_temporary = self._table_for_geocoding(dataset, table_name, if_exists)

result_info = self._geocode(input_table_name, street, city, state, country, metadata, dry_run)
result_info = self._geocode(input_table_name, street, city, state, country, status, dry_run)

if dry_run:
result_dataset = dataset
Expand Down Expand Up @@ -408,7 +467,7 @@ def geocode(self, dataset, street,
def _table_for_geocoding(self, dataset, table_name, if_exists):
temporary_table = False
input_dataset = dataset
if input_dataset.is_remote() and input_dataset.table_name: # FIXME: more robust to check first for query (hasattr(input_dataset, 'query'))
if input_dataset.is_remote() and input_dataset.table_name:
# input dataset is a table
if table_name:
# Copy input dataset into a new table
Expand Down Expand Up @@ -445,7 +504,7 @@ def _cleanup_geocoded_table(self, input_table_name, is_temporary):
# receiving geocoding results instead of storing in a table, etc.
# But that would make transition to using AFW harder.

def _geocode(self, table_name, street, city=None, state=None, country=None, metadata=None, dry_run=False):
def _geocode(self, table_name, street, city=None, state=None, country=None, status=None, dry_run=False):
# Internal Geocoding implementation.
# Geocode a table's rows not already geocoded in a dataset'

Expand All @@ -454,7 +513,7 @@ def _geocode(self, table_name, street, city=None, state=None, country=None, meta
logging.info('city = "%s"' % city)
logging.info('state = "%s"' % state)
logging.info('country = "%s"' % country)
logging.info('metadata = "%s"' % metadata)
logging.info('status = "%s"' % status)
logging.info('dry_run = "%s"' % dry_run)

output = {}
Expand Down Expand Up @@ -488,20 +547,17 @@ def _geocode(self, table_name, street, city=None, state=None, country=None, meta
output['error'] = 'The table is already being geocoded'
output['aborted'] = aborted = True
else:
# Create column to store input search hash
add_columns = [(HASH_COLUMN, 'text')]
sql, add_columns = _geocode_query(table_name, street, city, state, country, status)

if metadata:
# Create column to store result metadata
add_columns.append((metadata, 'jsonb'))
add_columns += [(HASH_COLUMN, 'text')]

logging.info("Adding columns {} if needed".format(', '.join([c[0] for c in add_columns])))
alter_sql = "ALTER TABLE {table} {add_columns};".format(
table=table_name,
add_columns=','.join(['ADD COLUMN IF NOT EXISTS {} {}'.format(name, type) for name, type in add_columns]))
add_columns=','.join([
'ADD COLUMN IF NOT EXISTS {} {}'.format(name, type) for name, type in add_columns]))
self._execute_query(alter_sql)

sql = _geocode_query(table_name, street, city, state, country, metadata)
logging.debug("Executing query: %s" % sql)
result = None
try:
Expand Down

0 comments on commit c837747

Please sign in to comment.