Skip to content

Commit

Permalink
Merge pull request #1676 from CartoDB/chore/ch93723/optimize-upload-t…
Browse files Browse the repository at this point in the history
…o-carto

Upload table using to_carto in chunks
  • Loading branch information
Jesus89 committed Aug 24, 2020
2 parents fbcec7e + 728db60 commit 40ca547
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 24 deletions.
26 changes: 23 additions & 3 deletions cartoframes/io/carto.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""Functions to interact with the CARTO platform"""
import math

from pandas import DataFrame
from geopandas import GeoDataFrame

from carto.exceptions import CartoException

from .managers.context_manager import ContextManager
from .managers.context_manager import ContextManager, _compute_copy_data, get_dataframe_columns_info
from ..utils.geom_utils import check_crs, has_geometry, set_geometry
from ..utils.logger import log
from ..utils.utils import is_valid_str, is_sql_query
Expand All @@ -15,6 +16,9 @@
GEOM_COLUMN_NAME = 'the_geom'
IF_EXISTS_OPTIONS = ['fail', 'replace', 'append']

MAX_UPLOAD_SIZE_BYTES = 2000000000 # 2GB
SAMPLE_ROWS_NUMBER = 100


@send_metrics('data_downloaded')
def read_carto(source, credentials=None, limit=None, retry_times=3, schema=None, index_col=None, decode_geom=True,
Expand Down Expand Up @@ -70,7 +74,7 @@ def read_carto(source, credentials=None, limit=None, retry_times=3, schema=None,

@send_metrics('data_uploaded')
def to_carto(dataframe, table_name, credentials=None, if_exists='fail', geom_col=None, index=False, index_label=None,
cartodbfy=True, log_enabled=True):
cartodbfy=True, log_enabled=True, retry_times=3, max_upload_size=MAX_UPLOAD_SIZE_BYTES):
"""Upload a DataFrame to CARTO. The geometry's CRS must be WGS 84 (EPSG:4326) so you can use it on CARTO.
Args:
Expand All @@ -85,6 +89,8 @@ def to_carto(dataframe, table_name, credentials=None, if_exists='fail', geom_col
uses the name of the index from the dataframe.
cartodbfy (bool, optional): convert the table to CARTO format. Default True. More info
`here <https://carto.com/developers/sql-api/guides/creating-tables/#create-tables>`.
retry_times (int, optional):
Number of time to retry the upload in case it fails. Default is 3.
Returns:
string: the table name normalized.
Expand Down Expand Up @@ -130,7 +136,14 @@ def to_carto(dataframe, table_name, credentials=None, if_exists='fail', geom_col
elif isinstance(dataframe, GeoDataFrame):
log.warning('Geometry column not found in the GeoDataFrame.')

table_name = context_manager.copy_from(gdf, table_name, if_exists, cartodbfy)
chunk_count = math.ceil(estimate_csv_size(gdf) / max_upload_size)
chunk_row_size = int(math.ceil(len(gdf) / chunk_count))
chunked_gdf = [gdf[i:i + chunk_row_size] for i in range(0, gdf.shape[0], chunk_row_size)]

for i, chunk in enumerate(chunked_gdf):
if i > 0:
if_exists = 'append'
table_name = context_manager.copy_from(chunk, table_name, if_exists, cartodbfy, retry_times)

if log_enabled:
log.info('Success! Data uploaded to table "{}" correctly'.format(table_name))
Expand Down Expand Up @@ -364,3 +377,10 @@ def update_privacy_table(table_name, privacy, credentials=None, log_enabled=True

if log_enabled:
log.info('Success! Table "{}" privacy updated correctly'.format(table_name))


def estimate_csv_size(gdf):
n = min(SAMPLE_ROWS_NUMBER, len(gdf))
columns = get_dataframe_columns_info(gdf)
return sum([len(x) for x in
_compute_copy_data(gdf.sample(n=n), columns)]) * len(gdf) / n
46 changes: 29 additions & 17 deletions cartoframes/io/managers/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@
DEFAULT_RETRY_TIMES = 3


def retry_copy(func):
def wrapper(*args, **kwargs):
m_retry_times = kwargs.get('retry_times', DEFAULT_RETRY_TIMES)
while m_retry_times >= 1:
try:
return func(*args, **kwargs)
except CartoRateLimitException as err:
m_retry_times -= 1

if m_retry_times <= 0:
warn(('Read call was rate-limited. '
'This usually happens when there are multiple queries being read at the same time.'))
raise err

warn('Read call rate limited. Waiting {s} seconds'.format(s=err.retry_after))
time.sleep(err.retry_after)
warn('Retrying...')
return func(*args, **kwargs)
return wrapper


class ContextManager:

def __init__(self, credentials):
Expand All @@ -44,7 +65,7 @@ def copy_to(self, source, schema, limit=None, retry_times=DEFAULT_RETRY_TIMES):
copy_query = self._get_copy_query(query, columns, limit)
return self._copy_to(copy_query, columns, retry_times)

def copy_from(self, gdf, table_name, if_exists='fail', cartodbfy=True):
def copy_from(self, gdf, table_name, if_exists='fail', cartodbfy=True, retry_times=DEFAULT_RETRY_TIMES):
schema = self.get_schema()
table_name = self.normalize_table_name(table_name)
df_columns = get_dataframe_columns_info(gdf)
Expand All @@ -71,7 +92,7 @@ def copy_from(self, gdf, table_name, if_exists='fail', cartodbfy=True):
else:
self._create_table_from_columns(table_name, schema, df_columns, cartodbfy)

self._copy_from(gdf, table_name, df_columns)
self._copy_from(gdf, table_name, df_columns, retry_times)
return table_name

def create_table_from_query(self, query, table_name, if_exists, cartodbfy=True):
Expand Down Expand Up @@ -299,23 +320,12 @@ def _get_copy_query(self, query, columns, limit):

return query

def _copy_to(self, query, columns, retry_times):
@retry_copy
def _copy_to(self, query, columns, retry_times=DEFAULT_RETRY_TIMES):
log.debug('COPY TO')
copy_query = 'COPY ({0}) TO stdout WITH (FORMAT csv, HEADER true, NULL \'{1}\')'.format(query, PG_NULL)

try:
raw_result = self.copy_client.copyto_stream(copy_query)
except CartoRateLimitException as err:
if retry_times > 0:
retry_times -= 1
warn('Read call rate limited. Waiting {s} seconds'.format(s=err.retry_after))
time.sleep(err.retry_after)
warn('Retrying...')
return self._copy_to(query, columns, retry_times)
else:
warn(('Read call was rate-limited. '
'This usually happens when there are multiple queries being read at the same time.'))
raise err
raw_result = self.copy_client.copyto_stream(copy_query)

converters = obtain_converters(columns)
parse_dates = date_columns_names(columns)
Expand All @@ -327,14 +337,16 @@ def _copy_to(self, query, columns, retry_times):

return df

def _copy_from(self, dataframe, table_name, columns):
@retry_copy
def _copy_from(self, dataframe, table_name, columns, retry_times=DEFAULT_RETRY_TIMES):
log.debug('COPY FROM')
query = """
COPY {table_name}({columns}) FROM stdin WITH (FORMAT csv, DELIMITER '|', NULL '{null}');
""".format(
table_name=table_name, null=PG_NULL,
columns=','.join(column.dbname for column in columns)).strip()
data = _compute_copy_data(dataframe, columns)

self.copy_client.copyfrom(query, data)

def _rename_table(self, table_name, new_table_name):
Expand Down
32 changes: 32 additions & 0 deletions tests/unit/io/test_carto.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import pytest

import random

from pandas import Index
from geopandas import GeoDataFrame
from shapely.geometry import Point
from shapely.geometry.base import BaseGeometry
from shapely import wkt

from cartoframes.auth import Credentials
from cartoframes.io.managers.context_manager import ContextManager
Expand Down Expand Up @@ -246,6 +249,35 @@ def test_to_carto(mocker):
assert norm_table_name == table_name


def test_to_carto_chunks(mocker):
# Given
table_name = '__table_name__'
cm_mock = mocker.patch.object(ContextManager, 'copy_from')
cm_mock.return_value = table_name

size = 4000 # About 1MB (1150000 bytes)
gdf = GeoDataFrame([
['Calle Gran Vía 46',
round(random.uniform(10, 100), 2),
round(random.uniform(100, 1000), 2),
round(random.uniform(1000, 10000), 2),
'POLYGON((-3.68831 40.42478, -3.68841 40.42478, -3.68841 40.42488, -3.68831 40.42478))']
for _ in range(size)],
columns=['address', 'value1', 'value2', 'value3', 'polygon']
)
gdf.set_geometry(gdf['polygon'].apply(wkt.loads), inplace=True)

# When
norm_table_name = to_carto(gdf, table_name, CREDENTIALS, max_upload_size=100000)

# Then
assert cm_mock.call_count == 12 # 12 chunks as max_upload_size is 100000 bytes and we are uploading 1150000 bytes
assert cm_mock.call_args[0][1] == table_name
assert cm_mock.call_args[0][2] in ['fail', 'append']
assert cm_mock.call_args[0][3] is True
assert norm_table_name == table_name


def test_to_carto_wrong_dataframe(mocker):
# When
with pytest.raises(ValueError) as e:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/io/test_crs.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ def test_transform_crs_to_carto(mocker):
to_carto(gdf, 'table_name', CREDENTIALS)

# Then
cm_mock.assert_called_once_with(mocker.ANY, 'table_name', 'fail', True)
cm_mock.assert_called_once_with(mocker.ANY, 'table_name', 'fail', True, 3)
23 changes: 21 additions & 2 deletions tests/unit/utils/managers/test_context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

from carto.datasets import DatasetManager
from carto.sql import SQLClient, BatchSQLClient, CopySQLClient
from carto.exceptions import CartoRateLimitException

from pandas import DataFrame
from geopandas import GeoDataFrame
from cartoframes.auth import Credentials
from cartoframes.io.managers.context_manager import ContextManager
from cartoframes.io.managers.context_manager import ContextManager, DEFAULT_RETRY_TIMES, retry_copy
from cartoframes.utils.columns import ColumnInfo


Expand Down Expand Up @@ -54,7 +55,7 @@ def test_copy_from(self, mocker):
cm.copy_from(df, 'TABLE NAME')

# Then
mock.assert_called_once_with(df, 'table_name', columns)
mock.assert_called_once_with(df, 'table_name', columns, DEFAULT_RETRY_TIMES)

def test_copy_from_exists_fail(self, mocker):
# Given
Expand Down Expand Up @@ -239,3 +240,21 @@ def test_list_tables_empty(self, mocker):

# Then
assert DataFrame(columns=['tables']).equals(tables)

def test_retry_copy_decorator(self):
@retry_copy
def test_function():
class ResponseMock:
def __init__(self):
self.text = 'My text'
self.headers = {
'Carto-Rate-Limit-Limit': 1,
'Carto-Rate-Limit-Remaining': 1,
'Retry-After': 1,
'Carto-Rate-Limit-Reset': 1
}
response_mock = ResponseMock()
raise CartoRateLimitException(response_mock)

with pytest.raises(CartoRateLimitException):
test_function()
2 changes: 1 addition & 1 deletion tests/unit/viz/test_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"type": "Feature",
"geometry": {
"type": "GeometryCollection",
"coordinates": None
"geometries": []
},
"properties": {}
}
Expand Down

0 comments on commit 40ca547

Please sign in to comment.