Skip to content

Commit

Permalink
Merge pull request #1311 from CartoDB/v4-me-endpoint
Browse files Browse the repository at this point in the history
DO token endpoint
subscription open data fix
  • Loading branch information
oleurud committed Dec 12, 2019
2 parents bde9554 + 50b3665 commit 0302b3f
Show file tree
Hide file tree
Showing 14 changed files with 198 additions and 114 deletions.
13 changes: 5 additions & 8 deletions cartoframes/auth/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,15 @@ def delete(cls, config_file=None):
except OSError:
warnings.warn('No credential file found at {}.'.format(path_to_remove))

def get_do_token(self):
"""Returns the Data Observatory v2 token"""
def get_do_credentials(self):
"""Returns the Data Observatory v2 credentials"""

do_token_manager = DoTokenManager(self.get_api_key_auth_client())
token = do_token_manager.get()
if not token:
do_credentials = do_token_manager.get()
if not do_credentials:
raise CartoException('Authentication error: do you have permissions to access Data Observatory v2?')

return token.access_token

def get_do_user_dataset(self):
return self._username.replace('-', '_')
return do_credentials

def get_api_key_auth_client(self):
if not self._api_key_auth_client:
Expand Down
47 changes: 29 additions & 18 deletions cartoframes/data/clients/bigquery_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except RefreshError:
self.bq_client, self.gcs_client = self._init_clients()
self._init_clients()
try:
return func(self, *args, **kwargs)
except RefreshError:
Expand All @@ -32,31 +32,42 @@ def wrapper(self, *args, **kwargs):

class BigQueryClient(object):

def __init__(self, project, credentials):
self._project = project
def __init__(self, credentials):
self._credentials = credentials or get_default_credentials()
self._bucket = 'carto-do-{username}'.format(username=self._credentials.username)
self.bq_client, self.gcs_client = self._init_clients()
self.bq_client = None
self.gcs_client = None

self.bq_public_project = None
self.bq_project = None
self.bq_dataset = None
self.instant_licensing = None
self._gcs_bucket = None

self._init_clients()

def _init_clients(self):
google_credentials = GoogleCredentials(self._credentials.get_do_token())
do_credentials = self._credentials.get_do_credentials()
google_credentials = GoogleCredentials(do_credentials.access_token)

bq_client = bigquery.Client(
project=self._project,
self.bq_client = bigquery.Client(
project=do_credentials.gcp_execution_project,
credentials=google_credentials)

gcs_client = storage.Client(
project=self._project,
self.gcs_client = storage.Client(
project=do_credentials.bq_project,
credentials=google_credentials
)

return bq_client, gcs_client
self.bq_public_project = do_credentials.bq_public_project
self.bq_project = do_credentials.bq_project
self.bq_dataset = do_credentials.bq_dataset
self.instant_licensing = do_credentials.instant_licensing
self._gcs_bucket = do_credentials.gcs_bucket

@refresh_clients
def upload_dataframe(self, dataframe, schema, tablename, project, dataset):

def upload_dataframe(self, dataframe, schema, tablename):
# Upload file to Google Cloud Storage
bucket = self.gcs_client.bucket(self._bucket)
bucket = self.gcs_client.get_bucket(self._gcs_bucket)
blob = bucket.blob(tablename, chunk_size=_GCS_CHUNK_SIZE)
dataframe.to_csv(tablename, index=False, header=False)
try:
Expand All @@ -65,14 +76,14 @@ def upload_dataframe(self, dataframe, schema, tablename, project, dataset):
os.remove(tablename)

# Import from GCS To BigQuery
dataset_ref = self.bq_client.dataset(dataset, project=project)
dataset_ref = self.bq_client.dataset(self.bq_dataset, project=self.bq_project)
table_ref = dataset_ref.table(tablename)
schema_wrapped = [bigquery.SchemaField(column, dtype) for column, dtype in schema.items()]

job_config = bigquery.LoadJobConfig()
job_config.schema = schema_wrapped
job_config.source_format = bigquery.SourceFormat.CSV
uri = 'gs://{bucket}/{tablename}'.format(bucket=self._bucket, tablename=tablename)
uri = 'gs://{bucket}/{tablename}'.format(bucket=self._gcs_bucket, tablename=tablename)

job = self.bq_client.load_table_from_uri(
uri, table_ref, job_config=job_config
Expand All @@ -93,8 +104,8 @@ def get_table_column_names(self, project, dataset, table):
table_info = self.get_table(project, dataset, table)
return [field.name for field in table_info.schema]

def download_to_file(self, project, dataset, table, limit=None, offset=None,
file_path=None, fail_if_exists=False, progress_bar=True):
def download_to_file(self, project, dataset, table, file_path=None, limit=None, offset=None,
fail_if_exists=False, progress_bar=True):
if not file_path:
file_name = '{}.{}.{}.csv'.format(project, dataset, table)
file_path = os.path.join(_USER_CONFIG_DIR, file_name)
Expand Down
12 changes: 8 additions & 4 deletions cartoframes/data/observatory/catalog/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,14 +366,15 @@ def get_all(cls, filters=None, credentials=None):

return cls._entity_repo.get_all(filters, credentials)

def download(self, credentials=None):
def download(self, credentials=None, file_path=None):
"""Download dataset data as a local file. You need Data Observatory enabled in your CARTO
account, please contact us at support@carto.com for more information.
For premium geographies (those with `is_public_data` set to False), you need a subscription to the geography.
For premium datasets (those with `is_public_data` set to False), you need a subscription to the dataset.
Check the subscription guides for more information.
Args:
file_path (str, optional): the file path where save the dataset
credentials (:py:class:`Credentials <cartoframes.auth.Credentials>`, optional):
credentials of CARTO user account. If not provided,
a default credentials (if set with :py:meth:`set_default_credentials
Expand All @@ -388,7 +389,7 @@ def download(self, credentials=None):
if not self._is_subscribed(credentials):
raise CartoException('You are not subscribed to this Dataset yet. Please, use the subscribe method first.')

return self._download(credentials)
return self._download(credentials, file_path)

@classmethod
def get_datasets_spatial_filtered(cls, filter_dataset):
Expand Down Expand Up @@ -487,7 +488,7 @@ def _is_subscribed(self, credentials=None):

datasets = Dataset.get_all({}, _credentials)

return self in datasets
return datasets is not None and self in datasets

def _get_summary_data(self):
data = self.data.get('summary_json')
Expand All @@ -497,3 +498,6 @@ def _get_summary_data(self):
else:
log.info('Summary information is not available')
return None

def __str__(self):
return "<Dataset.get('{}')>".format(self._get_print_id())
39 changes: 27 additions & 12 deletions cartoframes/data/observatory/catalog/entity.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
import pandas as pd
from warnings import warn

from google.api_core.exceptions import NotFound

from carto.exceptions import CartoException

from ...clients.bigquery_client import BigQueryClient
from ....auth import Credentials, defaults
from ....core.logger import log

try:
from abc import ABC
except ImportError:
from abc import ABCMeta
ABC = ABCMeta('ABC', (object,), {'__slots__': ()})

_WORKING_PROJECT = 'carto-do-customers'
_PLATFORM_BQ = 'bq'


Expand Down Expand Up @@ -115,24 +114,28 @@ def _get_print_id(self):

return self.id

def _download(self, credentials=None):
def _download(self, credentials=None, file_path=None):
if not self._is_available_in('bq'):
raise CartoException('{} is not ready for Download. Please, contact us for more information.'.format(self))

credentials = self._get_credentials(credentials)
user_dataset = credentials.get_do_user_dataset()
bq_client = _get_bigquery_client(_WORKING_PROJECT, credentials)
bq_client = _get_bigquery_client(credentials)

project, dataset, table = self.id.split('.')
view = 'view_{}_{}'.format(dataset.replace('-', '_'), table)
full_remote_table_name = self._get_remote_full_table_name(
bq_client.bq_project,
bq_client.bq_dataset,
bq_client.bq_public_project
)

project, dataset, table = full_remote_table_name.split('.')

try:
file_path = bq_client.download_to_file(_WORKING_PROJECT, user_dataset, view)
file_path = bq_client.download_to_file(project, dataset, table, file_path)
except NotFound:
raise CartoException('You have not purchased the dataset `{}` yet'.format(self.id))

warn('Data saved: {}.'.format(file_path))
warn("To read it you can do: `pandas.read_csv('{}')`.".format(file_path))
log.info('Data saved: {}.'.format(file_path))
log.info("To read it you can do: `pandas.read_csv('{}')`.".format(file_path))

return file_path

Expand All @@ -147,9 +150,21 @@ def _get_credentials(self, credentials=None):

return _credentials

def _get_remote_full_table_name(self, user_project, user_dataset, public_project):
project, dataset, table = self.id.split('.')

if project != public_project:
return '{project}.{dataset}.{table_name}'.format(
project=user_project,
dataset=user_dataset,
table_name='view_{}_{}'.format(dataset, table)
)
else:
return self.id


def _get_bigquery_client(project, credentials):
return BigQueryClient(project, credentials)
def _get_bigquery_client(credentials):
return BigQueryClient(credentials)


def is_slug_value(id_value):
Expand Down
9 changes: 5 additions & 4 deletions cartoframes/data/observatory/catalog/geography.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,15 @@ def get_all(cls, filters=None, credentials=None):

return cls._entity_repo.get_all(filters, credentials)

def download(self, credentials=None):
"""Download Geography data as a pandas DataFrame locally. You need Data Observatory enabled in your CARTO
def download(self, credentials=None, file_path=None):
"""Download geography data as a local file. You need Data Observatory enabled in your CARTO
account, please contact us at support@carto.com for more information.
For premium geographies (those with `is_public_data` set to False), you need a subscription to the geography.
Check the subscription guides for more information.
Args:
file_path (str, optional): the file path where save the dataset
credentials (:py:class:`Credentials <cartoframes.auth.Credentials>`, optional):
credentials of CARTO user account. If not provided,
a default credentials (if set with :py:meth:`set_default_credentials
Expand All @@ -223,7 +224,7 @@ def download(self, credentials=None):
raise CartoException('You are not subscribed to this Geography yet. Please, use the subscribe method '
'first.')

return self._download(credentials)
return self._download(credentials, file_path)

def subscribe(self, credentials=None):
"""Subscribe to a Geography. You need Data Observatory enabled in your CARTO account, please contact us at
Expand Down Expand Up @@ -292,4 +293,4 @@ def _is_subscribed(self, credentials=None):

geographies = Geography.get_all({}, _credentials)

return self in geographies
return geographies is not None and self in geographies

0 comments on commit 0302b3f

Please sign in to comment.