Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
147 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .points_enrichment import enrich_points |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
from . import fake_auth | ||
|
||
from google.cloud import bigquery | ||
|
||
_WORKING_PROJECT = 'cartodb-on-gcp-datascience' | ||
_TEMP_DATASET_ENRICHMENT = 'enrichment_temp' | ||
|
||
# TODO: decorator to authenticate | ||
|
||
|
||
class BigQueryClient(object): | ||
|
||
def __init__(self, credentials): | ||
token = fake_auth.auth(credentials) | ||
|
||
self.credentials = credentials | ||
self.client = bigquery.Client().from_service_account_json(token) # Change auth method when token received | ||
|
||
def upload_dataframe(self, dataframe, data_id_column, data_geom_column, tablename): | ||
dataset_ref = self.client.dataset(_TEMP_DATASET_ENRICHMENT, project=_WORKING_PROJECT) | ||
table_ref = dataset_ref.table(tablename) | ||
|
||
job_config = bigquery.LoadJobConfig() | ||
job_config.schema = [ | ||
bigquery.SchemaField(data_id_column, 'INTEGER'), | ||
bigquery.SchemaField(data_geom_column, "GEOGRAPHY"), | ||
] | ||
|
||
job = self.client.load_table_from_dataframe(dataframe, table_ref, job_config=job_config) | ||
job.result() | ||
|
||
def query(self, query, **kwargs): | ||
response = self.client.query(query, **kwargs) | ||
|
||
return response | ||
|
||
def delete_table(self, tablename): | ||
dataset_ref = self.client.dataset(_TEMP_DATASET_ENRICHMENT, project=_WORKING_PROJECT) | ||
table_ref = dataset_ref.table(tablename) | ||
self.client.delete_table(table_ref) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
import os | ||
|
||
|
||
def auth(credentials): | ||
|
||
if not credentials: | ||
raise ValueError('You should set credentials') | ||
|
||
return os.environ['GOOGLE_APPLICATION_CREDENTIALS'] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
import uuid | ||
|
||
from . import bigquery_client | ||
from ...auth import get_default_credentials | ||
from collections import defaultdict | ||
|
||
|
||
_ENRICHMENT_ID = 'enrichment_id' | ||
|
||
# TODO: process column name in metadata, remove spaces and points | ||
|
||
|
||
def enrich_points(data, variables, data_geom_column='geometry', filters=dict()): | ||
|
||
credentials = get_default_credentials() | ||
|
||
bq_client = bigquery_client.BigQueryClient(credentials) | ||
|
||
# Copy dataframe and generate id to join to original data later | ||
data_copy = data.copy() | ||
data_copy[_ENRICHMENT_ID] = range(data_copy.shape[0]) | ||
data_geometry_id_copy = data_copy[[_ENRICHMENT_ID, data_geom_column]] | ||
|
||
data_tablename = uuid.uuid4().hex | ||
|
||
bq_client.upload_dataframe(data_geometry_id_copy, _ENRICHMENT_ID, data_geom_column, data_tablename) | ||
|
||
variables_id = variables['id'].tolist() | ||
table_to_variables = __process_enrichment_variables(variables_id) | ||
table_data_enrichment = list(table_to_variables.keys()).pop() | ||
table_geo_enrichment = __get_name_geotable_from_datatable(table_data_enrichment) | ||
variables_list = list(table_to_variables.values()).pop() | ||
|
||
filters_str = __process_filters(filters) | ||
|
||
sql = ''' | ||
SELECT data_table.{enrichment_id}, | ||
{variables}, | ||
ST_Area(enrichment_geo_table.geom) AS area, | ||
NULL AS population | ||
FROM `carto-do-customers.{user_workspace}.{enrichment_table}` enrichment_table | ||
JOIN `carto-do-customers.{user_workspace}.{enrichment_geo_table}` enrichment_geo_table | ||
ON enrichment_table.geoid = enrichment_geo_table.geoid | ||
JOIN `{working_project}.{working_dataset}.{data_table}` data_table | ||
ON ST_Within(data_table.{data_geom_column}, enrichment_geo_table.geom) | ||
{filters}; | ||
'''.format(enrichment_id=_ENRICHMENT_ID, variables=', '.join(variables_list), | ||
enrichment_table=table_data_enrichment, enrichment_geo_table=table_geo_enrichment, | ||
user_workspace=credentials.username.replace('-', '_'), working_project=bigquery_client._WORKING_PROJECT, | ||
working_dataset=bigquery_client._TEMP_DATASET_ENRICHMENT, data_table=data_tablename, | ||
data_geom_column=data_geom_column, filters=filters_str) | ||
|
||
data_geometry_id_augmentated = bq_client.query(sql).to_dataframe() | ||
|
||
data_augmentated = data_copy.merge(data_geometry_id_augmentated, on=_ENRICHMENT_ID, how='left')\ | ||
.drop(_ENRICHMENT_ID, axis=1) | ||
|
||
bq_client.delete_table(data_tablename) | ||
|
||
return data_augmentated | ||
|
||
|
||
def __process_enrichment_variables(variables): | ||
table_to_variables = defaultdict(list) | ||
|
||
for variable in variables: | ||
variable_split = variable.split('.') | ||
table, variable = variable_split[-2], variable_split[-1] | ||
|
||
table_to_variables[table].append(variable) | ||
|
||
return table_to_variables | ||
|
||
|
||
def __get_name_geotable_from_datatable(datatable): | ||
datatable_split = datatable.split('_') | ||
geo_information = datatable_split[2:5] | ||
geotable = 'geography_{geo_information_joined}'.format(geo_information_joined='_'.join(geo_information)) | ||
|
||
return geotable | ||
|
||
|
||
def __process_filters(filters_dict): | ||
filters = '' | ||
# TODO: Add data table ref in fields of filters | ||
if filters_dict: | ||
filters_list = list() | ||
|
||
for key, value in filters_dict.items(): | ||
filters_list.append('='.join(["{}".format(key), "'{}'".format(value)])) | ||
|
||
filters = ' AND '.join(filters_list) | ||
filters = 'WHERE {filters}'.format(filters=filters) | ||
|
||
return filters |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,4 @@ | ||
-e git+https://github.com/CartoDB/carto-python.git#egg=carto | ||
. | ||
google-cloud-bigquery | ||
pyarrow |