### <font color='#4285f4'>Overview</font>

Overview: This notebook import metadata from a database (Oracle) running outside of Google Cloud into Dataplex Universal Catalog. It provides an example of how data from external systems can be integrated into the catalog and made available for search.

Approach:
1. The Aspect Types, Entry Types, and Entry Group found in the metadata import file are first generated in the project via python APIs.
2. A json import request file is downloaded from Cloud Storage to the notebook instance.
3. The metadata import API is invoked via curl, passing in the import request file. The import process loads the metadata import file for the Oracle database from Cloud Storage into Dataplex Universal Catalog. Note that the import process is async and takes approximately 8 minutes to run.

Author:
* Daniel Holgate

In [None]:
# Architecture Diagram
from IPython.display import Image
Image(url='https://storage.googleapis.com/data-analytics-golden-demo/colab-diagrams/BigQuery-Data-Governance-Automated-Data-Governance.png', width=1000)

### <font color='#4285f4'>Video Walkthrough</font>

TBD


### <font color='#4285f4'>License</font>

```
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

### <font color='#4285f4'>Pip installs</font>

In [None]:
# PIP Installs (if necessary)
import sys

!{sys.executable} -m pip install google-cloud-dataplex

### <font color='#4285f4'>Initialize</font>

In [None]:
from PIL import Image
from IPython.display import HTML
import IPython.display
import google.auth
import requests
import json
import uuid
import base64
import os
import cv2
import random
import time
import datetime
import base64
import random

import re
import argparse
from collections import Counter
from typing import List
from google.cloud import dataplex_v1
import time

import logging
from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception

In [None]:
# Set these (run this cell to verify the output)
dataplex_location = "${dataplex_location}"
governed_data_code_bucket = "${governed_data_code_bucket}"

# Get the current date and time
now = datetime.datetime.now()

# Format the date and time as desired
formatted_date = now.strftime("%Y-%m-%d-%H-%M")

# Get some values using gcloud
project_id = os.environ["GOOGLE_CLOUD_PROJECT"]
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")

if len(user) != 1:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

print(f"project_id = {project_id}")
print(f"user = {user}")
print(f"dataplex_location = {dataplex_location}")
print(f"governed_data_code_bucket = {governed_data_code_bucket}")

### <font color='#4285f4'>Helper Methods</font>

#### restAPIHelper
Calls the Google Cloud REST API using the current users credentials.

In [None]:
def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import google.auth.transport.requests
  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

### <font color='#4285f4'>Create Dataplex Universal Catalog metadata hierarchy</font>

Create the appropriate Dataplex metadata hierarchy (entry types, aspect types, entry group) before loading the metadata import file

In [None]:
def create_entry_group(
    project_id: str, location: str, entry_group_id: str
) -> dataplex_v1.EntryGroup:
    """Create Entry Group identified by entry_group_id, located in project_id, location """

    with dataplex_v1.CatalogServiceClient() as client:
        # The resource name of the Entry Group location
        parent = f"projects/{project_id}/locations/{location}"
        entry_group = dataplex_v1.EntryGroup(
            description=f"Entry group {entry_group_id} description"
        )
        create_operation = client.create_entry_group(
            parent=parent, entry_group=entry_group, entry_group_id=entry_group_id
        )
        print(f"Created Entry Group: projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}")
        return create_operation.result(60)

In [None]:
def create_entry_type(
    project_id: str, location: str, entry_type_id: str
) -> dataplex_v1.EntryType:
    """Create Entry Type identified by entry_type_id, located in project_id, location """

    print(f"Creating Entry Type {entry_type_id}")

    typeAliases = []

    # To improve search and discoverability, create some common aliases for the oracle records
    if "-table" in entry_type_id:
        typeAliases.append("TABLE")

    if "-view" in entry_type_id:
        typeAliases.append("VIEW")

    if "-database" in entry_type_id:
        typeAliases.append("DATABASE")

    with dataplex_v1.CatalogServiceClient() as client:
        parent = f"projects/{project_id}/locations/{location}"
        entry_type = dataplex_v1.EntryType(
            description="description of the entry type",
            type_aliases=typeAliases,
            required_aspects=[],
        )
        create_operation = client.create_entry_type(
            parent=parent, entry_type=entry_type, entry_type_id=entry_type_id
        )
        return create_operation.result(60)

In [None]:
# Creates Dataplex Aspect Types

def create_aspect_type(
    project_id: str, location: str, aspect_type_id: str,
) -> dataplex_v1.AspectType:
    """Create Aspect Type identified by aspect_type_id, located in project_id, location """

    aspect_fields = List[dataplex_v1.AspectType.MetadataTemplate]

    with dataplex_v1.CatalogServiceClient() as client:
        # The resource name of the Aspect Type location
        parent = f"projects/{project_id}/locations/{location}"

        # Define the Aspect Type resource.
        # It requires a metadata_template (a JSON schema) to define the
        # properties of the Aspect.

        aspect_field = dataplex_v1.AspectType.MetadataTemplate(
        name="name_of_the_field",
        # Metadata Template is recursive structure,
        # primitive types such as "string" or "integer" indicate leaf node,
        # complex types such as "record" or "array" would require nested Metadata Template
        type="string",
        index=1,
        annotations=dataplex_v1.AspectType.MetadataTemplate.Annotations(
            description="description of the field"
        ),
        constraints=dataplex_v1.AspectType.MetadataTemplate.Constraints(
            # Specifies if field will be required in Aspect Type.
            required=False
        ),
        )

        aspect_fields = [aspect_field]

        aspect_type = dataplex_v1.AspectType(
            description=f"description of aspect type {aspect_type_id}",
            metadata_template=dataplex_v1.AspectType.MetadataTemplate(
                name="name_of_the_template",
                type="record",
                # Aspect Type fields, that themselves are Metadata Templates.
                record_fields=aspect_fields,
            ),
        )

        create_operation = client.create_aspect_type(
            parent=parent,
            aspect_type=aspect_type,
            aspect_type_id=aspect_type_id
        )

        # Dataplex operations are long-running, so we wait for the result.
        # The timeout is set to 120 seconds (2 minutes).
        print("Waiting for creation operation to complete...")
        try:
            response = create_operation.result(timeout=30)
        except Exception as e:
            print(f"Error during Aspect Type creation: {e}")
            raise

        print(
            f"Successfully created Aspect Type: projects/{project_id}/locations/{location}/aspectTypes/{aspect_type_id}"
        )
        return response

In [None]:
# Create required entry types, aspect types, and entry group for the metadata import file

# Oracle dataplex metadata types found in the file
entry_types = ['oracle-instace','oracle-database','oracle-schema','oracle-table','oracle-view']
aspect_types = ['oracle-instance','oracle-database','oracle-schema','oracle-table','oracle-view']
entry_group_id = "oracle"

successCount = 0

# create the entry group
try:
  create_entry_group(project_id, dataplex_location, entry_group_id)
  successCount += 1
except Exception as e:
  print(f"Exception creating entry group {entry_group_id}: {e}")

# Create entry types
for et in entry_types:
  try:
    create_entry_type(project_id, dataplex_location,et)
    successCount += 1
  except Exception as e:
    print(f"Exception creating entry type {et}: {e}")

# Create aspect types
for at in aspect_types:
  try:
    create_aspect_type(project_id, "global",at)
    successCount += 1
  except Exception as e:
    print(f"Exception creating aspect type {at}: {e}")

if successCount == len(entry_types) + len(aspect_types) + 1:
  print("All dataplex metadata hierarchy types created successfully")

### <font color='#4285f4'>Import the metadata Import file into Dataplex Universal Catalog</font>

https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.metadataJobs/create

Replace the following
- project_id         "your GCP project id"
- dataplex_location   "target location for the metadata in the dataplex catalog"

```
curl -X POST -H 'Content-Type: application/json; charset=utf-8' -H "Authorization: Bearer $(gcloud -q auth print-access-token)" \
-d @metadata_import_request.json \
https://dataplex.googleapis.com/v1/projects/${project_id}/locations/${dataplex_location}/metadataJobs?metadataJobId=a001"
```

In [None]:
json_request_file_url = f"{governed_data_code_bucket}/oracle_exports/metadata_import_request.json"
print(f"Downloading the import request file from {json_request_file_url}")

!gsutil cp "$json_request_file_url" .

In [None]:
%%bash
BATCH_ID=$( date '+%d%H%M%S' )
echo $BATCH_ID
curl -X POST -H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @./metadata_import_request.json \
"https://dataplex.googleapis.com/v1/projects/${project_id}/locations/global/metadataJobs?metadataJobId=a${BATCH_ID}"

### <font color='#4285f4'>Clean Up</font>

In [None]:
# Placeholder

### <font color='#4285f4'>Reference Links</font>


- [REPLACE-ME](https://REPLACE-ME)