# Generador de Reglas Falco
Aplicación NLP que genera reglas Falco válidas mediante modelos de lenguaje, dada una descripción de una como entrada

_Autor: Carel Sánchez_

_Licencia: 	``GPL-3.0-only``_

### Fase 0: Requisitos previos
Descarga los recursos necesarios y las rulesets de internet.

#### _Instalación de módulos_

In [None]:
!python -m pip install datasets pyyaml ipywidgets evaluate nltk accelerate rouge_score tiktoken
!python -m pip install transformers[torch] -U #Resuelve un problema de HF
!python -m pip install openai   #Para interactuar con GPT de OpenAI

#### _Importamos las librerías necesarias_

In [None]:
from torch import no_grad, cuda, from_numpy
from sklearn.metrics.pairwise import cosine_similarity
from tokenizers import Tokenizer
from datasets import load_from_disk, Dataset
from transformers import (
    AutoModel,
    AutoModelForSeq2SeqLM,
    AutoConfig,
    DataCollatorForSeq2Seq,
    TrainingArguments,
    Seq2SeqTrainingArguments,
    Trainer,
    Seq2SeqTrainer,
    AutoTokenizer,
    logging
)
from evaluate import load
import nltk
import numpy as np
import re
import yaml
nltk.download('punkt')

#### _Comprobamos especificaciones_

In [None]:
import psutil
from pathlib import Path

def get_total_vram():
    vram = !nvidia-smi.exe --query-gpu=memory.free --format=csv
    return str(round(int(vram[-1].split()[0])/1024,2))+" GB"

def get_total_ram():
    return str(round(psutil.virtual_memory()[0]/(2**30),2))+" GB"

def get_total_disk():
    return str(round(psutil.disk_usage("/")[0]/(2**30),2))+" GB"

def get_used_vram():
    vram = !nvidia-smi.exe --query-gpu=memory.used --format=csv
    return str(round(int(vram[-1].split()[0])/1024,2))+" GB"

def get_used_ram():
    return str(round(psutil.virtual_memory()[3]/(2**30),2))+" GB"

def get_used_disk():
    return str(round(sum(f.stat().st_size for f in Path('.').glob('**/*') if f.is_file())/2**30,2))+" GB"

def print_specs():
    print("=== SPECS ===")
    print("Disk Size:",get_total_disk())
    print("RAM:      ",get_total_ram())
    print("GPU RAM:  ",get_total_vram())

def print_usage():
    print("=== USAGE ===")
    print("Used Disk:",get_used_disk())
    print("Used RAM: ",get_used_ram())
    print("Used VRAM:",get_used_vram())


print_specs()
print_usage()

### Fase 1: Recopilación y creación de los Datasets

Descargamos las rulesets desde distintas fuentes, unificamos, y creamos el dataset

#### Descarga de las rulesets

In [None]:
import requests
import os

def download(uri:str, rule_source:list):
    if not os.path.exists('files'):
        os.mkdir("files")

    if not os.path.exists('files/rules'):
        os.mkdir("files/rules")

    for rule in rule_source:
        if not os.path.exists("files/rules/"+rule):
            print(f"[*] Getting {rule}...")
            with open("files/rules/"+rule,"wb") as f:
                r = requests.get(uri_source+rule).text #Descarga el archivo
                f.write(r.encode('utf8')) # Escribe y guarda el archivo
        else:
            print(f"[!] The rule {rule} already exists!")

#Fuente 1: Repositorio oficial de Falco
uri_source = "https://raw.githubusercontent.com/falcosecurity/rules/main/rules/"
rule_source = ["falco_rules.yaml", "falco-deprecated_rules.yaml", "falco-incubating_rules.yaml", "falco-sandbox_rules.yaml"]
download(uri_source,rule_source)

#Fuente 2: Repositorio cloud-native-security
uri_source = "https://raw.githubusercontent.com/falcosecurity-retire/cloud-native-security-hub/master/resources/falco/"
rule_source = ["admin.yaml", "apache.yaml", "consul.yaml", "elasticsearch.yaml", "etcd.yaml", "fim.yaml", "fluentd.yaml", "gke.yaml", "haproxy.yaml", "kubernetes.yaml", "mongo.yaml", "nginx.yaml", "php-fpm.yaml", "postgres.yaml", "redis.yaml", "rook.yaml", "ssh.yaml", "traefik.yaml"]
download(uri_source,rule_source)

#Fuente 3: Repositorio falco_extended_rules
uri_source = "https://raw.githubusercontent.com/CloudDefenseAI/falco_extended_rules/master/rules/"
rule_source = [ "account_manipulation_in_ssh.yaml", "archive_and_compression_activity.yaml", "attempt_to_access_bash_history.yaml", "boot_or_logon_autostart_execution.yaml", "chown_chmod_operations.yaml", "create_account_add_user.yaml", "create_nodeport_service.yaml", "create_or_modify_system_process.yaml", "credentials_from_password_file.yaml", "custom_rules.yaml", "detect_data_destruction_activity.yaml", "detect_peripheral_device_enumeration.yaml", "detect_service_disable.yaml", "detect_suspicious_disk_activity.yaml", "disable_recovery_features.yaml", "enumerate_domain_trust.yaml", "execute_command_via_utility.yaml", "get_info_about_open_application_windows.yaml", "memory_maps_of_processes.yaml", "modify_authentication_process.yaml", "password_policy_discovery.yaml", "permission_group_members_discovery.yaml", "process_injection.yaml", "read_disk_block_command.yaml", "suspicious_info_gathering.yaml", "suspicious_network_spanning_command.yaml", "suspicious_time_Date.yaml", "system_location_retrieval.yaml", "system_service_discovery.yaml"]
download(uri_source,rule_source)

#Fuente 4: Repositorio Mitre-Attack-Falco-AWS
uri_source = "https://raw.githubusercontent.com/n1g3ld0ugla5/Mitre-Attack-Falco-AWS/main/plugins/rules/"
rule_source = ["aws_cloudtrail_rules.yaml"]
download(uri_source,rule_source)

#Fuente 5: Repositorio Sysdigfalcorules by hidd3ncod3s
uri_source = "https://raw.githubusercontent.com/hidd3ncod3s/sysdigfalcorules/master/rules/"
rule_source = [ "BolTCMS.yaml", "CMS_MadeSimple.yaml", "Log1CMS.yaml", "MonstraCMS.yaml", "PolarBearCMS.yaml", "generic.yaml", "wordpress.yaml"]
download(uri_source,rule_source)

#Fuente 6: Repositorio falco-rules-test
uri_source = "https://raw.githubusercontent.com/fsdaniel/falco-rules-test/main/"
rule_source = ["global_falco_rules.yaml"]
download(uri_source,rule_source)

#### Creación del Dataset

 Esto lo haremos usando la librería ``datasets``, de HuggingFace


In [None]:
from datasets import (Dataset, DatasetDict, logging, disable_progress_bars)
from os import listdir
import yaml

# Silenciamos el logging
logging.set_verbosity_error()
disable_progress_bars()

#Función para extraer las reglas de las rulesets
def retrieve_rules(path:str):
    rules  = []
    with open(path, 'r') as f:
        ruleset = yaml.safe_load(f)
        #Extraemos el verdadero ruleset si el objeto es un diccionario (esperamos una lista)
        if type(ruleset) == dict:
            ruleset = yaml.safe_load(ruleset['rules'][0]['raw'])

        #Removing 'Nones'
        while None in ruleset:
            ruleset.remove(None)

        #Extracting interesting items
        for elem in ruleset[1:]:
            if 'rule' in elem:
                rules.append(elem)

        return rules

# Generador para dataset supervisado
def gen():
    for filepath in listdir("files/rules"):
        ruleset = retrieve_rules("files/rules/"+filepath)
        print(f"The ruleset has {len(ruleset)} rules.")
        for rule in ruleset:
            yield {
                "name":rule['rule'],
                "description":rule["desc"],
                "condition":rule["condition"],
                "output": rule["output"],
                "priority": rule["priority"].upper(),
            }

# Generador para dataset no supervisado
def gen_raw():
    for filepath in listdir("files/rules"):
        ruleset = retrieve_rules("files/rules/"+filepath)
        print(f"The ruleset '{filepath}' has {len(ruleset)} rules.")
        for rule in ruleset:
            yield {
                "text":yaml.dump(rule, sort_keys=False).strip()+"\n"
            }

# Generador para dataset en crudo
def gen_text():
    for filepath in listdir("files/rules"):
        ruleset = retrieve_rules("files/rules/"+filepath)
        for rule in ruleset:
            yield {
                "text":(rule["desc"] + ": "+rule["condition"]).replace("\n",""),
            }

def gen_out():
    for filepath in listdir("files/rules"):
        ruleset = retrieve_rules("files/rules/"+filepath)
        for rule in ruleset:
            yield {
                "text":(rule["desc"] + ": "+rule["output"]).replace("\n",""),
            }

def split(dataset):
    trainTestValid = dataset.train_test_split(test_size=0.2)
    TestValid = trainTestValid['test'].train_test_split(test_size=0.5)
    return DatasetDict({
        'train': trainTestValid['train'],
        'test': TestValid['test'],
        'validation':TestValid['train']
    })

# Version 1: Supervisado
dataset = Dataset.from_generator(gen)
dataset_split = split(dataset)

# Version 2: No Supervisado
dataset_text = Dataset.from_generator(gen_text)
dataset_text_split = split(dataset_text)

dataset_out = Dataset.from_generator(gen_out)
dataset_out_split = split(dataset_out)

# Version 3: Texto en Crudo
dataset_raw = Dataset.from_generator(gen_raw)
dataset_raw_split = split(dataset_raw)

# Imprimimos los datasets
print("dataset:", dataset,"----------")
print("dataset_text:", dataset_text,"----------")
print("dataset_out:", dataset_out,"----------")
print("dataset_raw:", dataset_raw,"----------")
print("dataset_split:", dataset_split,"----------")
print("dataset_text_split:", dataset_text_split,"----------")
print("dataset_out_split:", dataset_out_split,"----------")
print("dataset_raw_split:", dataset_raw_split,"----------")

# Guardamos los datasets
dataset.save_to_disk("files/datasets/dataset")
dataset_text.save_to_disk("files/datasets/dataset_text")
dataset_out.save_to_disk("files/datasets/dataset_out")
dataset_raw.save_to_disk("files/datasets/dataset_raw")

dataset_split.save_to_disk("files/datasets/dataset_split")
dataset_text_split.save_to_disk("files/datasets/dataset_text_split")
dataset_out_split.save_to_disk("files/datasets/dataset_out_split")
dataset_raw_split.save_to_disk("files/datasets/dataset_raw_split")

print("All datasets have been saved")

### Fase 2: Afinamiento del Modelo

Afinamos la salida resultante del modelo a través de un entrenador usando un modelo como referencia.

#### Versión 1: Entrenamiento supervisado con Seq2Seq

In [None]:
#@title
from tokenizers import Tokenizer
from datasets import load_from_disk, Dataset
from transformers import (
    AutoConfig,
    AutoModelForSeq2SeqLM,
    DataCollatorForSeq2Seq,
    Seq2SeqTrainingArguments,
    Seq2SeqTrainer,
    AutoTokenizer
)
from evaluate import load
import nltk
import numpy as np
import yaml
nltk.download('punkt')

# Función de afinamiento de modelo
def train_v1(dataset: Dataset, model_checkpoint: str, tokenizer_checkpoint: str, input_key: str, output_key: str, version = "v1"):
    print("Loading the model and Tokenizer...")
    #Cargamos el modelo y el tokenizador
    config = AutoConfig.from_pretrained(model_checkpoint)
    #Usaremos Seq2SeqTrainer (optimizado para la sumarización)
    model = AutoModelForSeq2SeqLM.from_pretrained(model_checkpoint)

    tokenizer = AutoTokenizer.from_pretrained(tokenizer_checkpoint)
    #Añadimos el Padding Token
    if tokenizer.pad_token is None:
        tokenizer.add_special_tokens({'pad_token': '[PAD]'})

    #Tokenizamos el dataset
    print("Tokenizing the dataset...")
    def tokenize_function(examples):
        if model_checkpoint in ["t5-small", "t5-base", "t5-larg", "t5-3b", "t5-11b"]:
            prefix = "summarize: "
        else:
            prefix = ""
        max_input_length = 1024
        max_target_length = 128
        inputs = [prefix + doc for doc in examples[input_key]]
        model_inputs = tokenizer(inputs, max_length=max_input_length, truncation=True)

        # Inicializamos el tokenizador para los targets
        labels = tokenizer(text_target=examples[output_key],
                           max_length=max_target_length,
                           truncation=True)

        model_inputs["labels"] = labels["input_ids"]
        return model_inputs

    tokenized_dataset = dataset.map(tokenize_function, batched=True)

    #Creamos la métrica
    def compute_metrics(eval_pred):
        #Sacado del Notebook de ejemplo de HF 'Summarization'
        predictions, labels = eval_pred
        decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)

        #Usaremos la métrica ROUGE
        metric = load("rouge")

        # Reemplazamos -100 de los labels porque no se pueden decodificar.
        labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
        decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)

        # Por cada frase, Rouge se espera un salto de línea
        decoded_preds = ["\n".join(nltk.sent_tokenize(pred.strip())) for pred in decoded_preds]
        decoded_labels = ["\n".join(nltk.sent_tokenize(label.strip())) for label in decoded_labels]

        # La métrica ROUGE usa el parámetro `use_aggregator` que devuelve una
        # lista de la métrica calculada de cada frase, pero no suele ser usado
        # en otras métricas
        result = metric.compute(
            predictions=decoded_preds,
            references=decoded_labels,
            use_stemmer=True,
            use_aggregator=True
            )

        # Extraemos algunos resultados
        result = {key: value * 100 for key, value in result.items()}

        # Añadimos la longitud media de la salida generada
        prediction_lens = [np.count_nonzero(pred != tokenizer.pad_token_id) for pred in predictions]
        result["gen_len"] = np.mean(prediction_lens)

        #Devolvemos los resultados redondeados en cuatro decimales
        return {k: round(v, 4) for k, v in result.items()}

    batch_size = 16                            #Definimos el tamaño de los lotes
    model_name = model_checkpoint.split("/")[-1] #Extraemos el nombre del modelo

    # Establecemos los argumentos
    args = Seq2SeqTrainingArguments(
        f"{model_name}-rulegen"+"-"+version,   #Nombre del modelo resultante
        eval_strategy = "epoch",
        learning_rate=2e-5,
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        weight_decay=0.01,
        save_total_limit=3,
        num_train_epochs=1,
        predict_with_generate=True,
        fp16=True,
        push_to_hub=False,
    )

    # Creamos el colador de datos
    data_collator = DataCollatorForSeq2Seq(tokenizer, model=model)

    trainer = Seq2SeqTrainer(
        model,
        args,
        train_dataset=tokenized_dataset["train"],
        eval_dataset=tokenized_dataset["validation"],
        data_collator=data_collator,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics,
    )

    #Entrenamos el modelo
    print("Running the training process...")
    trainer.train()

    #Devolvemos el entrenador
    return trainer

# Realizamos el afinamiento del modelo mediante entrenamiento
dataset = load_from_disk("files/datasets/dataset_split")

model_checkpoint_v1 = "google-t5/t5-small" #@param  {type:"string"}
tokenizer_checkpoint_v1 = "google-t5/t5-small" #@param  {type:"string"}

print("Initiating training script...")
rule_tf = train_v1(dataset, model_checkpoint_v1, tokenizer_checkpoint_v1, "description", "condition", "v1-condition")
rule_tf.save_model()
print("Model has been fine-tuned and saved!")


_Obtenemos el consumo y liberamos memoria_

In [None]:
print_usage()
del rule_tf
cuda.empty_cache()

#### Versión 2: Entrenamiento sin supervisión

In [None]:
#@title
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    Trainer,
    TrainingArguments,
    DataCollatorForLanguageModeling
    )

from datasets import load_from_disk
import evaluate
import numpy as np


def train_v2(dataset, model_checkpoint: str, tokenizer_checkpoint: str, version = "v2"):

    print("Loading model and tokenizer...")
    # Cargar el tokenizador y el modelo preentrenado
    tokenizer = AutoTokenizer.from_pretrained(tokenizer_checkpoint)
    if tokenizer.pad_token is None:
        tokenizer.add_special_tokens({'pad_token': '[PAD]'})

    model = AutoModelForCausalLM.from_pretrained(model_checkpoint)
    model.to('cuda') # Forces to use CUDA

    # Tokeniza los datos
    def tokenize_function(examples):
        max_input_length = 1024
        return tokenizer(examples['text'],
                         max_length=max_input_length,
                         padding='max_length',
                         truncation=True
                         )

    # Crea los grupos de texto
    def group_texts(examples):
        block_size = 128        #Tamaño de los grupos de texto
        # Concatenamos el texto
        concatenated_examples = {k: sum(examples[k], []) for k in examples.keys()}
        total_length = len(concatenated_examples[list(examples.keys())[0]])
        #Calculamos longitud total
        total_length = (total_length // block_size) * block_size
        # Separamos el texto en trozos del tamaño de block_size
        result = {
            k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
            for k, t in concatenated_examples.items()
        }
        result["labels"] = result["input_ids"].copy()
        return result

    # Tokenizamos y hacemos bloques de texto
    print("Tokenizing and group-texting the dataset...")
    tokenized_dataset = dataset.map(
        tokenize_function,
        batched=True,
        num_proc = 4,
        remove_columns=['text']
    )
    lm_dataset = tokenized_dataset.map(group_texts, batched=True, num_proc=4)

    model_name = model_checkpoint.split("/")[-1]
    # Configuración del entrenamiento
    training_args = TrainingArguments(
        output_dir=f"{model_name}-rulegen"+"-"+version,
        overwrite_output_dir=True,
        eval_strategy="epoch",
        learning_rate=2e-5,
        weight_decay=0.01,
        fp16=True,
        use_cpu = False,
    )
    # Inicializar el entrenador
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=lm_dataset['train'],
        eval_dataset=lm_dataset['validation'],
    )

    # Entrenar el modelo
    print("Starting training process...")
    trainer.train()
    return trainer


### Realizamos el afinamiento del modelo mediate entrenamiento ###

#Dataset para generar la condición
dataset_text_split = load_from_disk("files/datasets/dataset_text_split")
#Dataset para generar la salida
dataset_out_split = load_from_disk("files/datasets/dataset_out_split")
#Modelo a afinar
model_checkpoint = "gpt2" #@param  {type:"string"}
#Tokenizador a usar
tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer" #@param  {type:"string"}

# Entrenamos el modelo...
print("Initiating training script...")
rule_tf = train_v2(dataset_text_split, model_checkpoint, tokenizer_checkpoint, "v2-condition")
rule_tf.save_model()
print("Model has been fine-tuned and saved!")


_Evaluamos la perplejidad, el consumo, y liberamos memoria:_

In [None]:
import math
eval_results = rule_tf.evaluate()
print_usage()
print(f"Perplexity: {math.exp(eval_results['eval_loss']):.2f}")
del rule_tf
cuda.empty_cache()

#### Versión 3: Solamente texto en crudo

In [None]:
# Es la misma versión que la versión 2,
# solo que usamos las reglas enteras en crudo

#@title
dataset_raw_split = load_from_disk("files/datasets/dataset_raw_split")
model_checkpoint = "gpt2" #@param  {type:"string"}
tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer" #@param  {type:"string"}
print("Initiating training script...")
rule_tf = train_v2(dataset_raw_split, model_checkpoint, tokenizer_checkpoint, "v3")
rule_tf.save_model()
print("Model has been fine-tuned and saved!")

_Evaluamos  la perplejidad, el consumo, y liberamos memoria:_

In [None]:
import math
eval_results = rule_tf.evaluate()
print_usage()
print(f"Perplexity: {math.exp(eval_results['eval_loss']):.2f}")
del rule_tf
cuda.empty_cache()

#### Generador de reglas por LLM

In [None]:
from openai import OpenAI

# Es necesario haber definido como variable de entorno la clave de acceso a la API de OpenAI
def gpt_generate_rule(description: str):
    api_key="insert_your_api_key_here" #Usar aquí tu llave API de OpenAI
    client = OpenAI(api_key=api_key)

    # Indicamos el modelo que queremos usar
    model = "gpt-4o"
    input_price = 0.005
    output_price = 0.015
    # Pre-definimos el prompt
    messages=[
        # Instrucciones que damos al LLM
        {
        "role": "system",
        "content": [
            {
            "type": "text",
            "text": "You are a security engineer, and your task is to automate the generation of Falco rules to detect and prevent various attack techniques. You will receive a description of an attack technique, and you need to produce a corresponding Falco rule in YAML format. Guidelines for generating the Falco rule: \t\n- Rule Definition: \t\t\n        - Define a rule name that briefly describes the detection objective.\n \t- Provide a description (desc) of what the rule is detecting and why it's important.\n        - Mandatory fields are \"Rule\", \"Description\", \"Condition\", \"Output\" and \"Priority\", in that order. \"Rule\" field is the header of the Rule object, and contains the name of the rule, while the rest of fields are sub-fields of the Rule object\n - Condition: \n\t- Use appropriate conditions to detect the described attack technique. \n\t- Include relevant process names, arguments, and context (such as user IDs or working directories). \n\t- Ensure the conditions are specific enough to avoid false positives but general enough to catch the described attack. \n- Output: \n\t- Craft an output message that clearly states what was detected, including relevant details such as the command line and user name.  Use up to 50 words\n- Priority and Tags: \n\t\t- Set an appropriate priority level (info, warning, critical). Include relevant tags for categorization (e.g., [security, intrusion]).\n\nIt is expected the user will submit only a description for a specific rule.  Give only the Falco rule in YAML format. Do not comment it nor use Markdown. Once you finished, in a new line write [END]"
            }
        ]
        },
        # Ofrecemos una entrada de ejemplo
        {
        "role": "user",
        "content": [
            {
            "type": "text",
            "text": "Please generate a Falco Rule for the following description:\nA shell was used as the entrypoint/exec point into a container with an attached terminal. Parent process may have  legitimately already exited and be null (read container_entrypoint macro). Common when using \"kubectl exec\" in Kubernetes.  Correlate with k8saudit exec logs if possible to find user or serviceaccount token used (fuzzy correlation by namespace and pod name).  Rather than considering it a standalone rule, it may be best used as generic auditing rule while examining other triggered  rules in this container/tty."
            }
        ]
        },
        # Ofrecemos una salida de ejemplo
        {
        "role": "assistant",
        "content": [
            {
            "type": "text",
            "text": "- rule: Terminal shell in container\r\n  desc: >\r\n    A shell was used as the entrypoint/exec point into a container with an attached terminal. Parent process may have \r\n    legitimately already exited and be null (read container_entrypoint macro). Common when using \"kubectl exec\" in Kubernetes. \r\n    Correlate with k8saudit exec logs if possible to find user or serviceaccount token used (fuzzy correlation by namespace and pod name). \r\n    Rather than considering it a standalone rule, it may be best used as generic auditing rule while examining other triggered \r\n    rules in this container/tty.\r\n  condition: >\r\n    spawned_process \r\n    and container\r\n    and shell_procs \r\n    and proc.tty != 0\r\n    and container_entrypoint\r\n    and not (never_true)\r\n  output: A shell was spawned in a container with an attached terminal (evt_type=%evt.type user=%user.name user_uid=%user.uid user_loginuid=%user.loginuid process=%proc.name proc_exepath=%proc.exepath parent=%proc.pname command=%proc.cmdline terminal=%proc.tty exe_flags=%evt.arg.flags %container.info)\r\n  priority: NOTICE\r\n  tags: [maturity_stable, container, shell, mitre_execution, T1059]"
            }
        ]
        },
        # Aquí incluimos el prompt con la descripción de la regla que queremos generar
        {
        "role": "user",
        "content": [
            {
            "type": "text",
            "text": "Please generate a Falco Rule for the following description:\n"+description
            }
        ]
        },
        ]

    # Parámetros de generación
    temperature=1
    temp = 1
    max_t=1500
    top_p=1
    freq_p=0
    pres_p=0
    stop=["[END]"]

    # Enviamos la petición a OpenAI y devolvemos la regla
    response = client.chat.completions.create(model=model, messages=messages, temperature=temp, max_tokens=max_t, top_p=top_p, frequency_penalty=freq_p, presence_penalty=pres_p,stop=stop)
    prompt_tokens = response.usage.prompt_tokens
    completion_tokens = response.usage.completion_tokens
    price = prompt_tokens*(input_price/1000)+completion_tokens*(output_price/1000)
    return (response.choices[0].message.content, price)

#### Creación de otros modelos
Las versiones 1 y 2 requieren crear otros modelos para completar una regla Falco


In [None]:
#@title
import nltk
import numpy as np
import yaml
from datasets import load_from_disk, concatenate_datasets
from evaluate import load
from torch import from_numpy, nn
from transformers import (DataCollatorWithPadding,
                          AutoTokenizer,
                          Trainer,
                          TrainingArguments,
                          AutoModelForSequenceClassification)

nltk.download('punkt')

# Modelo para generar la salida.
# Cada uno usa una versión del entrenador
dataset_split = load_from_disk("files/datasets/dataset_split")
dataset_out_split = load_from_disk("files/datasets/dataset_out_split")
print("[*] Creating V1 output generator model...")
rule_out_v1 =train_v1(dataset_split,
                      model_checkpoint_v1,
                      tokenizer_checkpoint_v1,
                      "description",
                      "output",
                      "v1-output")
print("[*]  Creating V2 output generator model...")
rule_out_v2 = train_v2(dataset_out_split,
                       model_checkpoint,
                       tokenizer_checkpoint,
                       "v2-output")

rule_out_v1.save_model()
rule_out_v2.save_model()

del rule_out_v1, rule_out_v2
cuda.empty_cache()

# Modelo para clasificar por prioridad
# Creamos un entrenador propio

pr_model_checkpoint = "distilbert-base-uncased" #@param {type:"string"}
pr_tokenizer_checkpoint = "distilbert-base-uncased" #@param {type:"string"}

def train_priority(dataset):
    # Definimos las etiquetas, de ida y de vuelta
    priority_label = {"EMERGENCY":0, "ALERT":1, "CRITICAL":2,
                   "ERROR":3, "WARNING":4, "NOTICE":5,
                   "INFO":6, "DEBUG":7}
    label_priority = {}
    for l, i in zip(list(priority_label.keys()),[*range(len(priority_label))]):
        label_priority.update({i: l})

    # Creamos el modelo y el tokenizador
    tokenizer = AutoTokenizer.from_pretrained(pr_tokenizer_checkpoint)
    if tokenizer.pad_token is None:
        tokenizer.add_special_tokens({'pad_token': '[PAD]'})

    model = AutoModelForSequenceClassification.from_pretrained(
        pr_model_checkpoint, num_labels=len(priority_label),
        id2label=label_priority, label2id=priority_label
        )
    def preprocess_function(examples):
        # Tokenizamos las entradas
        max_input_length = 1024
        max_target_length = 128
        model_inputs = tokenizer(examples['description'],
                                 max_length=max_input_length,
                                 truncation=True,
                                 padding=True,
                                 return_tensors="pt")
        # Tokenizamos las etiquetas
        labels = [priority_label[priority] for priority in examples['priority']]
        # Asignamos los tokens como entrada
        model_inputs["labels"] = labels
        return model_inputs

    tokenized_dataset = dataset.map(
        preprocess_function,
        batched=True,
        num_proc = 4,
        remove_columns = list(dataset["train"].to_dict().keys())
    )

    #Sacamos los pesos (para corregir sesgos de clasificación)
    def get_weights(dataset):
        # Transformamos el dataset a un dataframe y agrupamos las etiquetas
        dataset = concatenate_datasets([dataset["train"],dataset["validation"],dataset["test"]])
        df = dataset.to_pandas()
        #Si falta una etiqueta, lo añadimos y ponemos 0 (porque no hay etiquetas)
        label_counts = df["labels"].value_counts(normalize=True)
        print(label_counts)
        for i in range(len(priority_label)):
            if i not in label_counts.index:
                label_counts[i] = 0
        # Calculamos los pesos como la inversa de la frecuencia de ocurrencias
        class_weights = (1-label_counts.sort_index()).values
        print("label weights:", class_weights)
        class_weights = from_numpy(class_weights).float().to("cuda")
        return class_weights

    class_weights = get_weights(tokenized_dataset)
    print(class_weights)

    def compute_metrics(eval_pred):
        metric = load('accuracy')
        predictions, labels = eval_pred
        predictions = np.argmax(predictions, axis=1)
        return metric.compute(predictions=predictions, references=labels)

    model_name = pr_model_checkpoint.split("/")[-1]
    batch_size = 16
    training_args = TrainingArguments(
        output_dir= model_name+"-priority-classifier",
        learning_rate = 2e-5,
        per_device_train_batch_size = batch_size,
        per_device_eval_batch_size = batch_size,
        num_train_epochs = 2,
        weight_decay = 0.01,
        eval_strategy = "epoch",
        save_strategy = "epoch",
    )

    #Creamos el agrupador de datos
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

    #Definimos un entrenador personalizado que tiene en cuenta los pesos
    class WeightedLossTrainer(Trainer):
        def compute_loss(self, model, inputs, return_outputs=False):
            # Añadimos las entradas al modelo y extraemos los logits
            outputs = model(**inputs)
            logits = outputs.get("logits")
            # Extraemos las etiquetas
            labels = inputs.get("labels")
            # Definimos la función de perdida con pesos de sesgo
            loss_func=nn.CrossEntropyLoss (weight=class_weights)
            # Calculamos pérdidas
            loss = loss_func(logits, labels)
            return (loss, outputs) if return_outputs else loss

    trainer =  WeightedLossTrainer(
                model=model,
                args=training_args,
                train_dataset=tokenized_dataset["train"],
                eval_dataset=tokenized_dataset["validation"],
                tokenizer=tokenizer,
                data_collator=data_collator,
                compute_metrics=compute_metrics)

    trainer.train()
    return trainer

print("[*] Creating priority classifier model...")
rule_priority = train_priority(dataset_split)
rule_priority.save_model()

print("[+] Model has been created and saved!")


_Evaluamos el consumo y liberamos memoria_

In [None]:
print_usage()
del rule_priority
cuda.empty_cache()

### Fase 3: Prueba y validación

Una vez entrenado los modelos, generaremos una regla y validaremos si es una regla Falco válida y si se ajusta a la descripción de entrada

#### Validación manual
Se genera un texto de prueba que puede ser evaluado a mano

In [None]:
#@title
from transformers import pipeline, logging

#Callamos las advertencias de transformers menos críticas
logging.set_verbosity_error()

# Descripción de una detección de la técnica T1098.002
# Esta descripción no está en el dataset de reglas.
# Obtenido de MITRE ATT&CK.
test = "Monitor for unusual Exchange and Office 365 "+\
        "email account permissions changes that may indicate "+\
        "excessively broad permissions (including memberships "+\
        "in privileged groups) being granted to compromised accounts."

########### VERSIÓN 1 ###########

#Cargamos el modelo
#@markdown #Variables de la Versión 1
v1_model_name = "t5-small-rulegen" #@param  {type:"string"}
condition_model_name = v1_model_name+"-v1-condition"
output_model_name = v1_model_name+"-v1-output"
v1_tokenizer_checkpoint = "google-t5/t5-small" #@param  {type:"string"}
condition_pipe =  pipeline("text2text-generation",
                           model=condition_model_name,
                           tokenizer=v1_tokenizer_checkpoint)
output_pipe =  pipeline("text2text-generation",
                        model=output_model_name,
                        tokenizer=v1_tokenizer_checkpoint)

print("-------------")
print("VERSION 1:")
print(condition_pipe(test))
print(output_pipe(test))
print("-------------")

########### VERSIÓN 2 ###########
#@markdown ------
#@markdown #Variables de la Versiones 2 y 3
#Cargamos el modelo
v2_model_name = "gpt2" #@param  {type:"string"}
condition_model_name = v2_model_name+"-rulegen-v2-condition"
output_model_name = v2_model_name+"-rulegen-v2-output"
v2_tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer" #@param  {type:"string"}
condition_pipe =  pipeline("text-generation",
                           model=condition_model_name,
                           tokenizer=v2_tokenizer_checkpoint,
                           device = 0)
output_pipe =  pipeline("text-generation",
                        model=output_model_name,
                        tokenizer=v2_tokenizer_checkpoint,
                        device = 0)

print("-------------")
print("VERSION 2:")
print(condition_pipe(test+":"))
print(output_pipe(test+":"))
print("-------------")

########### VERSIÓN 3 ###########
model_name = v2_model_name+"-rulegen-v3"
pipe = pipeline("text-generation",
                model=model_name,
                tokenizer=v2_tokenizer_checkpoint,
                device = 0)
nom_pipe = pipeline("summarization", model="t5-small")
name = nom_pipe(test, min_length = 5, max_length=10)[0]["summary_text"]
print("-------------")
print("VERSION 3:")
print(pipe("rule: "+name+"\n  desc: "+test+"\n  condition: ", max_length = 1000))
print("-------------")

########### GENERADOR LLM ###########
rule, price = gpt_generate_rule(test)
print("-------------")
print("Rule generated by GPT-4o:\n"+rule)
print("-------------")
print("Price:",str(round(price,3))+"$")

########### CLASIFICADOR DE PRIORIDAD ###########
#@markdown ------
#@markdown #Parámetros del modelo clasificador
#@markdown _Nota: el tokenizador es el mismo que el nombre del modelo_
pri_model_name = "distilbert-base-uncased" #@param {type:"string"}
model_name = pri_model_name+"-priority-classifier"
pipe = pipeline("text-classification",
                model=model_name,
                tokenizer=pri_model_name)
print("-------------")
print("Text classification model:")
print(pipe(test))
print("-------------")

#### Pruebas automáticas
Usaremos el dataset de prueba y medimos la tasa de éxito, que es igual a la razón del número de reglas satisfactorias entre el tamaño de la muestra.

##### Función de validación de sintaxis

In [None]:
import re

# Prueba de sintaxis
def check_syntax(condition):
    # Tokenizamos la condición
    tokens = list(filter(None, re.split(r'\s|([^\w@#])', condition)))

    # Verificar paréntesis emparejados
    def check_parentheses(tokens):
        stack = []
        for token in tokens:
            if token == '(':
                stack.append(token)
            elif token == ')':
                if not stack:
                    return False
                stack.pop()
        return not stack

    if not check_parentheses(tokens):
        return False

    # Definir operadores y funciones válidas
    bin_operators = {"=", "!=", "<=", ">=", ">",
                           "contains", "icontains", "startswith","endswith",
                            "glob", "in", "and", "or", "intersects", "pmatch",
                            "exists", "bcontains", "bstartswith"}
    uni_operators = {"val", "tolower", "toupper"}
    i = 0
    n = len(tokens)
    while i < n:
        token = tokens[i]
        # Verificar operadores binarios
        if token in bin_operators:
            if token == "glob":
                if i + 1 >= n or tokens[i + 1] != '"':
                    return False
                i += 2
            elif (i == 0 or
                  i == n - 1 or
                  tokens[i - 1] in bin_operators or
                  tokens[i + 1] in bin_operators):
                pass  # Operadores binarios lógicos sin valor a la derecha están permitidos
            else:
                i += 1
        # Verificar operadores unitarios
        elif token in uni_operators:
            if i + 1 >= n or tokens[i + 1] != '(':
                return False
            j = i + 2
            while j < n and tokens[j] != ')':
                j += 1
            if j >= n:
                return False
            i = j
        # Verificar conjuntos entre paréntesis
        elif token == '(':
            j = i + 1
            while j < n and tokens[j] != ')':
                if tokens[j] == ',' and (j == i + 1 or
                                         j == n - 1 or
                                         tokens[j + 1] == ',' or
                                         tokens[j - 1] == ','):
                    return False
                j += 1
            if j >= n:
                return False
            i = j

        i += 1
    return True

condition = 'tolower(val(name)) contains john and age >='
result = check_syntax(condition)
print(result)  # True means correct, False means invalid

##### Modelo calculador de afinidad semántica

_Creamos el modelo de semantica:_

In [None]:
#Usando la versión 1 para la tarea de resumen,
#creamos un modelo que resuma una regla y a
#partir del resumen evaluamos la similitud
from torch import no_grad, cuda
from transformers import AutoModel, logging
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from datasets import load_from_disk

#Callamos las advertencias de transformers menos críticas
logging.set_verbosity_error()

# Generamos el modelo extractor de semántica
dataset = load_from_disk("files/datasets/dataset_split")

model_checkpoint = "google-t5/t5-small" #@param  {type:"string"}
tokenizer_checkpoint = "google-t5/t5-small" #@param  {type:"string"}

print("Initiating training script...")
sem_extractor_trainer = train_v1(dataset,
                        model_checkpoint,
                        tokenizer_checkpoint,
                        "condition",
                        "description",
                        "sem-extractor")
sem_extractor_trainer.save_model()


_Evaluamos el consumo y liberamos memoria:_

In [None]:
print_usage()
del sem_extractor_trainer
cuda.empty_cache()

_Declaramos las funciones de sintaxis y de similitud:_

In [None]:
# Función para calcular la similitud coseno entre dos textos
def check_similarity(text1, text2):
        # Crear un vectorizador de conteo de palabras
        vectorizer = CountVectorizer().fit([text1, text2])

        # Transformar los textos en vectores de conteo
        vectors = vectorizer.transform([text1, text2])

        # Calcular la similaridad del coseno entre los dos vectores
        similarity = cosine_similarity(vectors)[0][1]
        return similarity

# Definimos función de similaridad
def check_semantic(condition: str, description: str):
    model_name = "./"+model_checkpoint.split("/")[-1]+"-rulegen-sem-extractor"

    #Cargamos el modelo y el tokenizador
    summarizer = AutoModel.from_pretrained(model_name)
    summarizer = pipeline("text2text-generation",
                        model=model_name,
                        tokenizer=tokenizer_checkpoint,
                        device = 0)

    #Generamos el resumen de la condición
    res = summarizer(condition,
                     min_length = int(len(description)*0.6),
                     max_length = len(description))[-1]["generated_text"]


    return check_similarity(res,description)



##### Ejecución de funciones

In [None]:
#@title
from datasets import load_from_disk
from transformers import pipeline, logging
import yaml

#Silenciamos las advertencias de transformers menos críticas
logging.set_verbosity_error()

# Ejecutamos las funciones declaradas para realizar las pruebas
# Cargamos el dataset de prueba
test_dataset = load_from_disk("files/datasets/dataset_split")["test"]

# Para almacenar los costes
prices = []

############## VARIABLES ##############
v1_model_name = "t5-small-rulegen"
v1_tokenizer_checkpoint = "google-t5/t5-small"
v2_model_name = "gpt2"
v2_tokenizer_checkpoint = "sgugger/gpt2-like-tokenizer"

#######################################

# Generador de reglas
def generate_rule(description: str, version: str):

    # Creamos las variables
    if version == 'v1':
        condition_model_name = "./"+v1_model_name.split("/")[-1]+"-v1-condition"

        condition_pipe =  pipeline("text2text-generation",
                                   model=condition_model_name,
                                   tokenizer=v1_tokenizer_checkpoint,
                                   device = 0)

    elif version == 'v2':
        condition_model_name = "./"+v2_model_name.split("/")[-1]+"-rulegen-v2-condition"
        condition_pipe =  pipeline("text-generation",
                                   model=condition_model_name,
                                   tokenizer=v2_tokenizer_checkpoint,
                                   device = 0)

    elif version == 'v3':
        model_name = "./"+v2_model_name.split("/")[-1]+"-rulegen-v3"
        pipe = pipeline("text-generation",
                        model=model_name,
                        tokenizer=v2_tokenizer_checkpoint,
                        device = 0)

    # Generamos y devolvemos la regla
    if version == 'v1' or version == 'v2':
        # La versión 2 usa un separador que indica cuando
        # empieza la generación de la condición/salida
        size = 20
        if version == 'v2':
            sep = ":"
            size += len(description+sep)
        else:
            sep = ""

        condition = condition_pipe(description+sep, max_length = size)[-1]["generated_text"]
        if sep != "":
            condition = condition.split(sep)[-1]

        # Evaluamos consumo y limpiamos la memoria
        print_usage()
        del condition_pipe
        cuda.empty_cache()

        return condition

    elif version == 'v3':
        # Versión 3 devuelve una regla completa
        generated_rule = pipe("desc: "+description+"\n  condition: ", max_length=1000)[-1]["generated_text"]
        # Evaluamos consumo y limpiamos la memoria
        print_usage()
        del pipe
        cuda.empty_cache()

        try:
            rule = yaml.safe_load(generated_rule)
            return rule[0]['condition'] if type(rule) == list else rule['condition']
        except:
            return "None)"

    elif version == 'llm':
        try:
            output, price = gpt_generate_rule(description)
            prices.append(price)
            print("Prices:",prices)
            rule = yaml.safe_load(output)
            return rule[0]['condition'] if type(rule) == list else rule['condition']
        except:
            return "None)"

# Realizamos las pruebas
results = {
    'v1': {'syntax':None, 'similarity': None, 'semantic': None},
    'v2': {'syntax':None, 'similarity': None, 'semantic': None},
    'v3': {'syntax':None, 'similarity': None, 'semantic': None},
    'llm': {'syntax':None, 'similarity': None, 'semantic': None, 'avg_price':None}
}


for version in results.keys():
    syntax_res = 0   #Nº de reglas cuya sintaxis es correcta
    semantic_res = []    #Afinidad semántica de las reglas
    similarity_res = []  #Similitud de las reglas con las originales
    for description, condition in zip(test_dataset['description'], test_dataset['condition']):
        generated_condition = generate_rule(description, version)
        syntax_res += 1 if check_syntax(generated_condition) else 0
        semantic_res.append(check_semantic(generated_condition, description))
        similarity_res.append(check_similarity(generated_condition, condition))

    # Calculamos el ratio de satisfacción sintáctica, semántica y de similitud
    dataset_size = len(test_dataset)
    #print(f"----------\nsize: {dataset_size}\nsimilarity_hits: {similarity_res}\nsemantic_hits: {semantic_success}\nsyntax_hits:{syntax_success}")
    results[version]['syntax'] = round(syntax_res/dataset_size,4)
    results[version]['semantic'] = round(sum(semantic_res)/len(semantic_res),4)
    results[version]['similarity'] = round(sum(similarity_res)/len(similarity_res),4)
    if version == 'llm':
        results[version]['avg_price'] = round(sum(prices)/len(prices),3)

# Imprimimos los resultados
print("====================================")
print("ATTENTION: the rules have NOT been expanded")
for version in results.keys():
    print(f"{version}: {results[version]}")


### Aplicación final
Aquí definimos las funciones que permiten usar los modelos afinados y los modelos pre-entrenados mediante prompt.

In [None]:
#@title
import yaml
from transformers import pipeline
#Callamos las advertencias de transformers menos críticas
#logging.set_verbosity_error()

########### VARIABLES ###########
#@markdown ### Elegir versión e indicar descripción
version       = "llm" #@param ["v1","v2","v3","llm"]
description   = "Monitor for unusual Exchange and Office 365 email account permissions changes that may indicate excessively broad permissions (including memberships in privileged groups) being granted to compromised accounts." #@param {type: "string"}
#@markdown ----------------------
#@markdown ###Variables (rellenarlas todas. V2 y V3 usan las mismas)
v1_model_name = "t5-small" #@param {type: "string"}
v1_tokenizer  = "t5-small" #@param {type: "string"}
v2_model_name = "gpt2" #@param {type: "string"}
v2_tokenizer  = "sgugger/gpt2-like-tokenizer" #@param {type: "string"}
pr_model_name = "distilbert-base-uncased" #@param {type: "string"}
pr_tokenizer  = "distilbert-base-uncased" #@param {type: "string"}

#################################

# Función generador de reglas
def gen_rule(description: str, version: str) -> str:
    if description != "":
        # VERSIÓN 1
        if version == 'v1' and\
        v1_model_name != "" and\
        pr_model_name != "" and\
        v1_tokenizer  != "":
            pipe_gen = pipeline("text2text-generation",
                                model = "./"+v1_model_name.split("/")[-1]+"-rulegen-v1-condition",
                                tokenizer = v1_tokenizer,
                                device = 0)
            pipe_out = pipeline("text2text-generation",
                                model = "./"+v1_model_name.split("/")[-1]+"-rulegen-v1-output",
                                tokenizer = v1_tokenizer,
                                device = 0)
            pipe_pri = pipeline("text-classification",
                                model = "./"+pr_model_name.split("/")[-1]+"-priority-classifier",
                                tokenizer = pr_tokenizer,
                                device = 0)
            pipe_nom = pipeline("summarization",
                                model = v1_model_name,
                                device = 0)
            res = {"rule": pipe_nom(description,
                                    min_length = 5,
                                    max_length = 10)[-1]["summary_text"],
                   "desc": description,
                   "condition": pipe_gen(description, min_length = 50)[-1]["generated_text"],
                   "output": pipe_out(description, min_length = 50)[-1]["generated_text"],
                   "priority":pipe_pri(description)[-1]["label"],
                   }
            print_usage()
            del pipe_gen, pipe_nom, pipe_out, pipe_pri
            cuda.empty_cache()
            return yaml.safe_dump(res, sort_keys=False).strip().replace("[]","")+"\n"

        # VERSIÓN 2
        elif version == 'v2' and\
        v2_model_name != "" and\
        pr_model_name != "" and\
        v2_tokenizer  != "":
            pipe_gen = pipeline("text-generation",
                                model = "./"+v2_model_name.split("/")[-1]+"-rulegen-v2-condition",
                                tokenizer = v2_tokenizer)
            pipe_out = pipeline("text-generation",
                                model = "./"+v2_model_name.split("/")[-1]+"-rulegen-v2-output",
                                tokenizer = v2_tokenizer)
            pipe_pri = pipeline("text-classification",
                                model = "./"+pr_model_name.split("/")[-1]+"-priority-classifier",
                                tokenizer = pr_tokenizer)
            pipe_nom = pipeline("summarization",
                                model = v2_model_name)
            res = {"rule": pipe_nom(description,
                                    min_length = len(description),
                                    max_length = len(description)+10)[-1]["summary_text"],
                   "desc": description,
                   "condition": pipe_gen(description+":", min_length = 50)[-1]["generated_text"].split(description)[-1],
                   "output": pipe_out(description+":", min_length = 50)[-1]["generated_text"].split(description)[-1],
                   "priority":pipe_pri(description)[-1]["label"],
                   }
            print_usage()
            del pipe_gen, pipe_nom, pipe_out, pipe_pri
            cuda.empty_cache()
            return yaml.safe_dump(res, sort_keys=False).strip().replace("[]","")+"\n"

        # VERSIÓN 3
        elif version == 'v3' and v2_model_name != "" and v2_tokenizer  != "":
            pipe = pipeline("text-generation",
                            model = "./"+v2_model_name.split("/")[-1]+"-rulegen-v3",
                            tokenizer = v2_tokenizer)
            pipe_nom = pipeline("text-generation",
                                model = v2_model_name)
            name = pipe_nom(description+"title: ", max_new_tokens = 10)[0]["generated_text"].split(description+"title: ")[-1]
            print_usage()
            del pipe, pipe_nom
            cuda.empty_cache()
            return pipe("description:"+description+"\n  condition:")

        # GENERADOR LLM
        elif version == 'llm':
            return gpt_generate_rule(description)

        # Gestión de errores
        else:
            if version not in ["v1","v2","v3","llm"]:
                raise Exception("Wrong version specified")
            else:
                Exception("Some parameters are empty, please fulfill them.")
    else:
        Exception("The description is empty, please fulfill it.")



print("The generated rule is:",gen_rule(description, version),sep="\n")

### Creditos

*©2023 Carel Sánchez*

*Algunos derechos reservados*

*Este documento se distribuye bajo la licencia "Atribución 4.0 Internacional" de Creative Commons*