In [None]:
from copy import deepcopy
import json
import os
from typing import List
import uuid
import re
import pandas as pd
from icecream import ic

## Utilidades

In [None]:
def plural(palavra: str) -> str:
    """
    Passa uma palavra em português para o plural. Não funciona sempre,
    devido à quantidade de exceções que precisam ser programadas à mão.
    """
    palavra = palavra.strip()
    if len(palavra.split(" ")) > 1:
        return palavra
    if not palavra:
        return palavra
    invariaveis = [r"x$"]
    for p in invariaveis:
        if re.search(p, palavra):
            return palavra
    substituicoes = {
        r"ão": r"õe",
        r"r$": r"re",
        r"z$": r"ze",
        r"s$": r"se",
        r"(?<=[aeou])l": r"i",
        r"il": r"ei",
        r"m$": r"n",
    }
    for p, s in substituicoes.items():
        palavra = re.sub(p, s, palavra)
    return palavra + "s" if palavra[-1] != "s" else palavra

def sanitize(palavra: str) -> str:
    """Substitui caracteres acentuados."""
    substituicoes = {
        "a": ["á", "â", "ã", "à"],
        "c": ["ç"],
        "e": ["é", "ê"],
        "i": ["í"],
        "o": ["ó", "ô", "õ"],
        "u": ["ú", "ü"],
    }
    for substituta, letras in substituicoes.items():
        for letra in letras:
            palavra = palavra.replace(letra, substituta)
    return palavra

In [None]:
def drop_duplicates(ls: list) -> list:
    """Remove elementos duplicados ou vazios."""
    return list(filter(None, set(ls)))

def flatten(ls: list) -> list:
    """Reduz em 1 a dimensão de uma lista."""
    return [item for sublist in ls for item in sublist]

def remove(ls: list, to_remove: list) -> list:
    """Remove de uma lista quaisquer elementos que estejam em outra."""
    return [item for item in ls if item not in to_remove]

def is_flat(x) -> bool:
    """Retorna verdadeiro se a entrada não for um dicionário nem uma lista."""
    return not (isinstance(x, list) or isinstance(x, dict))

def is_flat_list(x) -> bool:
    """Retorna verdadeiro se a lista for plana."""
    return all(is_flat(y) for y in x)

def mix_list(a, b) -> list:
    """
    Realiza um outer join entre duas listas planas ou que contêm dicionários.
    """
    if is_flat_list(a) and is_flat_list(b):
        return drop_duplicates(a + b)
    elif is_flat_list(a) or is_flat_list(b):
        raise ValueError(a, b)

    a_dict = to_dict(a)
    b_dict = to_dict(b)
    mixed_dicts = mix_dict(a_dict, b_dict)
    return list(mixed_dicts.values())

def to_dict(ls: List[dict]) -> dict:
    """
    Transforma uma lista de dicionários em um dicionário de dicionários, 
    inferindo como chave para cada dict algum valor dele que seja presente
    em todos os dicts, único em cada um e plano.
    """
    if len(ls) == 1:
        return {"placeholder_key": ls[0]}
    d = ls[0]
    global_keys = []
    hardcoded_precedence = {"conditions": 0, "title": 1}
    for key in d:
        # se o valor dessa chave não for plano
        if not is_flat(d[key]):
            # print(key, "is not flat")
            continue
        # se nem todos dicts tiverem essa chave
        if not all(key in d_ for d_ in ls):
            # print(key, "is not in every dict")
            continue
        # se nem todos os dicts tiverem um valor para essa chave
        if not all(bool(d_[key]) for d_ in ls):
            # print(key, "doesnt have a value in all dicts")
            continue
        # se cada dict não tiver um valor único para essa chave
        if not len(set(d_[key] for d_ in ls)) == len(ls):
            # print(key, "doesnt have all unique values")
            continue
        global_keys.append(key)

    if not global_keys:
        raise ValueError(f"Could not find a global key for {ls}")
    global_keys.sort(key=lambda x: hardcoded_precedence.get(x, 999))
    global_key = global_keys[0]
    return {d_[global_key]: d_ for d_ in ls}

def mix_dict(a: dict, b: dict) -> dict:
    """
    Retorna um dicionário contendo as chaves e valores de ambos dicionários
    de entrada, dando preferência para os valores de b.
    """
    out = deepcopy(b)
    for k, v in a.items():
        if k not in out:
            out[k] = v
        elif isinstance(v, list):
            if not isinstance(b[k], list):
                raise ValueError(f"a é uma lista em {k}, mas b é {type(b[k])}")
            out[k] = mix_list(v, b[k])
        elif isinstance(v, dict):
            if not isinstance(b[k], dict):
                raise ValueError(f"a é um dict em {k}, mas b é {type(b[k])}")
            out[k] = mix_dict(v, b[k])
        else:
            out[k] = v
    return out

## I/O

In [None]:
def load_questions() -> pd.DataFrame:
    df = pd.read_excel("results/Perguntas.xlsx", sheet_name="finais")
    df = df.dropna(subset=["Resposta"])
    subs = {
        "Pergunta": "pergunta",
        "Resposta": "resposta",
        "Intenção": "intent",
        "Rótulos": "rótulos",
        "Modificador": "modificador",
        "Substantivo": "substantivo",
        "Recipiente": "recipiente",
        "Elocuções": "examples",
    }
    df = df[list(subs.keys())]
    df = df.rename(columns=subs)
    df = df.fillna("")
    return df

def load_skill(file_path: str) -> dict:
    with open(file_path, "r", encoding="utf-8") as f:
        sk = json.load(f)
    return sk

def save_skill(file_path: str, to_save: dict):
    root, extension = os.path.splitext(file_path)
    new_skill_path = f"{root}2{extension}"
    with open(new_skill_path, "w", encoding="utf-8") as f:
        json.dump(to_save, f, ensure_ascii=False)

## Intenções

In [None]:
def get_intents(df: pd.DataFrame) -> dict:
    subset = ["intent", "pergunta", "examples"]
    records = df[subset].to_dict(orient="records")
    intents = [
        {
            "intent": record["intent"],
            "examples": get_examples(record),
            "description": "",
        }
        for record in records
    ]
    return intents

def get_examples(record: dict) -> List:
    # a própria pergunta é um exemplo
    out = [{"text": record["pergunta"]}]
    # tudo que está em Elocuções é exemplo também
    if record["examples"]:
        out += [{"text": exemplo} for exemplo in record["examples"].split("--")]
    return out

## Entidades

In [None]:
def get_entity_values(series):
    records = series.drop_duplicates().to_list()
    records = [r.split("-") for r in records]
    records = flatten(records)
    records = drop_duplicates(records)
    values = [
        {"type": "synonyms", "value": record, "synonyms": []} for record in records
    ]
    return values

def get_entities(df):
    subset = ["rótulos", "modificador", "substantivo", "recipiente"]
    entities = [
        {"entity": col, "values": get_entity_values(df[col]), "fuzzy_match": True}
        for col in subset
    ]
    return entities

## Nós de diálogo

In [None]:
def get_dialog_nodes(df: pd.DataFrame) -> dict:
    records = df.to_dict(orient="records")
    nodes = [
        {
            "type": "standard",
            "title": get_titulo(record),
            "output": {
                "generic": [
                    {
                        "values": [{"text": record["resposta"]}],
                        "response_type": "text",
                        "selection_policy": "sequential",
                    }
                ]
            },
            "context": {"contexto": sanitize(record["rótulos"])},
            "conditions": get_condition_string(record),
            "dialog_node": f"node_{uuid.uuid4().hex[:16]}"
        }
        for record in records
    ]
    nodes.sort(key=lambda x: x["conditions"])
    return nodes

def get_titulo(js: dict) -> str:
    """
    Retorna um título para o nó baseado no modificador, substantivo, recipiente
    e rótulos.
    :param js: nó
    :return: título
    """
    modificador = js["modificador"]
    substantivo = js["substantivo"].replace("-", " ")
    recipiente = js["recipiente"]
    
    contextos = get_contextos(js["rótulos"].split("-"))
    contextos = list(filter(lambda x: x.count("-") == 0, contextos))
    trechos = []
    if contextos:
        trechos.append(f"{'/'.join(contextos)}:")
    
    if modificador in ["efeito"]:
        trechos.append(modificador)
        if substantivo:
            trechos.append(f"de {substantivo}")
        recipiente = recipiente or contextos[0]
        trechos.append(f"em {recipiente}")
    elif modificador in ["definição"]:
        trechos.append(modificador)
        if substantivo and recipiente:
            raise ValueError(f"Perguntas do tipo 'definição' não podem ter substantivo e recipiente, "
                             "apenas um deles! Pergunta: {js['pergunta']}")
        elif substantivo or recipiente:
            trechos.append(f"de {substantivo or recipiente}")
    elif modificador in ["maiores", "menores"]:
        substituir = {"produção": "produtores"}
        if recipiente in substituir:
            recipiente = substituir[recipiente]
        substantivo = plural(substantivo)
        trechos.append(f"{modificador} {substantivo} {recipiente}?")
    elif modificador in ["diferença"]:
        recipiente = recipiente or contextos[0]
        trechos.append(f"{modificador} entre {substantivo} e {recipiente}?")
    elif modificador in ["existe", "quantidade"]:
        recipiente = recipiente or "no Brasil"
        trechos.append(modificador)
        if substantivo:
            trechos.append(substantivo)
        trechos.append(f"{recipiente}?")
    elif modificador in ["listar"]:
        substituir = {"extinção": "em extinção", "aaz": "na Amazônia Azul", "brasil": "no Brasil"}
        if recipiente in substituir:
            recipiente = substituir[recipiente]
        substantivo = plural(substantivo)
        trechos.append(f"exemplos de {substantivo} {recipiente}")
    elif modificador in ["pertence"]:
        if substantivo and recipiente:
            trechos.append(f"{substantivo} é um {recipiente}?")
        elif substantivo:
            trechos.append(f"{substantivo} é um {contextos[0]}?")
        elif recipiente:
            trechos.append(f"{contextos[0]} é um {recipiente}?")
        else:
            raise ValueError(f"Perguntas do tipo 'pertence' precisam de substantivo ou de recipiente! Pergunta: {js['pergunta']}")
    else:
        trechos.append(modificador)
        if substantivo:
            trechos.append(substantivo)
        if recipiente:
            trechos.append(recipiente)
    trechos[0] = trechos[0].capitalize()
    titulo = " ".join(trechos)
    titulo = titulo.strip()
    return titulo

def get_contextos(rotulos):
    """
    Devolve os contextos de uma pergunta baseado em seus rótulos,
    sendo que há uma lista de rótulos que não definem contexto.
    """
    rotulos_nao_contextuais = [
        "fauna",
        "flora",
        "outras",
        "física",
        "símbolo",
        "turismo",
        "engenharia",
        "saúde",
        "geologia",
    ]
    contextos = [contexto for contexto in rotulos if all(rot not in contexto for rot in rotulos_nao_contextuais)]
    return contextos

def get_condition_string(js):
    modificador = js["modificador"]
    substantivo = js["substantivo"]
    recipiente = js["recipiente"]
    rotulos = js["rótulos"].split("-") + [js["rótulos"]]
    rotulos = drop_duplicates(rotulos)
    contextos = get_contextos(rotulos)

    if contextos:
        conds_adicionais = [get_condition(modificador, substantivo, recipiente, contexto) for contexto in contextos]
    else:
        conds_adicionais = [get_condition(modificador, substantivo, recipiente)]

    conds = [f"#{js['intent']}"] + flatten(conds_adicionais)
    cond_str = " || ".join(conds)
    return cond_str

def get_condition(modificador, substantivo, recipiente, contexto=None) -> list:
    trechos = [f"@modificador:{modificador}"]
    if substantivo:
        trechos.append(f"&& @substantivo:{substantivo}")
    if recipiente:
        trechos.append(f"&& @recipiente:{recipiente}")
    cond = " ".join(trechos)
    if contexto:
        contexto = sanitize(contexto)
        out = [cond + f' && $contexto=="{contexto}"', cond + f" && @rótulos:{contexto}"]
    else:
        out = [cond]
    return out
                    
def apply_previous_siblings(nodes: List[dict]) -> List[dict]:
    for i, node in enumerate(nodes):
        if i != 0:
            prev = nodes[i-1]
            node["previous_sibling"] = prev["dialog_node"]
    return nodes

def fix_previous_siblings(original_nodes: List[dict]) -> List[dict]:
    """
    Conserta previous_sibling para nós que foram copiados da interface, mas
    referenciam algum nó que não existe mais, e também para o primeiro nó 
    gerado a partir da planilha, que deve referenciar um dos nós já existentes.
    """
    nodes = deepcopy(original_nodes)
    dialog_nodes = [node["dialog_node"] for node in nodes]
    siblings = [] # valor de 'dialog_node' de um nó que é previous_sibling de outro
    parents = [] # valor de 'dialog_node' de um nó que é parent de outro
    has_sibling = [] # nós que possuem sibling
    has_parent = [] # nós que possuem parent
    invalid_sibling = [] # nós cujo sibling é inválido
    invalid_parent = [] # nós cujo parent é inválido
    for node in nodes:
        if "previous_sibling" in node or "parent" in node:
            if "previous_sibling" in node:
                has_sibling.append(node)
                if node["previous_sibling"] not in dialog_nodes:
                    invalid_sibling.append(node)
                else:
                    siblings.append(node["previous_sibling"])
            if "parent" in node:
                has_parent.append(node)
                if node["parent"] not in dialog_nodes:
                    invalid_parent.append(node)
                else:
                    parents.append(node["parent"])

    no_upstream = [n for n in nodes if n not in has_sibling and n not in has_parent]
    no_upstream = list(filter(lambda x: "conditions" in x and x["conditions"].count("welcome") < 1, no_upstream))
    no_downstream = [n for n in nodes if n["dialog_node"] not in siblings and n not in has_parent]
    no_downstream = list(filter(lambda x: x["conditions"] != "anything_else", no_downstream))
    # ic(no_downstream) # pode ser usado como sibling para alguém que não tem
    # ic(invalid_sibling) # tem um sibling inválido
    # ic(no_upstream) # não tem sibling nem parent
    if any(invalid_parent):
        raise ValueError(f"Alguns nós possuem pais inválidos! {invalid_parent}")
    if len(invalid_sibling) + len(no_upstream) != len(no_downstream):
        raise ValueError("Há mais nós sem irmãos que nós sem filhos!")

    for node in no_downstream:
        new_sibling = node["dialog_node"]
        if re.search(r"node_._", node["dialog_node"]):
            node_to_fix = no_upstream[0]
        else:
            node_to_fix = invalid_sibling[0]
        for node_ in nodes:
            if node_["dialog_node"] == node_to_fix["dialog_node"]:
                node_["previous_sibling"] = new_sibling
    return nodes

## Skill

In [None]:
def mix_skills(base, **kwargs):
    new = deepcopy(base)
    for k, v in kwargs.items():
        new[k] = v
    return new

## Rodar

In [None]:
questions = load_questions()
questions[~(questions["examples"]=="")]

In [None]:
skill_path = "results/skill-Amazônia-Azul.json"
skill = load_skill(skill_path)

In [None]:
new_intents = get_intents(questions)
print("Intents obtained!")
mixed_intents = mix_list(skill["intents"], new_intents)
print("Intents mixed!")

In [None]:
new_entities = get_entities(questions)
print("Entities obtained!")
mixed_entities = mix_list(skill["entities"], new_entities)
print("Entities mixed!")

In [None]:
new_nodes = get_dialog_nodes(questions)
print("Nodes obtained!")
new_nodes = apply_previous_siblings(new_nodes)
print("Previous siblings applied!")
mixed_nodes = mix_list(skill["dialog_nodes"], new_nodes)
print("Nodes mixed!")
mixed_nodes = fix_previous_siblings(mixed_nodes)
print("Nodes' siblings fixed!")
mixed_nodes

In [None]:
mixed_skill = mix_skills(skill, intents=mixed_intents, entities=mixed_entities, dialog_nodes=mixed_nodes)
save_skill(skill_path, mixed_skill)

## Comparar

In [None]:
nodes_dict = to_dict(skill["dialog_nodes"])
mixed_nodes_dict = to_dict(mixed_nodes)
diff = [node for node in mixed_nodes_dict.keys() if node not in nodes_dict.keys()]
diff = [mixed_nodes_dict[node] for node in diff]
diff