Skip to content

Commit

Permalink
Fix large exports
Browse files Browse the repository at this point in the history
closes pulp#5375

Co-authored-by: Tobias Grigo <56518487+hstct@users.noreply.github.com>
  • Loading branch information
quba42 and hstct committed May 16, 2024
1 parent 865e5c9 commit ce0c796
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGES/5375.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added Pulp side batching to fix large exports that were failing due to changes in psycopg.
68 changes: 39 additions & 29 deletions pulpcore/app/importexport.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from django.db.models.query import QuerySet

from pulpcore.app.apps import get_plugin_config
from pulpcore.app.models.content import Artifact
from pulpcore.app.models.progress import ProgressReport
from pulpcore.app.models.repository import Repository
from pulpcore.app.modelresource import (
Expand Down Expand Up @@ -54,21 +55,18 @@ def process_batch(batch):
# Strip "[" and "]" as we are writing the dataset in batch
temp_file.write(dataset.json.lstrip("[").rstrip("]"))

batch = []
needs_comma = False
for item in resource.queryset.iterator(chunk_size=EXPORT_BATCH_SIZE):
batch.append(item)
if needs_comma:
# Write "," if not last loop
temp_file.write(", ")
needs_comma = False

if len(batch) >= EXPORT_BATCH_SIZE:
process_batch(batch)
batch.clear()
needs_comma = True
offset = 0
first_loop = True
while True:
batch = list(resource.queryset[offset : offset + EXPORT_BATCH_SIZE])
if not batch:
break
offset += EXPORT_BATCH_SIZE

if batch:
if not first_loop:
temp_file.write(", ")
else:
first_loop = False
process_batch(batch)

temp_file.write("]")
Expand Down Expand Up @@ -102,36 +100,48 @@ def export_versions(export, version_info):
export.tarfile.addfile(info, io.BytesIO(version_json))


def export_artifacts(export, artifacts):
def export_artifacts(export, artifact_pks):
"""
Export a set of Artifacts, ArtifactResources, and RepositoryResources
Args:
export (django.db.models.PulpExport): export instance that's doing the export
artifacts (django.db.models.Artifacts): List of artifacts in all repos being exported
artifact_pks (django.db.models.Artifacts): List of artifact_pks in all repos being exported
Raises:
ValidationError: When path is not in the ALLOWED_EXPORT_PATHS setting
"""
data = dict(message="Exporting Artifacts", code="export.artifacts", total=len(artifacts))
artifacts = Artifact.objects.filter(pk__in=artifact_pks)
data = dict(message="Exporting Artifacts", code="export.artifacts", total=len(artifact_pks))
with ProgressReport(**data) as pb:
pb.BATCH_INTERVAL = 5000

batch = True
offset = 0
if settings.DEFAULT_FILE_STORAGE != "pulpcore.app.models.storage.FileSystem":
with tempfile.TemporaryDirectory(dir=".") as temp_dir:
for artifact in pb.iter(artifacts.only("file").iterator()):
with tempfile.NamedTemporaryFile(dir=temp_dir) as temp_file:
# TODO: this looks like a memory usage threat
# TODO: it's also probably horrificaly slow, going one-by-one over the net
# TODO: probably we could skip the temp file entirely and add
# artifact.file.read() directly to the tarfile with tarfile.addfile()
temp_file.write(artifact.file.read())
temp_file.flush()
artifact.file.close()
export.tarfile.add(temp_file.name, artifact.file.name)
while batch:
batch = list(artifacts.only("file")[offset : offset + EXPORT_BATCH_SIZE])
offset += EXPORT_BATCH_SIZE

for artifact in pb.iter(batch):
with tempfile.NamedTemporaryFile(dir=temp_dir) as temp_file:
# TODO: this looks like a memory usage threat
# TODO: it's probably very slow, going one-by-one over the net
# TODO: probably we could skip the temp file entirely and add
# artifact.file.read() directly to the tarfile with
# tarfile.addfile()
temp_file.write(artifact.file.read())
temp_file.flush()
artifact.file.close()
export.tarfile.add(temp_file.name, artifact.file.name)
else:
for artifact in pb.iter(artifacts.only("file").iterator()):
export.tarfile.add(artifact.file.path, artifact.file.name)
while batch:
batch = list(artifacts.only("file")[offset : offset + EXPORT_BATCH_SIZE])
offset += EXPORT_BATCH_SIZE

for artifact in pb.iter(batch):
export.tarfile.add(artifact.file.path, artifact.file.name)

resource = ArtifactResource()
resource.queryset = artifacts
Expand Down
4 changes: 2 additions & 2 deletions pulpcore/app/tasks/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
RepositoryVersion,
Task,
)
from pulpcore.app.models.content import Artifact, ContentArtifact
from pulpcore.app.models.content import ContentArtifact
from pulpcore.app.serializers import PulpExportSerializer

from pulpcore.app.util import compute_file_hash, Crc32Hasher
Expand Down Expand Up @@ -509,7 +509,7 @@ def _do_export(pulp_exporter, tar, the_export):

# Export the top-level entities (artifacts and repositories)
# Note: we've already handled "what about incrementals" when building the 'artifacts' list
export_artifacts(the_export, Artifact.objects.filter(pk__in=artifact_pks))
export_artifacts(the_export, artifact_pks)
del artifact_pks

# Export the repository-version data, per-version
Expand Down

0 comments on commit ce0c796

Please sign in to comment.