Skip to content

Commit

Permalink
Store Dependencies as parquet file (#372)
Browse files Browse the repository at this point in the history
* Use pyarrow for save/load/dtypes in Dependencies

* Fix dtype mapping

* Fix expected str representation output

* Add pyarrow as dependency

* Add parquet format to save()/load()

* Add tests for parquet files

* Fix docstring of Dependencies.save()

* Publish dependency table as parquet file

* Fix cache handling for docs/publish.rst

* Compare dependency tables instead of MD5 sums

* Store always as parquet in cache

* Fix skipping of old audb caches

* Add LEGACY to old depedendency cache file name

* Use pickle in cache

* Remove debug print statement

* Mention correct dependency file in docs

* Add docstring to test

* Fix comment for errors test

* Simplify dependency file loading code

* Only convert dtype if needed during loading

* Add test for backward compatibility

* Remove unneeded line
  • Loading branch information
hagenw committed May 3, 2024
1 parent 3b053fa commit 4acf650
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 112 deletions.
35 changes: 20 additions & 15 deletions audb/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,15 @@ def cached(
flavor_id_paths = audeer.list_dir_names(version_path)

# Skip old audb cache (e.g. 1 as flavor)
files = audeer.list_file_names(version_path)
deps_path = os.path.join(version_path, define.DEPENDENCIES_FILE)
deps_path_cached = os.path.join(
version_path,
define.CACHED_DEPENDENCIES_FILE,
)
if deps_path not in files and deps_path_cached not in files:
files = audeer.list_file_names(version_path, basenames=True)
if (
define.DEPENDENCIES_FILE not in files
and define.LEGACY_DEPENDENCIES_FILE not in files
and define.CACHED_DEPENDENCIES_FILE not in files
):
# Skip all cache entries
# that don't contain a db.csv or db.pkl file
# that don't contain a dependency file
# as those stem from audb<1.0.0.
# We only look for db.csv
# as we switched to db.pkl with audb>=1.0.5
continue # pragma: no cover

for flavor_id_path in flavor_id_paths:
Expand Down Expand Up @@ -265,15 +262,15 @@ def dependencies(
version,
cache_root=cache_root,
)
deps_path = os.path.join(db_root, define.CACHED_DEPENDENCIES_FILE)
cached_path = os.path.join(db_root, define.CACHED_DEPENDENCIES_FILE)

deps = Dependencies()

with FolderLock(db_root):
try:
deps.load(deps_path)
deps.load(cached_path)
except (AttributeError, FileNotFoundError, ValueError, EOFError):
# If loading pickled cached file fails, load again from backend
# If loading cached file fails, load again from backend
backend = utils.lookup_backend(name, version)
with tempfile.TemporaryDirectory() as tmp_root:
archive = backend.join("/", name, define.DB + ".zip")
Expand All @@ -283,8 +280,16 @@ def dependencies(
version,
verbose=verbose,
)
deps.load(os.path.join(tmp_root, define.DEPENDENCIES_FILE))
deps.save(deps_path)
# Load parquet or csv from tmp dir
# and store as pickle in cache
deps_path = os.path.join(tmp_root, define.DEPENDENCIES_FILE)
legacy_path = os.path.join(tmp_root, define.LEGACY_DEPENDENCIES_FILE)
if os.path.exists(deps_path):
deps.load(deps_path)
else:
deps.load(legacy_path)
# Store as pickle in cache
deps.save(cached_path)

return deps

Expand Down
23 changes: 12 additions & 11 deletions audb/core/define.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
HEADER_FILE = f"{DB}.yaml"

# Dependencies
DEPENDENCIES_FILE = f"{DB}.csv"
DEPENDENCIES_FILE = f"{DB}.parquet"
CACHED_DEPENDENCIES_FILE = f"{DB}.pkl"
LEGACY_DEPENDENCIES_FILE = f"{DB}.csv"

# Cache lock
CACHED_VERSIONS_TIMEOUT = 10 # Timeout to acquire access to cached versions
Expand Down Expand Up @@ -48,16 +49,16 @@ class DependField:
}

DEPEND_FIELD_DTYPES = {
DependField.ARCHIVE: "string",
DependField.BIT_DEPTH: "int32",
DependField.CHANNELS: "int32",
DependField.CHECKSUM: "string",
DependField.DURATION: "float64",
DependField.FORMAT: "string",
DependField.REMOVED: "int32",
DependField.SAMPLING_RATE: "int32",
DependField.TYPE: "int32",
DependField.VERSION: "string",
DependField.ARCHIVE: "string[pyarrow]",
DependField.BIT_DEPTH: "int32[pyarrow]",
DependField.CHANNELS: "int32[pyarrow]",
DependField.CHECKSUM: "string[pyarrow]",
DependField.DURATION: "float64[pyarrow]",
DependField.FORMAT: "string[pyarrow]",
DependField.REMOVED: "int32[pyarrow]",
DependField.SAMPLING_RATE: "int32[pyarrow]",
DependField.TYPE: "int32[pyarrow]",
DependField.VERSION: "string[pyarrow]",
}

DEPEND_INDEX_DTYPE = "object"
Expand Down
137 changes: 111 additions & 26 deletions audb/core/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import typing

import pandas as pd
import pyarrow as pa
import pyarrow.csv as csv
import pyarrow.parquet as parquet

import audeer

Expand Down Expand Up @@ -59,6 +62,23 @@ def __init__(self):
):
data[name] = pd.Series(dtype=dtype)
self._df = pd.DataFrame(data)
# pyarrow schema
# used for reading and writing files
self._schema = pa.schema(
[
("file", pa.string()),
("archive", pa.string()),
("bit_depth", pa.int32()),
("channels", pa.int32()),
("checksum", pa.string()),
("duration", pa.float64()),
("format", pa.string()),
("removed", pa.int32()),
("sampling_rate", pa.int32()),
("type", pa.int32()),
("version", pa.string()),
]
)

def __call__(self) -> pd.DataFrame:
r"""Return dependencies as a table.
Expand Down Expand Up @@ -297,19 +317,22 @@ def load(self, path: str):
Args:
path: path to file.
File extension can be ``csv`` or ``pkl``
File extension can be ``csv``
``pkl``,
or ``parquet``
Raises:
ValueError: if file extension is not ``csv`` or ``pkl``
ValueError: if file extension is not one of
``csv``, ``pkl``, ``parquet``
FileNotFoundError: if ``path`` does not exists
"""
self._df = pd.DataFrame(columns=define.DEPEND_FIELD_NAMES.values())
path = audeer.path(path)
extension = audeer.file_extension(path)
if extension not in ["csv", "pkl"]:
if extension not in ["csv", "pkl", "parquet"]:
raise ValueError(
f"File extension of 'path' has to be 'csv' or 'pkl' "
f"File extension of 'path' has to be 'csv', 'pkl', or 'parquet' "
f"not '{extension}'"
)
if not os.path.exists(path):
Expand All @@ -320,29 +343,27 @@ def load(self, path: str):
)
if extension == "pkl":
self._df = pd.read_pickle(path)
# Correct dtype of index
# to make backward compatiple
# with old pickle files in cache
# that might use `string` as dtype
if self._df.index.dtype != define.DEPEND_INDEX_DTYPE:
self._df.index = self._df.index.astype(define.DEPEND_INDEX_DTYPE)

elif extension == "csv":
# Data type of dependency columns
dtype_mapping = {
name: dtype
for name, dtype in zip(
define.DEPEND_FIELD_NAMES.values(),
define.DEPEND_FIELD_DTYPES.values(),
)
}
# Data type of index
index = 0
self._df = pd.read_csv(
table = csv.read_csv(
path,
index_col=index,
na_filter=False,
dtype=dtype_mapping,
read_options=csv.ReadOptions(
column_names=self._schema.names,
skip_rows=1,
),
convert_options=csv.ConvertOptions(column_types=self._schema),
)
self._df.index.name = None
# Set dtype of index for both CSV and PKL
# to make backward compatiple
# with old pickle files in cache
# that might use `string` as dtype
self._df.index = self._df.index.astype(define.DEPEND_INDEX_DTYPE)
self._df = self._table_to_dataframe(table)

elif extension == "parquet":
table = parquet.read_table(path)
self._df = self._table_to_dataframe(table)

def removed(
self,
Expand Down Expand Up @@ -379,17 +400,25 @@ def save(self, path: str):
Args:
path: path to file.
File extension can be ``csv`` or ``pkl``
File extension can be ``csv``, ``pkl``, or ``parquet``
"""
path = audeer.path(path)
if path.endswith("csv"):
self._df.to_csv(path)
table = self._dataframe_to_table(self._df)
csv.write_csv(
table,
path,
write_options=csv.WriteOptions(quoting_style="none"),
)
elif path.endswith("pkl"):
self._df.to_pickle(
path,
protocol=4, # supported by Python >= 3.4
)
elif path.endswith("parquet"):
table = self._dataframe_to_table(self._df, file_column=True)
parquet.write_table(table, path)

def type(
self,
Expand Down Expand Up @@ -539,6 +568,35 @@ def _column_loc(
values = values.tolist()
return values

def _dataframe_to_table(
self,
df: pd.DataFrame,
*,
file_column: bool = False,
) -> pa.Table:
r"""Convert pandas dataframe to pyarrow table.
Args:
df: dependency table as pandas dataframe
file_column: if ``False``
the ``"file"`` column
is renamed to ``""``
Returns:
dependency table as pyarrow table
"""
table = pa.Table.from_pandas(
df.reset_index().rename(columns={"index": "file"}),
preserve_index=False,
schema=self._schema,
)
if not file_column:
columns = table.column_names
columns = ["" if c == "file" else c for c in columns]
table = table.rename_columns(columns)
return table

def _drop(self, files: typing.Sequence[str]):
r"""Drop files from table.
Expand All @@ -563,6 +621,33 @@ def _remove(self, file: str):
"""
self._df.at[file, "removed"] = 1

def _table_to_dataframe(self, table: pa.Table) -> pd.DataFrame:
r"""Convert pyarrow table to pandas dataframe.
Args:
table: dependency table as pyarrow table
Returns:
dependency table as pandas dataframe
"""
df = table.to_pandas(
deduplicate_objects=False,
# Convert to pyarrow dtypes,
# but ensure we use pd.StringDtype("pyarrow")
# and not pd.ArrowDtype(pa.string())
# see https://pandas.pydata.org/docs/user_guide/pyarrow.html
types_mapper={
pa.string(): pd.StringDtype("pyarrow"),
pa.int32(): pd.ArrowDtype(pa.int32()),
pa.float64(): pd.ArrowDtype(pa.float64()),
}.get, # we have to provide a callable, not a dict
)
df.set_index("file", inplace=True)
df.index.name = None
df.index = df.index.astype(define.DEPEND_INDEX_DTYPE)
return df

def _update_media(
self,
values: typing.Sequence[
Expand Down
54 changes: 25 additions & 29 deletions audb/core/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,18 +615,20 @@ def publish(
previous_version = None

# load database and dependencies
deps_path = os.path.join(db_root, define.DEPENDENCIES_FILE)
deps = Dependencies()
if os.path.exists(deps_path):
deps.load(deps_path)
for deps_file in [define.DEPENDENCIES_FILE, define.LEGACY_DEPENDENCIES_FILE]:
deps_path = os.path.join(db_root, deps_file)
if os.path.exists(deps_path):
deps.load(deps_path)
break

# check if database folder depends on the right version

# dependencies shouldn't be there
if previous_version is None and len(deps) > 0:
raise RuntimeError(
f"You did not set a dependency to a previous version, "
f"but you have a '{define.DEPENDENCIES_FILE}' file present "
f"but you have a '{deps_file}' file present "
f"in {db_root}."
)

Expand All @@ -644,32 +646,25 @@ def publish(

# dependencies do not match version
if previous_version is not None and len(deps) > 0:
with tempfile.TemporaryDirectory() as tmp_dir:
previous_deps_path = os.path.join(
tmp_dir,
define.DEPENDENCIES_FILE,
)
previous_deps = dependencies(
db.name,
version=previous_version,
cache_root=cache_root,
verbose=verbose,
previous_deps = dependencies(
db.name,
version=previous_version,
cache_root=cache_root,
verbose=verbose,
)
if not deps().equals(previous_deps()):
raise RuntimeError(
f"You want to depend on '{previous_version}' "
f"of {db.name}, "
f"but the dependency file '{deps_file}' "
f"in {db_root} "
f"does not match the dependency file "
f"for the requested version in the repository. "
f"Did you forgot to call "
f"'audb.load_to({db_root}, {db.name}, "
f"version='{previous_version}') "
f"or modified the file manually?"
)
previous_deps.save(previous_deps_path)
if audeer.md5(deps_path) != audeer.md5(previous_deps_path):
raise RuntimeError(
f"You want to depend on '{previous_version}' "
f"of {db.name}, "
f"but the MD5 sum of your "
f"'{define.DEPENDENCIES_FILE}' file "
f"in {db_root} "
f"does not match the MD5 sum of the corresponding file "
f"for the requested version in the repository. "
f"Did you forgot to call "
f"'audb.load_to({db_root}, {db.name}, "
f"version='{previous_version}') "
f"or modified the file manually?"
)

# load database with table data
db = audformat.Database.load(
Expand Down Expand Up @@ -753,6 +748,7 @@ def publish(
)

# publish dependencies and header
deps_path = os.path.join(db_root, define.DEPENDENCIES_FILE)
deps.save(deps_path)
archive_file = backend.join("/", db.name, define.DB + ".zip")
backend.put_archive(
Expand Down
Loading

0 comments on commit 4acf650

Please sign in to comment.