Skip to content

Commit

Permalink
Merge da807d6 into cc818ce
Browse files Browse the repository at this point in the history
  • Loading branch information
alejandrohall committed Sep 27, 2019
2 parents cc818ce + da807d6 commit cf17e1d
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 121 deletions.
38 changes: 32 additions & 6 deletions cartoframes/data/clients/bigquery_client.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,44 @@
import datetime
import pytz

from ..enrichment import fake_auth
from google.cloud import bigquery
from google.oauth2.credentials import Credentials as GoogleCredentials
from google.auth.exceptions import RefreshError

# TODO: decorator to authenticate
from carto.exceptions import CartoException

from ...auth import get_default_credentials


def refresh_client(func):
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except RefreshError:
self._init_client()
try:
return func(self, *args, **kwargs)
except RefreshError:
raise CartoException('Something went wrong accessing data. '
'Please, try again in a few seconds or contact support for help.')
return wrapper


class BigQueryClient(object):

def __init__(self, credentials):
token = fake_auth.auth(credentials)
def __init__(self, project, credentials):
self._project = project
self._credentials = credentials or get_default_credentials()
self.client = self._init_client()

def _init_client(self):
google_credentials = GoogleCredentials(self._credentials.get_do_token())

self.credentials = credentials
self.client = bigquery.Client().from_service_account_json(token) # Change auth method when token received
return bigquery.Client(
project=self._project,
credentials=google_credentials)

@refresh_client
def upload_dataframe(self, dataframe, schema, tablename, project, dataset, ttl_days=None):
dataset_ref = self.client.dataset(dataset, project=project)
table_ref = dataset_ref.table(tablename)
Expand All @@ -33,11 +57,13 @@ def upload_dataframe(self, dataframe, schema, tablename, project, dataset, ttl_d
table.expires = expiration
self.client.update_table(table, ["expires"])

@refresh_client
def query(self, query, **kwargs):
response = self.client.query(query, **kwargs)

return response

@refresh_client
def delete_table(self, tablename, project, dataset):
dataset_ref = self.client.dataset(dataset, project=project)
table_ref = dataset_ref.table(tablename)
Expand Down
73 changes: 69 additions & 4 deletions cartoframes/data/enrichment/enrichment_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,77 @@
import pandas as pd
import geopandas as gpd
import uuid
from collections import defaultdict

from ..dataset.dataset import Dataset
from collections import defaultdict
from ...utils.geom_utils import wkt_to_geojson, geojson_to_wkt
from ...exceptions import EnrichmentException
from ...auth import get_default_credentials
from ..clients import bigquery_client

_ENRICHMENT_ID = 'enrichment_id'
_WORKING_PROJECT = 'carto-do-customers'


def enrich(query_function, **kwargs):
credentials = __get_credentials(kwargs['credentials'])
user_dataset = credentials.username.replace('-', '_')
bq_client = __get_bigquery_client(_WORKING_PROJECT, credentials)

data_copy = __prepare_data(kwargs['data'], kwargs['data_geom_column'])
tablename = __upload_dataframe(bq_client, user_dataset, data_copy, kwargs['data_geom_column'])

query = __enrichment_query(user_dataset, tablename, query_function,
kwargs['variables'], kwargs['filters'], **kwargs)

return __execute_enrichment(bq_client, query, data_copy, kwargs['data_geom_column'])


def __get_credentials(credentials=None):
return credentials or get_default_credentials()


def __get_bigquery_client(project, credentials):
return bigquery_client.BigQueryClient(project, credentials)


def __prepare_data(data, data_geom_column):
data_copy = __copy_data_and_generate_enrichment_id(data, _ENRICHMENT_ID, data_geom_column)
data_copy[data_geom_column] = data_copy[data_geom_column].apply(wkt_to_geojson)
return data_copy


def __upload_dataframe(bq_client, user_dataset, data_copy, data_geom_column):
data_geometry_id_copy = data_copy[[data_geom_column, _ENRICHMENT_ID]]
schema = {data_geom_column: 'GEOGRAPHY', _ENRICHMENT_ID: 'INTEGER'}

id_tablename = uuid.uuid4().hex
data_tablename = 'temp_{id}'.format(id=id_tablename)

bq_client.upload_dataframe(data_geometry_id_copy, schema, data_tablename,
project=_WORKING_PROJECT, dataset=user_dataset, ttl_days=1)

return data_tablename


def __enrichment_query(user_dataset, tablename, query_function, variables, filters, **kwargs):
table_data_enrichment, table_geo_enrichment, variables_list = __get_tables_and_variables(variables)
filters_str = __process_filters(filters)

return query_function(_ENRICHMENT_ID, filters_str, variables_list, table_data_enrichment,
table_geo_enrichment, user_dataset, _WORKING_PROJECT, tablename, **kwargs)


def __execute_enrichment(bq_client, query, data_copy, data_geom_column):
df_enriched = bq_client.query(query).to_dataframe()

data_copy = data_copy.merge(df_enriched, on=_ENRICHMENT_ID, how='left').drop(_ENRICHMENT_ID, axis=1)
data_copy[data_geom_column] = data_copy[data_geom_column].apply(geojson_to_wkt)

return data_copy


def copy_data_and_generate_enrichment_id(data, enrichment_id_column, geometry_column):
def __copy_data_and_generate_enrichment_id(data, enrichment_id_column, geometry_column):

if isinstance(data, Dataset):
data = data.dataframe
Expand All @@ -20,7 +85,7 @@ def copy_data_and_generate_enrichment_id(data, enrichment_id_column, geometry_co
return data_copy


def process_filters(filters_dict):
def __process_filters(filters_dict):
filters = ''
# TODO: Add data table ref in fields of filters
if filters_dict:
Expand All @@ -35,7 +100,7 @@ def process_filters(filters_dict):
return filters


def get_tables_and_variables(variables):
def __get_tables_and_variables(variables):

if isinstance(variables, pd.Series):
variables_id = [variables['id']]
Expand Down
9 changes: 0 additions & 9 deletions cartoframes/data/enrichment/fake_auth.py

This file was deleted.

57 changes: 12 additions & 45 deletions cartoframes/data/enrichment/points_enrichment.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,34 @@
import uuid

from ..clients import bigquery_client
from ...auth import get_default_credentials
from .enrichment_utils import copy_data_and_generate_enrichment_id, process_filters, get_tables_and_variables

_ENRICHMENT_ID = 'enrichment_id'
_WORKING_PROJECT = 'carto-do-customers'
from .enrichment_utils import enrich

# TODO: process column name in metadata, remove spaces and points


def enrich_points(data, variables, data_geom_column='geometry', filters=dict(), credentials=None):

credentials = credentials or get_default_credentials()
bq_client = bigquery_client.BigQueryClient(credentials)

user_dataset = credentials.username.replace('-', '_')

data_copy = copy_data_and_generate_enrichment_id(data, _ENRICHMENT_ID, data_geom_column)

data_geometry_id_copy = data_copy[[data_geom_column, _ENRICHMENT_ID]]
schema = {data_geom_column: 'GEOGRAPHY', _ENRICHMENT_ID: 'INTEGER'}

id_tablename = uuid.uuid4().hex
data_tablename = 'temp_{id}'.format(id=id_tablename)

bq_client.upload_dataframe(data_geometry_id_copy, schema, data_tablename,
project=_WORKING_PROJECT, dataset=user_dataset, ttl_days=1)

table_data_enrichment, table_geo_enrichment, variables_list = get_tables_and_variables(variables)

filters_str = process_filters(filters)

sql = __prepare_sql(_ENRICHMENT_ID, variables_list, table_data_enrichment, table_geo_enrichment,
user_dataset, _WORKING_PROJECT, user_dataset,
data_tablename, data_geom_column, filters_str)

data_geometry_id_enriched = bq_client.query(sql).to_dataframe()

data_copy = data_copy.merge(data_geometry_id_enriched, on=_ENRICHMENT_ID, how='left')\
.drop(_ENRICHMENT_ID, axis=1)
data_enriched = enrich(__prepare_sql, data=data, variables=variables, data_geom_column=data_geom_column,
filters=filters, credentials=credentials)

return data_copy
return data_enriched


def __prepare_sql(enrichment_id, variables, enrichment_table, enrichment_geo_table, user_dataset,
working_project, working_dataset, data_table, data_geom_column, filters):
def __prepare_sql(enrichment_id, filters_processed, variables_processed, enrichment_table,
enrichment_geo_table, user_dataset, working_project, data_table, **kwargs):

sql = '''
SELECT data_table.{enrichment_id},
{variables},
ST_Area(enrichment_geo_table.geom) AS area,
NULL AS population
FROM `carto-do-customers.{user_dataset}.{enrichment_table}` enrichment_table
JOIN `carto-do-customers.{user_dataset}.{enrichment_geo_table}` enrichment_geo_table
FROM `{working_project}.{user_dataset}.{enrichment_table}` enrichment_table
JOIN `{working_project}.{user_dataset}.{enrichment_geo_table}` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `{working_project}.{working_dataset}.{data_table}` data_table
JOIN `{working_project}.{user_dataset}.{data_table}` data_table
ON ST_Within(data_table.{data_geom_column}, enrichment_geo_table.geom)
{filters};
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables),
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables_processed),
enrichment_table=enrichment_table, enrichment_geo_table=enrichment_geo_table,
user_dataset=user_dataset, working_project=working_project,
working_dataset=working_dataset, data_table=data_table,
data_geom_column=data_geom_column, filters=filters)
data_table=data_table, data_geom_column=kwargs['data_geom_column'],
filters=filters_processed)

return sql
71 changes: 19 additions & 52 deletions cartoframes/data/enrichment/polygons_enrichment.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,50 @@
import uuid

from ..clients import bigquery_client
from ...auth import get_default_credentials
from .enrichment_utils import copy_data_and_generate_enrichment_id, process_filters, get_tables_and_variables


_ENRICHMENT_ID = 'enrichment_id'
_WORKING_PROJECT = 'carto-do-customers'
from .enrichment_utils import enrich

# TODO: process column name in metadata, remove spaces and points


def enrich_polygons(data, variables, agg_operators, data_geom_column='geometry', filters=dict(), credentials=None):

credentials = credentials or get_default_credentials()
bq_client = bigquery_client.BigQueryClient(credentials)

user_dataset = credentials.username.replace('-', '_')

data_copy = copy_data_and_generate_enrichment_id(data, _ENRICHMENT_ID, data_geom_column)

data_geometry_id_copy = data_copy[[data_geom_column, _ENRICHMENT_ID]]
schema = {data_geom_column: 'GEOGRAPHY', _ENRICHMENT_ID: 'INTEGER'}

id_tablename = uuid.uuid4().hex
data_tablename = 'temp_{id}'.format(id=id_tablename)

bq_client.upload_dataframe(data_geometry_id_copy, schema, data_tablename,
project=_WORKING_PROJECT, dataset=user_dataset, ttl_days=1)

table_data_enrichment, table_geo_enrichment, variables_list = get_tables_and_variables(variables)

filters_str = process_filters(filters)

sql = __prepare_sql(_ENRICHMENT_ID, variables_list, agg_operators, table_data_enrichment,
table_geo_enrichment, user_dataset, _WORKING_PROJECT,
user_dataset, data_tablename, data_geom_column, filters_str)

data_geometry_id_enriched = bq_client.query(sql).to_dataframe()

data_copy = data_copy.merge(data_geometry_id_enriched, on=_ENRICHMENT_ID, how='left')\
.drop(_ENRICHMENT_ID, axis=1)
data_enriched = enrich(__prepare_sql, data=data, variables=variables, agg_operators=agg_operators,
data_geom_column=data_geom_column, filters=filters, credentials=credentials)

return data_copy
return data_enriched


def __prepare_sql(enrichment_id, variables, agg_operators, enrichment_table, enrichment_geo_table, user_dataset,
working_project, working_dataset, data_table, data_geom_column, filters):
def __prepare_sql(enrichment_id, filters_processed, variables_processed, enrichment_table,
enrichment_geo_table, user_dataset, working_project, data_table, **kwargs):

grouper = 'group by data_table.{enrichment_id}'.format(enrichment_id=enrichment_id)

if agg_operators:
if kwargs['agg_operators']:
variables_sql = ['{operator}({variable} * \
(ST_Area(ST_Intersection(enrichment_geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}))) as {variable}'.format(variable=variable,
data_geom_column=data_geom_column, operator=agg_operators[variable]) for variable in variables]
data_geom_column=kwargs['data_geom_column'],
operator=kwargs['agg_operators'][variable]) for variable in variables_processed]

if isinstance(agg_operators, str):
agg_operators = {variable: agg_operators for variable in variables}
if isinstance(kwargs['agg_operators'], str):
agg_operators = {variable: kwargs['agg_operators'] for variable in variables_processed}

elif agg_operators is None:
variables_sql = variables + ['ST_Area(ST_Intersection(geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}) AS measures_proportion'.format(
data_geom_column=data_geom_column)]
variables_sql = variables_processed + ['ST_Area(ST_Intersection(geo_table.geom, data_table.{data_geom_column}))\
/ ST_area(data_table.{data_geom_column}) AS measures_proportion'.format(
data_geom_column=kwargs['data_geom_column'])]
grouper = ''

sql = '''
SELECT data_table.{enrichment_id}, {variables}
FROM `carto-do-customers.{user_dataset}.{enrichment_table}` enrichment_table
JOIN `carto-do-customers.{user_dataset}.{enrichment_geo_table}` enrichment_geo_table
FROM `{working_project}.{user_dataset}.{enrichment_table}` enrichment_table
JOIN `{working_project}.{user_dataset}.{enrichment_geo_table}` enrichment_geo_table
ON enrichment_table.geoid = enrichment_geo_table.geoid
JOIN `{working_project}.{working_dataset}.{data_table}` data_table
JOIN `{working_project}.{user_dataset}.{data_table}` data_table
ON ST_Intersects(data_table.{data_geom_column}, enrichment_geo_table.geom)
{filters}
{grouper};
'''.format(enrichment_id=enrichment_id, variables=', '.join(variables_sql),
enrichment_table=enrichment_table, enrichment_geo_table=enrichment_geo_table,
user_dataset=user_dataset, working_project=working_project,
working_dataset=working_dataset, data_table=data_table,
data_geom_column=data_geom_column, filters=filters, grouper=grouper)
data_table=data_table, data_geom_column=kwargs['data_geom_column'],
filters=filters_processed, grouper=grouper)

return sql
Loading

0 comments on commit cf17e1d

Please sign in to comment.