Skip to content

Commit

Permalink
Merge 5cd90bb into 6f48f72
Browse files Browse the repository at this point in the history
  • Loading branch information
alejandrohall authored Sep 27, 2019
2 parents 6f48f72 + 5cd90bb commit 83df2c2
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 99 deletions.
63 changes: 63 additions & 0 deletions cartoframes/data/enrichment/enrichment_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,56 @@
import pandas as pd
import geopandas as gpd
import geojson
import uuid

from shapely.geometry import shape
from shapely.wkt import loads
from ..dataset.dataset import Dataset
from collections import defaultdict
from ...exceptions import EnrichmentException
from ...auth import get_default_credentials
from ..clients import bigquery_client

_ENRICHMENT_ID = 'enrichment_id'
_WORKING_PROJECT = 'carto-do-customers'


def enrich(preparation_sql_function, **kwargs):

credentials = kwargs['credentials'] or get_default_credentials()
bq_client = bigquery_client.BigQueryClient(credentials)

user_dataset = credentials.username.replace('-', '_')

data_copy = copy_data_and_generate_enrichment_id(kwargs['data'], _ENRICHMENT_ID, kwargs['data_geom_column'])

data_copy[kwargs['data_geom_column']] = data_copy[kwargs['data_geom_column']].apply(wkt_to_geojson)

data_geometry_id_copy = data_copy[[kwargs['data_geom_column'], _ENRICHMENT_ID]]
schema = {kwargs['data_geom_column']: 'GEOGRAPHY', _ENRICHMENT_ID: 'INTEGER'}

id_tablename = uuid.uuid4().hex
data_tablename = 'temp_{id}'.format(id=id_tablename)

bq_client.upload_dataframe(data_geometry_id_copy, schema, data_tablename,
project=_WORKING_PROJECT, dataset=user_dataset, ttl_days=1)

table_data_enrichment, table_geo_enrichment, variables_list = get_tables_and_variables(kwargs['variables'])

filters_str = process_filters(kwargs['filters'])

sql = preparation_sql_function(_ENRICHMENT_ID, filters_str, variables_list, table_data_enrichment,
table_geo_enrichment, user_dataset, _WORKING_PROJECT, data_tablename,
**kwargs)

data_geometry_id_enriched = bq_client.query(sql).to_dataframe()

data_copy = data_copy.merge(data_geometry_id_enriched, on=_ENRICHMENT_ID, how='left')\
.drop(_ENRICHMENT_ID, axis=1)

data_copy[kwargs['data_geom_column']] = data_copy[kwargs['data_geom_column']].apply(geojson_to_wkt)

return data_copy


def copy_data_and_generate_enrichment_id(data, enrichment_id_column, geometry_column):
Expand All @@ -20,6 +67,22 @@ def copy_data_and_generate_enrichment_id(data, enrichment_id_column, geometry_co
return data_copy


def wkt_to_geojson(wkt):
shapely_geom = loads(wkt)
geojson_geometry = geojson.Feature(geometry=shapely_geom, properties={})

return str(geojson_geometry.geometry)


def geojson_to_wkt(geojson_str):
geojson_geom = geojson.loads(geojson_str)
wkt_geometry = shape(geojson_geom)

shapely_geom = loads(wkt_geometry.wkt)

return shapely_geom


def process_filters(filters_dict):
filters = ''
# TODO: Add data table ref in fields of filters
Expand Down
57 changes: 12 additions & 45 deletions cartoframes/data/enrichment/points_enrichment.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,34 @@
import uuid

from ..clients import bigquery_client
from ...auth import get_default_credentials
from .enrichment_utils import copy_data_and_generate_enrichment_id, process_filters, get_tables_and_variables

_ENRICHMENT_ID = 'enrichment_id'
_WORKING_PROJECT = 'carto-do-customers'
from .enrichment_utils import enrich

# TODO: process column name in metadata, remove spaces and points


def enrich_points(data, variables, data_geom_column='geometry', filters=dict(), credentials=None):

credentials = credentials or get_default_credentials()
bq_client = bigquery_client.BigQueryClient(credentials)

user_dataset = credentials.username.replace('-', '_')

data_copy = copy_data_and_generate_enrichment_id(data, _ENRICHMENT_ID, data_geom_column)

data_geometry_id_copy = data_copy[[data_geom_column, _ENRICHMENT_ID]]
schema = {data_geom_column: 'GEOGRAPHY', _ENRICHMENT_ID: 'INTEGER'}

id_tablename = uuid.uuid4().hex
data_tablename = 'temp_{id}'.format(id=id_tablename)

bq_client.upload_dataframe(data_geometry_id_copy, schema, data_tablename,
project=_WORKING_PROJECT, dataset=user_dataset, ttl_days=1)

table_data_enrichment, table_geo_enrichment, variables_list = get_tables_and_variables(variables)

filters_str = process_filters(filters)

sql = __prepare_sql(_ENRICHMENT_ID, variables_list, table_data_enrichment, table_geo_enrichment,
user_dataset, _WORKING_PROJECT, user_dataset,
data_tablename, data_geom_column, filters_str)

data_geometry_id_enriched = bq_client.query(sql).to_dataframe()

data_copy = data_copy.merge(data_geometry_id_enriched, on=_ENRICHMENT_ID, how='left')\
.drop(_ENRICHMENT_ID, axis=1)
data_enriched = enrich(__prepare_sql, data=data, variables=variables, data_geom_column=data_geom_column,
filters=filters, credentials=credentials)

return data_copy
return data_enriched


def __prepare_sql(enrichment_id, variables, enrichment_table, enrichment_geo_table, user_dataset,
working_project, working_dataset, data_table, data_geom_column, filters):
def __prepare_sql(enrichment_id, filters_processed, variables_processed, enrichment_table,
enrichment_geo_table, user_dataset, working_project, data_table, **kwargs):

sql = '''
SELECT data_table.{enrichment_id},
{variables},
ST_Area(enrichment_geo_table.geom) AS area,
NULL AS population
FROM `carto-do-customers.{user_dataset}.{enrichment_table}` enrichment_table
JOIN `carto-do-customers.{user_dataset}.{enrichment_geo_table}` enrichment_geo_table
FROM `{working_project}.{user_dataset}.{enrichment_table}` enrichment_table
JOIN `{working_project}.{user_dataset}.{enrichment_geo_table}` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `{working_project}.{working_dataset}.{data_table}` data_table
JOIN `{working_project}.{user_dataset}.{data_table}` data_table
ON ST_Within(data_table.{data_geom_column}, enrichment_geo_table.geom)
{filters};
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables),
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables_processed),
enrichment_table=enrichment_table, enrichment_geo_table=enrichment_geo_table,
user_dataset=user_dataset, working_project=working_project,
working_dataset=working_dataset, data_table=data_table,
data_geom_column=data_geom_column, filters=filters)
data_table=data_table, data_geom_column=kwargs['data_geom_column'],
filters=filters_processed)

return sql
72 changes: 19 additions & 53 deletions cartoframes/data/enrichment/polygons_enrichment.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,49 @@
import uuid

from ..clients import bigquery_client
from ...auth import get_default_credentials
from .enrichment_utils import copy_data_and_generate_enrichment_id, process_filters, get_tables_and_variables


_ENRICHMENT_ID = 'enrichment_id'
_WORKING_PROJECT = 'carto-do-customers'
from .enrichment_utils import enrich

# TODO: process column name in metadata, remove spaces and points


def enrich_polygons(data, variables, agg_operators, data_geom_column='geometry', filters=dict(), credentials=None):

credentials = credentials or get_default_credentials()
bq_client = bigquery_client.BigQueryClient(credentials)

user_dataset = credentials.username.replace('-', '_')

data_copy = copy_data_and_generate_enrichment_id(data, _ENRICHMENT_ID, data_geom_column)

data_geometry_id_copy = data_copy[[data_geom_column, _ENRICHMENT_ID]]
schema = {data_geom_column: 'GEOGRAPHY', _ENRICHMENT_ID: 'INTEGER'}

id_tablename = uuid.uuid4().hex
data_tablename = 'temp_{id}'.format(id=id_tablename)

bq_client.upload_dataframe(data_geometry_id_copy, schema, data_tablename,
project=_WORKING_PROJECT, dataset=user_dataset, ttl_days=1)

table_data_enrichment, table_geo_enrichment, variables_list = get_tables_and_variables(variables)

filters_str = process_filters(filters)

sql = __prepare_sql(_ENRICHMENT_ID, variables_list, agg_operators, table_data_enrichment,
table_geo_enrichment, user_dataset, _WORKING_PROJECT,
user_dataset, data_tablename, data_geom_column, filters_str)

data_geometry_id_enriched = bq_client.query(sql).to_dataframe()

data_copy = data_copy.merge(data_geometry_id_enriched, on=_ENRICHMENT_ID, how='left')\
.drop(_ENRICHMENT_ID, axis=1)
data_enriched = enrich(__prepare_sql, data=data, variables=variables, agg_operators=agg_operators,
data_geom_column=data_geom_column, filters=filters, credentials=credentials)

return data_copy
return data_enriched


def __prepare_sql(enrichment_id, variables, agg_operators, enrichment_table, enrichment_geo_table, user_dataset,
working_project, working_dataset, data_table, data_geom_column, filters):
def __prepare_sql(enrichment_id, filters_processed, variables_processed, enrichment_table,
enrichment_geo_table, user_dataset, working_project, data_table, **kwargs):

grouper = 'group by data_table.{enrichment_id}'.format(enrichment_id=enrichment_id)

if agg_operators:
if kwargs['agg_operators']:
variables_sql = ['{operator}({variable} * \
(ST_Area(ST_Intersection(enrichment_geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}))) as {variable}'.format(variable=variable,
data_geom_column=data_geom_column, operator=agg_operators[variable]) for variable in variables]
data_geom_column=kwargs['data_geom_column'],
operator=kwargs['agg_operators'][variable]) for variable in variables_processed]

if isinstance(agg_operators, str):
agg_operators = {variable: agg_operators for variable in variables}
if isinstance(kwargs['agg_operators'], str):
agg_operators = {variable: kwargs['agg_operators'] for variable in variables_processed}

elif agg_operators is None:
variables_sql = variables + ['ST_Area(ST_Intersection(geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}) AS measures_proportion'.format(
data_geom_column=data_geom_column)]
variables_sql = variables_processed + ['ST_Area(ST_Intersection(geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}) AS measures_proportion'.format(
data_geom_column=kwargs['data_geom_column'])]
grouper = ''

sql = '''
SELECT data_table.{enrichment_id}, {variables}
FROM `carto-do-customers.{user_dataset}.{enrichment_table}` enrichment_table
JOIN `carto-do-customers.{user_dataset}.{enrichment_geo_table}` enrichment_geo_table
FROM `{working_project}.{user_dataset}.{enrichment_table}` enrichment_table
JOIN `{working_project}.{user_dataset}.{enrichment_geo_table}` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `{working_project}.{working_dataset}.{data_table}` data_table
JOIN `{working_project}.{user_dataset}.{data_table}` data_table
ON ST_Intersects(data_table.{data_geom_column}, enrichment_geo_table.geom)
{filters}
{grouper};
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables_sql),
enrichment_table=enrichment_table, enrichment_geo_table=enrichment_geo_table,
user_dataset=user_dataset, working_project=working_project,
working_dataset=working_dataset, data_table=data_table,
data_geom_column=data_geom_column, filters=filters, grouper=grouper)
user_dataset=user_dataset, working_project=working_project, data_table=data_table,
data_geom_column=kwargs['data_geom_column'], filters=filters_processed, grouper=grouper)

return sql
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def walk_subpkg(name):
'unidecode>=1.1.0,<2.0',
'webcolors>=1.9.1,<2.0',
'pyarrow>=0.14.1,<1.0',
'google-cloud-bigquery>=1.19.0,<2.0'
'google-cloud-bigquery>=1.19.0,<2.0',
'geojson>=2.5.0,<3.0'
]

PACKAGE_DATA = {
Expand Down

0 comments on commit 83df2c2

Please sign in to comment.