In [1]:
import random
import json
import os
from apache_atlas.utils.Constants import EndRelations, TypeNames
from apache_atlas.utils.API import API, HTTPMethod
from apache_atlas.client.ApacheAtlas import ApacheAtlasClient

atlas_section = ApacheAtlasClient(
    "http://10.100.100.61:21000",
    "admin",
    "admin"
)

### Criação das Tabelas

In [None]:
# Executado...

groups_sia ={
    #'AB': 'APAC de Cirurgia Bariátrica',
    'ABO': 'APAC de Acompanhamento Pós Cirurgia Bariátrica',
    'ACF': 'APAC de Confecção de Fístula',
    'AD': 'APAC de Laudos Diversos',
    'AM': 'APAC de Medicamentos',
    'AMP': 'APAC de Acompanhamento Multiprofissional',
    'AN': 'APAC de Nefrologia',
    'AQ': 'APAC de Quimioterapia',
    'AR': 'APAC de Radioterapia',
    'ATD': 'APAC de Tratamento Dialítico',
    'BI': 'Boletim de Produção Ambulatorial individualizado',
    'PA': 'Produção Ambulatorial',
    'PS': 'RAAS Psicossocial',
    'SAD': 'RAAS de Atenção Domiciliar'
}

for acronymus, name in groups_sia.items():
    atlas_section.entity.create_entity_dt_table({
       'name': f"{name} ({acronymus})",
       'acronymus': acronymus,
       'description': f'Tabela de {name} da SIA' 
    }, 'SIA')

### Criação de Colunas

In [None]:
# Executado...

import json
import os

pasta = 'dicionarios'

for arquivo in os.listdir(pasta):
    sigla = arquivo.split('-')[1].split('.')[0]
    caminho_dict = os.path.join(pasta, arquivo)

    atlas_section.entity.create_entity_columns(caminho_dict, sigla)

### Criação de Tabelas Anuais/Mensais

In [None]:
# Executado...

atlas_section.type.create_type([
   {
      "name": TypeNames.MONTLY_TABLE,
      "description": "Representa uma tabela em um mês específico",
      "superTypes": [TypeNames.ANUAL_TABLE],
      "attributeDefs": [
        {
          "name": "month",
          "typeName": "int",
          "isOptional": True,
          "isUnique": False,
          "isIndexable": False
        }
      ]
    },
])

atlas_section.type.create_type(relationshipDefs=[
    {
    "name": "rsh_table_to_columns_time",
    "relationshipCategory": "AGGREGATION",
    "propagateTags": "NONE",
    "endDef1": {
        "type": TypeNames.TABLE,
        "name": EndRelations.END_TABLE_TO_COLUMNS_TIME[0],
        "cardinality": "SET",
        "isContainer": True,
        "isLegacyAttribute": False
    },
    "endDef2": {
        "type": TypeNames.ANUAL_TABLE,
        "name": EndRelations.END_TABLE_TO_COLUMNS_TIME[1],
        "cardinality": "SINGLE",
        "isContainer": False,
        "isLegacyAttribute": False
    }
}, 
])


In [None]:
# Executado ...

url = atlas_section.search.SEARCH_BY_TYPENAME.format_path({
    'typeName': TypeNames.ANUAL_TABLE
})

response = atlas_section.request(url)

filters = list(filter(
    lambda x: x['attributes']['name'].startswith("DO"),
    response['entities']
)) 

table = atlas_section.search.search_table_by_acronymus("DO")

for i in filters:
    i['attributes'][EndRelations.END_TABLE_TO_COLUMNS_TIME[1]] = {
        'guid': table['guid']
    }

atlas_section.entity.create_multiple_entities(filters)

In [None]:
# Executado apenas para o SIA
# Deu problema no AM (AM Altura), AR sei lá oq

import os
import json

pasta = 'data'

for arquivo in os.listdir(pasta):
    if arquivo.endswith('sia.json'):
        caminho_arquivo = os.path.join(pasta, arquivo)

        with open(caminho_arquivo, 'r', encoding='utf-8') as f:
            dados_json = json.load(f)
        
        for acronymus, lineage in dados_json.items():
            atlas_section.lineage.create_lineage_table(lineage, acronymus)

In [None]:
# Executado...

atlas_section.entity.create_database_entity({
    'name': 'Database de Tabelas',
    'description': 'Database apenas para agrupar determinadas tabelas (Ignorar)',
    'acronymus': TypeNames.ACRONYMUS_DATABASE_DTC
}, "DataSUS")

atlas_section.entity.create_entity_dt_table({
    "name": "Tabela de Colunas",
    "description": "Tabela de referência para colunas/arquivos alterados por processos (Ignorar)",
    "acronymus": TypeNames.ACRONYMUS_TABLE_DTC
}, f"{TypeNames.ACRONYMUS_DATABASE_DTC}")

### Criação de DataSET Processing

In [2]:
atlas_section.type.create_type(types=[
    {
      "name": TypeNames.DATASET_PROCESSING_LINEAGE,
      "description": "Representa um cojunto de arquivos de tabelas que passará pro um sequência de processos",
      "superTypes": ["DataSet"],
      "attributeDefs": [
        {
          "name": "files_interval",
          "typeName": f"array<{TypeNames.TABLE_FILE}>",
          "isOptional": True,
          "isUnique": False,
          "isIndexable": False
        },
        {
          "name": "columns",
          "typeName": f"array<{TypeNames.TABLE_COLUMN}>",
          "isOptional": True,
          "isUnique": False,
          "isIndexable": False   
        },
        {
          "name": "table",
          "typeName": f"{TypeNames.TABLE}",
          "isOptional": True,
          "isUnique": False,
          "isIndexable": True
        },
        {
          "name": "id",
          "typeName": f"string",
          "isOptional": False,
          "isUnique": True,
          "isIndexable": True
        },
        {
            "name": "start_time",
            "typeName": "date",
            "isOptional": True,        
            "valuesMinCount": 1,            
            "isIndexable": False,      
          },
          {
            "name": "end_time",
            "typeName": "date",
            "isOptional": True,        
            "valuesMinCount": 1,            
            "isIndexable": False,      
          },
          {
            "name": "output_location",
            "typeName": "string",
            "isOptional": True,                    
            "isUnique": False,
            "isIndexable": True,      
          },
          {
            "name": "records_processed",
            "typeName": "int",
            "isOptional": True,                    
            "isUnique": False,
            "isIndexable": False,      
          },
          {
            "name": "execution_time",
            "typeName": "long",
            "isOptional": True,                    
            "isUnique": False,
            "isIndexable": False,      
          },
          {
            "name": "observation",
            "typeName": "string",
            "isOptional": True,                    
            "isUnique": False,
            "isIndexable": False,      
          },
      ]
    }
])

atlas_section.type.create_type(types=[
    {
      "name": TypeNames.DATASET_PROCESSING_LINEAGE_RESULT,
      "description": "Representa um resultado após um processo",
      "superTypes": ["DataSet"],
      "attributeDefs": [
        {
          "name": "files_interval",
          "typeName": f"array<{TypeNames.TABLE_FILE}>",
          "isOptional": True,
          "isUnique": False,
          "isIndexable": False
        },
        {
          "name": "columns",
          "typeName": f"array<{TypeNames.TABLE_COLUMN}>",
          "isOptional": True,
          "isUnique": False,
          "isIndexable": False   
        }
      ]
    }
])

http://10.100.100.61:21000/api/atlas/v2/types/typedefs
http://10.100.100.61:21000/api/atlas/v2/types/typedefs


{'enumDefs': [],
 'structDefs': [],
 'classificationDefs': [],
 'entityDefs': [{'category': 'ENTITY',
   'guid': '8320510e-8f8d-4248-b5cf-52ca74cab81e',
   'createdBy': 'admin',
   'updatedBy': 'admin',
   'createTime': 1730240709295,
   'updateTime': 1730240709295,
   'version': 1,
   'name': 'dt_result_data_batch',
   'description': 'Representa um resultado após um processo',
   'typeVersion': '1.0',
   'attributeDefs': [{'name': 'files_interval',
     'typeName': 'array<dt_table_file>',
     'isOptional': True,
     'cardinality': 'SINGLE',
     'valuesMinCount': 0,
     'valuesMaxCount': 1,
     'isUnique': False,
     'isIndexable': False,
     'includeInNotification': False,
     'searchWeight': -1},
    {'name': 'columns',
     'typeName': 'array<dt_table_column>',
     'isOptional': True,
     'cardinality': 'SINGLE',
     'valuesMinCount': 0,
     'valuesMaxCount': 1,
     'isUnique': False,
     'isIndexable': False,
     'includeInNotification': False,
     'searchWeight': -

In [3]:
atlas_section.type.create_type(types=[
    {
      "name": TypeNames.PROCESS_ETL_DATASET_PROCESS,
      "description": "Processo de alteração de coluna de um ETL Data Batch",
      "superTypes": ["Process"],
      "attributeDefs": [
        {
          "name": "etl_process",
          "typeName": f"{TypeNames.DATASET_PROCESSING_LINEAGE}",
          "isOptional": True,
          "isUnique": False,
          "isIndexable": False
        },
      ]
    }
])

http://10.100.100.61:21000/api/atlas/v2/types/typedefs


{'enumDefs': [],
 'structDefs': [],
 'classificationDefs': [],
 'entityDefs': [{'category': 'ENTITY',
   'guid': '8b103a56-5851-4bf8-9e15-da693b6a1a44',
   'createdBy': 'admin',
   'updatedBy': 'admin',
   'createTime': 1730240731610,
   'updateTime': 1730240731610,
   'version': 1,
   'name': 'dt_change_etl_column',
   'description': 'Processo de alteração de coluna de um ETL Data Batch',
   'typeVersion': '1.0',
   'attributeDefs': [{'name': 'etl_process',
     'typeName': 'dt_etl_data_batch',
     'isOptional': True,
     'cardinality': 'SINGLE',
     'valuesMinCount': 0,
     'valuesMaxCount': 1,
     'isUnique': False,
     'isIndexable': False,
     'includeInNotification': False,
     'searchWeight': -1}],
   'superTypes': ['Process'],
   'subTypes': [],
   'relationshipAttributeDefs': [{'name': 'outputs',
     'typeName': 'array<DataSet>',
     'isOptional': True,
     'cardinality': 'SET',
     'valuesMinCount': 0,
     'valuesMaxCount': 2147483647,
     'isUnique': False,
   

In [4]:
type = atlas_section.type.get_type_by_name(f'{TypeNames.PROCESS_CHANGE_COLUMN}')

type['attributeDefs'].append({
     "name": "processes",
     "typeName": f"array<{TypeNames.PROCESS}>",
     "isOptional": True,
     "isUnique": False,
     "isIndexable": False
})

atlas_section.type.put_type(types=[type])

http://10.100.100.61:21000/api/atlas/v2/types/entitydef/name/dt_column_change_process
http://10.100.100.61:21000/api/atlas/v2/types/typedefs


{'enumDefs': [],
 'structDefs': [],
 'classificationDefs': [],
 'entityDefs': [{'category': 'ENTITY',
   'guid': 'e2344b93-b71b-4935-9d03-2353fdb37595',
   'createdBy': 'admin',
   'updatedBy': 'admin',
   'createTime': 1724236716696,
   'updateTime': 1730240739763,
   'version': 3,
   'name': 'dt_column_change_process',
   'description': 'Descreve o processo de alteração estrutural das colunas de uma tabela, incluindo a adição e remoção de colunas.',
   'typeVersion': '1.0',
   'attributeDefs': [{'name': 'added_columns',
     'typeName': 'array<dt_table_column>',
     'isOptional': True,
     'cardinality': 'SINGLE',
     'valuesMinCount': 0,
     'valuesMaxCount': 1,
     'isUnique': False,
     'isIndexable': False,
     'includeInNotification': False,
     'searchWeight': -1},
    {'name': 'deleted_columns',
     'typeName': 'array<dt_table_column>',
     'isOptional': True,
     'cardinality': 'SINGLE',
     'valuesMinCount': 0,
     'valuesMaxCount': 1,
     'isUnique': False,
  

In [5]:
process_id = 'etl-bronze-silver-do-1996-2022-1'

process_attributes = {
    "name": "ETL Bronze-Silver de DO (1996-2022)",
    "description": "ETL de drops/rename de colunas e substituição de valores da Tabela de Declaração de Óbito dos anos de 1996 a 2022.",
    "output_location": "/silver/sim-parquet",
    "records_processed": 31279782,
    "execution_time": 2445036,
    "id_process": process_id,
    'observation': 'As colunas representam a interseção de todas as colunas de todo o intervalo de tempo temporal.'
}

interval = {
    'start_year': 1996,
    'end_year': 2022,
}

atlas_section.lineage.create_entity_lineage_by_interval_time_anual(interval, "DO", process_attributes)

http://10.100.100.61:21000/api/atlas/v2/search/attribute?typeName=dt_table&attrName=acronymus&attrValuePrefix=DO&limit=1&offset=0
http://10.100.100.61:21000/api/atlas/v2/search/dsl?query=dt_anual_table name like "DO*" and (year >= 1996 and year <= 2022)
http://10.100.100.61:21000/api/atlas/v2/entity/bulk?guid=0ab0a86b-f777-4a2c-a8ac-5f529fe803ba&guid=c230954f-f11a-46de-bdb3-9bcea34d0eeb&guid=c28a98c3-39af-48e5-9937-18aa84bd9421&guid=409a1bfb-b38a-4a92-a176-3e37ecb5d5e1&guid=81557f7c-aa3b-4ce0-b6b1-045c863846e1&guid=e7384a97-554c-4d71-82f2-f453cccb69f6&guid=0cba4c22-d915-4645-b939-617d06d00212&guid=4db9efb7-acb4-47fb-8320-b15eb44c192f&guid=41ca9119-4b92-402d-9a85-832ab45aa068&guid=50dad7d6-d06c-46f0-a002-e5f91cb8f7cf&guid=c2eb5bab-b72d-4da0-8e34-a93c311ebc2c&guid=9384d3fa-36d4-4e8b-b0ae-4524a032fa8b&guid=34498c5e-15e4-46c3-91cf-62eb410d568d&guid=737647c0-aa41-42d1-a410-6c97bc8bc92e&guid=1d58f934-a528-43bd-8b95-422f23ffbeb3&guid=8f2b35fd-c030-41ad-8b39-987cf4ee1566&guid=f9c7b5d9-a3fa-4d0

{'typeName': 'dt_etl_data_batch',
 'attributes': {'owner': '',
  'qualifiedName': 'dt_etl_data_batch.DO@etl-bronze-silver-do-1996-2022-1',
  'name': 'ETL Bronze-Silver de DO (1996-2022)',
  'description': 'ETL de drops/rename de colunas e substituição de valores da Tabela de Declaração de Óbito dos anos de 1996 a 2022.',
  'id': 'etl-bronze-silver-do-1996-2022-1'},
 'guid': '69870f5a-1f01-459a-ab0b-b0d02bcfcc04',
 'status': 'ACTIVE',
 'displayText': 'ETL Bronze-Silver de DO (1996-2022)',
 'classificationNames': [],
 'classifications': [],
 'meaningNames': [],
 'meanings': [],
 'isIncomplete': False,
 'labels': []}

In [None]:
atlas_section.process.create_process_drop_column_dataset(
    process_id,
    [
        "ORIGEM",
        "ESC2010",
        "SERIESCFAL",
        "OCUP",
        "LINHAA",
        "LINHAB",
        "LINHAC",
        "LINHAD",
        "LINHAII",
        "DTRECORIGA",
        "ESCMAEAGR1",
        "ESCFALAGR1",
        "STDOEPIDEM",
        "STDONOVA",
        "DIFDATA",
        "CONTADOR",
        "DTRECEBIM",
        "VERSAOSIST",
        "CAUSABAS_O",
        "ATESTANTE",
        "STCODIFICA",
        "CODIFICADO",
        "VERSAOSCB",
        "NUMEROLOTE",
        "TPPOS",
        "CODMUNRES",
        "CODMUNNATU",
        "CODMUNOCOR",
        "ATESTADO"
    ]
)

In [None]:
changed_columns = [
    {
        "name": "SEXO",
        "attributes_to_change": { 
            'domain': ', '.join(["Ignorado", "Homem", "Mulher"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "IDADE",
        "attributes_to_change": { 
            'domain': 'Idade Por Extenso',
            'type': 'Char (255)'
        }
    },
    {
        "name": "CAUSABAS",
        "attributes_to_change": { 
            'domain': "CID-10 Descrição",
            'type': 'Char (255)'
        }
    },
    {
        "name": "TIPOBITO",
        "attributes_to_change": { 
            'domain': ', '.join(["Fetal", "Não Fetal"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "RACACOR",
        "attributes_to_change": { 
            'domain': ', '.join(["Branca", "Preta", "Amarela", "Parda", "Indígena"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "ESTCIV",
        "attributes_to_change": { 
            'domain': ', '.join(["Solteito", "Casado", "Viúvo", "Separado Judicialmente", "Ignorado"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "ESC",
        "attributes_to_change": { 
            'domain': ', '.join(["Nenhuma", "1 a 3 anos", "4 a 7 anos", "8 a 11 anos", "12 a mais", "Ignorado"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "LOCOCOR",
        "attributes_to_change": { 
            'domain': ', '.join(["Hospital", "Outro estado saúde", "Domicílio", "Via Pública", "Outros", "Ignorado"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "GRAVIDEZ",
        "attributes_to_change": { 
            'domain': ', '.join(["Única", "Dupla", "Tripla e mais", "Ignorado"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "SEMAGESTAC",
        "attributes_to_change": { 
            'domain': ', '.join(["Menos de 22 semanas", "22 a 27 semanas", "28 a 31 semanas", "32 a 36 semanas", "37 a 41 semanas", "42 semanas e mais", "Ignorado"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "GESTACAO",
        "attributes_to_change": { 
            'domain': ', '.join(["Vaginal", "Cesáreo", "Ignorado"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "PARTO",
        "attributes_to_change": { 
            'domain': ', '.join(["Antes" , "Durante", "Depois", "Ignorado"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "OBITOPARTO",
        "attributes_to_change": { 
            'domain': ', '.join(["Antes" , "Durante", "Depois", "Ignorado"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "OBITOGRAV",
        "attributes_to_change": { 
            'domain': ', '.join(["Sim" , "Não", "Ignorado"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "OBITOPUERP",
        "attributes_to_change": { 
            'domain': ', '.join(["Sim até 42 dias" , "Sim, de 43 dias a 1 ano", "Não", "Ignorado"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "ASSISTMED",
        "attributes_to_change": { 
            'domain': ', '.join(["Sim" , "Não", "Ignorado"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "EXAME",
        "attributes_to_change": { 
            'domain': ', '.join(["Sim" , "Não", "Ignorado"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "CIRURGIA",
        "attributes_to_change": { 
            'domain': ', '.join(["Sim" , "Não", "Ignorado"]),
            'type': 'Char (255)'
        }
    },
    {
        "name": "NECROPSIA",
        "attributes_to_change": { 
            'domain': ', '.join(["Sim" , "Não", "Ignorado"]),
            'type': 'Char (255)'
        }
    }
]

atlas_section.process.create_process_alter_column_dataset(
    changed_columns,
    process_id,
    'DO', {
        'name': 'Grupo Transformação',
        'description': 'Replace de valores númericos por sua respectiva descrição'
    }
)

In [None]:
columns_renamed = {
    "DTOBITO": "Data do Óbito",
    "HORAOBITO": "Hora do Óbito",
    "DTNASC": "Data de Nascimento",
    "TIPOBITO": "Tipo de Óbito",
    "NATURAL": "Naturalidade",
    "IDADE": "Idade",
    "SEXO": "Sexo",
    "RACACOR": "Raça/Cor",
    "ESTCIV": "Estado Civil",
    "ESC": "Escolaridade",
    "LOCOCOR": "Local da Ocorrência",
    "CODESTAB": "Hospital",
    "ASSISTMED": "Assintência Médica",
    "NECROPSIA": "Necrópsia",
    "CAUSABAS": "Causa Básica",
    #"CAUSABAS_keep": "Categoria CID",
    "DTATESTADO": "Data do Atestado",
    "DTCADASTRO": "Data do Cadastro",
    "COMUNSVOIM": "Cidade do Paciente"
}

data = [
    { 
        'name': old_name,
        'attributes_to_change': {
            'name': new_name
        } 
    } for old_name, new_name in columns_renamed.items()
]

atlas_section.process.create_process_alter_column_dataset(
    data,
    process_id,
   'DO', {
       'name': "Renomeação de Colunas",
       'description': f'Renomeacão de nomes de colunas de DO: {" | ".join([f"{old_name} -> {new_name}"  for old_name, new_name in columns_renamed.items()]) }'
   }
)