Skip to content

Commit

Permalink
Merge ab02cdd into 73b0315
Browse files Browse the repository at this point in the history
  • Loading branch information
alejandrohall committed Oct 15, 2019
2 parents 73b0315 + ab02cdd commit e2dc07f
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 75 deletions.
58 changes: 25 additions & 33 deletions cartoframes/data/enrichment/enrichment_service.py
Expand Up @@ -8,7 +8,8 @@
from ...exceptions import EnrichmentException
from ...auth import get_default_credentials
from ...utils.geom_utils import _compute_geometry_from_geom
from ..observatory.variable import Variable
from ..observatory import Variable
from ..observatory import Dataset as DatasetCatalog


_ENRICHMENT_ID = 'enrichment_id'
Expand All @@ -27,7 +28,11 @@ def enrich(query_function, **kwargs):

queries = _enrichment_queries(user_dataset, tablename, query_function, **kwargs)

return _execute_enrichment(bq_client, queries, data_copy, kwargs['data_geom_column'])
data_enriched = _execute_enrichment(bq_client, queries, data_copy, kwargs['data_geom_column'])

data_enriched[kwargs['data_geom_column']] = _compute_geometry_from_geom(data_enriched[kwargs['data_geom_column']])

return data_enriched


def _get_credentials(credentials=None):
Expand Down Expand Up @@ -58,11 +63,11 @@ def _upload_dataframe(bq_client, user_dataset, data_copy, data_geom_column):


def _enrichment_queries(user_dataset, tablename, query_function, **kwargs):
is_polygon_enrichment = 'agg_operators' in kwargs
variables = __process_variables(kwargs['variables'], is_polygon_enrichment)

variables = __process_variables(kwargs['variables'])

table_to_geotable, table_to_variables, table_to_project, table_to_dataset =\
__process_enrichment_variables(variables, user_dataset)
table_to_geotable, table_to_variables,\
table_to_project, table_to_dataset = __process_enrichment_variables(variables, user_dataset)

filters_str = __process_filters(kwargs['filters'])

Expand Down Expand Up @@ -117,7 +122,7 @@ def __copy_data_and_generate_enrichment_id(data, enrichment_id_column, geometry_
return data_copy


def __process_variables(variables):
def __process_variables(variables, is_polygon_enrichment):

variables_result = list()
if isinstance(variables, Variable):
Expand All @@ -134,6 +139,9 @@ def __process_variables(variables):
else:
raise EnrichmentException('Variable(s) to enrich should be an instance of Variable / CatalogList / str / list')

if is_polygon_enrichment:
variables_result = [variable for variable in variables_result if variable.agg_method is not None]

return variables_result


Expand Down Expand Up @@ -162,14 +170,6 @@ def __process_agg_operators(agg_operators, variables):
return agg_operators_result


def __get_tables_and_variables(variables, user_dataset):

table_to_geotable, table_to_variables, table_to_project, table_to_dataset =\
__process_enrichment_variables(variables, user_dataset)

return table_to_geotable, table_to_variables, table_to_project, table_to_dataset


def __process_enrichment_variables(variables, user_dataset):
table_to_geotable = dict()
table_to_variables = defaultdict(list)
Expand All @@ -181,11 +181,12 @@ def __process_enrichment_variables(variables, user_dataset):
dataset_name = variable.schema_name
table_name = variable.dataset_name
variable_name = variable.column_name
dataset_geotable, geotable = __get_properties_geotable(variable)

if project_name != _PUBLIC_PROJECT:
table_name = '{dataset}_{table}'.format(dataset=dataset_name,
table=table_name,
user_dataset=user_dataset)
table_name = 'view_{dataset}_{table}'.format(dataset=dataset_name,
table=table_name,
user_dataset=user_dataset)

if table_name not in table_to_dataset:
if project_name != _PUBLIC_PROJECT:
Expand All @@ -194,13 +195,9 @@ def __process_enrichment_variables(variables, user_dataset):
table_to_dataset[table_name] = _PUBLIC_DATASET

if table_name not in table_to_geotable:
geotable = __get_name_geotable_from_datatable(table_name)

if project_name != _PUBLIC_PROJECT:
geotable = '{dataset}_{geotable}'.format(dataset=dataset_name,
geotable=geotable,
user_dataset=user_dataset)

geotable = 'view_{dataset}_{geotable}'.format(dataset=dataset_geotable,
geotable=geotable)
table_to_geotable[table_name] = geotable

if table_name not in table_to_project:
Expand All @@ -214,15 +211,10 @@ def __process_enrichment_variables(variables, user_dataset):
return table_to_geotable, table_to_variables, table_to_project, table_to_dataset


def __get_name_geotable_from_datatable(datatable):

datatable_split = datatable.split('_')
def __get_properties_geotable(variable):

if len(datatable_split) == 8:
geo_information = datatable_split[3:6]
elif len(datatable_split) == 7:
geo_information = datatable_split[2:5]
geography_id = DatasetCatalog.get(variable.dataset).geography

geotable = 'geography_{geo_information_joined}'.format(geo_information_joined='_'.join(geo_information))
_, geo_dataset, geo_table = geography_id.split('.')

return geotable
return geo_dataset, geo_table
16 changes: 8 additions & 8 deletions cartoframes/data/enrichment/points_enrichment.py
Expand Up @@ -86,7 +86,8 @@ def enrich_points(data, variables, data_geom_column='geometry', filters=dict(),


def _prepare_sql(enrichment_id, filters_processed, table_to_geotable, table_to_variables,
table_to_project, table_to_dataset, user_dataset, working_project, data_table, **kwargs):
table_to_project, table_to_dataset, user_dataset, working_project,
data_table, **kwargs):

sqls = list()

Expand All @@ -95,20 +96,19 @@ def _prepare_sql(enrichment_id, filters_processed, table_to_geotable, table_to_v
sql = '''
SELECT data_table.{enrichment_id},
{variables},
ST_Area(enrichment_geo_table.geom) AS {variables_underscored}_area,
NULL AS {variables_underscored}_population
ST_Area(enrichment_geo_table.geom) AS {enrichment_table}_area
FROM `{project}.{dataset}.{enrichment_table}` enrichment_table
JOIN `{project}.{dataset}.{enrichment_geo_table}` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `{working_project}.{user_dataset}.{data_table}` data_table
ON ST_Within(data_table.{data_geom_column}, enrichment_geo_table.geom)
{filters};
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables),
variables_underscored='_'.join(variables), enrichment_table=table,
enrichment_geo_table=table_to_geotable[table], user_dataset=user_dataset,
working_project=working_project, data_table=data_table,
'''.format(enrichment_id=enrichment_id, variables_underscored='_'.join(variables),
enrichment_table=table, enrichment_geo_table=table_to_geotable[table],
user_dataset=user_dataset, working_project=working_project, data_table=data_table,
data_geom_column=kwargs['data_geom_column'], filters=filters_processed,
project=table_to_project[table], dataset=table_to_dataset[table])
project=table_to_project[table], dataset=table_to_dataset[table],
variables=', '.join(['enrichment_table.{}'.format(variable) for variable in variables]))

sqls.append(sql)

Expand Down
22 changes: 12 additions & 10 deletions cartoframes/data/enrichment/polygons_enrichment.py
Expand Up @@ -133,31 +133,33 @@ def enrich_polygons(data, variables, agg_operators, data_geom_column='geometry',


def _prepare_sql(enrichment_id, filters_processed, table_to_geotable, table_to_variables,
table_to_project, table_to_dataset, user_dataset, working_project, data_table, **kwargs):
table_to_project, table_to_dataset, user_dataset, working_project,
data_table, **kwargs):

grouper = 'group by data_table.{enrichment_id}'.format(enrichment_id=enrichment_id)

sqls = list()

for table, variables in table_to_variables.items():
agg_operators = kwargs.get('agg_operators')

if 'agg_operators' in kwargs:
if agg_operators is not None:

if isinstance(kwargs['agg_operators'], str):
agg_operators = {variable: kwargs['agg_operators'] for variable in variables}
else:
agg_operators = kwargs['agg_operators']
if isinstance(agg_operators, str):
agg_operators = {variable: agg_operators for variable in variables}

variables_sql = ['{operator}({variable} * \
variables_sql = ['{operator}(enrichment_table.{variable} * \
(ST_Area(ST_Intersection(enrichment_geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}))) as {variable}'.format(variable=variable,
data_geom_column=kwargs['data_geom_column'],
operator=agg_operators[variable]) for variable in variables]

else:
variables_sql = variables + ['ST_Area(ST_Intersection(geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}) AS measures_proportion'.format(
data_geom_column=kwargs['data_geom_column'])]
variables_sql = ['enrichment_table.{}'.format(variable) for variable in variables] +\
['ST_Area(ST_Intersection(enrichment_geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}) AS measures_proportion'.format(
data_geom_column=kwargs['data_geom_column'])]

grouper = ''

sql = '''
Expand Down
45 changes: 21 additions & 24 deletions test/data/enrichment/test_service.py
Expand Up @@ -76,13 +76,12 @@ def test_enrichment_query_by_points_one_variable(self):
queries = _enrichment_queries(user_dataset, tablename, query_function, **kwargs)

expected_queries = ['''SELECT data_table.enrichment_id,
CRMCYBURG,
ST_Area(enrichment_geo_table.geom) AS CRMCYBURG_area,
NULL AS CRMCYBURG_population
enrichment_table.CRMCYBURG,
ST_Area(enrichment_geo_table.geom) AS view_ags_demographics_crimerisk_usa_blockgroup_2015_yearly_2018_area
FROM `carto-do-customers.{user_dataset}\
.ags_demographics_crimerisk_usa_blockgroup_2015_yearly_2018` enrichment_table
.view_ags_demographics_crimerisk_usa_blockgroup_2015_yearly_2018` enrichment_table
JOIN `carto-do-customers.{user_dataset}\
.ags_geography_usa_blockgroup_2015` enrichment_geo_table
.view_ags_geography_usa_blockgroup_2015` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `carto-do-customers.{user_dataset}.{tablename}` data_table
ON ST_Within(data_table.{geometry_column}, enrichment_geo_table.geom)
Expand Down Expand Up @@ -111,25 +110,23 @@ def test_enrichment_query_by_points_two_variables(self):
queries = _enrichment_queries(user_dataset, tablename, query_function, **kwargs)

expected_queries = ['''SELECT data_table.enrichment_id,
CRMCYBURG,
ST_Area(enrichment_geo_table.geom) AS CRMCYBURG_area,
NULL AS CRMCYBURG_population
enrichment_table.CRMCYBURG,
ST_Area(enrichment_geo_table.geom) AS view_ags_demographics_crimerisk_usa_blockgroup_2015_yearly_2018_area
FROM `carto-do-customers.{user_dataset}\
.ags_demographics_crimerisk_usa_blockgroup_2015_yearly_2018` enrichment_table
.view_ags_demographics_crimerisk_usa_blockgroup_2015_yearly_2018` enrichment_table
JOIN `carto-do-customers.{user_dataset}\
.ags_geography_usa_blockgroup_2015` enrichment_geo_table
.view_ags_geography_usa_blockgroup_2015` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `carto-do-customers.{user_dataset}.{tablename}` data_table
ON ST_Within(data_table.{geometry_column}, enrichment_geo_table.geom)
WHERE a='b';''', '''
SELECT data_table.enrichment_id,
ticket_size_score,
ST_Area(enrichment_geo_table.geom) AS ticket_size_score_area,
NULL AS ticket_size_score_population
enrichment_table.ticket_size_score,
ST_Area(enrichment_geo_table.geom) AS view_mastercard_financial_mrli_usa_blockgroup_2019_monthly_2019_area
FROM `carto-do-customers.{user_dataset}\
.mastercard_financial_mrli_usa_blockgroup_2019_monthly_2019` enrichment_table
.view_mastercard_financial_mrli_usa_blockgroup_2019_monthly_2019` enrichment_table
JOIN `carto-do-customers.{user_dataset}\
.mastercard_geography_usa_blockgroup_2019` enrichment_geo_table
.view_mastercard_geography_usa_blockgroup_2019` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `carto-do-customers.{user_dataset}.{tablename}` data_table
ON ST_Within(data_table.{geometry_column}, enrichment_geo_table.geom)
Expand Down Expand Up @@ -158,12 +155,12 @@ def test_enrichment_query_by_polygons_one_variable(self):

queries = _enrichment_queries(user_dataset, tablename, query_function, **kwargs)

expected_queries = ['''SELECT data_table.enrichment_id, avg(CRMCYBURG *\
expected_queries = ['''SELECT data_table.enrichment_id, avg(enrichment_table.CRMCYBURG *\
(ST_Area(ST_Intersection(enrichment_geo_table.geom, data_table.{geometry_column}))\
/ ST_area(data_table.{geometry_column}))) as CRMCYBURG
FROM `carto-do-customers.{user_dataset}.ags_demographics_crimerisk_usa_blockgroup_2015_yearly_2018`\
FROM `carto-do-customers.{user_dataset}.view_ags_demographics_crimerisk_usa_blockgroup_2015_yearly_2018`\
enrichment_table
JOIN `carto-do-customers.{user_dataset}.ags_geography_usa_blockgroup_2015` enrichment_geo_table
JOIN `carto-do-customers.{user_dataset}.view_ags_geography_usa_blockgroup_2015` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `carto-do-customers.{user_dataset}.{tablename}` data_table
ON ST_Intersects(data_table.{geometry_column}, enrichment_geo_table.geom)
Expand Down Expand Up @@ -194,23 +191,23 @@ def test_enrichment_query_by_polygons_two_variables(self):

queries = _enrichment_queries(user_dataset, tablename, query_function, **kwargs)

expected_queries = ['''SELECT data_table.enrichment_id, avg(CRMCYBURG *\
expected_queries = ['''SELECT data_table.enrichment_id, avg(enrichment_table.CRMCYBURG *\
(ST_Area(ST_Intersection(enrichment_geo_table.geom, data_table.{geometry_column}))\
/ ST_area(data_table.{geometry_column}))) as CRMCYBURG
FROM `carto-do-customers.{user_dataset}.ags_demographics_crimerisk_usa_blockgroup_2015_yearly_2018`\
FROM `carto-do-customers.{user_dataset}.view_ags_demographics_crimerisk_usa_blockgroup_2015_yearly_2018`\
enrichment_table
JOIN `carto-do-customers.{user_dataset}.ags_geography_usa_blockgroup_2015` enrichment_geo_table
JOIN `carto-do-customers.{user_dataset}.view_ags_geography_usa_blockgroup_2015` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `carto-do-customers.{user_dataset}.{tablename}` data_table
ON ST_Intersects(data_table.{geometry_column}, enrichment_geo_table.geom)
WHERE a='b'
group by data_table.enrichment_id;''', '''
SELECT data_table.enrichment_id, avg(ticket_size_score *\
SELECT data_table.enrichment_id, avg(enrichment_table.ticket_size_score *\
(ST_Area(ST_Intersection(enrichment_geo_table.geom, data_table.{geometry_column}))\
/ ST_area(data_table.{geometry_column}))) as ticket_size_score
FROM `carto-do-customers.{user_dataset}.mastercard_financial_mrli_usa_blockgroup_2019_monthly_2019`\
FROM `carto-do-customers.{user_dataset}.view_mastercard_financial_mrli_usa_blockgroup_2019_monthly_2019`\
enrichment_table
JOIN `carto-do-customers.{user_dataset}.mastercard_geography_usa_blockgroup_2019` enrichment_geo_table
JOIN `carto-do-customers.{user_dataset}.view_mastercard_geography_usa_blockgroup_2019` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `carto-do-customers.{user_dataset}.{tablename}` data_table
ON ST_Intersects(data_table.{geometry_column}, enrichment_geo_table.geom)
Expand Down

0 comments on commit e2dc07f

Please sign in to comment.