## Cabeçalho

In [None]:
import time
from pathlib import Path
from pprint import pprint
from uuid import uuid1

import yaml
from ckanapi import RemoteCKAN
from IPython.display import clear_output

In [None]:
# EXECUTE UMA VEZ PARA IR PARA O DIRETORIO RAIZ
# SUPONHO QUE A PASTA MAIS ESTEJA NO DIRETORIO DE WEBSITE
%cd ../../..

## Edição Manual

In [None]:
# COVERAGE_GEO MAL FORMATADO
# bases/br_ipea_acesso_oportunidades/indicadores_2019/table_config.yaml
# bases/br_ipea_acesso_oportunidades/estatisticas_2019/table_config.yaml
# bases/br_anatel_telefonia_movel/tecnologia/table_config.yaml
# bases/br_ponte_indicadores/censo2010_populacao_raca/table_config.yaml
# bases/br_mobilidados_indicadores/transporte_media_alta_capacidade/table_config.yaml
# bases/br_mobilidados_indicadores/proporcao_domicilios_infra_urbana/table_config.yaml


# DADOS NÃO PROCESSADOS de dataset_config.yaml
# languages
# free
# microdata
# API
# registration
# availability
# brazilian_IP

In [None]:
# COMO ADICIONO SPATIAL COVERAGE? ATUALMENTE ARMAZENANDO UMA STRING
# TEM TABELAS COM MULTIPLOS MUNICIPIOS/ESTADOS

# COMO ADICIONO OWNER_ORG? PROCURO NO BANCO DE DADOS?

# COMO TRATO EXTERNAL LINKS?

## Migração Automática

In [None]:
class Migrator:
    def __init__(self, ckan_local: RemoteCKAN, ckan_remote, folder: str):
        self.ckan_local = ckan_local
        self.ckan_remote = ckan_remote
        self.datasets = self.load_filepath(folder)

    ###########################################################

    def migrate_all(self):
        sucessful = []
        for index in range(len(self.datasets)):
            try:
                self.migrate_one(index)
                sucessful.append(self.datasets[index].name)
                clear_output(wait=True)
            except Exception as error:
                print(error)
        return sucessful

    def migrate_one(self, index):
        dataset = self.load_dataset_parsed(index)
        return self.ckan_local.action.package_update(**dataset)

    ###########################################################

    def validate_all(self):
        sucessful = []
        for index in range(len(self.datasets)):
            self.validate_one(index)
            sucessful.append(self.datasets[index].name)
            clear_output(wait=True)
        return sucessful

    def validate_one(self, index):
        dataset = self.load_dataset_parsed(index)
        return self.ckan_local.action.package_validate(**dataset)

    ###########################################################

    def load_dataset_parsed(self, index):
        dataset = self.load_dataset_yaml(index)
        dataset = self.parse_dataset(dataset)
        return dataset

    def load_dataset_ckan(self, index):
        updates = self.load_dataset_yaml(index)
        try:
            name = updates["dataset_id"].replace("_", "-")
            current = self.ckan_remote.action.package_show(id=name)
        except:
            print(f"ERROR: Dataset {name} not found")
            current = {}
        return current

    def load_dataset_yaml(self, index):
        dataset = self.datasets[index]
        dataset = dataset.open("r").read()
        dataset = yaml.safe_load(dataset)

        resources = self.datasets[index].parent
        resources = resources.glob("**/table_config.yaml")

        if "resources" not in dataset:
            dataset["resources"] = []

        for resource in resources:
            resource = resource.open("r").read()
            resource = yaml.safe_load(resource)
            dataset["resources"].append(resource)

        dataset = self.omit_hints(dataset)

        print(f"{'-' * 60}")
        print(f"{index:03} {'-' * 56}")

        return dataset

    ###########################################################

    def parse_dataset(self, updates):
        print(f"LOG: Dataset {updates['dataset_id']}")
        print(f"LOG: Organization {updates['organization']}")
        try:
            name = updates["dataset_id"].replace("_", "-")
            current = self.ckan_remote.action.package_show(id=name)
        except:
            print(f"ERROR: Dataset {name} not found")
            current = {
                "name": name,
                "owner_org": str(uuid1()),
                "resources": [],
                "groups": [],
                "tags": [],
            }
        ###########################################################
        # Package #################################################
        ###########################################################
        # id
        # name
        # title
        # notes
        current["notes"] = updates.get("description")
        # author
        current["author"] = (
            updates.get("author").get("name") if "author" in updates else None
        )
        # author_email
        current["author_email"] = (
            updates.get("author").get("email") if "author" in updates else None
        )
        # mantainer
        # mantainer_email
        # state
        current["state"] = "active"
        # license_id
        current["license_id"] = updates["license"]["name"]
        # url (PROBLEMA: ISSO É UMA LISTA NO DATASET_CONFIG, MAS STRING NO BD)
        current["url"] = ",".join((updates.get("website", []) or []))
        # version
        # metadata_created
        # metadata_modified
        # creator_user_id
        # private
        current["private"] = False
        # license_title
        current["license_title"] = (
            updates.get("license").get("name")
            if "name" in updates.get("license")
            else None
        )
        # num_resources
        # resources
        # groups
        # owner_org
        if (
            "organization" in current
            and current["organization"]
            and current["organization"]["name"] != updates["organization"]
        ):
            print(f"ERROR: Organization mismatch")
            print(f"> Current organization: {current['organization']['name']}")
            print(f"> Updated organization: {updates['organization']}")
        else:
            current["organization"] = {"name": updates["organization"]}
        # num_tags
        # tags
        # relationships_as_object
        # relationships_as_subject
        ###########################################################
        # Dataset #################################################
        ###########################################################
        # description
        # spatial_coverage
        # temporal_coverage
        # update_frequency
        # entity
        # time_unit
        # ckan_url
        current["ckan_url"] = updates["url_ckan"]
        # github_url
        current["github_url"] = updates["url_github"]
        # cache_last_updated
        # isopen
        # extras
        ###########################################################
        # Resource ################################################
        ###########################################################
        current_resource_keys = map(lambda x: x.get("name"), current["resources"])
        current_resource_keys = filter(lambda x: x, current_resource_keys)
        current_resource_keys = list(current_resource_keys)
        for resource in updates["resources"]:
            resource_key = resource["table_id"]
            if resource_key in current_resource_keys:
                index = current_resource_keys.index(resource_key)
                current["resources"][index] = self.parse_resource(
                    current["resources"][index], resource
                )
            else:
                new_resource = self.parse_resource({}, resource)
                current["resources"].append(new_resource)
        for index, resource in enumerate(current["resources"]):
            if resource["resource_type"] == "external_link":
                current["resources"][index]["temporal_coverage"] = [
                    value
                    for value in current["resources"][index]["temporal_coverage"]
                    if isinstance(value, int)
                ]
                del current["resources"][index]["brazilian_ip"]
                del current["resources"][index]["free"]
                del current["resources"][index]["license_type"]
                del current["resources"][index]["signup_needed"]
                del current["resources"][index]["version"]
        ###########################################################
        # Groups ##################################################
        ###########################################################
        for group in updates.get("groups", []) or []:
            current["groups"].append({"name": group})
        ###########################################################
        # Tags ####################################################
        ###########################################################
        for tag in updates.get("tags", []) or []:
            current["tags"].append({"name": tag})
        ###########################################################
        # Extras ##################################################
        ###########################################################
        # if not "extras" in current:
        #     current["extras"] = []
        ###########################################################
        return current

    def parse_resource(self, current, updates):
        entity_mapping = {
            "<primeira coluna>": None,
            "<segunda coluna>": None,
            "CBO1994": None,
            "CBO2002": None,
            "CID-10": None,
            "CID-9": None,
            "CID10": None,
            "CNAE1": None,
            "CNAE2": None,
            "Deputado": None,
            "Documento da vacinação": None,
            "Fornecedor": None,
            "acao": None,
            "aluno": "person",
            "ano": None,
            "ano escolar": None,
            "ano_de": None,
            "ano_escolar": None,
            "ano_ingresso": None,
            "ano_para": None,
            "ano_referencia": None,
            "anos escolares": None,
            "anos_escolares": None,
            "atividade": None,
            "beneficiário": None,
            "bimestre": None,
            "bioma": None,
            "candidato": None,
            "cargo": None,
            "categoria NCM": None,
            "causa CID10": None,
            "causa do obito": None,
            "causa/meio do homicídio": None,
            "causa_basica": None,
            "cetor censitário": None,
            "chave": None,
            "classe": None,
            "cnae": None,
            "cnpj": None,
            "cnpj_cpf_do_socio": None,
            "cnpj_holding": None,
            "cobertura temporal": None,
            "coluna": None,
            "curso": None,
            "curso de ensino superior": None,
            "ddd": None,
            "deputado": None,
            "despesa": None,
            "dia": None,
            "diretoria": None,
            "distrito": "district",
            "docente": "person",
            "domicilio": "households",
            "edicao": None,
            "elemento": None,
            "ensino": None,
            "escola": "school",
            "escolaridade": None,
            "estabelecimento": None,
            "estado": "state",
            "estação meteorológica": None,
            "evento": None,
            "fala proferida em alguma sessao da CPI da Pandemia": None,
            "filiação": None,
            "fonte_de_recursos": None,
            "fornecedor": None,
            "gas": None,
            "genero": None,
            "gestao": None,
            "grupo LGBT": None,
            "grupo de idade": None,
            "grupo lgbtqia": None,
            "hexagono": None,
            "hora": None,
            "id terceirizado": None,
            "id_atleta": None,
            "id_deputado": None,
            "id_orgao": None,
            "id_servidor": None,
            "idade": None,
            "instituicao de ensino superior": None,
            "instrucao": None,
            "item declarado": None,
            "lei": "law",
            "local de ocorrência do homicídio": None,
            "local de votacao": None,
            "local do homicídio": None,
            "localizacao": None,
            "localização": None,
            "matricula": None,
            "mes": None,
            "minuto": None,
            "modalidade": None,
            "movimentação": None,
            "municipio": "municipality",
            "município": "municipality",
            "nivel de emissao": None,
            "nome": "name",
            "obito": "death",
            "orgao": None,
            "paciente": None,
            "pais": "country",
            "partida": "match",
            "partido": "party",
            "pensao": None,
            "pergunta": None,
            "pessoa": "person",
            "produto": "product",
            "publicacao": None,
            "raca": "race",
            "raca/cor": "race",
            "receita": None,
            "rede": None,
            "regiao": "region",
            "regiao administrativa": "region",
            "resposta": None,
            "secao": None,
            "segundo": None,
            "semestre": None,
            "servidor": None,
            "setor censitario": None,
            "setor censitário": None,
            "sexo": "sex",
            "seção eleitoral": None,
            "sigla_partido_antiga": None,
            "sigla_partido_nova": None,
            "sinônimo": None,
            "tabela": None,
            "tecnologia": None,
            "tempo": None,
            "tipo da eleição": None,
            "tipo de eleicao": None,
            "tipo de eleição": None,
            "tipo de emissao": None,
            "tipo de ensino": None,
            "tipo de unidade": None,
            "tipo_eleicao": None,
            "transacao": "transaction",
            "trimestre": None,
            "turma": None,
            "turno": None,
            "uf": "state",
            "unidade da federacao": "state",
            "unidade da federação": "state",
            "unidade_gestora": None,
            "urf": None,
            "vacinação": None,
            "velocidade": None,
            "via de transporte": None,
            "vinculo": None,
            "zona": None,
        }
        spatial_coverage_mapping = {
            "BA": "id_uf_29",
            "Belem": "id_uf_15",
            "Belo Horizonte": "id_uf_31",
            "Brasil": "bra",
            "Brasilia": "id_uf_53",
            "CE": "id_uf_23",
            "Campinas": "id_uf_35",
            "Campo Grande": "id_uf_50",
            "Curitiba": "id_uf_41",
            "DF": "id_uf_53",
            "Duque de Caxias": "id_uf_33",
            "Feira de Santana": "id_uf_29",
            "Fortaleza": "id_uf_23",
            "Goiania": "id_uf_52",
            "Guarulhos": "id_uf_35",
            "MG": "id_uf_31",
            "Maceio": "id_uf_27",
            "Manaus": "id_uf_13",
            "Natal": "id_uf_24",
            "PA": "id_uf_15",
            "PE": "id_uf_26",
            "PR": "id_uf_41",
            "Porto Alegre": "id_uf_43",
            "RJ": "id_uf_33",
            "RS": "id_uf_43",
            "Recife": "id_uf_26",
            "Rio de Janeiro": "id_uf_33",
            "SP": "id_uf_35",
            "Salvador": "id_uf_29",
            "Sao Goncalo": "id_uf_33",
            "Sao Luis": "id_uf_21",
            "Sao Paulo": "id_uf_35",
            "brasil": "bra",
            "europa": "europe",
            "mundo": "all",
        }
        update_frequency_mapping = {
            "outro": None,
            "-": None,
            "vazio": None,
            "empty": None,
            "dia": "day",
            "mes": "month",
            "semana": "week",
            "ano": "one_year",
            "dez_anos": "ten_years",
            "trimestre": "quarter",
            "dois_anos": "two_years",
            "recorrente": "recurring",
            "sem_atualizacao": "unique",
            "1 ano": "one_year",
            "~ 7 anos": "five_years",
            "cinco_anos": "five_years",
            "1 trimestre": "quarter",
            "1 mês": "month",
            "~5 anos": "five_years",
            "3 horas": "hour",
            "semestre": "semester",
            "1 dia": "day",
            "2 anos": "two_years",
            "~2 anos": "two_years",
            "~3 anos": "three_years",
            "dia, semana, mês": None,
            "~1 mês": "month",
            "~6 meses": "semester",
            "~1 ano": "one_year",
            "5 anos": "five_years",
            "~4 anos": "four_years",
            "10 anos": "ten_years",
            "1 semana": "week",
            "3 anos": "three_years",
        }
        ###########################################################
        # Resource ################################################
        ###########################################################
        # id
        # name
        current["name"] = updates.get("table_id")
        # description
        current["description"] = updates.get("description")
        # position
        # url
        # cache_last_updated
        # cache_url
        # created
        # datastore_active
        # format
        # formato
        # hash
        # last_modified
        # mimetype
        # mimetype_inner
        # package_id
        # size
        # state
        # url_type
        ###########################################################
        # BDM Table ###############################################
        ###########################################################
        # resource_type
        current["resource_type"] = "bdm_table"
        # dataset_id
        current["dataset_id"] = updates.get("dataset_id")
        # table_id
        current["table_id"] = updates.get("table_id")
        # spatial_coverage
        current["spatial_coverage"] = ",".join(
            [
                spatial_coverage_mapping.get(key, key)
                for key in (updates.get("coverage_geo", []) or [])
            ]
        )
        if "coverage_geo" in current:
            del current["coverage_geo"]
        # temporal_coverage
        current["temporal_coverage"] = (
            [t for t in updates.get("coverage_time", []) if isinstance(t, int)]
            if updates.get("coverage_time")
            else None
        )
        if "coverage_time" in current:
            del current["coverage_time"]
        # update_frequency
        current["update_frequency"] = update_frequency_mapping.get(
            updates.get("data_update_frequency")
        )
        if "data_update_frequency" in current:
            del current["data_update_frequency"]
        # entity
        current["entity"] = [
            entity_mapping[key] for key in (updates.get("observation_level") or [])
        ]
        current["entity"] = filter(lambda x: x, current["entity"])
        current["entity"] = list(current["entity"])
        if "observation_level" in current:
            del current["observation_level"]
        # time_unit
        # identifying_columns
        current["identifying_columns"] = updates.get("primary_keys", None)
        if "primary_keys" in current:
            del current["primary_keys"]
        # last_updated
        # version
        current["version"] = updates.get("version")
        # published_by
        current["published_by"] = {
            "name": (updates.get("published_by") or {}).get("name"),
            "email": (updates.get("published_by") or {}).get("email"),
            "website": (updates.get("published_by") or {}).get("website"),
            "ckan_user": (updates.get("published_by") or {}).get("ckan_user"),
            "github_user": (updates.get("published_by") or {}).get("github_user"),
        }
        if "publisher" in current:
            del current["publisher"]
        if "publisher_email" in current:
            del current["publisher_email"]
        if "publisher_github" in current:
            del current["publisher_github"]
        if "publisher_website" in current:
            del current["publisher_website"]
        # data_cleaned_by
        current["data_cleaned_by"] = (
            {
                "name": (updates.get("treated_by") or {}).get("name"),
                "email": (updates.get("treated_by") or {}).get("email"),
                "website": (updates.get("treated_by") or {}).get("website"),
                "code_url": (updates.get("treated_by") or {}).get("code_url"),
                "ckan_user": (updates.get("treated_by") or {}).get("ckan_user"),
                "github_user": (updates.get("treated_by") or {}).get("github_user"),
            }
            if "treated_by" in updates
            else None
        )
        # data_cleaning_description
        current["data_cleaning_description"] = updates.get("treatment_description")
        if "treatment_description" in current:
            del current["treatment_description"]
        # raw_files_url
        # auxiliary_files_url
        # architecture_url
        # covered_by_dictionary
        # source_bucket_name
        current["source_bucket_name"] = updates.get("source_bucket_name")
        # project_id_prod
        current["project_id_prod"] = updates.get("project_id_prod")
        # project_id_staging
        current["project_id_staging"] = updates.get("project_id_staging")
        # partitions
        current["partitions"] = str(updates.get("partitions"))
        # bdm_file_size
        # columns
        current["columns"] = updates.get("columns")
        for key, _ in enumerate(current["columns"]):
            current["columns"][key]["dataset_id"] = current["dataset_id"]
            current["columns"][key]["table_id"] = current["table_id"]
        ###########################################################
        ###########################################################
        ###########################################################
        return current

    ###########################################################

    def load_filepath(self, folder):
        datasets = Path(folder).glob("**/**/dataset_config.yaml")
        datasets = list(datasets)
        return datasets

    def omit_hints(self, data):
        """Set values encapsulated with <> to None in a dict"""
        if isinstance(data, str):
            hidden = f"{data[0]}{data[-1]}" != "<>" if len(data) >= 2 else None
            return data if hidden else None
        elif isinstance(data, list):
            lst = list(filter(self.omit_hints, data))
            return lst if lst else None
        elif isinstance(data, dict):
            for key in data.keys():
                data[key] = self.omit_hints(data[key])
            return data
        return data


###########################################################
###########################################################
###########################################################
###########################################################

ckan_local = RemoteCKAN(
    "http://localhost:5000/",
    "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJqdGkiOiJ1Uy1yZzFkR3JnYjd6cTk1Q1JOQTYyX2FRUEs4aWltSWRtQmR6N1RScXVydFN3WGNwMUVqSW5uT1RtSE5tNlhPU1BtQkFfbVExbF9rdDNrUCIsImlhdCI6MTYyNzgyNTAzN30.G-lrCmuRiAKEUzUVYqvRVoowpYbyXv16FDOHFhzwtaw",
)

ckan_remote = RemoteCKAN("https://basedosdados.org/")

migrator = Migrator(ckan_local, ckan_remote, "mais/bases")

In [None]:
# LOAD_DATASET_PARSED
# for i in range(len(migrator.datasets)):
#     migrator.load_dataset_parsed(i)
#     clear_output(wait=True)

In [None]:
# VALIDATE_ALL
migrator.validate_all()

In [None]:
# MIGRATE_ALL
# migrator.migrate_all()

## Debug Hidráulico