Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload Chunking #42

Merged
merged 4 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ixmp4/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Settings(BaseSettings):
managed: bool = True
max_page_size: int = 10_000
default_page_size: int = 5_000

default_upload_chunk_size: int = 10_000
model_config = SettingsConfigDict(env_prefix="ixmp4_", extra="allow")

def __init__(self, *args, **kwargs) -> None:
Expand Down
39 changes: 35 additions & 4 deletions ixmp4/data/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pydantic import ConfigDict, Field, GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema

from ixmp4.conf import settings
from ixmp4.core.exceptions import (
ImproperlyConfigured,
IxmpError,
Expand Down Expand Up @@ -128,6 +129,12 @@ def _request(
res = self.client.request(method, path, params=params, json=json, **kwargs)

if res.status_code >= 400:
if res.status_code == 413:
raise ImproperlyConfigured(
"Received status code 413 (Payload Too Large). "
"Consider decreasing `IXMP4_DEFAULT_UPLOAD_CHUNK_SIZE` "
f"(current: {settings.default_upload_chunk_size})."
)
raise self.get_remote_exception(res, res.status_code)
else:
try:
Expand Down Expand Up @@ -278,8 +285,23 @@ def enumerate(
return self.list(*args, **kwargs)


class BulkUpserter(BaseRepository[ModelType]):
def bulk_upsert(self, df: pd.DataFrame, **kwargs) -> None:
class BulkOperator(BaseRepository[ModelType]):
def yield_chunks(self, df: pd.DataFrame, chunk_size: int):
for _, chunk in df.groupby(df.index // chunk_size):
yield chunk


class BulkUpserter(BulkOperator[ModelType]):
def bulk_upsert(
self,
df: pd.DataFrame,
chunk_size: int = settings.default_upload_chunk_size,
**kwargs,
):
for chunk in self.yield_chunks(df, chunk_size):
self.bulk_upsert_chunk(chunk, **kwargs)

def bulk_upsert_chunk(self, df: pd.DataFrame, **kwargs) -> None:
dict_ = df_to_dict(df)
json_ = DataFrame(**dict_).model_dump_json()
self._request(
Expand All @@ -290,8 +312,17 @@ def bulk_upsert(self, df: pd.DataFrame, **kwargs) -> None:
)


class BulkDeleter(BaseRepository[ModelType]):
def bulk_delete(self, df: pd.DataFrame, **kwargs) -> None:
class BulkDeleter(BulkOperator[ModelType]):
def bulk_delete(
self,
df: pd.DataFrame,
chunk_size: int = settings.default_upload_chunk_size,
**kwargs,
):
for chunk in self.yield_chunks(df, chunk_size):
self.bulk_delete_chunk(chunk, **kwargs)

def bulk_delete_chunk(self, df: pd.DataFrame, **kwargs) -> None:
dict_ = df_to_dict(df)
json_ = DataFrame(**dict_).model_dump_json()
self._request(
Expand Down
6 changes: 5 additions & 1 deletion ixmp4/server/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
from .iamc import scenario as iamc_scenario
from .iamc import unit as iamc_unit
from .iamc import variable as iamc_variable
from .middleware import RequestSizeLoggerMiddleware, RequestTimeLoggerMiddleware
from .optimization import indexset, scalar

v1 = FastAPI()

if settings.mode == "debug":
v1.add_middleware(RequestSizeLoggerMiddleware)
v1.add_middleware(RequestTimeLoggerMiddleware)

v1.add_middleware(
CORSMiddleware,
allow_origins=["*"],
Expand All @@ -28,7 +33,6 @@
allow_headers=["*"],
)


v1.include_router(datapoint.router, prefix="/iamc")
v1.include_router(docs.router)
v1.include_router(iamc_model.router, prefix="/iamc")
Expand Down
1 change: 0 additions & 1 deletion ixmp4/server/rest/iamc/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def query(
pagination: Pagination = Depends(),
backend: Backend = Depends(deps.get_backend),
):
print(filter)
return EnumerationOutput(
results=backend.iamc.timeseries.paginate(
_filter=filter,
Expand Down
22 changes: 22 additions & 0 deletions ixmp4/server/rest/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging
import time

from starlette.middleware.base import BaseHTTPMiddleware

logger = logging.getLogger(__name__)


class RequestTimeLoggerMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
logger.debug(f"Request process time: {process_time} seconds.")
return response


class RequestSizeLoggerMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
body = await request.body()
logger.debug(f"Request body size: {len(body)} bytes.")
return await call_next(request)
1 change: 0 additions & 1 deletion ixmp4/server/rest/optimization/indexset.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def query(
pagination: Pagination = Depends(),
backend: Backend = Depends(deps.get_backend),
):
print(filter)
return EnumerationOutput(
results=backend.optimization.indexsets.paginate(
_filter=filter,
Expand Down
Loading
Loading