Skip to content

Commit

Permalink
Upload Chunking (#42)
Browse files Browse the repository at this point in the history
* add default_upload_chunk_size setting

* implement upload chunking

* cleanup

* add logging middleware in debug mode
  • Loading branch information
meksor committed Feb 6, 2024
1 parent 5d06552 commit c6fd47c
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 247 deletions.
2 changes: 1 addition & 1 deletion ixmp4/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Settings(BaseSettings):
managed: bool = True
max_page_size: int = 10_000
default_page_size: int = 5_000

default_upload_chunk_size: int = 10_000
model_config = SettingsConfigDict(env_prefix="ixmp4_", extra="allow")

def __init__(self, *args, **kwargs) -> None:
Expand Down
39 changes: 35 additions & 4 deletions ixmp4/data/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pydantic import ConfigDict, Field, GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema

from ixmp4.conf import settings
from ixmp4.core.exceptions import (
ImproperlyConfigured,
IxmpError,
Expand Down Expand Up @@ -128,6 +129,12 @@ def _request(
res = self.client.request(method, path, params=params, json=json, **kwargs)

if res.status_code >= 400:
if res.status_code == 413:
raise ImproperlyConfigured(
"Received status code 413 (Payload Too Large). "
"Consider decreasing `IXMP4_DEFAULT_UPLOAD_CHUNK_SIZE` "
f"(current: {settings.default_upload_chunk_size})."
)
raise self.get_remote_exception(res, res.status_code)
else:
try:
Expand Down Expand Up @@ -278,8 +285,23 @@ def enumerate(
return self.list(*args, **kwargs)


class BulkUpserter(BaseRepository[ModelType]):
def bulk_upsert(self, df: pd.DataFrame, **kwargs) -> None:
class BulkOperator(BaseRepository[ModelType]):
def yield_chunks(self, df: pd.DataFrame, chunk_size: int):
for _, chunk in df.groupby(df.index // chunk_size):
yield chunk


class BulkUpserter(BulkOperator[ModelType]):
def bulk_upsert(
self,
df: pd.DataFrame,
chunk_size: int = settings.default_upload_chunk_size,
**kwargs,
):
for chunk in self.yield_chunks(df, chunk_size):
self.bulk_upsert_chunk(chunk, **kwargs)

def bulk_upsert_chunk(self, df: pd.DataFrame, **kwargs) -> None:
dict_ = df_to_dict(df)
json_ = DataFrame(**dict_).model_dump_json()
self._request(
Expand All @@ -290,8 +312,17 @@ def bulk_upsert(self, df: pd.DataFrame, **kwargs) -> None:
)


class BulkDeleter(BaseRepository[ModelType]):
def bulk_delete(self, df: pd.DataFrame, **kwargs) -> None:
class BulkDeleter(BulkOperator[ModelType]):
def bulk_delete(
self,
df: pd.DataFrame,
chunk_size: int = settings.default_upload_chunk_size,
**kwargs,
):
for chunk in self.yield_chunks(df, chunk_size):
self.bulk_delete_chunk(chunk, **kwargs)

def bulk_delete_chunk(self, df: pd.DataFrame, **kwargs) -> None:
dict_ = df_to_dict(df)
json_ = DataFrame(**dict_).model_dump_json()
self._request(
Expand Down
6 changes: 5 additions & 1 deletion ixmp4/server/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
from .iamc import scenario as iamc_scenario
from .iamc import unit as iamc_unit
from .iamc import variable as iamc_variable
from .middleware import RequestSizeLoggerMiddleware, RequestTimeLoggerMiddleware
from .optimization import indexset, scalar

v1 = FastAPI()

if settings.mode == "debug":
v1.add_middleware(RequestSizeLoggerMiddleware)
v1.add_middleware(RequestTimeLoggerMiddleware)

v1.add_middleware(
CORSMiddleware,
allow_origins=["*"],
Expand All @@ -28,7 +33,6 @@
allow_headers=["*"],
)


v1.include_router(datapoint.router, prefix="/iamc")
v1.include_router(docs.router)
v1.include_router(iamc_model.router, prefix="/iamc")
Expand Down
1 change: 0 additions & 1 deletion ixmp4/server/rest/iamc/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def query(
pagination: Pagination = Depends(),
backend: Backend = Depends(deps.get_backend),
):
print(filter)
return EnumerationOutput(
results=backend.iamc.timeseries.paginate(
_filter=filter,
Expand Down
22 changes: 22 additions & 0 deletions ixmp4/server/rest/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging
import time

from starlette.middleware.base import BaseHTTPMiddleware

logger = logging.getLogger(__name__)


class RequestTimeLoggerMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
logger.debug(f"Request process time: {process_time} seconds.")
return response


class RequestSizeLoggerMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
body = await request.body()
logger.debug(f"Request body size: {len(body)} bytes.")
return await call_next(request)
1 change: 0 additions & 1 deletion ixmp4/server/rest/optimization/indexset.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def query(
pagination: Pagination = Depends(),
backend: Backend = Depends(deps.get_backend),
):
print(filter)
return EnumerationOutput(
results=backend.optimization.indexsets.paginate(
_filter=filter,
Expand Down
Loading

0 comments on commit c6fd47c

Please sign in to comment.