In [79]:
from datacentric import config
from google.cloud import aiplatform
from google.cloud.aiplatform.metadata.schema.system import artifact_schema
from packaging.version import Version

cfg = config.new()
aiplatform.init(project=cfg.project_id, location=cfg.location)

In [80]:
artifacts = aiplatform.Artifact.list()
for artifact in artifacts:
    artifact.delete()

Deleting Artifact : projects/714561025480/locations/us-central1/metadataStores/default/artifacts/ea03a801-992c-4bb3-9518-ac4dcf986acf
Artifact deleted. . Resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/ea03a801-992c-4bb3-9518-ac4dcf986acf
Deleting Artifact resource: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/ea03a801-992c-4bb3-9518-ac4dcf986acf
Delete Artifact backing LRO: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/ea03a801-992c-4bb3-9518-ac4dcf986acf/operations/3183547304460156928
Artifact resource projects/714561025480/locations/us-central1/metadataStores/default/artifacts/ea03a801-992c-4bb3-9518-ac4dcf986acf deleted.
Deleting Artifact : projects/714561025480/locations/us-central1/metadataStores/default/artifacts/5ba9f46f-54c0-4d6b-955f-1ea5affd3d56
Artifact deleted. . Resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/5ba9f46f-54c0-

In [81]:
from typing import Optional, List
import json
from google.cloud import storage
from packaging.version import Version

class DatasetCatalogue:
    def new(self, name: str, gcs_uri: str, version: Version):
        existing_datasets = self.list(name=name)
        if any(dataset.name == name and dataset.version == version for dataset in existing_datasets):
            raise ValueError(f"Dataset with name {name} and version {version} already exists.")
        return DatasetService(aiplatform.Artifact.create(
            schema_title="system.Dataset", 
            display_name=name,
            uri=gcs_uri,
            metadata={
                "version": str(version),
                "status": "ACTIVE"
            }
        ))

    def get(self, name: str, active_only: bool = False, sort_by_semver: bool = False):
        artifacts = self._list(name=name, active_only=active_only)
        if sort_by_semver:
            artifacts = self._sort_by_semver_version(artifacts=artifacts)
        return DatasetService(artifacts[0])

    def _list(self, name: Optional[str] = None, active_only: bool = False):
        filter = f'display_name={name}' if name is not None else None
        artifacts = artifact_schema.Dataset.list(filter=filter)
        if active_only:
            artifacts = self._filter_active(artifacts)
        return artifacts

    def list(self, name: Optional[str] = None, active_only: bool = False):
        return [DatasetService(artifact) for artifact in self._list(name=name, active_only=active_only)]

    @property
    def available_datasets(self):
        return set(x.name for x in self.list())

    def _sort_by_semver_version(self, artifacts: List[artifact_schema.Dataset]):
        sorted_artifacts = sorted(artifacts, key=lambda artifact: Version(artifact.metadata.get('version')), reverse=True)
        return sorted_artifacts

    def _filter_active(self, artifacts: List[artifact_schema.Dataset]):
        return [artifact for artifact in artifacts if artifact.metadata.get("status") == "ACTIVE"]



class DatasetService:
    def __init__(self, artifact: artifact_schema.Dataset):
        self.artifact = artifact
        self.client = storage.Client()
        self.bucket_name, self.glob_pattern = artifact.uri.replace("gs://", "").split("/", 1)
        self.bucket = self.client.get_bucket(self.bucket_name)

    def _get_blobs(self):
        return list(self.bucket.list_blobs(match_glob=self.glob_pattern))

    def _load_blob_data(self, blob):
        return [json.loads(line) for line in blob.download_as_string().decode('utf-8').split('\n') if line]

    @property
    def name(self):
        return self.artifact.display_name

    @property
    def version(self):
        return Version(self.artifact.metadata.get("version"))

    @property
    def status(self):
        return self.artifact.metadata.get("status")

    def make_inactive(self):
        new_metadata = self.artifact.metadata.copy()
        new_metadata['status'] = "INACTIVE"
        self.artifact.update(new_metadata)

    def load(self):
        blobs = self._get_blobs()
        data = [item for blob in blobs for item in self._load_blob_data(blob)]
        return data

catalogue = DatasetCatalogue()
dataset_artifact = catalogue.new(
    name="question_answering",
    gcs_uri="gs://ruinard_datacentric/assets/data/*.jsonl",
    version=Version("3.0.1")
)
print('newly created dataset name: ', dataset_artifact.name)

print("Available datasets: ", catalogue.available_datasets)
dataset = catalogue.get(name=dataset_artifact.name, active_only=True, sort_by_semver=True)
dataset.load()

print(f'dataset version: {dataset.version} --- dataset status: {dataset.status}')
dataset.make_inactive()
print(f'dataset version: {dataset.version} --- dataset status: {dataset.status}')

# new version
dataset = catalogue.new(
    name="question_answering",
    gcs_uri="gs://ruinard_datacentric/assets/data/*.jsonl",
    version=Version(f"{dataset.version.major}.{dataset.version.minor}.{dataset.version.micro + 1}")
)
print(f'dataset version: {dataset.version} --- dataset status: {dataset.status}')

newly created dataset name:  question_answering
Available datasets:  {'question_answering'}
dataset version: 3.0.1 --- dataset status: ACTIVE
dataset version: 3.0.1 --- dataset status: INACTIVE
dataset version: 3.0.2 --- dataset status: ACTIVE


In [4]:
DATASET_URI = "gs://ruinard_datacentric/assets/data/*.jsonl"
display_name = 'vertex_metadata_managed_question_answering_dataset'

for inactive_version in range(10):
    dataset_artifact = aiplatform.Artifact.create(
        schema_title="system.Dataset", display_name=display_name, uri=DATASET_URI, metadata={"custom_schema": "gs://my_bucket/my_schema/schema.yaml", "version": f"1.0.{inactive_version}", "status": "INACTIVE"}
    )
dataset_artifact = aiplatform.Artifact.create(
    schema_title="system.Dataset", display_name=display_name, uri=DATASET_URI, metadata={"custom_schema": "gs://my_bucket/my_schema/schema.yaml", "version": f"2.0.0", "status": "ACTIVE"}
)
dataset = artifact_schema.Dataset(
    display_name=display_name, uri=DATASET_URI, metadata={"custom_schema": "gs://my_bucket/my_schema/schema.yaml"}
    ).create()

In [48]:
 artifact_schema.Dataset.list()

[<google.cloud.aiplatform.metadata.schema.system.artifact_schema.Dataset object at 0x7f93ecdf22f0> 
 resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/49757494-9957-44a0-acc0-d962a151d07b
 uri: gs://ruinard_datacentric/assets/data/*.jsonl
 schema_title:system.Dataset,
 <google.cloud.aiplatform.metadata.schema.system.artifact_schema.Dataset object at 0x7f93ecd8bdc0> 
 resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/416c46fd-26f3-4fed-8186-fe273c2eb930
 uri: gs://ruinard_datacentric/assets/data/*.jsonl
 schema_title:system.Dataset,
 <google.cloud.aiplatform.metadata.schema.system.artifact_schema.Dataset object at 0x7f93ec340f40> 
 resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/756232a8-3604-4c54-ab59-4a78a1669e27
 uri: gs://ruinard_datacentric/assets/data/*.jsonl
 schema_title:system.Dataset,
 <google.cloud.aiplatform.metadata.schema.system.artifact_schema.Da

In [47]:
aiplatform.Artifact.list(filter='schema_title="system.Dataset"')

[<google.cloud.aiplatform.metadata.artifact.Artifact object at 0x7f93ecd426e0> 
 resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/49757494-9957-44a0-acc0-d962a151d07b
 uri: gs://ruinard_datacentric/assets/data/*.jsonl
 schema_title:system.Dataset,
 <google.cloud.aiplatform.metadata.artifact.Artifact object at 0x7f93ecdf05b0> 
 resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/416c46fd-26f3-4fed-8186-fe273c2eb930
 uri: gs://ruinard_datacentric/assets/data/*.jsonl
 schema_title:system.Dataset,
 <google.cloud.aiplatform.metadata.artifact.Artifact object at 0x7f93ecdf0370> 
 resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/756232a8-3604-4c54-ab59-4a78a1669e27
 uri: gs://ruinard_datacentric/assets/data/*.jsonl
 schema_title:system.Dataset,
 <google.cloud.aiplatform.metadata.artifact.Artifact object at 0x7f93ecdf33d0> 
 resource name: projects/714561025480/locations

In [44]:
dataset.to

<google.cloud.aiplatform.metadata.schema.system.artifact_schema.Dataset object at 0x7f93f8219720> 
resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/756232a8-3604-4c54-ab59-4a78a1669e27
uri: gs://ruinard_datacentric/assets/data/*.jsonl
schema_title:system.Dataset

In [5]:
# dataset_artifact.update(
#     metadata={"custom_schema": "gs://my_bucket/my_schema/schema.yaml", "version": "1.0.0", "status": "INACTIVE"}
# )

In [6]:
# dataset_artifact.to_dict()

In [16]:
from packaging.version import Version
filter = f'display_name={display_name}'
artifacts = aiplatform.Artifact.list(filter=filter)
max_version, artifact_of_interest = max(
    ((Version(artifact.metadata.get('version')), artifact) for artifact in artifacts if artifact.metadata.get('version')),
    default=(Version('0.0.0'), None)
)

max_version, artifact_of_interest, artifact_of_interest.metadata

(<Version('2.0.0')>,
 <google.cloud.aiplatform.metadata.artifact.Artifact object at 0x7f93f83c3160> 
 resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/416c46fd-26f3-4fed-8186-fe273c2eb930
 uri: gs://ruinard_datacentric/assets/data/*.jsonl
 schema_title:system.Dataset,
 {'version': '2.0.0',
  'status': 'INACTIVE',
  'custom_schema': 'gs://my_bucket/my_schema/schema.yaml'})

In [24]:
# update and new version
new_metadata = artifact_of_interest.metadata.copy()
new_metadata['status'] = "INACTIVE"
artifact_of_interest.update(new_metadata)

new_version = Version(f'{max_version.major}.{max_version.minor}.{max_version.micro}')
dataset_artifact = aiplatform.Artifact.create(
    schema_title="system.Dataset", display_name=display_name, uri=DATASET_URI, metadata={"custom_schema": "gs://my_bucket/my_schema/schema.yaml", "version": str(new_version), "status": "ACTIVE"}
)

In [25]:
dataset_artifact

<google.cloud.aiplatform.metadata.artifact.Artifact object at 0x7f93f827c130> 
resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/49757494-9957-44a0-acc0-d962a151d07b
uri: gs://ruinard_datacentric/assets/data/*.jsonl
schema_title:system.Dataset

In [39]:
aiplatform.Artifact.list()

[<google.cloud.aiplatform.metadata.artifact.Artifact object at 0x7f93f8cab2b0> 
 resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/49757494-9957-44a0-acc0-d962a151d07b
 uri: gs://ruinard_datacentric/assets/data/*.jsonl
 schema_title:system.Dataset,
 <google.cloud.aiplatform.metadata.artifact.Artifact object at 0x7f93f8cabc70> 
 resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/416c46fd-26f3-4fed-8186-fe273c2eb930
 uri: gs://ruinard_datacentric/assets/data/*.jsonl
 schema_title:system.Dataset,
 <google.cloud.aiplatform.metadata.artifact.Artifact object at 0x7f93f8b744f0> 
 resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/756232a8-3604-4c54-ab59-4a78a1669e27
 uri: gs://ruinard_datacentric/assets/data/*.jsonl
 schema_title:system.Dataset,
 <google.cloud.aiplatform.metadata.artifact.Artifact object at 0x7f93f8b74700> 
 resource name: projects/714561025480/locations

In [49]:
datasets = aiplatform.Artifact.list(filter='schema_title="system.Dataset"')


'vertex_metadata_managed_question_answering_dataset'

ValueError: Dataset with name question_answering and version 3.0.1 already exists.

In [100]:
artifact.metadata

{'custom_schema': 'gs://my_bucket/my_schema/schema.yaml'}

In [None]:
dataset = artifact_schema.Dataset(
    display_name=display_name, uri=DATASET_URI, metadata={"custom_schema": "gs://my_bucket/my_schema/schema.yaml"}
    ).create()

In [None]:
dataset_artifact = aiplatform.Artifact.create(
    schema_title="system.Dataset", display_name=display_name, uri=DATASET_URI, metadata={"custom_schema": "gs://my_bucket/my_schema/schema.yaml", "version": "1.0.0", "status": "LIVE"}
)

In [62]:
artifact_schema.gca_artifact.Artifact.State

<enum 'State'>

In [68]:
dataset = artifact_schema.Dataset(
    display_name=display_name, uri=DATASET_URI, metadata={"custom_schema": "gs://my_bucket/my_schema/schema.yaml"}
    ).create()
updated_uri = "gs://ruinard_datacentric/assets/data/some_new_uri/*.jsonl"
dataset.uri = updated_uri
dataset.sync_resource()

AttributeError: can't set attribute 'uri'

In [67]:
dataset.uri

'gs://ruinard_datacentric/assets/data/*.jsonl'

In [52]:
dataset.lineage_console_uri

'https://console.cloud.google.com/vertex-ai/locations/us-central1/metadata-stores/default/artifacts/958bb996-beab-4c73-a3d2-ff8d90765e2c?project=llmops-data-engine'

In [45]:
dataset_artifact.to_dict()

{'name': 'projects/714561025480/locations/us-central1/metadataStores/default/artifacts/7a267116-5221-4405-b159-e49c5ba1a8db',
 'displayName': 'vertex_metadata_managed_question_answering_dataset',
 'uri': 'gs://ruinard_datacentric/assets/data/*.jsonl',
 'etag': '1734598186000',
 'createTime': '2024-12-19T08:49:46Z',
 'updateTime': '2024-12-19T08:49:46Z',
 'state': 'LIVE',
 'schemaTitle': 'system.Dataset',
 'schemaVersion': '0.0.1',
 'metadata': {'custom_schema': 'gs://my_bucket/my_schema/schema.yaml'}}

In [37]:
dataset_artifact = aiplatform.Artifact('projects/714561025480/locations/us-central1/metadataStores/default/artifacts/c186113d-2ea0-4e31-9ea3-28bb55063412')
dataset_artifact.to_dict()

{'name': 'projects/714561025480/locations/us-central1/metadataStores/default/artifacts/c186113d-2ea0-4e31-9ea3-28bb55063412',
 'displayName': 'vertex_metadata_managed_question_answering_dataset',
 'uri': 'gs://ruinard_datacentric/assets/data/*.jsonl',
 'etag': '1734598053726',
 'createTime': '2024-12-19T08:47:33.726Z',
 'updateTime': '2024-12-19T08:47:33.726Z',
 'state': 'LIVE',
 'schemaTitle': 'google.VertexDataset',
 'schemaVersion': '0.0.1',
 'metadata': {'custom_schema': 'gs://my_bucket/my_schema/schema.yaml'}}

In [38]:
from google.cloud.aiplatform.metadata.schema.system import artifact_schema

In [41]:
ds = artifact_schema.Dataset(artifact_id='projects/714561025480/locations/us-central1/metadataStores/default/artifacts/c186113d-2ea0-4e31-9ea3-28bb55063412')

ds


<google.cloud.aiplatform.metadata.schema.system.artifact_schema.Dataset object at 0x7f7c111c2ce0>
schema_title: system.Dataset

In [30]:
dataset_artifact.

<bound method _Resource.update of <google.cloud.aiplatform.metadata.artifact.Artifact object at 0x7f7c111c24d0> 
resource name: projects/714561025480/locations/us-central1/metadataStores/default/artifacts/54ff5787-8922-4921-887a-615dcceeaa6e
uri: gs://ruinard_datacentric/assets/data/*.jsonl
schema_title:system.Dataset>

In [26]:
ds

SyntaxError: invalid syntax (1233047620.py, line 1)

In [41]:
ds = aiplatform.Artifact.list(filter=f"display_name={display_name}")[0]

In [42]:
bucket_name, blob_path = ds.uri.split('/', 3)[2:]
bucket_name, blob_path

('ruinard_datacentric', 'assets/data/*.jsonl')

In [33]:
datasets = aiplatform.TabularDataset.list()
for ds in datasets:
    ds.delete()

Deleting TabularDataset : projects/714561025480/locations/us-central1/datasets/8411576763544502272
TabularDataset deleted. . Resource name: projects/714561025480/locations/us-central1/datasets/8411576763544502272
Deleting TabularDataset resource: projects/714561025480/locations/us-central1/datasets/8411576763544502272
Delete TabularDataset backing LRO: projects/714561025480/locations/us-central1/operations/5081005306838777856
TabularDataset resource projects/714561025480/locations/us-central1/datasets/8411576763544502272 deleted.
Deleting TabularDataset : projects/714561025480/locations/us-central1/datasets/2646969240510267392
TabularDataset deleted. . Resource name: projects/714561025480/locations/us-central1/datasets/2646969240510267392
Deleting TabularDataset resource: projects/714561025480/locations/us-central1/datasets/2646969240510267392
Delete TabularDataset backing LRO: projects/714561025480/locations/us-central1/operations/7649605206811869184
TabularDataset resource projects/7

In [45]:
data

[{'question': 'What is the capital of France?', 'answer': 'Paris'},
 {'question': 'What is the largest planet in our solar system?',
  'answer': 'Jupiter'},
 {'question': "Who wrote 'To Kill a Mockingbird'?", 'answer': 'Harper Lee'},
 {'question': 'What is the boiling point of water?',
  'answer': '100 degrees Celsius'},
 {'question': 'What is the currency of Japan?', 'answer': 'Yen'},
 {'question': 'What is the capital of France?'},
 {'question': 'What is the largest planet in our solar system?'},
 {'question': "Who wrote 'To Kill a Mockingbird'?"},
 {'question': 'What is the boiling point of water?'},
 {'question': 'What is the currency of Japan?'}]

In [None]:
aiplatform.Artifact.list()