## Test script

In [None]:
import os
import json
import glob
from dotenv import load_dotenv  

# --- Load Environment Variables ---
load_dotenv()  

from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import OpenMetadataConnection
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import OpenMetadataJWTClientConfig
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceRequest

from config import hostPort, TOKEN

In [None]:
server_config = OpenMetadataConnection(
    hostPort=hostPort, 
    authProvider="openmetadata",
    securityConfig=OpenMetadataJWTClientConfig(
        jwtToken=os.getenv("OM_JWT_TOKEN", "<FALLBACK_TOKEN_IF_NEEDED>") 
    )
)

metadata = OpenMetadata(server_config)

def clean_connection_config(connection_dict):
    """
    Removes read-only 'supports...' flags from the connection config 
    that the API rejects during service creation.
    """
    if "config" in connection_dict:
        config = connection_dict["config"]
        keys_to_remove = [k for k in config.keys() if k.startswith("supports")]
        for key in keys_to_remove:
            config.pop(key, None)
    return connection_dict

def inject_secrets(service_name, connection_dict):
    """
    Replaces masked passwords with real secrets from the .env file.
    Expects environment variables formatted as: <SERVICE_NAME_UPPERCASE>_PASSWORD
    """
    if "config" in connection_dict and "authType" in connection_dict["config"]:
        auth_type = connection_dict["config"]["authType"]
        
        # Check if a password field exists and needs replacing
        if "password" in auth_type:
            # Clean up the service name (e.g., replace spaces/hyphens if needed)
            safe_name = service_name.replace("-", "_").replace(" ", "_").upper()
            env_var_name = f"{safe_name}_PASSWORD"
            
            real_password = os.getenv(env_var_name)
            
            if real_password:
                auth_type["password"] = real_password
                print(f"  [Info] Successfully injected secret from '{env_var_name}'")
            else:
                print(f"  [Warning] No secret found for '{env_var_name}' in .env file. Masked password sent.")
                
    return connection_dict

In [None]:
directory_path= "./services_configs"
search_pattern = os.path.join(directory_path, '*.json')
json_files = glob.glob(search_pattern)


print(f"Found {len(json_files)} JSON files. Starting import...")



In [None]:

file_path = "./services_configs/service_iceberg.json"

with open(file_path, 'r') as f:
    data = json.load(f)

name = data.get("name")
service_type = data.get("serviceType")
description = data.get("description", "")
connection = data.get("connection", {})


clean_connection = clean_connection_config(connection)
secure_connection = inject_secrets(name, clean_connection)



In [None]:
request = CreateDatabaseServiceRequest(
    name=name,
    serviceType=service_type,
    description=description,
    connection=secure_connection
)

service_entity = metadata.create_or_update(request)
print(f"  [Success] Created/Updated service: {service_entity.fullyQualifiedName.root}")


## EXPLORE OPENMETDATA

In [None]:
#! pip install "openmetadata-ingestion~=1.11.9"

In [1]:
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
    OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
    OpenMetadataJWTClientConfig,
)

  import pkg_resources


In [2]:

from metadata.sdk import configure
from config import TOKEN, hostPort

configure(host=hostPort, 
          jwt_token=TOKEN)


<metadata.sdk.client.OpenMetadata at 0x7769561d7dd0>

### SERVICES

##### Get Database Service

In [None]:
from metadata.sdk.entities import DatabaseServices

# List all with auto-pagination
all_services = DatabaseServices.list_all(batch_size=100)
for service in all_services:
    with open(f"files/service_{service.fullyQualifiedName.model_dump_json().strip('"')}.json", "w") as f:
        f.write(service.model_dump_json(indent=4))

##### Create and Import Database Service

In [None]:
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceRequest


request = CreateDatabaseServiceRequest(
    name="snowflake_prod",
    displayName="Snowflake Production",
    serviceType="Snowflake",
    description="Production Snowflake data warehouse",
    connection={
        "config": {
            "account": "xy12345.us-east-1",
            "username": "ingestion_user",
            "password": "***",
            "warehouse": "COMPUTE_WH",
            "database": "ANALYTICS"
        }
    }
)

service = DatabaseServices.create(request)
print(f"Created: {service.fullyQualifiedName}")

### DATABASES

In [None]:
from metadata.sdk.entities import Databases

### TABLES

In [3]:
from metadata.sdk.entities import Tables

In [None]:
# List all with auto-pagination
all_tables = Tables.list_all(batch_size=100)
for table in all_tables:
    print(f"{table.fullyQualifiedName}")

In [5]:
# Get by fully qualified name
layers = ["bronze", "silver"]
cdr_type = ["sms", "data", "voice"]

for layer in layers:
    for cdr in cdr_type:
        table_name  = f"iceberg.iceberg.{layer}.{cdr}"
        table = Tables.retrieve_by_name(table_name)
        with open(f"tables/{table_name.replace('.', '_')}.json", "w") as f:
            f.write(table.model_dump_json(indent=4))


for cdr in cdr_type:
    table_name  = f"iceberg.iceberg.gold.{cdr}_daily_global_kpis"
    table = Tables.retrieve_by_name(table_name)
    with open(f"tables/{table_name.replace('.', '_')}.json", "w") as f:
        f.write(table.model_dump_json(indent=4))

    table_name  = f"iceberg.iceberg.gold.{cdr}_daily_tower_kpis"
    table = Tables.retrieve_by_name(table_name)
    with open(f"tables/{table_name.replace('.', '_')}.json", "w") as f:
        f.write(table.model_dump_json(indent=4))


### PIPELINES

In [None]:
from metadata.sdk.entities import  Pipelines