In [1]:
import header

In [3]:
import ipywidgets as widgets
from IPython.display import display
import os
import zipfile
from datetime import datetime
from src.ai.openai.model.training import Model
from src import gs
from src.utils import j_loads, j_dumps

# Инициализация модели
model = Model()

# Виджеты
file_upload = widgets.FileUpload(accept='.txt', multiple=True)
dataset_button = widgets.Button(description='Select Dataset')
train_button = widgets.Button(description='Train Model')
test_button = widgets.Button(description='Test Model')
save_button = widgets.Button(description='Save Job ID')
output = widgets.Output()

def on_upload_change(change):
    with output:
        output.clear_output()
        if file_upload.value:
            files = [file for file in file_upload.value.values()]
            display(files)

file_upload.observe(on_upload_change, names='value')

def on_dataset_button_clicked(b):
    with output:
        output.clear_output()
        path_to_dir = './dataset/'  # Путь к директории для набора данных
        dataset = select_dataset_and_archive(path_to_dir, positive=True)
        if dataset:
            print(f"Dataset selected and archived: {len(dataset)} items")
        else:
            print("Failed to select or archive dataset")

def on_train_button_clicked(b):
    with output:
        output.clear_output()
        dataset = select_dataset_and_archive('./dataset/', positive=True)
        if dataset:
            job_id = model.train(dataset, positive=True)
            if job_id:
                print(f"Training started, Job ID: {job_id}")
            else:
                print("Failed to start training")
        else:
            print("Failed to select dataset")

def on_test_button_clicked(b):
    with output:
        output.clear_output()
        test_data = [
            {"name": "Test 1", "text": "Sample text for testing", "label": "positive"}
        ]  # Замените на актуальные данные для тестирования
        predictions = model.test(test_data)
        if predictions:
            for entry, prediction in zip(test_data, predictions):
                print(f"Page: {entry['name']}, Prediction: {prediction}, Actual: {entry['label']}")
        else:
            print("Failed to test the model")

def on_save_button_clicked(b):
    with output:
        output.clear_output()
        job_id = model.current_job_id
        if job_id:
            model.save_job_id(job_id, "Training job description")
            print(f"Job ID {job_id} saved successfully")
        else:
            print("No Job ID to save")

def select_dataset_and_archive(path_to_dir: str, positive: bool = True):
    """Select a dataset for training the model and archive the files."""
    dataset = []
    for filename in os.listdir(path_to_dir):
        if filename.endswith('.txt'):
            with open(os.path.join(path_to_dir, filename), 'r', encoding='utf-8') as file:
                text = file.read()
                dataset.append({"text": text, "label": "positive" if positive else "negative", "name": filename})
    # Архивируем файлы
    archive_files(path_to_dir)
    return dataset

def archive_files(directory: str):
    """Archive all files in the specified directory with the current date in the filename."""
    current_date = datetime.now().strftime('%Y-%m-%d')
    archive_filename = f"{current_date}_files.zip"
    with zipfile.ZipFile(archive_filename, 'w') as zipf:
        for filename in os.listdir(directory):
            if filename.endswith('.txt'):
                zipf.write(os.path.join(directory, filename), filename)
    print("Files archived successfully.")

def create_vector_store(client, name: str, file_paths: list[str] = None) -> str | None:
    """Create a vector store and optionally add files to it."""
    try:
        if file_paths:
            # Создание векторного хранилища с файлами
            file_ids = [upload_file(client, path) for path in file_paths]
            response = client.beta.vector_stores.create(
                name=name,
                file_ids=file_ids
            )
        else:
            # Создание векторного хранилища без файлов
            response = client.beta.vector_stores.create(name=name)
        
        print(f"Vector store created: {response}")
        return response.id
    except Exception as ex:
        print(f"An error occurred while creating the vector store: {ex}")
        return

def upload_file(client, file_path: str) -> str | None:
    """Upload a file and return its ID."""
    try:
        with open(file_path, "rb") as file:
            response = client.beta.files.create(file=file)
            file_id = response.id
            print(f"File uploaded: {file_id}")
            return file_id
    except Exception as ex:
        print(f"An error occurred while uploading file {file_path}: {ex}")
        return

def upload_files_to_vector_store(client, vector_store_id: str, file_paths: List[str]):
    """Upload files to an existing vector store."""
    try:
        file_ids = [upload_file(client, path) for path in file_paths]
        response = client.beta.vector_stores.update(
            vector_store_id=vector_store_id,
            file_ids=file_ids
        )
        print(f"Files added to vector store: {response}")
    except Exception as ex:
        print(f"An error occurred while uploading files: {ex}")

def update_assistant_with_file_search(client, assistant_id: str, vector_store_id: str):
    """Update the assistant to include the file_search tool and link the vector store."""
    try:
        assistant = client.beta.assistants.get(assistant_id)
        updated_tools = assistant.get('tools', []) + [{"type": "file_search"}]
        updated_assistant = client.beta.assistants.update(
            assistant_id,
            tool_resources={"file_search": {"vector_store_ids": [vector_store_id]}}
        )
        print(f"Assistant updated successfully: {updated_assistant}")
    except Exception as ex:
        print(f"An error occurred while updating the assistant: {ex}")

# Отображение виджетов
display(file_upload)
display(dataset_button)
display(train_button)
display(test_button)
display(save_button)
display(output)

# Привязка обработчиков событий
dataset_button.on_click(on_dataset_button_clicked)
train_button.on_click(on_train_button_clicked)
test_button.on_click(on_test_button_clicked)
save_button.on_click(on_save_button_clicked)


NameError: name 'List' is not defined