Skip to content

Commit

Permalink
Merge pull request #1016 from CartoDB/feature/first_stage_enrichment_…
Browse files Browse the repository at this point in the history
…polygons

Feature/first stage enrichment polygons and points
  • Loading branch information
oleurud committed Sep 24, 2019
2 parents affad11 + f9da75f commit 7d21fe8
Show file tree
Hide file tree
Showing 12 changed files with 5,675 additions and 1 deletion.
44 changes: 44 additions & 0 deletions cartoframes/data/clients/bigquery_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import datetime
import pytz

from ..enrichment import fake_auth
from google.cloud import bigquery

# TODO: decorator to authenticate


class BigQueryClient(object):

def __init__(self, credentials):
token = fake_auth.auth(credentials)

self.credentials = credentials
self.client = bigquery.Client().from_service_account_json(token) # Change auth method when token received

def upload_dataframe(self, dataframe, schema, tablename, project, dataset, ttl_days=None):
dataset_ref = self.client.dataset(dataset, project=project)
table_ref = dataset_ref.table(tablename)

schema_wrapped = [bigquery.SchemaField(column, dtype) for column, dtype in schema.items()]

job_config = bigquery.LoadJobConfig()
job_config.schema = schema_wrapped

job = self.client.load_table_from_dataframe(dataframe, table_ref, job_config=job_config)
job.result()

if ttl_days:
table = self.client.get_table(table_ref)
expiration = datetime.datetime.now(pytz.utc) + datetime.timedelta(days=ttl_days)
table.expires = expiration
self.client.update_table(table, ["expires"])

def query(self, query, **kwargs):
response = self.client.query(query, **kwargs)

return response

def delete_table(self, tablename, project, dataset):
dataset_ref = self.client.dataset(dataset, project=project)
table_ref = dataset_ref.table(tablename)
self.client.delete_table(table_ref)
3 changes: 3 additions & 0 deletions cartoframes/data/enrichment/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .points_enrichment import enrich_points
from .polygons_enrichment import enrich_polygons
__all__ = ['enrich_points', 'enrich_polygons']
72 changes: 72 additions & 0 deletions cartoframes/data/enrichment/enrichment_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import pandas as pd
import geopandas as gpd

from ..dataset.dataset import Dataset
from collections import defaultdict
from ...exceptions import EnrichmentException


def copy_data_and_generate_enrichment_id(data, enrichment_id_column, geometry_column):

if isinstance(data, Dataset):
data = data.dataframe

data_copy = data.copy()
data_copy[enrichment_id_column] = range(data_copy.shape[0])

if isinstance(data_copy, gpd.GeoDataFrame):
data_copy[geometry_column] = data_copy[geometry_column].apply(lambda geometry: geometry.wkt)

return data_copy


def process_filters(filters_dict):
filters = ''
# TODO: Add data table ref in fields of filters
if filters_dict:
filters_list = list()

for key, value in filters_dict.items():
filters_list.append('='.join(["{}".format(key), "'{}'".format(value)]))

filters = ' AND '.join(filters_list)
filters = 'WHERE {filters}'.format(filters=filters)

return filters


def get_tables_and_variables(variables):

if isinstance(variables, pd.Series):
variables_id = [variables['id']]
elif isinstance(variables, pd.DataFrame):
variables_id = variables['id'].tolist()
else:
raise EnrichmentException('Variable(s) to enrich should be an instance of Series or DataFrame')

table_to_variables = __process_enrichment_variables(variables_id)
table_data_enrichment = list(table_to_variables.keys()).pop()
table_geo_enrichment = __get_name_geotable_from_datatable(table_data_enrichment)
variables_list = list(table_to_variables.values()).pop()

return table_data_enrichment, table_geo_enrichment, variables_list


def __process_enrichment_variables(variables):
table_to_variables = defaultdict(list)

for variable in variables:
variable_split = variable.split('.')
table, variable = variable_split[-2], variable_split[-1]

table_to_variables[table].append(variable)

return table_to_variables


def __get_name_geotable_from_datatable(datatable):
datatable_split = datatable.split('_')
geo_information = datatable_split[2:5]
geotable = 'geography_{geo_information_joined}'.format(geo_information_joined='_'.join(geo_information))

return geotable
9 changes: 9 additions & 0 deletions cartoframes/data/enrichment/fake_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import os


def auth(credentials):

if not credentials:
raise ValueError('You should set credentials')

return os.environ['GOOGLE_APPLICATION_CREDENTIALS']
67 changes: 67 additions & 0 deletions cartoframes/data/enrichment/points_enrichment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import uuid

from ..clients import bigquery_client
from ...auth import get_default_credentials
from .enrichment_utils import copy_data_and_generate_enrichment_id, process_filters, get_tables_and_variables

_ENRICHMENT_ID = 'enrichment_id'
_WORKING_PROJECT = 'carto-do-customers'

# TODO: process column name in metadata, remove spaces and points


def enrich_points(data, variables, data_geom_column='geometry', filters=dict(), credentials=None):

credentials = credentials or get_default_credentials()
bq_client = bigquery_client.BigQueryClient(credentials)

user_dataset = credentials.username.replace('-', '_')

data_copy = copy_data_and_generate_enrichment_id(data, _ENRICHMENT_ID, data_geom_column)

data_geometry_id_copy = data_copy[[data_geom_column, _ENRICHMENT_ID]]
schema = {data_geom_column: 'GEOGRAPHY', _ENRICHMENT_ID: 'INTEGER'}

id_tablename = uuid.uuid4().hex
data_tablename = 'temp_{id}'.format(id=id_tablename)

bq_client.upload_dataframe(data_geometry_id_copy, schema, data_tablename,
project=_WORKING_PROJECT, dataset=user_dataset, ttl_days=1)

table_data_enrichment, table_geo_enrichment, variables_list = get_tables_and_variables(variables)

filters_str = process_filters(filters)

sql = __prepare_sql(_ENRICHMENT_ID, variables_list, table_data_enrichment, table_geo_enrichment,
user_dataset, _WORKING_PROJECT, user_dataset,
data_tablename, data_geom_column, filters_str)

data_geometry_id_enriched = bq_client.query(sql).to_dataframe()

data_copy = data_copy.merge(data_geometry_id_enriched, on=_ENRICHMENT_ID, how='left')\
.drop(_ENRICHMENT_ID, axis=1)

return data_copy


def __prepare_sql(enrichment_id, variables, enrichment_table, enrichment_geo_table, user_dataset,
working_project, working_dataset, data_table, data_geom_column, filters):

sql = '''
SELECT data_table.{enrichment_id},
{variables},
ST_Area(enrichment_geo_table.geom) AS area,
NULL AS population
FROM `carto-do-customers.{user_dataset}.{enrichment_table}` enrichment_table
JOIN `carto-do-customers.{user_dataset}.{enrichment_geo_table}` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `{working_project}.{working_dataset}.{data_table}` data_table
ON ST_Within(data_table.{data_geom_column}, enrichment_geo_table.geom)
{filters};
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables),
enrichment_table=enrichment_table, enrichment_geo_table=enrichment_geo_table,
user_dataset=user_dataset, working_project=working_project,
working_dataset=working_dataset, data_table=data_table,
data_geom_column=data_geom_column, filters=filters)

return sql
83 changes: 83 additions & 0 deletions cartoframes/data/enrichment/polygons_enrichment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import uuid

from ..clients import bigquery_client
from ...auth import get_default_credentials
from .enrichment_utils import copy_data_and_generate_enrichment_id, process_filters, get_tables_and_variables


_ENRICHMENT_ID = 'enrichment_id'
_WORKING_PROJECT = 'carto-do-customers'

# TODO: process column name in metadata, remove spaces and points


def enrich_polygons(data, variables, agg_operators, data_geom_column='geometry', filters=dict(), credentials=None):

credentials = credentials or get_default_credentials()
bq_client = bigquery_client.BigQueryClient(credentials)

user_dataset = credentials.username.replace('-', '_')

data_copy = copy_data_and_generate_enrichment_id(data, _ENRICHMENT_ID, data_geom_column)

data_geometry_id_copy = data_copy[[data_geom_column, _ENRICHMENT_ID]]
schema = {data_geom_column: 'GEOGRAPHY', _ENRICHMENT_ID: 'INTEGER'}

id_tablename = uuid.uuid4().hex
data_tablename = 'temp_{id}'.format(id=id_tablename)

bq_client.upload_dataframe(data_geometry_id_copy, schema, data_tablename,
project=_WORKING_PROJECT, dataset=user_dataset, ttl_days=1)

table_data_enrichment, table_geo_enrichment, variables_list = get_tables_and_variables(variables)

filters_str = process_filters(filters)

sql = __prepare_sql(_ENRICHMENT_ID, variables_list, agg_operators, table_data_enrichment,
table_geo_enrichment, user_dataset, _WORKING_PROJECT,
user_dataset, data_tablename, data_geom_column, filters_str)

data_geometry_id_enriched = bq_client.query(sql).to_dataframe()

data_copy = data_copy.merge(data_geometry_id_enriched, on=_ENRICHMENT_ID, how='left')\
.drop(_ENRICHMENT_ID, axis=1)

return data_copy


def __prepare_sql(enrichment_id, variables, agg_operators, enrichment_table, enrichment_geo_table, user_dataset,
working_project, working_dataset, data_table, data_geom_column, filters):

grouper = 'group by data_table.{enrichment_id}'.format(enrichment_id=enrichment_id)

if agg_operators:
variables_sql = ['{operator}({variable} * \
(ST_Area(ST_Intersection(enrichment_geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}))) as {variable}'.format(variable=variable,
data_geom_column=data_geom_column, operator=agg_operators[variable]) for variable in variables]

if isinstance(agg_operators, str):
agg_operators = {variable: agg_operators for variable in variables}

elif agg_operators is None:
variables_sql = variables + ['ST_Area(ST_Intersection(geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}) AS measures_proportion'.format(
data_geom_column=data_geom_column)]
grouper = ''

sql = '''
SELECT data_table.{enrichment_id}, {variables}
FROM `carto-do-customers.{user_dataset}.{enrichment_table}` enrichment_table
JOIN `carto-do-customers.{user_dataset}.{enrichment_geo_table}` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `{working_project}.{working_dataset}.{data_table}` data_table
ON ST_Intersects(data_table.{data_geom_column}, enrichment_geo_table.geom)
{filters}
{grouper};
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables_sql),
enrichment_table=enrichment_table, enrichment_geo_table=enrichment_geo_table,
user_dataset=user_dataset, working_project=working_project,
working_dataset=working_dataset, data_table=data_table,
data_geom_column=data_geom_column, filters=filters, grouper=grouper)

return sql
8 changes: 8 additions & 0 deletions cartoframes/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,11 @@ class DiscoveryException(Exception):
"""
def __init__(self, message):
super(DiscoveryException, self).__init__(message)


class EnrichmentException(Exception):
"""
This exception is raised when a problem is encountered while using enrichment functions.
"""
def __init__(self, message):
super(DiscoveryException, self).__init__(message)
Loading

0 comments on commit 7d21fe8

Please sign in to comment.