# ADE metadata with DuckDB
This script fetches data from ADE Metadata Interface and loads those to local DuckDB.

## Pre-requisites
- Install required libraries in requirements.txt
- ADE External API CLI needs to be installed, as specified in README.md

## Imports and parameters

In [2]:
import duckdb
import subprocess
import os
import json

In [3]:
# Replace these values with your tenant values. 
# These should matche the ones which have been configured to External API CLI, as specified in pre-requisites
ade_tenant = "a9000000"
ade_installation = "datahub"
ade_environment = "dev"

# Parameter to differentiate from saas and saasdev. In most cases, leave it as it is.
saas_scope_identifier = "saasdev"
base_url = f"https://external.services.{saas_scope_identifier}.agiledataengine.com"

## Metadata interface queries

In [4]:
package_list_query = "query{package_list(size: 1000, page: <pageMarker>)  { page hasNextPage pageSize content { ... on PACKAGE { id name version is_deleted type status status_updated status_updated_user description meta_created meta_created_user meta_updated meta_updated_user} } }  }"
entity_list_query = "query{entity_list(size: 1000, page: <pageMarker>)  { page hasNextPage pageSize content { ... on ENTITY { id package_id schema name logical_name type physical_type is_recreate_forced is_deleted, description zone source_id meta_created meta_created_user meta_updated meta_updated_user} } }  }"
entity_attribute_list_query = "query{entity_attribute_list(size: 1000, page: <pageMarker>)  { page hasNextPage pageSize content { ... on ENTITY_ATTRIBUTE { id entity_id position is_deleted is_nullable name data_type data_precision data_scale data_length description technical_attribute_type collation_value masking_policy compression_type meta_created meta_created_user meta_updated meta_updated_user } } }  }"
entity_load_list = "query{entity_load_list(size: 1000, page: <pageMarker>)  { page hasNextPage pageSize content { ... on ENTITY_LOAD { id entity_id name type description is_enabled schedule_id meta_created meta_updated meta_updated_user } } }  }"
entity_load_mapping_list = "query{entity_load_mapping_list(size: 1000, page: <pageMarker>)  { page hasNextPage pageSize content { ... on ENTITY_LOAD_MAPPING { id entity_load_id source_entity_id name position type meta_created meta_created_user meta_updated meta_updated_user} } }  }"
entity_load_mapping_source_attribute_list = "query{entity_load_mapping_source_attribute_list(size: 1000, page: <pageMarker>)  { page hasNextPage pageSize content { ... on ENTITY_LOAD_MAPPING_SOURCE_ATTRIBUTE { id entity_load_mapping_target_attribute_id position source_entity_attribute_id meta_created meta_created_user meta_updated meta_updated_user} } }  }"
entity_load_mapping_target_attribute_list = "query{entity_load_mapping_target_attribute_list(size: 1000, page: <pageMarker>)  { page hasNextPage pageSize content { ... on ENTITY_LOAD_MAPPING_TARGET_ATTRIBUTE { id entity_load_mapping_id target_entity_attribute_id transformation_formula_type transformation_formula meta_created_user meta_updated meta_updated_user} } }  }"
source_list = "query{source_list(size: 1000, page: <pageMarker>)  { page hasNextPage pageSize content { ... on SOURCE { id name is_deleted description } } }  }"
entity_load_step_list = "query{entity_load_step_list(size: 1000, page: <pageMarker>)  { page hasNextPage pageSize content { ... on ENTITY_LOAD_STEP { id entity_load_id position name type language text_step meta_created meta_created_user meta_updated meta_updated_user } } }  }"
entity_load_opt_list = "query{entity_load_opt_list(size: 1000, page: <pageMarker>)  { page hasNextPage pageSize content { ... on ENTITY_LOAD_OPT { id entity_load_id type name value meta_created meta_created_user meta_updated meta_updated_user } } }  }"
schedule_list = "query{schedule_list(size: 1000, page: <pageMarker>)  { page hasNextPage pageSize content { ... on SCHEDULE { id name cron_expr description } } }  }"
workflow_list = "query{workflow_list(size: 1000, page: <pageMarker>)  { page hasNextPage pageSize content { ... on WORKFLOW { name cron_expr is_custom generation_mode load_pool schedule_id description } } }  }"
workflow_load_list = "query{workflow_load_list(size: 1000, page: <pageMarker>)  { page hasNextPage pageSize content { ... on WORKFLOW_LOAD { name entity_load_id } } }  }"
workflow_load_usage_list = "query{workflow_load_usage_list(size: 1000, page: <pageMarker>)  { page hasNextPage pageSize content { ... on WORKFLOW_LOAD_USAGE { workflow_name workflow_load_name } } }  }"

## Fetch data from metadata interface

In [None]:
def run_cli_commands(file_path, ade_metadata_object, query):
    page = 0

    while True:
        target_path = f"{file_path}/{ade_metadata_object}"
        if not os.path.exists(target_path):
            os.makedirs(target_path)
        out_file = f"{target_path}/{ade_metadata_object}_{str(page)}.json"
        exec_query = query.replace("<pageMarker>", str(page))
        print(exec_query)
        # Using ADE External API CLI tool from shell
        # This way won't need to rewrite credentials etc.
        result = subprocess.call([
            f"""ade --base-url {base_url} \
            --tenant {ade_tenant} \
            --installation {ade_installation} \
            --out {out_file} \
            metadata-interface graphql \
            --environment-name {ade_environment} \
            --query '{exec_query}' \
            """
            ], shell=True
        )

        with open(out_file, 'r') as f:
            key = f"{ade_metadata_object}_list"
            data = json.load(f)
            if data["data"][key]["hasNextPage"] == True:
                page = page + 1
                continue
            else:
                break


path = "metadata_export"

if not os.path.exists(path):
    os.makedirs(path)

run_cli_commands(path, 'package', package_list_query)
run_cli_commands(path, 'entity', entity_list_query)
run_cli_commands(path, 'entity_attribute', entity_attribute_list_query)
run_cli_commands(path, 'entity_load', entity_load_list)
run_cli_commands(path, 'entity_load_mapping', entity_load_mapping_list)
run_cli_commands(path, 'entity_load_mapping_source_attribute', entity_load_mapping_source_attribute_list)
run_cli_commands(path, 'entity_load_mapping_target_attribute', entity_load_mapping_target_attribute_list)
run_cli_commands(path, 'entity_load_opt', entity_load_opt_list)
run_cli_commands(path, 'entity_load_step', entity_load_step_list)
run_cli_commands(path, 'source', source_list)
run_cli_commands(path, 'schedule', schedule_list)
run_cli_commands(path, 'workflow', workflow_list)
run_cli_commands(path, 'workflow_load', workflow_load_list)
run_cli_commands(path, 'workflow_load_usage', workflow_load_usage_list)


## Load data to DuckDB

In [6]:
def ctas_from_json(table_name, file_path):
    load_data_sql = "create or replace table {0} as from read_json_auto('{1}')".format(table_name, file_path)
    print(load_data_sql)
    duckdb.sql(load_data_sql)

def parse_and_load_content(file_path, ade_metadata_object):
    source_path = f"{file_path}/{ade_metadata_object}"
    files = [f for f in os.listdir(source_path) if f.endswith('.json')]

    out_data = []
    for file in files:
        in_file = f"{source_path}/{file}"
        with open(in_file, 'r') as f:
            data = json.load(f)
            
            content_data = data.get("data", {}).get(f"{ade_metadata_object}_list", {}).get("content", [])
            
            out_data.extend(content_data)
            
    out_file = f"{file_path}/{ade_metadata_object}.json"
    with open(out_file, 'w') as f:
        json.dump(out_data, f, indent=4)

    ctas_from_json(ade_metadata_object, out_file)

In [7]:
# Parse received JSON:s and load those to DuckDB

parse_and_load_content(path, 'package')
parse_and_load_content(path, 'entity')
parse_and_load_content(path, 'entity_attribute')
parse_and_load_content(path, 'entity_load')
parse_and_load_content(path, 'entity_load_mapping')
parse_and_load_content(path, 'entity_load_mapping_source_attribute')
parse_and_load_content(path, 'entity_load_mapping_target_attribute')
parse_and_load_content(path, 'entity_load_opt')
parse_and_load_content(path, 'entity_load_step')
parse_and_load_content(path, 'source')
parse_and_load_content(path, 'schedule')
parse_and_load_content(path, 'workflow')
parse_and_load_content(path, 'workflow_load')
parse_and_load_content(path, 'workflow_load_usage')

create or replace table package as from read_json_auto('metadata_export/package.json')
create or replace table entity as from read_json_auto('metadata_export/entity.json')
create or replace table entity_attribute as from read_json_auto('metadata_export/entity_attribute.json')
create or replace table entity_load as from read_json_auto('metadata_export/entity_load.json')
create or replace table entity_load_mapping as from read_json_auto('metadata_export/entity_load_mapping.json')
create or replace table entity_load_mapping_source_attribute as from read_json_auto('metadata_export/entity_load_mapping_source_attribute.json')
create or replace table entity_load_mapping_target_attribute as from read_json_auto('metadata_export/entity_load_mapping_target_attribute.json')
create or replace table entity_load_opt as from read_json_auto('metadata_export/entity_load_opt.json')
create or replace table entity_load_step as from read_json_auto('metadata_export/entity_load_step.json')
create or replace t

## Example queries

In [None]:
# All entities
sql = """
select 
    schema,
    name
from entity
where is_deleted = false
"""

duckdb.sql(sql)

In [None]:
# Amount of attributes
sql = """
select 
    count(distinct id)
from entity_attribute
"""

duckdb.sql(sql)

In [10]:
# Describe tables
tables = duckdb.sql("select table_name from information_schema.tables").fetchall()

for table in tables:
    sql = f"""
    select 
        table_name, 
        column_name,
        data_type,
        ordinal_position
    from information_schema.columns
    where table_name = '{table[0]}'
    order by ordinal_position
    """

    result = duckdb.sql(sql)
    print(result)

┌────────────┬────────────────────┬───────────┬──────────────────┐
│ table_name │    column_name     │ data_type │ ordinal_position │
│  varchar   │      varchar       │  varchar  │      int32       │
├────────────┼────────────────────┼───────────┼──────────────────┤
│ entity     │ id                 │ UUID      │                1 │
│ entity     │ package_id         │ UUID      │                2 │
│ entity     │ schema             │ VARCHAR   │                3 │
│ entity     │ name               │ VARCHAR   │                4 │
│ entity     │ logical_name       │ VARCHAR   │                5 │
│ entity     │ type               │ VARCHAR   │                6 │
│ entity     │ physical_type      │ VARCHAR   │                7 │
│ entity     │ is_recreate_forced │ BOOLEAN   │                8 │
│ entity     │ is_deleted         │ BOOLEAN   │                9 │
│ entity     │ zone               │ VARCHAR   │               10 │
│ entity     │ source_id          │ UUID      │               

In [None]:
# All entities with custom SQL
sql = """
select 
    e.schema,
    e.name as entity_name,
    el.name,
    el.type,
    el.is_enabled,
    els.type,
    els.language,
    els.text_step
from entity e
join entity_load el on e.id = el.entity_id
left join entity_load_step els on el.id = els.entity_load_id
where els.text_step is not null
order by 1, 2
"""

duckdb.sql(sql)

In [12]:
# All entities with custom SQL
sql = """
COPY (
    select 
        wlu.workflow_name,
        e.schema,
        e.name as entity_name,
        el.name as load_name
    from entity e
    join entity_load el on e.id = el.entity_id
    join workflow_load wfl on el.id = wfl.entity_load_id
    join workflow_load_usage wlu on wfl.name = wlu.workflow_load_name
    order by 1, 2, 3
) TO 'workflows_and_loads.csv' (HEADER, DELIMITER ';');
"""

duckdb.sql(sql)