Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions dataplex/quickstart/noxfile_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Default TEST_CONFIG_OVERRIDE for python repos.

# You can copy this file into your directory, then it will be imported from
# the noxfile.py.

# The source of truth:
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py

TEST_CONFIG_OVERRIDE = {
# You can opt out from the test for specific Python versions.
"ignored_versions": ["2.7", "3.7", "3.9", "3.10", "3.11"],
# Old samples are opted out of enforcing Python type hints
# All new samples should feature them
"enforce_type_hints": True,
# An envvar key for determining the project id to use. Change it
# to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a
# build specific Cloud project. You can also use your own string
# to use your own Cloud project.
"gcloud_project_env": "GOOGLE_CLOUD_PROJECT",
# 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT',
# If you need to use a specific version of pip,
# change pip_version_override to the string representation
# of the version number, for example, "20.2.4"
"pip_version_override": None,
# A dictionary you want to inject into your test. Don't put any
# secrets here. These values will override predefined values.
"envs": {},
}
202 changes: 202 additions & 0 deletions dataplex/quickstart/quickstart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START dataplex_quickstart]
import time

from google.cloud import dataplex_v1
from google.protobuf import struct_pb2


# Method to demonstrate lifecycle of different Dataplex resources and their interactions.
# Method creates Aspect Type, Entry Type, Entry Group and Entry, retrieves Entry
# and cleans up created resources.
def quickstart(
project_id: str,
location: str,
aspect_type_id: str,
entry_type_id: str,
entry_group_id: str,
entry_id: str,
) -> None:
# Initialize client that will be used to send requests across threads. This
# client only needs to be created once, and can be reused for multiple requests.
# After completing all of your requests, call the "__exit__()" method to safely
# clean up any remaining background resources. Alternatively, use the client as
# a context manager.
with dataplex_v1.CatalogServiceClient() as client:
# 0) Prepare variables used in following steps
global_parent = f"projects/{project_id}/locations/global"
specific_location_parent = f"projects/{project_id}/locations/{location}"

# 1) Create Aspect Type that will be attached to Entry Type
aspect_field = dataplex_v1.AspectType.MetadataTemplate(
# The name must follow regex ^(([a-zA-Z]{1})([\\w\\-_]{0,62}))$
# That means name must only contain alphanumeric character or dashes or underscores,
# start with an alphabet, and must be less than 63 characters.
name="example_field",
# Metadata Template is recursive structure,
# primitive types such as "string" or "integer" indicate leaf node,
# complex types such as "record" or "array" would require nested Metadata Template
type="string",
index=1,
annotations=dataplex_v1.AspectType.MetadataTemplate.Annotations(
description="example field to be filled during entry creation"
),
constraints=dataplex_v1.AspectType.MetadataTemplate.Constraints(
# Specifies if field will be required in Aspect Type.
required=True
),
)
aspect_type = dataplex_v1.AspectType(
description="aspect type for dataplex quickstart",
metadata_template=dataplex_v1.AspectType.MetadataTemplate(
name="example_template",
type="record",
# Aspect Type fields, that themselves are Metadata Templates.
record_fields=[aspect_field],
),
)
aspect_type_create_operation = client.create_aspect_type(
# Aspect Type is created in "global" location to highlight, that resources from
# "global" region can be attached to Entry created in specific location
parent=global_parent,
aspect_type=aspect_type,
aspect_type_id=aspect_type_id,
)
created_aspect_type = aspect_type_create_operation.result(60)
print(f"Step 1: Created aspect type -> {created_aspect_type.name}")

# 2) Create Entry Type, of which type Entry will be created
entry_type = dataplex_v1.EntryType(
description="entry type for dataplex quickstart",
required_aspects=[
dataplex_v1.EntryType.AspectInfo(
# Aspect Type created in step 1
type=f"projects/{project_id}/locations/global/aspectTypes/{aspect_type_id}"
)
],
)
entry_type_create_operation = client.create_entry_type(
# Entry Type is created in "global" location to highlight, that resources from
# "global" region can be attached to Entry created in specific location
parent=global_parent,
entry_type=entry_type,
entry_type_id=entry_type_id,
)
created_entry_type = entry_type_create_operation.result(60)
print(f"Step 2: Created entry type -> {created_entry_type.name}")

# 3) Create Entry Group in which Entry will be located
entry_group = dataplex_v1.EntryGroup(
description="entry group for dataplex quickstart"
)
entry_group_create_operation = client.create_entry_group(
# Entry Group is created for specific location
parent=specific_location_parent,
entry_group=entry_group,
entry_group_id=entry_group_id,
)
created_entry_group = entry_group_create_operation.result(60)
print(f"Step 3: Created entry group -> {created_entry_group.name}")

# 4) Create Entry
# Wait 10 second to allow previously created resources to propagate
time.sleep(10)
aspect_key = f"{project_id}.global.{aspect_type_id}"
entry = dataplex_v1.Entry(
# Entry is an instance of Entry Type created in step 2
entry_type=f"projects/{project_id}/locations/global/entryTypes/{entry_type_id}",
entry_source=dataplex_v1.EntrySource(
description="entry for dataplex quickstart"
),
aspects={
# Attach Aspect that is an instance of Aspect Type created in step 1
aspect_key: dataplex_v1.Aspect(
aspect_type=f"projects/{project_id}/locations/global/aspectTypes/{aspect_type_id}",
data=struct_pb2.Struct(
fields={
"example_field": struct_pb2.Value(
string_value="example value for the field"
),
}
),
)
},
)
created_entry = client.create_entry(
# Entry is created in specific location, but it is still possible to link it with
# resources (Aspect Type and Entry Type) from "global" location
parent=f"projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}",
entry=entry,
entry_id=entry_id,
)
print(f"Step 4: Created entry -> {created_entry.name}")

# 5) Retrieve created Entry
get_entry_request = dataplex_v1.GetEntryRequest(
name=f"projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}/entries/{entry_id}",
view=dataplex_v1.EntryView.FULL,
)
retrieved_entry = client.get_entry(request=get_entry_request)
print(f"Step 5: Retrieved entry -> {retrieved_entry.name}")
for retrieved_aspect in retrieved_entry.aspects.values():
print("Retrieved aspect for entry:")
print(f" * aspect type -> {retrieved_aspect.aspect_type}")
print(f" * aspect field value -> {retrieved_aspect.data['example_field']}")

# 6) Use Search capabilities to find Entry
# Wait 30 second to allow resources to propagate to Search
print("Step 6: Waiting for resources to propagate to Search...")
time.sleep(30)
search_entries_request = dataplex_v1.SearchEntriesRequest(
name=global_parent, query="name:dataplex-quickstart-entry"
)
results = client.search_entries(search_entries_request)
search_entries_response = results._response
entries_from_search = [
result.dataplex_entry for result in search_entries_response.results
]
print("Entries found in Search:")
# Please note in output that Entry Group and Entry Type are also represented as Entries
for entry_from_search in entries_from_search:
print(f" * {entry_from_search.name}")

# 7) Clean created resources
client.delete_entry_group(
name=f"projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}"
)
client.delete_entry_type(
name=f"projects/{project_id}/locations/global/entryTypes/{entry_type_id}"
)
client.delete_aspect_type(
name=f"projects/{project_id}/locations/global/aspectTypes/{aspect_type_id}"
)
print("Step 7: Successfully cleaned up resources")


if __name__ == "__main__":
# TODO(developer): Replace these variables before running the sample.
project_id = "MY_PROJECT_ID"
# Available locations: https://cloud.google.com/dataplex/docs/locations
location = "MY_LOCATION"
# Variables below can be replaced with custom values or defaults can be kept
aspect_type_id = "dataplex-quickstart-aspect-type"
entry_type_id = "dataplex-quickstart-entry-type"
entry_group_id = "dataplex-quickstart-entry-group"
entry_id = "dataplex-quickstart-entry"

quickstart(
project_id, location, aspect_type_id, entry_type_id, entry_group_id, entry_id
)
# [END dataplex_quickstart]
92 changes: 92 additions & 0 deletions dataplex/quickstart/quickstart_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

import uuid

from google.api_core.exceptions import NotFound
from google.api_core.retry import Retry
from google.cloud import dataplex_v1

import pytest

import quickstart

ID = str(uuid.uuid4()).split("-")[0]
PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
LOCATION = "us-central1"
ASPECT_TYPE_ID = f"quickstart-aspect-type-{ID}"
ENTRY_TYPE_ID = f"quickstart-entry-type-{ID}"
ENTRY_GROUP_ID = f"quickstart-entry-group-{ID}"
ENTRY_ID = f"quickstart-entry-{ID}"


@Retry()
def test_quickstart(capsys: pytest.CaptureFixture) -> None:
expected_logs = [
f"Step 1: Created aspect type -> projects/{PROJECT_ID}/locations/global/aspectTypes/{ASPECT_TYPE_ID}",
f"Step 2: Created entry type -> projects/{PROJECT_ID}/locations/global/entryTypes/{ENTRY_TYPE_ID}",
(
f"Step 3: Created entry group -> projects/{PROJECT_ID}/locations/{LOCATION}"
f"/entryGroups/{ENTRY_GROUP_ID}"
),
(
f"Step 4: Created entry -> projects/{PROJECT_ID}/locations/{LOCATION}"
f"/entryGroups/{ENTRY_GROUP_ID}/entries/{ENTRY_ID}"
),
(
f"Step 5: Retrieved entry -> projects/{PROJECT_ID}/locations/{LOCATION}"
f"/entryGroups/{ENTRY_GROUP_ID}/entries/{ENTRY_ID}"
),
# Step 6 - result from Search
"Entries found in Search:",
"Step 7: Successfully cleaned up resources",
]

quickstart.quickstart(
PROJECT_ID, LOCATION, ASPECT_TYPE_ID, ENTRY_TYPE_ID, ENTRY_GROUP_ID, ENTRY_ID
)
out, _ = capsys.readouterr()

for expected_log in expected_logs:
assert expected_log in out


@pytest.fixture(autouse=True, scope="session")
def setup_and_teardown_aspect_type() -> None:
# No set-up
yield
force_clean_resources()


def force_clean_resources() -> None:
with dataplex_v1.CatalogServiceClient() as client:
try:
client.delete_entry_group(
name=f"projects/{PROJECT_ID}/locations/{LOCATION}/entryGroups/{ENTRY_GROUP_ID}"
)
except NotFound:
pass # no resource to delete
try:
client.delete_entry_type(
name=f"projects/{PROJECT_ID}/locations/global/entryTypes/{ENTRY_TYPE_ID}"
)
except NotFound:
pass # no resource to delete
try:
client.delete_aspect_type(
name=f"projects/{PROJECT_ID}/locations/global/aspectTypes/{ASPECT_TYPE_ID}"
)
except NotFound:
pass # no resource to delete