In [2]:
import requests

# Configura√ß√µes
BASE_URL = "http://127.0.0.1:8000"  # URL do servidor FastAPI

def test_create_domain():
    """Testa a cria√ß√£o de um dom√≠nio."""
    url = f"{BASE_URL}/domains"
    payload = {
        "domainName": "finance",
        "description": "Dom√≠nio de Finan√ßas"
    }
    response = requests.post(url, json=payload)
    assert response.status_code == 201, f"Erro: {response.status_code} - {response.text}"
    print("‚úÖ test_create_domain passou!")
    return response.json()["domainId"]  # Retorna o ID do dom√≠nio criado

def test_list_domains():
    """Testa a listagem de dom√≠nios."""
    url = f"{BASE_URL}/domains"
    response = requests.get(url)
    assert response.status_code == 200, f"Erro: {response.status_code} - {response.text}"
    print("‚úÖ test_list_domains passou!")
    return response.json()  # Retorna a lista de dom√≠nios

def test_get_domain(domain_id):
    """Testa a obten√ß√£o de um dom√≠nio espec√≠fico."""
    url = f"{BASE_URL}/domains/{domain_id}"
    response = requests.get(url)
    assert response.status_code == 200, f"Erro: {response.status_code} - {response.text}"
    print("‚úÖ test_get_domain passou!")
    return response.json()

def test_create_subdomain(domain_id):
    """Testa a cria√ß√£o de um subdom√≠nio."""
    url = f"{BASE_URL}/domains/{domain_id}/subdomains"
    payload = {
        "subDomainName": "billing",
        "description": "Subdom√≠nio de Cobran√ßa"
    }
    response = requests.post(url, json=payload)
    assert response.status_code == 201, f"Erro: {response.status_code} - {response.text}"
    print("‚úÖ test_create_subdomain passou!")
    return response.json()["subDomainId"]

def test_list_subdomains(domain_id):
    """Testa a listagem de subdom√≠nios de um dom√≠nio."""
    url = f"{BASE_URL}/domains/{domain_id}/subdomains"
    response = requests.get(url)
    assert response.status_code == 200, f"Erro: {response.status_code} - {response.text}"
    print("‚úÖ test_list_subdomains passou!")
    return response.json()

def test_create_catalog(subdomain_id):
    """Testa a cria√ß√£o de um cat√°logo."""
    url = f"{BASE_URL}/domains/finance/subdomains/{subdomain_id}/catalogs"
    payload = {
        "catalogName": "finance_catalog",
        "description": "Cat√°logo para dados financeiros"
    }
    response = requests.post(url, json=payload)
    assert response.status_code == 201, f"Erro: {response.status_code} - {response.text}"
    print("‚úÖ test_create_catalog passou!")
    return response.json()["catalogId"]

def test_create_schema(catalog_id):
    """Testa a cria√ß√£o de um schema."""
    url = f"{BASE_URL}/catalogs/{catalog_id}/schemas"
    payload = {
        "schemaName": "finance_schema",
        "description": "Schema para tabelas financeiras"
    }
    response = requests.post(url, json=payload)
    assert response.status_code == 201, f"Erro: {response.status_code} - {response.text}"
    print("‚úÖ test_create_schema passou!")
    return response.json()["schemaId"]

def test_create_table(schema_id):
    """Testa a cria√ß√£o de uma tabela."""
    url = f"{BASE_URL}/schemas/{schema_id}/tables"
    payload = {
        "tableName": "transactions",
        "location": "s3://data-lake/transactions",
        "format": "iceberg"
    }
    response = requests.post(url, json=payload)
    assert response.status_code == 201, f"Erro: {response.status_code} - {response.text}"
    print("‚úÖ test_create_table passou!")
    return response.json()["tableId"]

def run_tests():
    """Executa todos os testes na sequ√™ncia."""
    # Testa cria√ß√£o de dom√≠nio
    domain_id = test_create_domain()

    # Testa listagem de dom√≠nios
    test_list_domains()

    # Testa obten√ß√£o de dom√≠nio espec√≠fico
    test_get_domain(domain_id)

    # Testa cria√ß√£o de subdom√≠nio
    subdomain_id = test_create_subdomain(domain_id)

    # Testa listagem de subdom√≠nios
    test_list_subdomains(domain_id)

    # Testa cria√ß√£o de cat√°logo
    catalog_id = test_create_catalog(subdomain_id)

    # Testa cria√ß√£o de schema
    schema_id = test_create_schema(catalog_id)

    # Testa cria√ß√£o de tabela
    test_create_table(schema_id)

    print("\n‚úÖ Todos os testes passaram!")

if __name__ == "__main__":
    run_tests()


‚úÖ test_create_domain passou!
‚úÖ test_list_domains passou!
‚úÖ test_get_domain passou!
‚úÖ test_create_subdomain passou!
‚úÖ test_list_subdomains passou!
‚úÖ test_create_catalog passou!
‚úÖ test_create_schema passou!
‚úÖ test_create_table passou!

‚úÖ Todos os testes passaram!


In [4]:
import requests

# Configura√ß√µes
BASE_URL = "http://127.0.0.1:8000"  # URL do servidor FastAPI

def test_get_all_resources():
    """Busca e exibe todos os recursos da API."""
    print("\n--- Listando Todos os Recursos ---\n")
    
    # Listar dom√≠nios
    domains_url = f"{BASE_URL}/domains"
    domains_response = requests.get(domains_url)
    domains = domains_response.json()
    print("üåê Dom√≠nios:")
    for domain in domains:
        print(f"  - ID: {domain['domainId']}, Nome: {domain['domainName']}, Descri√ß√£o: {domain.get('description')}")

        # Listar subdom√≠nios de cada dom√≠nio
        subdomains_url = f"{BASE_URL}/domains/{domain['domainId']}/subdomains"
        subdomains_response = requests.get(subdomains_url)
        subdomains = subdomains_response.json()
        print("    üìÇ Subdom√≠nios:")
        for subdomain in subdomains:
            print(f"      - ID: {subdomain['subDomainId']}, Nome: {subdomain['subDomainName']}, Descri√ß√£o: {subdomain.get('description')}")

            # Listar cat√°logos de cada subdom√≠nio
            catalogs_url = f"{BASE_URL}/domains/{domain['domainId']}/subdomains/{subdomain['subDomainId']}/catalogs"
            catalogs_response = requests.get(catalogs_url)
            catalogs = catalogs_response.json()
            print("        üìö Cat√°logos:")
            for catalog in catalogs:
                print(f"          - ID: {catalog['catalogId']}, Nome: {catalog['catalogName']}, Descri√ß√£o: {catalog.get('description')}")

                # Listar schemas de cada cat√°logo
                schemas_url = f"{BASE_URL}/catalogs/{catalog['catalogId']}/schemas"
                schemas_response = requests.get(schemas_url)
                schemas = schemas_response.json()
                print("            üóÇ Schemas:")
                for schema in schemas:
                    print(f"              - ID: {schema['schemaId']}, Nome: {schema['schemaName']}, Descri√ß√£o: {schema.get('description')}")

                    # Listar tabelas de cada schema
                    tables_url = f"{BASE_URL}/schemas/{schema['schemaId']}/tables"
                    tables_response = requests.get(tables_url)
                    tables = tables_response.json()
                    print("                üìä Tabelas:")
                    for table in tables:
                        print(f"                  - ID: {table['tableId']}, Nome: {table['tableName']}, Localiza√ß√£o: {table.get('location')}, Formato: {table.get('format')}")
    print("\n--- Fim da Listagem ---")

if __name__ == "__main__":
    # Executar a listagem de todos os recursos
    test_get_all_resources()



--- Listando Todos os Recursos ---

üåê Dom√≠nios:
  - ID: dom-dea4a9, Nome: finance, Descri√ß√£o: Dom√≠nio de Finan√ßas
    üìÇ Subdom√≠nios:
      - ID: sd-15def8, Nome: billing, Descri√ß√£o: Subdom√≠nio de Cobran√ßa
        üìö Cat√°logos:
          - ID: cat-a5c553, Nome: finance_catalog, Descri√ß√£o: Cat√°logo para dados financeiros
            üóÇ Schemas:
              - ID: sch-70d13a, Nome: finance_schema, Descri√ß√£o: Schema para tabelas financeiras
                üìä Tabelas:
                  - ID: tbl-89c031, Nome: transactions, Localiza√ß√£o: s3://data-lake/transactions, Formato: iceberg

--- Fim da Listagem ---
