Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload table using to_carto in chunks #1676

Merged
merged 13 commits into from
Aug 24, 2020
26 changes: 23 additions & 3 deletions cartoframes/io/carto.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""Functions to interact with the CARTO platform"""
import math

from pandas import DataFrame
from geopandas import GeoDataFrame

from carto.exceptions import CartoException

from .managers.context_manager import ContextManager
from .managers.context_manager import ContextManager, _compute_copy_data, get_dataframe_columns_info
from ..utils.geom_utils import check_crs, has_geometry, set_geometry
from ..utils.logger import log
from ..utils.utils import is_valid_str, is_sql_query
Expand All @@ -15,6 +16,9 @@
GEOM_COLUMN_NAME = 'the_geom'
IF_EXISTS_OPTIONS = ['fail', 'replace', 'append']

MAX_UPLOAD_SIZE_BYTES = 2000000000 # 2GB
SAMPLE_ROWS_NUMBER = 100


@send_metrics('data_downloaded')
def read_carto(source, credentials=None, limit=None, retry_times=3, schema=None, index_col=None, decode_geom=True,
Expand Down Expand Up @@ -70,7 +74,7 @@ def read_carto(source, credentials=None, limit=None, retry_times=3, schema=None,

@send_metrics('data_uploaded')
def to_carto(dataframe, table_name, credentials=None, if_exists='fail', geom_col=None, index=False, index_label=None,
cartodbfy=True, log_enabled=True):
cartodbfy=True, log_enabled=True, retry_times=3, max_upload_size=MAX_UPLOAD_SIZE_BYTES):
Jesus89 marked this conversation as resolved.
Show resolved Hide resolved
"""Upload a DataFrame to CARTO. The geometry's CRS must be WGS 84 (EPSG:4326) so you can use it on CARTO.

Args:
Expand All @@ -85,6 +89,8 @@ def to_carto(dataframe, table_name, credentials=None, if_exists='fail', geom_col
uses the name of the index from the dataframe.
cartodbfy (bool, optional): convert the table to CARTO format. Default True. More info
`here <https://carto.com/developers/sql-api/guides/creating-tables/#create-tables>`.
retry_times (int, optional):
Number of time to retry the upload in case it fails. Default is 3.

Returns:
string: the table name normalized.
Expand Down Expand Up @@ -130,7 +136,14 @@ def to_carto(dataframe, table_name, credentials=None, if_exists='fail', geom_col
elif isinstance(dataframe, GeoDataFrame):
log.warning('Geometry column not found in the GeoDataFrame.')

table_name = context_manager.copy_from(gdf, table_name, if_exists, cartodbfy)
chunk_count = math.ceil(estimate_csv_size(gdf) / max_upload_size)
chunk_row_size = int(math.ceil(len(gdf) / chunk_count))
chunked_gdf = [gdf[i:i + chunk_row_size] for i in range(0, gdf.shape[0], chunk_row_size)]

for i, chunk in enumerate(chunked_gdf):
if i > 0:
if_exists = 'append'
table_name = context_manager.copy_from(chunk, table_name, if_exists, cartodbfy, retry_times)

if log_enabled:
log.info('Success! Data uploaded to table "{}" correctly'.format(table_name))
Expand Down Expand Up @@ -364,3 +377,10 @@ def update_privacy_table(table_name, privacy, credentials=None, log_enabled=True

if log_enabled:
log.info('Success! Table "{}" privacy updated correctly'.format(table_name))


def estimate_csv_size(gdf):
n = min(SAMPLE_ROWS_NUMBER, len(gdf))
columns = get_dataframe_columns_info(gdf)
return sum([len(x) for x in
_compute_copy_data(gdf.sample(n=n), columns)]) * len(gdf) / n
46 changes: 29 additions & 17 deletions cartoframes/io/managers/context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@
DEFAULT_RETRY_TIMES = 3


def retry_copy(func):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice decorator!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙇

def wrapper(*args, **kwargs):
m_retry_times = kwargs.get('retry_times', DEFAULT_RETRY_TIMES)
while m_retry_times >= 1:
try:
return func(*args, **kwargs)
except CartoRateLimitException as err:
m_retry_times -= 1

if m_retry_times <= 0:
warn(('Read call was rate-limited. '
'This usually happens when there are multiple queries being read at the same time.'))
raise err

warn('Read call rate limited. Waiting {s} seconds'.format(s=err.retry_after))
time.sleep(err.retry_after)
warn('Retrying...')
return func(*args, **kwargs)
return wrapper


class ContextManager:

def __init__(self, credentials):
Expand All @@ -44,7 +65,7 @@ def copy_to(self, source, schema, limit=None, retry_times=DEFAULT_RETRY_TIMES):
copy_query = self._get_copy_query(query, columns, limit)
return self._copy_to(copy_query, columns, retry_times)

def copy_from(self, gdf, table_name, if_exists='fail', cartodbfy=True):
def copy_from(self, gdf, table_name, if_exists='fail', cartodbfy=True, retry_times=DEFAULT_RETRY_TIMES):
schema = self.get_schema()
table_name = self.normalize_table_name(table_name)
df_columns = get_dataframe_columns_info(gdf)
Expand All @@ -71,7 +92,7 @@ def copy_from(self, gdf, table_name, if_exists='fail', cartodbfy=True):
else:
self._create_table_from_columns(table_name, schema, df_columns, cartodbfy)

self._copy_from(gdf, table_name, df_columns)
self._copy_from(gdf, table_name, df_columns, retry_times)
return table_name

def create_table_from_query(self, query, table_name, if_exists, cartodbfy=True):
Expand Down Expand Up @@ -299,23 +320,12 @@ def _get_copy_query(self, query, columns, limit):

return query

def _copy_to(self, query, columns, retry_times):
@retry_copy
def _copy_to(self, query, columns, retry_times=DEFAULT_RETRY_TIMES):
log.debug('COPY TO')
copy_query = 'COPY ({0}) TO stdout WITH (FORMAT csv, HEADER true, NULL \'{1}\')'.format(query, PG_NULL)

try:
raw_result = self.copy_client.copyto_stream(copy_query)
except CartoRateLimitException as err:
if retry_times > 0:
retry_times -= 1
warn('Read call rate limited. Waiting {s} seconds'.format(s=err.retry_after))
time.sleep(err.retry_after)
warn('Retrying...')
return self._copy_to(query, columns, retry_times)
else:
warn(('Read call was rate-limited. '
'This usually happens when there are multiple queries being read at the same time.'))
raise err
raw_result = self.copy_client.copyto_stream(copy_query)

converters = obtain_converters(columns)
parse_dates = date_columns_names(columns)
Expand All @@ -327,14 +337,16 @@ def _copy_to(self, query, columns, retry_times):

return df

def _copy_from(self, dataframe, table_name, columns):
@retry_copy
def _copy_from(self, dataframe, table_name, columns, retry_times=DEFAULT_RETRY_TIMES):
log.debug('COPY FROM')
query = """
COPY {table_name}({columns}) FROM stdin WITH (FORMAT csv, DELIMITER '|', NULL '{null}');
""".format(
table_name=table_name, null=PG_NULL,
columns=','.join(column.dbname for column in columns)).strip()
data = _compute_copy_data(dataframe, columns)

self.copy_client.copyfrom(query, data)

def _rename_table(self, table_name, new_table_name):
Expand Down
32 changes: 32 additions & 0 deletions tests/unit/io/test_carto.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import pytest

import random

from pandas import Index
from geopandas import GeoDataFrame
from shapely.geometry import Point
from shapely.geometry.base import BaseGeometry
from shapely import wkt

from cartoframes.auth import Credentials
from cartoframes.io.managers.context_manager import ContextManager
Expand Down Expand Up @@ -246,6 +249,35 @@ def test_to_carto(mocker):
assert norm_table_name == table_name


def test_to_carto_chunks(mocker):
# Given
table_name = '__table_name__'
cm_mock = mocker.patch.object(ContextManager, 'copy_from')
cm_mock.return_value = table_name

size = 4000 # About 1MB (1150000 bytes)
gdf = GeoDataFrame([
['Calle Gran Vía 46',
round(random.uniform(10, 100), 2),
round(random.uniform(100, 1000), 2),
round(random.uniform(1000, 10000), 2),
'POLYGON((-3.68831 40.42478, -3.68841 40.42478, -3.68841 40.42488, -3.68831 40.42478))']
for _ in range(size)],
columns=['address', 'value1', 'value2', 'value3', 'polygon']
)
gdf.set_geometry(gdf['polygon'].apply(wkt.loads), inplace=True)

# When
norm_table_name = to_carto(gdf, table_name, CREDENTIALS, max_upload_size=100000)

# Then
assert cm_mock.call_count == 12 # 12 chunks as max_upload_size is 100000 bytes and we are uploading 1150000 bytes
assert cm_mock.call_args[0][1] == table_name
assert cm_mock.call_args[0][2] in ['fail', 'append']
assert cm_mock.call_args[0][3] is True
assert norm_table_name == table_name


def test_to_carto_wrong_dataframe(mocker):
# When
with pytest.raises(ValueError) as e:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/io/test_crs.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ def test_transform_crs_to_carto(mocker):
to_carto(gdf, 'table_name', CREDENTIALS)

# Then
cm_mock.assert_called_once_with(mocker.ANY, 'table_name', 'fail', True)
cm_mock.assert_called_once_with(mocker.ANY, 'table_name', 'fail', True, 3)
23 changes: 21 additions & 2 deletions tests/unit/utils/managers/test_context_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

from carto.datasets import DatasetManager
from carto.sql import SQLClient, BatchSQLClient, CopySQLClient
from carto.exceptions import CartoRateLimitException

from pandas import DataFrame
from geopandas import GeoDataFrame
from cartoframes.auth import Credentials
from cartoframes.io.managers.context_manager import ContextManager
from cartoframes.io.managers.context_manager import ContextManager, DEFAULT_RETRY_TIMES, retry_copy
from cartoframes.utils.columns import ColumnInfo


Expand Down Expand Up @@ -54,7 +55,7 @@ def test_copy_from(self, mocker):
cm.copy_from(df, 'TABLE NAME')

# Then
mock.assert_called_once_with(df, 'table_name', columns)
mock.assert_called_once_with(df, 'table_name', columns, DEFAULT_RETRY_TIMES)

def test_copy_from_exists_fail(self, mocker):
# Given
Expand Down Expand Up @@ -239,3 +240,21 @@ def test_list_tables_empty(self, mocker):

# Then
assert DataFrame(columns=['tables']).equals(tables)

def test_retry_copy_decorator(self):
@retry_copy
def test_function():
class ResponseMock:
def __init__(self):
self.text = 'My text'
self.headers = {
'Carto-Rate-Limit-Limit': 1,
'Carto-Rate-Limit-Remaining': 1,
'Retry-After': 1,
'Carto-Rate-Limit-Reset': 1
}
response_mock = ResponseMock()
raise CartoRateLimitException(response_mock)

with pytest.raises(CartoRateLimitException):
test_function()
2 changes: 1 addition & 1 deletion tests/unit/viz/test_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"type": "Feature",
"geometry": {
"type": "GeometryCollection",
"coordinates": None
"geometries": []
},
"properties": {}
}
Expand Down