Skip to content

Commit

Permalink
Merge pull request #1414 from CartoDB/enrichment-agg-custom-array
Browse files Browse the repository at this point in the history
Support several aggregations & filters by variable
  • Loading branch information
alasarr committed Jan 8, 2020
2 parents 0553f8a + 5898460 commit 86a7e7e
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 55 deletions.
40 changes: 30 additions & 10 deletions cartoframes/data/observatory/enrichment/enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ def enrich_points(self, dataframe, variables, geom_col=None, filters={}):
variables is 50.
geom_col (str, optional): string indicating the geometry column name in the source `DataFrame`.
filters (dict, optional): dictionary to filter results by variable values. As a key it receives the
variable id, and as value receives a SQL operator, for example: {variable1.id: "> 30"}. It works by
variable id, and as value receives a SQL operator, for example: `{variable1.id: "> 30"}`. It works by
appending the filter SQL operators to the `WHERE` clause of the resulting enrichment SQL with the `AND`
operator (in the example: `WHERE {variable1.column_name} > 30`). The variables used to filter results
should exists in `variables` property list.
operator (in the example: `WHERE {variable1.column_name} > 30`). If you want to filter the same
variable several times you can use a list as a dict value: `{variable1.id: ["> 30", "< 100"]}`. The
variables used to filter results should exist in `variables` property list.
Returns:
A geopandas.GeoDataFrame enriched with the variables passed as argument.
Expand Down Expand Up @@ -108,10 +109,11 @@ def enrich_polygons(self, dataframe, variables, geom_col=None, filters={}, aggre
variables is 50.
geom_col (str, optional): string indicating the geometry column name in the source `DataFrame`.
filters (dict, optional): dictionary to filter results by variable values. As a key it receives the
variable id, and as value receives a SQL operator, for example: {variable1.id: "> 30"}. It works by
variable id, and as value receives a SQL operator, for example: `{variable1.id: "> 30"}`. It works by
appending the filter SQL operators to the `WHERE` clause of the resulting enrichment SQL with the `AND`
operator (in the example: `WHERE {variable1.column_name} > 30`). The variables used to filter results
should exists in `variables` property list.
operator (in the example: `WHERE {variable1.column_name} > 30`). If you want to filter the same
variable several times you can use a list as a dict value: `{variable1.id: ["> 30", "< 100"]}`. The
variables used to filter results should exist in `variables` property list.
aggregation (None, str, list, optional): sets the data aggregation. The polygons in the source `DataFrame`
can intersect with one or more polygons from the Data Observatory. With this method you can select how
to aggregate the resulting data.
Expand All @@ -133,7 +135,8 @@ def enrich_polygons(self, dataframe, variables, geom_col=None, filters={}, aggre
aggregation method to use.
- dictionary: if you want to overwrite some default aggregation methods from your selected
variables, use a dict as :py:attr:`Variable.id`: aggregation method pairs, for example:
`{variable1.id: 'SUM', variable3.id: 'AVG'}`.
`{variable1.id: 'SUM', variable3.id: 'AVG'}`. Or if you want to use several aggregation method for one
variable, you can use a list as a dict value: `{variable1.id: ['SUM', 'AVG'], variable3.id: 'AVG'}`
Returns:
A geopandas.GeoDataFrame enriched with the variables passed as argument.
Expand Down Expand Up @@ -163,8 +166,8 @@ def enrich_polygons(self, dataframe, variables, geom_col=None, filters={}, aggre
>>> df = pandas.read_csv('path/to/local/csv')
>>> all_variables = Catalog().country('usa').category('demographics').datasets[0].variables
>>> variables = all_variables[:2]
>>> gdf_enrich = Enrichment().enrich_polygons(df, variables, geom_col='the_geom')
>>> variables = [all_variables[0].id, all_variables[1].id]
>>> cdf_enrich = Enrichment().enrich_polygons(df, variables, geom_col='the_geom')
Enrich a polygons dataframe with filters:
Expand Down Expand Up @@ -192,7 +195,10 @@ def enrich_polygons(self, dataframe, variables, geom_col=None, filters={}, aggre
>>> df = pandas.read_csv('path/to/local/csv')
>>> all_variables = Catalog().country('usa').category('demographics').datasets[0].variables
>>> variables = all_variables[:3]
>>> variable1 = all_variables[0] // variable1.agg_method is 'AVG' but you want 'SUM'
>>> variable2 = all_variables[1] // variable2.agg_method is 'AVG' and it is what you want
>>> variable3 = all_variables[2] // variable3.agg_method is 'SUM' but you want 'AVG'
>>> variables = [variable1, variable2, variable3]
>>> aggregation = {
... variable1.id: 'SUM',
... variable3.id: 'AVG'
Expand All @@ -203,6 +209,20 @@ def enrich_polygons(self, dataframe, variables, geom_col=None, filters={}, aggre
... aggregation=aggregation,
... geom_col='the_geom')
Enrich a polygons dataframe using several aggregation methods for a variable:
>>> df = pandas.read_csv('path/to/local/csv')
>>> all_variables = Catalog().country('usa').category('demographics').datasets[0].variables
>>> variable1 = all_variables[0] // variable1.agg_method is 'AVG' but you want 'SUM' and 'AVG'
>>> variable2 = all_variables[1] // variable2.agg_method is 'AVG' and it is what you want
>>> variable3 = all_variables[2] // variable3.agg_method is 'SUM' but you want 'AVG'
>>> variables = [variable1, variable2, variable3]
>>> aggregation = {
... variable1.id: ['SUM', 'AVG'],
... variable3.id: 'AVG'
>>> }
>>> cdf_enrich = Enrichment().enrich_polygons(df, variables, aggregation=aggregation)
Enrich a polygons dataframe without aggregating variables (because you want to it yourself, for example,
in case you want to use your custom function for aggregating the data):
Expand Down
100 changes: 70 additions & 30 deletions cartoframes/data/observatory/enrichment/enrichment_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,9 @@ def _get_tables_metadata(self, variables, filters):
table_name = self.__get_enrichment_table_by_variable(variable)
tables_metadata[table_name]['variables'].append(variable)

if variable.id in filters:
tables_metadata[table_name]['filters'].append(
_build_where_condition(
variable.column_name,
filters[variable.id]
)
)
variable_filters = _build_where_conditions_by_variable(variable, filters)
if variable_filters:
tables_metadata[table_name]['filters'] = variable_filters

if 'geo_table' not in tables_metadata[table_name].keys():
tables_metadata[table_name]['geo_table'] = self.__get_geo_table(variable)
Expand Down Expand Up @@ -246,35 +242,49 @@ def _build_polygons_query(self, metadata, enrichment_table, temp_table_name, agg


def _build_polygons_query_variables_with_aggregation(variables, aggregation):
return ', '.join([
_build_polygons_query_variable_with_aggregation(
variable,
aggregation
) for variable in variables])
sql = []
for variable in variables:
variable_aggregation = _get_aggregation(variable, aggregation)
if isinstance(variable_aggregation, list):
for agg in variable_aggregation:
sql.append(_build_polygons_column_with_aggregation(variable, agg, True))
else:
sql.append(_build_polygons_column_with_aggregation(variable, variable_aggregation))

return ', '.join(sql)


def _build_polygons_query_variable_with_aggregation(variable, aggregation):
variable_agg = _get_aggregation(variable, aggregation)
def _build_polygons_column_with_aggregation(variable, aggregation, column_sufix=False):
column_name = _get_polygons_agg_column_name(variable.column_name, aggregation, column_sufix)

if (variable_agg == 'sum'):
if (aggregation == 'sum'):
return """
{aggregation}(
enrichment_table.{column} * (
ST_AREA(ST_INTERSECTION(enrichment_geo_table.geom, data_table.{geo_column}))
/
ST_AREA(data_table.{geo_column})
NULLIF(ST_AREA(enrichment_geo_table.geom), 0)
)
) AS {column}
) AS {column_name}
""".format(
column=variable.column_name,
column_name=column_name,
geo_column=_GEOM_COLUMN,
aggregation=variable_agg)
aggregation=aggregation)
else:
return """
{aggregation}(enrichment_table.{column}) AS {column}
{aggregation}(enrichment_table.{column}) AS {column_name}
""".format(
column=variable.column_name,
aggregation=variable_agg)
column_name=column_name,
aggregation=aggregation)


def _get_polygons_agg_column_name(column, aggregation, column_sufix):
if column_sufix:
return '{}_{}'.format(aggregation, column)
else:
return column


def _build_polygons_query_variables_without_aggregation(variables):
Expand All @@ -291,6 +301,30 @@ def _build_polygons_query_variables_without_aggregation(variables):
geom_column=_GEOM_COLUMN)


def _build_where_conditions_by_variable(variable, filters):
if variable.id in filters:
conditions = []
variable_filter = filters[variable.id]

if isinstance(variable_filter, list):
for f in variable_filter:
conditions.append(
_build_where_condition(
variable.column_name,
f
)
)
else:
conditions.append(
_build_where_condition(
variable.column_name,
variable_filter
)
)

return conditions


def _build_where_condition(column, condition):
return "enrichment_table.{} {}".format(column, condition)

Expand Down Expand Up @@ -385,16 +419,22 @@ def _is_subscribed(dataset, geography, credentials):


def _get_aggregation(variable, aggregation):
if aggregation is AGGREGATION_NONE:
aggregation_method = None
elif aggregation == AGGREGATION_DEFAULT:
aggregation_method = variable.agg_method
elif isinstance(aggregation, str):
aggregation_method = aggregation
elif isinstance(aggregation, dict):
if isinstance(aggregation, dict):
aggregation_method = aggregation.get(variable.id, variable.agg_method)

if isinstance(aggregation_method, list):
return [_get_aggregation(variable, agg) for agg in aggregation_method]

return _get_aggregation(variable, aggregation_method)
else:
raise ValueError('The `aggregation` parameter is invalid.')
if aggregation is AGGREGATION_NONE:
aggregation_method = None
elif aggregation == AGGREGATION_DEFAULT:
aggregation_method = variable.agg_method
elif isinstance(aggregation, str):
aggregation_method = aggregation
else:
raise ValueError('The `aggregation` parameter is invalid.')

if aggregation_method is not None:
return aggregation_method.lower()
if aggregation_method is not None:
return aggregation_method.lower()
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"type": "FeatureCollection",
"crs": { "type": "name", "properties": { "name": "urn:ogc:def:crs:OGC:1.3:CRS84" } },
"features": [
{ "type": "Feature", "properties": { "geoid": "17031814200,17031815300,17031815702,17031560300,17031814300,17031815500,17031815701,17031814400,17031820700,17031814500,17031815400,17031815800,17031820800,17031815200,17031560200", "one_car": 15, "poverty": 732.3984 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -87.83843994140625, 41.832990869731084 ], [ -87.750205993652344, 41.807149141688363 ], [ -87.750205993652344, 41.834269892288312 ], [ -87.83843994140625, 41.832990869731084 ] ] ] } }
{ "type": "Feature", "properties": { "poverty": 4497.199539839602, "one_car": 15, "geoid": "17031814200,17031815300,17031815701,17031820800,17031820700,17031815800,17031814400,17031815702,17031815400,17031560200,17031814500,17031560300,17031814300,17031815500,17031815200" }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -87.83843994140625, 41.832990869731084 ], [ -87.750205993652344, 41.807149141688363 ], [ -87.750205993652344, 41.834269892288312 ], [ -87.83843994140625, 41.832990869731084 ] ] ] } }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"type": "FeatureCollection",
"crs": { "type": "name", "properties": { "name": "urn:ogc:def:crs:OGC:1.3:CRS84" } },
"features": [
{ "type": "Feature", "properties": { "poverty": 509.72196529886583, "one_car": 416.77415309292218 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -87.83843994140625, 41.832990869731084 ], [ -87.750205993652344, 41.807149141688363 ], [ -87.750205993652344, 41.834269892288312 ], [ -87.83843994140625, 41.832990869731084 ] ] ] } }
{ "type": "Feature", "properties": { "poverty": 3694.5163418376524, "one_car": 3034.585708376479 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -87.83843994140625, 41.832990869731084 ], [ -87.750205993652344, 41.807149141688363 ], [ -87.750205993652344, 41.834269892288312 ], [ -87.83843994140625, 41.832990869731084 ] ] ] } }
]
}

0 comments on commit 86a7e7e

Please sign in to comment.