Skip to content

Commit

Permalink
Merge 3155cf8 into a66a09f
Browse files Browse the repository at this point in the history
  • Loading branch information
alejandrohall committed Oct 23, 2019
2 parents a66a09f + 3155cf8 commit 66cee9b
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 230 deletions.
76 changes: 38 additions & 38 deletions cartoframes/data/enrichment/enrichment_service.py
Expand Up @@ -8,7 +8,8 @@
from ...exceptions import EnrichmentException
from ...auth import get_default_credentials
from ...utils.geom_utils import _compute_geometry_from_geom
from ..observatory.variable import Variable
from ..observatory import Variable
from ..observatory import CatalogDataset


_ENRICHMENT_ID = 'enrichment_id'
Expand All @@ -27,7 +28,11 @@ def enrich(query_function, **kwargs):

queries = _enrichment_queries(user_dataset, tablename, query_function, **kwargs)

return _execute_enrichment(bq_client, queries, data_copy, kwargs['data_geom_column'])
data_enriched = _execute_enrichment(bq_client, queries, data_copy, kwargs['data_geom_column'])

data_enriched[kwargs['data_geom_column']] = _compute_geometry_from_geom(data_enriched[kwargs['data_geom_column']])

return data_enriched


def _get_credentials(credentials=None):
Expand Down Expand Up @@ -58,11 +63,11 @@ def _upload_dataframe(bq_client, user_dataset, data_copy, data_geom_column):


def _enrichment_queries(user_dataset, tablename, query_function, **kwargs):
is_polygon_enrichment = 'agg_operators' in kwargs
variables = __process_variables(kwargs['variables'], is_polygon_enrichment)

variables = __process_variables(kwargs['variables'])

table_to_geotable, table_to_variables, table_to_project, table_to_dataset =\
__process_enrichment_variables(variables, user_dataset)
table_to_geotable, table_to_variables,\
table_to_project, table_to_dataset = __process_enrichment_variables(variables, user_dataset)

filters_str = __process_filters(kwargs['filters'])

Expand Down Expand Up @@ -117,8 +122,7 @@ def __copy_data_and_generate_enrichment_id(data, enrichment_id_column, geometry_
return data_copy


def __process_variables(variables):

def __process_variables(variables, is_polygon_enrichment):
variables_result = list()
if isinstance(variables, Variable):
variables_result = [variables]
Expand All @@ -128,12 +132,15 @@ def __process_variables(variables):
first_element = variables[0]

if isinstance(first_element, str):
variables_result = [Variable.get(variable) for variable in variables]
variables_result = Variable.get_list(variables)
else:
variables_result = variables
else:
raise EnrichmentException('Variable(s) to enrich should be an instance of Variable / CatalogList / str / list')

if is_polygon_enrichment:
variables_result = [variable for variable in variables_result if variable.agg_method is not None]

return variables_result


Expand All @@ -153,21 +160,22 @@ def __process_filters(filters_dict):


def __process_agg_operators(agg_operators, variables):
agg_operators_result = agg_operators.copy()

for variable in variables:
if variable.column_name not in agg_operators_result:
agg_operators_result[variable.column_name] = variable.agg_method

return agg_operators_result
if isinstance(agg_operators, str):
agg_operators_result = dict()

for variable in variables:
agg_operators_result[variable.column_name] = agg_operators

def __get_tables_and_variables(variables, user_dataset):
elif isinstance(agg_operators, dict):
agg_operators_result = agg_operators.copy()

table_to_geotable, table_to_variables, table_to_project, table_to_dataset =\
__process_enrichment_variables(variables, user_dataset)
for variable in variables:
if variable.column_name not in agg_operators_result:
agg_operators_result[variable.column_name] = variable.agg_method
else:
raise EnrichmentException('agg_operators param must be a string or a dict')

return table_to_geotable, table_to_variables, table_to_project, table_to_dataset
return agg_operators_result


def __process_enrichment_variables(variables, user_dataset):
Expand All @@ -181,11 +189,12 @@ def __process_enrichment_variables(variables, user_dataset):
dataset_name = variable.schema_name
table_name = variable.dataset_name
variable_name = variable.column_name
dataset_geotable, geotable = __get_properties_geotable(variable)

if project_name != _PUBLIC_PROJECT:
table_name = '{dataset}_{table}'.format(dataset=dataset_name,
table=table_name,
user_dataset=user_dataset)
table_name = 'view_{dataset}_{table}'.format(dataset=dataset_name,
table=table_name,
user_dataset=user_dataset)

if table_name not in table_to_dataset:
if project_name != _PUBLIC_PROJECT:
Expand All @@ -194,13 +203,9 @@ def __process_enrichment_variables(variables, user_dataset):
table_to_dataset[table_name] = _PUBLIC_DATASET

if table_name not in table_to_geotable:
geotable = __get_name_geotable_from_datatable(table_name)

if project_name != _PUBLIC_PROJECT:
geotable = '{dataset}_{geotable}'.format(dataset=dataset_name,
geotable=geotable,
user_dataset=user_dataset)

geotable = 'view_{dataset}_{geotable}'.format(dataset=dataset_geotable,
geotable=geotable)
table_to_geotable[table_name] = geotable

if table_name not in table_to_project:
Expand All @@ -214,15 +219,10 @@ def __process_enrichment_variables(variables, user_dataset):
return table_to_geotable, table_to_variables, table_to_project, table_to_dataset


def __get_name_geotable_from_datatable(datatable):

datatable_split = datatable.split('_')
def __get_properties_geotable(variable):

if len(datatable_split) == 8:
geo_information = datatable_split[3:6]
elif len(datatable_split) == 7:
geo_information = datatable_split[2:5]
geography_id = CatalogDataset.get(variable.dataset).geography

geotable = 'geography_{geo_information_joined}'.format(geo_information_joined='_'.join(geo_information))
_, geo_dataset, geo_table = geography_id.split('.')

return geotable
return geo_dataset, geo_table
74 changes: 0 additions & 74 deletions cartoframes/data/enrichment/enrichment_utils.py

This file was deleted.

56 changes: 28 additions & 28 deletions cartoframes/data/enrichment/points_enrichment.py
Expand Up @@ -12,6 +12,28 @@ def enrich_points(data, variables, data_geom_column='geometry', filters=dict(),
your points with our geographies. Extra columns as area and population will be provided
with the aims of normalize these columns.
Args:
data (:py:class:`Dataset <cartoframes.data.Dataset>`, DataFrame, GeoDataFrame):
a Dataset, DataFrame or GeoDataFrame object to be enriched.
variables (:py:class:`Variable <cartoframes.data.observatory.Catalog>`, CatalogList, list, str):
variable(s), discovered through Catalog, for enriching the `data` argument.
data_geom_column (str): string indicating the 4326 geometry column in `data`.
filters (dict, optional): dictionary with either a `column` key
with the name of the column to filter or a `value` value with the value to filter by.
Filters will be used using the `AND` operator
credentials (:py:class:`Credentials <cartoframes.auth.Credentials>`, optional):
credentials of user account. If not provided,
a default credentials (if set with :py:meth:`set_default_credentials
<cartoframes.auth.set_default_credentials>`) will attempted to be
used.
Returns:
A dataframe as the provided one but with the variables to enrich appended to it
Note that if the geometry of the `data` you provide intersects with more than one geometry
in the enrichment dataset, the number of rows of the returned dataframe could be different
than the `data` argument number of rows.
Examples:
Enrich a points dataset with Catalog classes:
Expand Down Expand Up @@ -55,28 +77,6 @@ def enrich_points(data, variables, data_geom_column='geometry', filters=dict(),
variables = Catalog().country('usa').category('demographics').datasets[0].variables
filters = {'do_date': '2019-09-01'}
dataset_enrich = enrichment.enrich_points(dataset, variables, filters)
Args:
data (:py:class:`Dataset <cartoframes.data.Dataset>`, DataFrame, GeoDataFrame):
a Dataset, DataFrame or GeoDataFrame object to be enriched.
variables (:py:class:`Variable <cartoframes.data.observatory.Catalog>`, CatalogList, list, str):
variable(s), discovered through Catalog, for enriching the `data` argument.
data_geom_column (str): string indicating the 4326 geometry column in `data`.
filters (dict, optional): dictionary with either a `column` key
with the name of the column to filter or a `value` value with the value to filter by.
Filters will be used using the `AND` operator
credentials (:py:class:`Credentials <cartoframes.auth.Credentials>`, optional):
credentials of user account. If not provided,
a default credentials (if set with :py:meth:`set_default_credentials
<cartoframes.auth.set_default_credentials>`) will attempted to be
used.
Returns:
A dataframe as the provided one but with the variables to enrich appended to it
Note that if the geometry of the `data` you provide intersects with more than one geometry
in the enrichment dataset, the number of rows of the returned dataframe could be different
than the `data` argument number of rows.
"""

data_enriched = enrich(_prepare_sql, data=data, variables=variables, data_geom_column=data_geom_column,
Expand All @@ -86,7 +86,8 @@ def enrich_points(data, variables, data_geom_column='geometry', filters=dict(),


def _prepare_sql(enrichment_id, filters_processed, table_to_geotable, table_to_variables,
table_to_project, table_to_dataset, user_dataset, working_project, data_table, **kwargs):
table_to_project, table_to_dataset, user_dataset, working_project,
data_table, **kwargs):

sqls = list()

Expand All @@ -95,20 +96,19 @@ def _prepare_sql(enrichment_id, filters_processed, table_to_geotable, table_to_v
sql = '''
SELECT data_table.{enrichment_id},
{variables},
ST_Area(enrichment_geo_table.geom) AS {variables_underscored}_area,
NULL AS {variables_underscored}_population
ST_Area(enrichment_geo_table.geom) AS {enrichment_table}_area
FROM `{project}.{dataset}.{enrichment_table}` enrichment_table
JOIN `{project}.{dataset}.{enrichment_geo_table}` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `{working_project}.{user_dataset}.{data_table}` data_table
ON ST_Within(data_table.{data_geom_column}, enrichment_geo_table.geom)
{filters};
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables),
variables_underscored='_'.join(variables), enrichment_table=table,
'''.format(enrichment_id=enrichment_id, enrichment_table=table,
enrichment_geo_table=table_to_geotable[table], user_dataset=user_dataset,
working_project=working_project, data_table=data_table,
data_geom_column=kwargs['data_geom_column'], filters=filters_processed,
project=table_to_project[table], dataset=table_to_dataset[table])
project=table_to_project[table], dataset=table_to_dataset[table],
variables=', '.join(['enrichment_table.{}'.format(variable) for variable in variables]))

sqls.append(sql)

Expand Down

0 comments on commit 66cee9b

Please sign in to comment.