In [9]:
# Файл: prepare_logs_to_jsonl.py

import json
import re
from transformers import AutoTokenizer
from tqdm import tqdm

# --- Конфигурация ---
model_name = "iiiorg/piiranha-v1-detect-personal-information"
input_file = "synthetic_logs.csv"
output_jsonl_file = "piiranha_formatted_logs.jsonl"

# --- Персональные теги ---
personal_tags = {
    "client_ip": "IP",
    "username": "USERNAME",
    "accountName": "ACCOUNT",
    "telephone": "PHONE",
    "creditCard": "CREDIT_CARD",
    "user_id": "USERID",
    "accountID": "ACCOUNTID",
    "login": "LOGIN",
    "givenName": "NAME",
}

tokenizer = AutoTokenizer.from_pretrained(model_name)

def tag_text_with_labels(text: str, tag_dict: dict):
    matches = list(re.finditer(r"<(.*?)>(.*?)</\1>", text))
    entities = []
    cleaned_parts = []
    last_end = 0
    offset_shift = 0

    for match in matches:
        tag, value = match.group(1), match.group(2)
        if tag in tag_dict:
            start = match.start(2) - offset_shift
            end = match.end(2) - offset_shift
            entities.append({"start": start, "end": end, "label": tag_dict[tag]})
        cleaned_parts.append(text[last_end:match.start()])
        cleaned_parts.append(value)
        offset_shift += len(match.group(0)) - len(value)
        last_end = match.end()

    cleaned_parts.append(text[last_end:])
    clean_text = "".join(cleaned_parts)

    # Токенизация
    encoding = tokenizer(clean_text, return_offsets_mapping=True, add_special_tokens=False)
    tokens = tokenizer.convert_ids_to_tokens(encoding["input_ids"])

    # BIO-маркировка
    labels = ["O"] * len(encoding["offset_mapping"])
    for ent in entities:
        for i, (start, end) in enumerate(encoding["offset_mapping"]):
            if start >= ent["start"] and end <= ent["end"]:
                labels[i] = f"I-{ent['label']}"
        for i, (start, end) in enumerate(encoding["offset_mapping"]):
            if start == ent["start"]:
                labels[i] = f"B-{ent['label']}"

    return tokens, labels

# --- Чтение логов из файла ---
def load_logs(file_path):
    with open(file_path, "r", encoding="utf-8") as f:
        lines = f.readlines()
    return [
        line.strip().strip('"')
        for line in lines
        if line.strip() and not line.lower().startswith("log_text")
    ]

# --- Основной процесс ---
logs = load_logs(input_file)

with open(output_jsonl_file, "w", encoding="utf-8") as out_file:
    for log in tqdm(logs, desc="Обработка логов"):
        tokens, labels = tag_text_with_labels(log, personal_tags)
        out_file.write(json.dumps({"tokens": tokens, "ner_labels": labels}, ensure_ascii=False) + "\n")

print(f"✅ Файл сохранён: {output_jsonl_file}")


Обработка логов: 100%|██████████| 2417/2417 [00:01<00:00, 1946.20it/s]

✅ Файл сохранён: piiranha_formatted_logs.jsonl



