Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload table using to_carto in chunks #1676

Merged
merged 13 commits into from
Aug 24, 2020
21 changes: 18 additions & 3 deletions cartoframes/io/carto.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""Functions to interact with the CARTO platform"""
import math

from pandas import DataFrame
from geopandas import GeoDataFrame

from carto.exceptions import CartoException

from .managers.context_manager import ContextManager
from .managers.context_manager import ContextManager, _compute_copy_data, get_dataframe_columns_info
from ..utils.geom_utils import check_crs, has_geometry, set_geometry
from ..utils.logger import log
from ..utils.utils import is_valid_str, is_sql_query
Expand All @@ -15,6 +16,8 @@
GEOM_COLUMN_NAME = 'the_geom'
IF_EXISTS_OPTIONS = ['fail', 'replace', 'append']

MAX_UPLOAD_SIZE_BYTES = 2000000000 # 2GB


@send_metrics('data_downloaded')
def read_carto(source, credentials=None, limit=None, retry_times=3, schema=None, index_col=None, decode_geom=True,
Expand Down Expand Up @@ -70,7 +73,7 @@ def read_carto(source, credentials=None, limit=None, retry_times=3, schema=None,

@send_metrics('data_uploaded')
def to_carto(dataframe, table_name, credentials=None, if_exists='fail', geom_col=None, index=False, index_label=None,
cartodbfy=True, log_enabled=True):
cartodbfy=True, log_enabled=True, retry_times=3, max_upload_size=MAX_UPLOAD_SIZE_BYTES):
Jesus89 marked this conversation as resolved.
Show resolved Hide resolved
"""Upload a DataFrame to CARTO. The geometry's CRS must be WGS 84 (EPSG:4326) so you can use it on CARTO.

Args:
Expand All @@ -93,6 +96,11 @@ def to_carto(dataframe, table_name, credentials=None, if_exists='fail', geom_col
ValueError: if the dataframe or table name provided are wrong or the if_exists param is not valid.

"""
def estimate_csv_size(gdf):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move estimate_csv_size to utils.utils, also compute_copy_data because we are using it in different places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't moved it to utils.utils because that would mean that utils.utils would depend on io.managers.context_manager and as that file already depends on utils.utils that would create a circular dependency

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I'll move it to utils.columns then or at least out of the to_carto function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The utils.columns file has the same problem with the circular dependencies.

I have tried extracting the method and all of it's dependencies to utils.utils or utils.columns but in that case I find circular dependencies between utils.utils and utils.columns

Finally I have extracted the estimate_csv_size method to the carto.py file but outside the to_carto method

n = min(100, len(gdf))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would update 100 when we have the information of which value is enough for the precision we want for the estimation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but spoiler, that looks like a good number

return len(''.join([x.decode("utf-8") for x in
_compute_copy_data(gdf.sample(n=n), get_dataframe_columns_info(gdf))])) * len(gdf) / n
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would extract this get_dataframe_columns_info(gdf) to a variable columns_info before the loop.

Also, we could try something like sum([len(x) for x in ...]) to avoid the decode and join so it will be a bit faster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch, on it!


if not isinstance(dataframe, DataFrame):
raise ValueError('Wrong dataframe. You should provide a valid DataFrame instance.')

Expand Down Expand Up @@ -130,7 +138,14 @@ def to_carto(dataframe, table_name, credentials=None, if_exists='fail', geom_col
elif isinstance(dataframe, GeoDataFrame):
log.warning('Geometry column not found in the GeoDataFrame.')

table_name = context_manager.copy_from(gdf, table_name, if_exists, cartodbfy)
chunk_count = int(math.ceil(estimate_csv_size(gdf) / max_upload_size))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove the int( here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally!

chunk_row_size = int(math.ceil(len(gdf) / chunk_count))
chunked_gdf = [gdf[i:i + chunk_row_size] for i in range(0, gdf.shape[0], chunk_row_size)]

for i, chunk in enumerate(chunked_gdf):
if i > 0:
if_exists = 'append'
table_name = context_manager.copy_from(chunk, table_name, if_exists, cartodbfy, retry_times)

if log_enabled:
log.info('Success! Data uploaded to table "{}" correctly'.format(table_name))
Expand Down
46 changes: 29 additions & 17 deletions cartoframes/io/managers/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@
DEFAULT_RETRY_TIMES = 3


def retry_copy(func):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice decorator!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙇

def wrapper(*args, **kwargs):
m_retry_times = kwargs.get('retry_times', DEFAULT_RETRY_TIMES)
while m_retry_times >= 1:
try:
return func(*args, **kwargs)
except CartoRateLimitException as err:
m_retry_times -= 1

if m_retry_times <= 0:
warn(('Read call was rate-limited. '
'This usually happens when there are multiple queries being read at the same time.'))
raise err

warn('Read call rate limited. Waiting {s} seconds'.format(s=err.retry_after))
time.sleep(err.retry_after)
warn('Retrying...')
return func(*args, **kwargs)
return wrapper


class ContextManager:

def __init__(self, credentials):
Expand All @@ -44,7 +65,7 @@ def copy_to(self, source, schema, limit=None, retry_times=DEFAULT_RETRY_TIMES):
copy_query = self._get_copy_query(query, columns, limit)
return self._copy_to(copy_query, columns, retry_times)

def copy_from(self, gdf, table_name, if_exists='fail', cartodbfy=True):
def copy_from(self, gdf, table_name, if_exists='fail', cartodbfy=True, retry_times=DEFAULT_RETRY_TIMES):
schema = self.get_schema()
table_name = self.normalize_table_name(table_name)
df_columns = get_dataframe_columns_info(gdf)
Expand All @@ -71,7 +92,7 @@ def copy_from(self, gdf, table_name, if_exists='fail', cartodbfy=True):
else:
self._create_table_from_columns(table_name, schema, df_columns, cartodbfy)

self._copy_from(gdf, table_name, df_columns)
self._copy_from(gdf, table_name, df_columns, retry_times)
return table_name

def create_table_from_query(self, query, table_name, if_exists, cartodbfy=True):
Expand Down Expand Up @@ -299,23 +320,12 @@ def _get_copy_query(self, query, columns, limit):

return query

def _copy_to(self, query, columns, retry_times):
@retry_copy
def _copy_to(self, query, columns, retry_times=DEFAULT_RETRY_TIMES):
log.debug('COPY TO')
copy_query = 'COPY ({0}) TO stdout WITH (FORMAT csv, HEADER true, NULL \'{1}\')'.format(query, PG_NULL)

try:
raw_result = self.copy_client.copyto_stream(copy_query)
except CartoRateLimitException as err:
if retry_times > 0:
retry_times -= 1
warn('Read call rate limited. Waiting {s} seconds'.format(s=err.retry_after))
time.sleep(err.retry_after)
warn('Retrying...')
return self._copy_to(query, columns, retry_times)
else:
warn(('Read call was rate-limited. '
'This usually happens when there are multiple queries being read at the same time.'))
raise err
raw_result = self.copy_client.copyto_stream(copy_query)

converters = obtain_converters(columns)
parse_dates = date_columns_names(columns)
Expand All @@ -327,14 +337,16 @@ def _copy_to(self, query, columns, retry_times):

return df

def _copy_from(self, dataframe, table_name, columns):
@retry_copy
def _copy_from(self, dataframe, table_name, columns, retry_times=DEFAULT_RETRY_TIMES):
log.debug('COPY FROM')
query = """
COPY {table_name}({columns}) FROM stdin WITH (FORMAT csv, DELIMITER '|', NULL '{null}');
""".format(
table_name=table_name, null=PG_NULL,
columns=','.join(column.dbname for column in columns)).strip()
data = _compute_copy_data(dataframe, columns)

self.copy_client.copyfrom(query, data)

def _rename_table(self, table_name, new_table_name):
Expand Down
32 changes: 32 additions & 0 deletions tests/unit/io/test_carto.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import pytest

import random

from pandas import Index
from geopandas import GeoDataFrame
from shapely.geometry import Point
from shapely.geometry.base import BaseGeometry
from shapely import wkt

from cartoframes.auth import Credentials
from cartoframes.io.managers.context_manager import ContextManager
Expand Down Expand Up @@ -246,6 +249,35 @@ def test_to_carto(mocker):
assert norm_table_name == table_name


def test_to_carto_chunks(mocker):
# Given
table_name = '__table_name__'
cm_mock = mocker.patch.object(ContextManager, 'copy_from')
cm_mock.return_value = table_name

size = 4000 # About 1MB
gdf = GeoDataFrame([
['Calle Gran Vía 46',
round(random.uniform(10, 100), 2),
round(random.uniform(100, 1000), 2),
round(random.uniform(1000, 10000), 2),
'POLYGON((-3.68831 40.42478, -3.68841 40.42478, -3.68841 40.42488, -3.68831 40.42478))']
for _ in range(size)],
columns=['address', 'value1', 'value2', 'value3', 'polygon']
)
gdf.set_geometry(gdf['polygon'].apply(wkt.loads), inplace=True)

# When
norm_table_name = to_carto(gdf, table_name, CREDENTIALS, max_upload_size=100000)

# Then
assert cm_mock.call_count == 12
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that there are 12 chunks? Could you add a note with the info about where this comes from (max_size and size)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added note 👍

assert cm_mock.call_args[0][1] == table_name
assert cm_mock.call_args[0][2] in ['fail', 'append']
assert cm_mock.call_args[0][3] is True
assert norm_table_name == table_name


def test_to_carto_wrong_dataframe(mocker):
# When
with pytest.raises(ValueError) as e:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/io/test_crs.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ def test_transform_crs_to_carto(mocker):
to_carto(gdf, 'table_name', CREDENTIALS)

# Then
cm_mock.assert_called_once_with(mocker.ANY, 'table_name', 'fail', True)
cm_mock.assert_called_once_with(mocker.ANY, 'table_name', 'fail', True, 3)
23 changes: 21 additions & 2 deletions tests/unit/utils/managers/test_context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

from carto.datasets import DatasetManager
from carto.sql import SQLClient, BatchSQLClient, CopySQLClient
from carto.exceptions import CartoRateLimitException

from pandas import DataFrame
from geopandas import GeoDataFrame
from cartoframes.auth import Credentials
from cartoframes.io.managers.context_manager import ContextManager
from cartoframes.io.managers.context_manager import ContextManager, DEFAULT_RETRY_TIMES, retry_copy
from cartoframes.utils.columns import ColumnInfo


Expand Down Expand Up @@ -54,7 +55,7 @@ def test_copy_from(self, mocker):
cm.copy_from(df, 'TABLE NAME')

# Then
mock.assert_called_once_with(df, 'table_name', columns)
mock.assert_called_once_with(df, 'table_name', columns, DEFAULT_RETRY_TIMES)

def test_copy_from_exists_fail(self, mocker):
# Given
Expand Down Expand Up @@ -239,3 +240,21 @@ def test_list_tables_empty(self, mocker):

# Then
assert DataFrame(columns=['tables']).equals(tables)

def test_retry_copy_decorator(self):
@retry_copy
def test_function():
class ResponseMock:
def __init__(self):
self.text = 'My text'
self.headers = {
'Carto-Rate-Limit-Limit': 1,
'Carto-Rate-Limit-Remaining': 1,
'Retry-After': 1,
'Carto-Rate-Limit-Reset': 1
}
response_mock = ResponseMock()
raise CartoRateLimitException(response_mock)

with pytest.raises(CartoRateLimitException):
test_function()
2 changes: 1 addition & 1 deletion tests/unit/viz/test_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"type": "Feature",
"geometry": {
"type": "GeometryCollection",
"coordinates": None
"geometries": []
},
"properties": {}
}
Expand Down