Skip to content

Commit

Permalink
parquet: use temp files in the current folder
Browse files Browse the repository at this point in the history
  • Loading branch information
lukeshingles committed May 15, 2024
1 parent 5f1356d commit 7e2ff81
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 20 deletions.
12 changes: 7 additions & 5 deletions artistools/estimators/estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import contextlib
import math
import sys
import tempfile
import time
import typing as t
from collections import namedtuple
Expand Down Expand Up @@ -227,9 +228,8 @@ def get_rankbatch_parquetfile(
batch_mpiranks: t.Sequence[int],
batchindex: int,
) -> Path:
parquetfilename = f"estimbatch{batchindex:02d}_{batch_mpiranks[0]:04d}_{batch_mpiranks[-1]:04d}.out.parquet"
parquetfilepath = folderpath / f"{parquetfilename}.tmp"
parquetfilepathpartial = folderpath / f"{parquetfilename}.partial.tmp"
parquetfilename = f"estimbatch{batchindex:02d}_{batch_mpiranks[0]:04d}_{batch_mpiranks[-1]:04d}.out.parquet.tmp"
parquetfilepath = folderpath / parquetfilename

if not parquetfilepath.exists():
print(f" generating {parquetfilepath.relative_to(modelpath.parent)}...")
Expand Down Expand Up @@ -267,8 +267,10 @@ def get_rankbatch_parquetfile(
time_start = time.perf_counter()

assert pldf_batch is not None
pldf_batch.write_parquet(parquetfilepathpartial, compression="zstd", statistics=True, compression_level=8)
parquetfilepathpartial.rename(parquetfilepath)
tempparquetfilepath = Path(tempfile.mkstemp(dir=folderpath, prefix=f"{parquetfilename}.", suffix=".partial")[1])
pldf_batch.write_parquet(tempparquetfilepath, compression="zstd", statistics=True, compression_level=8)
tempparquetfilepath.unlink() if parquetfilepath.exists() else tempparquetfilepath.rename(parquetfilepath)

Check warning on line 272 in artistools/estimators/estimators.py

View workflow job for this annotation

GitHub Actions / Lint (ruff, mypy, pylint)

W0106

Expression "tempparquetfilepath.unlink() if parquetfilepath.exists() else tempparquetfilepath.rename(parquetfilepath)" is assigned to nothing

print(f"took {time.perf_counter() - time_start:.1f} s.")

filesize = parquetfilepath.stat().st_size / 1024 / 1024
Expand Down
29 changes: 18 additions & 11 deletions artistools/inputmodel/inputmodel_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import gc
import math
import os
import tempfile
import time
import typing as t
from collections import defaultdict
Expand Down Expand Up @@ -335,8 +336,8 @@ def get_modeldata_polars(
parquetfilepath = at.stripallsuffixes(Path(textfilepath)).with_suffix(".txt.parquet")

if parquetfilepath.exists() and Path(textfilepath).stat().st_mtime > parquetfilepath.stat().st_mtime:
print(f"{textfilepath} has been modified after {parquetfilepath}. Deleting out of date parquet file.")
parquetfilepath.unlink()
msg = f"ERROR: {textfilepath} has been modified after {parquetfilepath}. Delete the out of date parquet file to regenerate."
raise AssertionError(msg)

dfmodel: pl.LazyFrame | None | pl.DataFrame = None
if not getheadersonly and parquetfilepath.is_file():
Expand All @@ -345,7 +346,7 @@ def get_modeldata_polars(
try:
dfmodel = pl.scan_parquet(parquetfilepath)
except pl.exceptions.ComputeError:
print(f"Problem reading {parquetfilepath}. Will regenerate and overwite from text source.")
print(f"Problem reading {parquetfilepath}. Will regenerate and overwrite from text source.")
dfmodel = None

if dfmodel is not None:
Expand All @@ -367,9 +368,11 @@ def get_modeldata_polars(
mebibyte = 1024 * 1024
if isinstance(dfmodel, pl.DataFrame) and textfilepath.stat().st_size > 5 * mebibyte and not getheadersonly:
print(f"Saving {parquetfilepath}")
partialparquetfilepath = at.stripallsuffixes(Path(textfilepath)).with_suffix(".txt.parquet.partial.tmp")
dfmodel.write_parquet(partialparquetfilepath, compression="zstd")
partialparquetfilepath.rename(parquetfilepath)
partialparquetfilepath = Path(
tempfile.mkstemp(dir=modelpath, prefix=f"{parquetfilepath.name}.", suffix=".tmp")[1]
)
dfmodel.write_parquet(partialparquetfilepath, compression="zstd", statistics=True)
partialparquetfilepath.unlink() if parquetfilepath.exists() else partialparquetfilepath.rename(parquetfilepath)

Check warning on line 375 in artistools/inputmodel/inputmodel_misc.py

View workflow job for this annotation

GitHub Actions / Lint (ruff, mypy, pylint)

W0106

Expression "partialparquetfilepath.unlink() if parquetfilepath.exists() else partialparquetfilepath.rename(parquetfilepath)" is assigned to nothing
print(" Done.")
del dfmodel
gc.collect()
Expand Down Expand Up @@ -947,8 +950,8 @@ def get_initelemabundances_polars(

parquetfilepath = at.stripallsuffixes(Path(textfilepath)).with_suffix(".txt.parquet")
if parquetfilepath.exists() and Path(textfilepath).stat().st_mtime > parquetfilepath.stat().st_mtime:
print(f"{textfilepath} has been modified after {parquetfilepath}. Deleting out of date parquet file.")
parquetfilepath.unlink()
msg = f"ERROR: {textfilepath} has been modified after {parquetfilepath}. Delete the out of date parquet file to regenerate."
raise AssertionError(msg)

if parquetfilepath.is_file():
if not printwarningsonly:
Expand Down Expand Up @@ -981,9 +984,13 @@ def get_initelemabundances_polars(

if textfilepath.stat().st_size > 5 * 1024 * 1024:
print(f"Saving {parquetfilepath}")
partialparquetfilepath = at.stripallsuffixes(Path(textfilepath)).with_suffix(".txt.parquet.partial.tmp")
abundancedata.write_parquet(partialparquetfilepath, compression="zstd")
partialparquetfilepath.rename(parquetfilepath)
partialparquetfilepath = Path(
tempfile.mkstemp(dir=modelpath, prefix=f"{parquetfilepath.name}.", suffix=".tmp")[1]
)
abundancedata.write_parquet(partialparquetfilepath, compression="zstd", statistics=True)
partialparquetfilepath.unlink() if parquetfilepath.exists() else partialparquetfilepath.rename(
parquetfilepath
)

Check warning on line 993 in artistools/inputmodel/inputmodel_misc.py

View workflow job for this annotation

GitHub Actions / Lint (ruff, mypy, pylint)

W0106

Expression "partialparquetfilepath.unlink() if parquetfilepath.exists() else partialparquetfilepath.rename(parquetfilepath)" is assigned to nothing
print(" Done.")
del abundancedata
gc.collect()
Expand Down
10 changes: 6 additions & 4 deletions artistools/packets/packets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import calendar
import math
import tempfile
import time
import typing as t
from functools import lru_cache
Expand Down Expand Up @@ -443,7 +444,6 @@ def get_rankbatch_parquetfile(

parquetfilename = f"{strpacket}batch{batchindex:02d}_{batch_mpiranks[0]:04d}_{batch_mpiranks[-1]:04d}.out.parquet"
parquetfilepath = packetdir / f"{parquetfilename}.tmp"
parquetfilepathpartial = packetdir / f"{parquetfilename}.partial.tmp"

# time when the schema for the parquet files last change (e.g. new computed columns added or data types changed)
time_parquetschemachange = (2024, 4, 23, 9, 0, 0)
Expand All @@ -465,7 +465,8 @@ def get_rankbatch_parquetfile(
if parquet_mtime > last_textfile_mtime and parquet_mtime > t_lastschemachange:
conversion_needed = False
else:
print(f" outdated file: {parquetfilepath}. Will overwrite")
msg = f"ERROR: outdated file: {parquetfilepath}. Delete it to regenerate."
raise AssertionError(msg)

if conversion_needed:
time_start_load = time.perf_counter()
Expand Down Expand Up @@ -518,8 +519,9 @@ def get_rankbatch_parquetfile(
f" took {time.perf_counter() - time_start_load:.1f} seconds. Writing parquet file...", end="", flush=True
)
time_start_write = time.perf_counter()
pldf_batch.sink_parquet(parquetfilepathpartial, compression="zstd", statistics=True, compression_level=8)
parquetfilepathpartial.rename(parquetfilepath)
tempparquetfilepath = Path(tempfile.mkstemp(dir=packetdir, prefix=f"{parquetfilename}.", suffix=".tmp")[1])
pldf_batch.sink_parquet(tempparquetfilepath, compression="zstd", statistics=True, compression_level=8)
tempparquetfilepath.unlink() if parquetfilepath.exists() else tempparquetfilepath.rename(parquetfilepath)

Check warning on line 524 in artistools/packets/packets.py

View workflow job for this annotation

GitHub Actions / Lint (ruff, mypy, pylint)

W0106

Expression "tempparquetfilepath.unlink() if parquetfilepath.exists() else tempparquetfilepath.rename(parquetfilepath)" is assigned to nothing
print(f"took {time.perf_counter() - time_start_write:.1f} seconds")
else:
print(f" scanning {parquetfilepath.relative_to(modelpath)}")
Expand Down

0 comments on commit 7e2ff81

Please sign in to comment.