Skip to content

Commit

Permalink
Remove _content serialization related deadcode
Browse files Browse the repository at this point in the history
  • Loading branch information
kartik4949 committed May 22, 2024
1 parent 6de239f commit d97f041
Show file tree
Hide file tree
Showing 14 changed files with 5 additions and 415 deletions.
33 changes: 0 additions & 33 deletions superduperdb/backends/base/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,6 @@ def _delete_bytes(self, file_id: str):
:param file_id: File id uses to identify artifact in store
"""

def delete(self, r: t.Dict):
"""Delete artifact from artifact store.
:param r: dictionary with mandatory fields
{'file_id'}
"""
if '_content' in r and 'file_id' in r['_content']:
return self._delete_bytes(r['_content']['file_id'])
for v in r.values():
if isinstance(v, dict):
self.delete(v)

@abstractmethod
def drop(self, force: bool = False):
"""
Expand Down Expand Up @@ -190,27 +178,6 @@ def load_artifact(self, r):
x = self._load_bytes(file_id)
return datatype.decode_data(x)

def save(self, r: t.Dict) -> t.Dict:
"""Save list of artifacts and replace the artifacts with file reference.
:param r: `dict` of artifacts.
"""
if isinstance(r, dict):
if '_content' in r and r['_content']['leaf_type'] in {
'artifact',
'file',
'lazy_artifact',
'lazy_file',
}:
self.save_artifact(r['_content'])
else:
for k in r:
self.save(r[k])
if isinstance(r, list):
for x in r:
self.save(x)
return r

@abstractmethod
def disconnect(self):
"""Disconnect the client."""
Expand Down
9 changes: 0 additions & 9 deletions superduperdb/backends/base/data_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,6 @@ def get_table_or_collection(self, identifier):
"""
pass

def set_content_bytes(self, r, key, bytes_):
"""Set content bytes.
:param r: The row.
:param key: The key.
:param bytes_: The bytes.
"""
raise NotImplementedError

@abstractmethod
def drop(self, force: bool = False):
"""Drop the databackend.
Expand Down
1 change: 0 additions & 1 deletion superduperdb/backends/ibis/cdc/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ class IbisDatabaseListener(cdc.BaseDatabaseListener):
"""

DEFAULT_ID: str = 'id'
EXCLUSION_KEYS: t.Sequence[str] = [DEFAULT_ID]
IDENTITY_SEP: str = '/'
_scheduler: t.Optional[threading.Thread]

Expand Down
25 changes: 0 additions & 25 deletions superduperdb/backends/ibis/cursor.py

This file was deleted.

9 changes: 0 additions & 9 deletions superduperdb/backends/ibis/data_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,6 @@ def build_metadata(self):
"""Build metadata for the database."""
return SQLAlchemyMetadata(conn=self.conn.con, name='ibis')

# TODO this duplicates the method create_table_and_schema
def create_ibis_table(self, identifier: str, schema: Schema):
"""Create a table in the database.
:param identifier: The identifier of the table.
:param schema: The schema of the table.
"""
self.conn.create_table(identifier, schema=schema)

def insert(self, table_name, raw_documents):
"""Insert data into the database.
Expand Down
9 changes: 0 additions & 9 deletions superduperdb/backends/ibis/utils.py

This file was deleted.

1 change: 0 additions & 1 deletion superduperdb/backends/mongodb/cdc/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ class MongoDatabaseListener(cdc.BaseDatabaseListener):
"""

DEFAULT_ID: str = '_id'
EXCLUSION_KEYS: t.Sequence[str] = [DEFAULT_ID]
IDENTITY_SEP: str = '/'
_scheduler: t.Optional[threading.Thread]

Expand Down
27 changes: 0 additions & 27 deletions superduperdb/backends/mongodb/data_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from superduperdb.base.enums import DBType
from superduperdb.components.datatype import DataType
from superduperdb.misc.colors import Colors
from superduperdb.misc.special_dicts import MongoStyleDict

from .query import MongoQuery

Expand Down Expand Up @@ -98,32 +97,6 @@ def get_table_or_collection(self, identifier):
"""
return self._db[identifier]

def set_content_bytes(self, r, key, bytes_):
"""Set the content bytes in the data backend.
:param r: dictionary containing information about the content
:param key: key to set
:param bytes_: content bytes
"""
if not isinstance(r, MongoStyleDict):
r = MongoStyleDict(r)
r[f'{key}._content.bytes'] = bytes_
return r

def exists(self, table_or_collection, id, key):
"""Check if a document exists in the data backend.
:param table_or_collection: table or collection identifier
:param id: document identifier
:param key: key to check
"""
return (
self.db[table_or_collection].find_one(
{'_id': id, f'{key}._content.bytes': {'$exists': 1}}
)
is not None
)

def unset_outputs(self, info: t.Dict):
"""Unset the output field in the data backend.
Expand Down
46 changes: 0 additions & 46 deletions superduperdb/backends/mongodb/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,33 +82,13 @@ def create_job(self, info: t.Dict) -> InsertOneResult:
"""
return self.job_collection.insert_one(info)

def get_parent_child_relations(self):
"""Get parent-child relations from the metadata store."""
c = self.parent_child_mappings.find()
return [(r['parent'], r['child']) for r in c]

def get_component_version_children(self, uuid: str):
"""Get the children of a component version.
:param uuid: unique identifier of component
"""
return self.parent_child_mappings.distinct('child', {'parent': uuid})

def get_job(self, identifier: str):
"""Get a job from the metadata store.
:param identifier: identifier of job
"""
return self.job_collection.find_one({'identifier': identifier})

def create_metadata(self, key: str, value: str):
"""Create metadata in the metadata store.
:param key: key to be created
:param value: value to be created
"""
return self.meta_collection.insert_one({'key': key, 'value': value})

def get_metadata(self, key: str):
"""Get metadata from the metadata store.
Expand Down Expand Up @@ -205,23 +185,6 @@ def show_component_versions(
'version', {'type_id': type_id, 'identifier': identifier}
)

def list_components_in_scope(self, scope: str):
"""List components in a scope.
:param scope: scope of components
"""
out = []
for r in self.component_collection.find({'parent': scope}):
out.append((r['type_id'], r['identifier']))
return out

def show_job(self, job_id: str):
"""Show a job in the metadata store.
:param job_id: identifier of job
"""
return self.job_collection.find_one({'identifier': job_id})

def show_jobs(
self,
component_identifier: t.Optional[str] = None,
Expand All @@ -245,15 +208,6 @@ def _get_component_uuid(self, type_id: str, identifier: str, version: int) -> st
{'uuid': 1},
)['uuid']

def component_has_parents(self, type_id: str, identifier: str) -> int:
"""Check if a component has parents.
:param type_id: type of component
:param identifier: identifier of component
"""
doc = {'child': {'$regex': f'^{type_id}/{identifier}/'}}
return self.parent_child_mappings.count_documents(doc)

def component_version_has_parents(
self, type_id: str, identifier: str, version: int
) -> int:
Expand Down
103 changes: 1 addition & 102 deletions superduperdb/backends/sqlalchemy/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@
from superduperdb import logging
from superduperdb.backends.base.metadata import MetaDataStore, NonExistentMetadataError
from superduperdb.backends.sqlalchemy.db_helper import get_db_config
from superduperdb.base.document import Document
from superduperdb.components.component import Component as _Component
from superduperdb.misc.colors import Colors

if t.TYPE_CHECKING:
from superduperdb.backends.base.query import Select
pass


class SQLAlchemyMetadata(MetaDataStore):
Expand Down Expand Up @@ -546,107 +545,7 @@ def write_output_to_job(self, identifier, msg, stream):
# Not supported currently
raise NotImplementedError

# --------------- METADATA -----------------

def create_metadata(self, key, value):
"""Create metadata with the given key and value.
:param key: The key to create
:param value: The value to create
"""
with self.session_context() as session:
stmt = insert(self.meta_table).values(key=key, value=value)
session.execute(stmt)

def get_metadata(self, key):
"""Get the metadata with the given key.
:param key: The key to retrieve
"""
with self.session_context() as session:
stmt = select(self.meta_table).where(self.meta_table.c.key == key).limit(1)
res = self.query_results(self.meta_table, stmt, session)
value = res[0]['value'] if res else None
return value

def update_metadata(self, key, value):
"""Update the metadata with the given key.
:param key: The key to update
:param value: The updated value
"""
with self.session_context() as session:
stmt = (
self.meta_table.update()
.where(self.meta_table.c.key == key)
.values({key: value})
)
session.execute(stmt)

# --------------- Query ID -----------------
def add_query(self, query: 'Select', model: str):
"""Add a query to the query table.
:param query: The query to add to the table.
:param model: The model to associate with the query.
"""
query_hash = str(hash(query))

with self.session_context() as session:
with self._lock:
row = {
'query': query.dict().encode(),
'model': model,
'query_id': query_hash,
}

stmt = insert(self.query_id_table).values(**row)
session.execute(stmt)

def get_query(self, query_hash: str):
"""Get the query from the query table corresponding to the query hash.
:param query_hash: The hash of the query to retrieve.
"""
try:
with self.session_context() as session:
stmt = (
select(self.query_id_table)
.where(self.query_id_table.c.query_id == str(query_hash))
.limit(1)
)
res = self.query_results(self.query_id_table, stmt, session)
out = res[0] if res else None
except AttributeError as e:
if 'NoneType' in str(e):
raise NonExistentMetadataError(
f'Query hash {query_hash} does not exist'
)
raise e

if out is None:
raise NonExistentMetadataError(f'Query hash {query_hash} does not exist')

def get_model_queries(self, model: str):
"""Get queries related to the given model.
:param model: The name of the model to retrieve queries for.
"""
with self.session_context() as session:
stmt = select(self.query_id_table).where(
self.query_id_table.c.model == model
)
queries = self.query_results(self.query_id_table, stmt, session)

unpacked_queries = []
for row in queries:
id = row['query_id']
serialized = row['query']
query = Document.decode(serialized)
unpacked_queries.append(
{'query_id': id, 'query': query, 'sql': query.repr_()}
)
return unpacked_queries

def disconnect(self):
"""Disconnect the client."""
Expand Down
Loading

0 comments on commit d97f041

Please sign in to comment.