Skip to content

Commit

Permalink
Merge 23e50ea into 6a43dba
Browse files Browse the repository at this point in the history
  • Loading branch information
alejandrohall committed Oct 22, 2019
2 parents 6a43dba + 23e50ea commit d4a869c
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 154 deletions.
76 changes: 38 additions & 38 deletions cartoframes/data/enrichment/enrichment_service.py
Expand Up @@ -8,7 +8,8 @@
from ...exceptions import EnrichmentException
from ...auth import get_default_credentials
from ...utils.geom_utils import _compute_geometry_from_geom
from ..observatory.variable import Variable
from ..observatory import Variable
from ..observatory import CatalogDataset


_ENRICHMENT_ID = 'enrichment_id'
Expand All @@ -27,7 +28,11 @@ def enrich(query_function, **kwargs):

queries = _enrichment_queries(user_dataset, tablename, query_function, **kwargs)

return _execute_enrichment(bq_client, queries, data_copy, kwargs['data_geom_column'])
data_enriched = _execute_enrichment(bq_client, queries, data_copy, kwargs['data_geom_column'])

data_enriched[kwargs['data_geom_column']] = _compute_geometry_from_geom(data_enriched[kwargs['data_geom_column']])

return data_enriched


def _get_credentials(credentials=None):
Expand Down Expand Up @@ -58,11 +63,11 @@ def _upload_dataframe(bq_client, user_dataset, data_copy, data_geom_column):


def _enrichment_queries(user_dataset, tablename, query_function, **kwargs):
is_polygon_enrichment = 'agg_operators' in kwargs
variables = __process_variables(kwargs['variables'], is_polygon_enrichment)

variables = __process_variables(kwargs['variables'])

table_to_geotable, table_to_variables, table_to_project, table_to_dataset =\
__process_enrichment_variables(variables, user_dataset)
table_to_geotable, table_to_variables,\
table_to_project, table_to_dataset = __process_enrichment_variables(variables, user_dataset)

filters_str = __process_filters(kwargs['filters'])

Expand Down Expand Up @@ -117,8 +122,7 @@ def __copy_data_and_generate_enrichment_id(data, enrichment_id_column, geometry_
return data_copy


def __process_variables(variables):

def __process_variables(variables, is_polygon_enrichment):
variables_result = list()
if isinstance(variables, Variable):
variables_result = [variables]
Expand All @@ -128,12 +132,15 @@ def __process_variables(variables):
first_element = variables[0]

if isinstance(first_element, str):
variables_result = [Variable.get(variable) for variable in variables]
variables_result = Variable.get_list(variables)
else:
variables_result = variables
else:
raise EnrichmentException('Variable(s) to enrich should be an instance of Variable / CatalogList / str / list')

if is_polygon_enrichment:
variables_result = [variable for variable in variables_result if variable.agg_method is not None]

return variables_result


Expand All @@ -153,21 +160,22 @@ def __process_filters(filters_dict):


def __process_agg_operators(agg_operators, variables):
agg_operators_result = agg_operators.copy()

for variable in variables:
if variable.column_name not in agg_operators_result:
agg_operators_result[variable.column_name] = variable.agg_method

return agg_operators_result
if isinstance(agg_operators, str):
agg_operators_result = dict()

for variable in variables:
agg_operators_result[variable.column_name] = agg_operators

def __get_tables_and_variables(variables, user_dataset):
elif isinstance(agg_operators, dict):
agg_operators_result = agg_operators.copy()

table_to_geotable, table_to_variables, table_to_project, table_to_dataset =\
__process_enrichment_variables(variables, user_dataset)
for variable in variables:
if variable.column_name not in agg_operators_result:
agg_operators_result[variable.column_name] = variable.agg_method
else:
raise EnrichmentException('agg_operators param must be a string or a dict')

return table_to_geotable, table_to_variables, table_to_project, table_to_dataset
return agg_operators_result


def __process_enrichment_variables(variables, user_dataset):
Expand All @@ -181,11 +189,12 @@ def __process_enrichment_variables(variables, user_dataset):
dataset_name = variable.schema_name
table_name = variable.dataset_name
variable_name = variable.column_name
dataset_geotable, geotable = __get_properties_geotable(variable)

if project_name != _PUBLIC_PROJECT:
table_name = '{dataset}_{table}'.format(dataset=dataset_name,
table=table_name,
user_dataset=user_dataset)
table_name = 'view_{dataset}_{table}'.format(dataset=dataset_name,
table=table_name,
user_dataset=user_dataset)

if table_name not in table_to_dataset:
if project_name != _PUBLIC_PROJECT:
Expand All @@ -194,13 +203,9 @@ def __process_enrichment_variables(variables, user_dataset):
table_to_dataset[table_name] = _PUBLIC_DATASET

if table_name not in table_to_geotable:
geotable = __get_name_geotable_from_datatable(table_name)

if project_name != _PUBLIC_PROJECT:
geotable = '{dataset}_{geotable}'.format(dataset=dataset_name,
geotable=geotable,
user_dataset=user_dataset)

geotable = 'view_{dataset}_{geotable}'.format(dataset=dataset_geotable,
geotable=geotable)
table_to_geotable[table_name] = geotable

if table_name not in table_to_project:
Expand All @@ -214,15 +219,10 @@ def __process_enrichment_variables(variables, user_dataset):
return table_to_geotable, table_to_variables, table_to_project, table_to_dataset


def __get_name_geotable_from_datatable(datatable):

datatable_split = datatable.split('_')
def __get_properties_geotable(variable):

if len(datatable_split) == 8:
geo_information = datatable_split[3:6]
elif len(datatable_split) == 7:
geo_information = datatable_split[2:5]
geography_id = CatalogDataset.get(variable.dataset).geography

geotable = 'geography_{geo_information_joined}'.format(geo_information_joined='_'.join(geo_information))
_, geo_dataset, geo_table = geography_id.split('.')

return geotable
return geo_dataset, geo_table
74 changes: 0 additions & 74 deletions cartoframes/data/enrichment/enrichment_utils.py

This file was deleted.

12 changes: 6 additions & 6 deletions cartoframes/data/enrichment/points_enrichment.py
Expand Up @@ -86,7 +86,8 @@ def enrich_points(data, variables, data_geom_column='geometry', filters=dict(),


def _prepare_sql(enrichment_id, filters_processed, table_to_geotable, table_to_variables,
table_to_project, table_to_dataset, user_dataset, working_project, data_table, **kwargs):
table_to_project, table_to_dataset, user_dataset, working_project,
data_table, **kwargs):

sqls = list()

Expand All @@ -95,20 +96,19 @@ def _prepare_sql(enrichment_id, filters_processed, table_to_geotable, table_to_v
sql = '''
SELECT data_table.{enrichment_id},
{variables},
ST_Area(enrichment_geo_table.geom) AS {variables_underscored}_area,
NULL AS {variables_underscored}_population
ST_Area(enrichment_geo_table.geom) AS {enrichment_table}_area
FROM `{project}.{dataset}.{enrichment_table}` enrichment_table
JOIN `{project}.{dataset}.{enrichment_geo_table}` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `{working_project}.{user_dataset}.{data_table}` data_table
ON ST_Within(data_table.{data_geom_column}, enrichment_geo_table.geom)
{filters};
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables),
variables_underscored='_'.join(variables), enrichment_table=table,
'''.format(enrichment_id=enrichment_id, enrichment_table=table,
enrichment_geo_table=table_to_geotable[table], user_dataset=user_dataset,
working_project=working_project, data_table=data_table,
data_geom_column=kwargs['data_geom_column'], filters=filters_processed,
project=table_to_project[table], dataset=table_to_dataset[table])
project=table_to_project[table], dataset=table_to_dataset[table],
variables=', '.join(['enrichment_table.{}'.format(variable) for variable in variables]))

sqls.append(sql)

Expand Down
26 changes: 14 additions & 12 deletions cartoframes/data/enrichment/polygons_enrichment.py
Expand Up @@ -3,7 +3,7 @@
from .enrichment_service import enrich


def enrich_polygons(data, variables, agg_operators, data_geom_column='geometry',
def enrich_polygons(data, variables, agg_operators=dict(), data_geom_column='geometry',
filters=dict(), credentials=None):
"""enrich_polygons
Expand Down Expand Up @@ -61,7 +61,7 @@ def enrich_polygons(data, variables, agg_operators, data_geom_column='geometry',
filters = {'do_date': '2019-09-01'}
dataset_enrich = enrichment.enrich_polygons(dataset, variables, filters)
Enrich a polygons dataset with custom aggregation methods:
Expand Down Expand Up @@ -133,31 +133,33 @@ def enrich_polygons(data, variables, agg_operators, data_geom_column='geometry',


def _prepare_sql(enrichment_id, filters_processed, table_to_geotable, table_to_variables,
table_to_project, table_to_dataset, user_dataset, working_project, data_table, **kwargs):
table_to_project, table_to_dataset, user_dataset, working_project,
data_table, **kwargs):

grouper = 'group by data_table.{enrichment_id}'.format(enrichment_id=enrichment_id)

sqls = list()

for table, variables in table_to_variables.items():
agg_operators = kwargs.get('agg_operators')

if 'agg_operators' in kwargs:
if agg_operators is not None:

if isinstance(kwargs['agg_operators'], str):
agg_operators = {variable: kwargs['agg_operators'] for variable in variables}
else:
agg_operators = kwargs['agg_operators']
if isinstance(agg_operators, str):
agg_operators = {variable: agg_operators for variable in variables}

variables_sql = ['{operator}({variable} * \
variables_sql = ['{operator}(enrichment_table.{variable} * \
(ST_Area(ST_Intersection(enrichment_geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}))) as {variable}'.format(variable=variable,
data_geom_column=kwargs['data_geom_column'],
operator=agg_operators[variable]) for variable in variables]

else:
variables_sql = variables + ['ST_Area(ST_Intersection(geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}) AS measures_proportion'.format(
data_geom_column=kwargs['data_geom_column'])]
variables_sql = ['enrichment_table.{}'.format(variable) for variable in variables] +\
['ST_Area(ST_Intersection(enrichment_geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}) AS measures_proportion'.format(
data_geom_column=kwargs['data_geom_column'])]

grouper = ''

sql = '''
Expand Down

0 comments on commit d4a869c

Please sign in to comment.