Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support registering internal groups/dataset in HDF5. #687

Merged
merged 8 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions tiled/_tests/test_directory_walker.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,33 @@ def test_unknown_mimetype(tmpdir):
)
],
)


def test_one_asset_two_data_sources(tmpdir):
catalog = in_memory(writable_storage=tmpdir)
with Context.from_app(build_app(catalog)) as context:
client = from_context(context)
asset = Asset(
data_uri=ensure_uri(tmpdir / "test.csv"),
is_directory=False,
parameter="data_uris",
num=0,
)
with fail_with_status_code(415):
for key in ["x", "y"]:
client.new(
key=key,
structure_family="array",
metadata={},
specs=[],
data_sources=[
DataSource(
structure_family="array",
mimetype="text/csv",
structure=None,
parameters={},
management=Management.external,
assets=[asset],
)
],
)
29 changes: 29 additions & 0 deletions tiled/adapters/hdf5.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,35 @@
INLINED_DEPTH = int(os.getenv("TILED_HDF5_INLINED_CONTENTS_MAX_DEPTH", "7"))


def hdf5_lookup(
data_uri,
*,
structure=None,
metadata=None,
swmr=SWMR_DEFAULT,
libver="latest",
specs=None,
access_policy=None,
path=None,
):
path = path or []
adapter = HDF5Adapter.from_uri(
data_uri,
structure=structure,
metadata=metadata,
swmr=swmr,
libver=libver,
specs=specs,
access_policy=access_policy,
)
for segment in path:
adapter = adapter.get(segment)
if adapter is None:
raise KeyError(segment)
# TODO What to do with metadata, specs?
return adapter


def from_dataset(dataset):
return ArrayAdapter.from_array(dataset, metadata=getattr(dataset, "attrs", {}))

Expand Down
87 changes: 70 additions & 17 deletions tiled/catalog/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,15 @@ async def create_node(
specs=None,
data_sources=None,
):
# The only way to do "insert if does not exist" i.e. ON CONFLICT
# is to invoke dialect-specific insert.
if self.context.engine.dialect.name == "sqlite":
from sqlalchemy.dialects.sqlite import insert
elif self.context.engine.dialect.name == "postgresql":
from sqlalchemy.dialects.postgresql import insert
else:
assert False # future-proofing

key = key or self.context.key_maker()
data_sources = data_sources or []
node = orm.Node(
Expand Down Expand Up @@ -631,14 +640,6 @@ async def create_node(
structure_family, data_source.structure
)
structure_id = compute_structure_id(structure)
# The only way to do "insert if does not exist" i.e. ON CONFLICT
# is to invoke dialect-specific insert.
if self.context.engine.dialect.name == "sqlite":
from sqlalchemy.dialects.sqlite import insert
elif self.context.engine.dialect.name == "postgresql":
from sqlalchemy.dialects.postgresql import insert
else:
assert False # future-proofing
statement = (
insert(orm.Structure).values(
id=structure_id,
Expand All @@ -653,22 +654,32 @@ async def create_node(
parameters=data_source.parameters,
structure_id=structure_id,
)
db.add(data_source_orm)
node.data_sources.append(data_source_orm)
# await db.flush(data_source_orm)
await db.flush() # Get data_source_orm.id.
for asset in data_source.assets:
asset_orm = orm.Asset(
data_uri=asset.data_uri,
is_directory=asset.is_directory,
# Find an asset_id if it exists, otherwise create a new one
statement = select(orm.Asset.id).where(
orm.Asset.data_uri == asset.data_uri
)
result = await db.execute(statement)
if result := result.fetchone():
genematx marked this conversation as resolved.
Show resolved Hide resolved
(asset_id,) = result
else:
statement = insert(orm.Asset).values(
data_uri=asset.data_uri,
is_directory=asset.is_directory,
)
result = await db.execute(statement)
(asset_id,) = result.inserted_primary_key
assoc_orm = orm.DataSourceAssetAssociation(
asset=asset_orm,
asset_id=asset_id,
data_source_id=data_source_orm.id,
parameter=asset.parameter,
num=asset.num,
)
data_source_orm.asset_associations.append(assoc_orm)
db.add(node)
db.add(assoc_orm)
await db.commit()
await db.refresh(node)
# Load with DataSources each DataSource's Structure.
refreshed_node = (
await db.execute(
Expand All @@ -677,14 +688,56 @@ async def create_node(
.options(
selectinload(orm.Node.data_sources).selectinload(
orm.DataSource.structure
)
),
)
)
).scalar()
return key, type(self)(
self.context, refreshed_node, access_policy=self.access_policy
)

async def put_data_source(self, data_source):
# Obtain and hash the canonical (RFC 8785) representation of
# the JSON structure.
structure = _prepare_structure(
data_source.structure_family, data_source.structure
)
structure_id = compute_structure_id(structure)
# The only way to do "insert if does not exist" i.e. ON CONFLICT
# is to invoke dialect-specific insert.
if self.context.engine.dialect.name == "sqlite":
from sqlalchemy.dialects.sqlite import insert
elif self.context.engine.dialect.name == "postgresql":
from sqlalchemy.dialects.postgresql import insert
else:
assert False # future-proofing
statement = (
insert(orm.Structure).values(
id=structure_id,
structure=structure,
)
).on_conflict_do_nothing(index_elements=["id"])
async with self.context.session() as db:
await db.execute(statement)
values = dict(
structure_family=data_source.structure_family,
mimetype=data_source.mimetype,
management=data_source.management,
parameters=data_source.parameters,
structure_id=structure_id,
)
result = await db.execute(
update(orm.DataSource)
.where(orm.DataSource.id == data_source.id)
.values(**values)
)
if result.rowcount == 0:
raise HTTPException(
status_code=404,
detail=f"No data_source {data_source.id} on this node.",
)
await db.commit()

# async def patch_node(datasources=None):
# ...

Expand Down
2 changes: 1 addition & 1 deletion tiled/mimetypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
).ExcelAdapter.from_uri,
"application/x-hdf5": lambda: importlib.import_module(
"..adapters.hdf5", __name__
).HDF5Adapter.from_uri,
).hdf5_lookup,
"application/x-netcdf": lambda: importlib.import_module(
"..adapters.netcdf", __name__
).read_netcdf,
Expand Down
14 changes: 14 additions & 0 deletions tiled/server/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,20 @@ async def _create_node(
return json_or_msgpack(request, response_data)


@router.put("/data_source/{path:path}")
async def put_data_source(
request: Request,
path: str,
data_source: int,
body: schemas.PutDataSourceRequest,
settings: BaseSettings = Depends(get_settings),
entry=SecureEntry(scopes=["write:metadata", "register"]),
):
await entry.put_data_source(
data_source=body.data_source,
)


@router.delete("/metadata/{path:path}")
async def delete(
request: Request,
Expand Down
4 changes: 4 additions & 0 deletions tiled/server/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ def specs_uniqueness_validator(cls, v):
return v


class PutDataSourceRequest(pydantic.BaseModel):
data_source: DataSource


class PostMetadataResponse(pydantic.BaseModel, Generic[ResourceLinksT]):
id: str
links: Union[ArrayLinks, DataFrameLinks, SparseLinks]
Expand Down
Loading