In [1]:
import header
import ipywidgets as widgets
from IPython.display import display
import os
import zipfile
from datetime import datetime
from src.ai.openai.model.training import Model
from src.settings import gs
from src.utils import j_loads, j_dumps
from src.utils import pprint
# Initialize the model
model = Model()

Enter KeePass master password:  ········


In [2]:
# Widgets
file_upload = widgets.FileUpload(accept='.txt', multiple=True)
dataset_button = widgets.Button(description='Select Dataset')
train_button = widgets.Button(description='Train Model')
test_button = widgets.Button(description='Test Model')
save_button = widgets.Button(description='Save Job ID')
create_vector_store_button = widgets.Button(description='Create Vector Store')
upload_files_button = widgets.Button(description='Upload Files to Vector Store')
update_assistant_button = widgets.Button(description='Update Assistant')
output = widgets.Output()

def on_upload_change(change):
    with output:
        output.clear_output()
        if file_upload.value:
            files = [file for file in file_upload.value.values()]
            display(files)

file_upload.observe(on_upload_change, names='value')

def on_dataset_button_clicked(b):
    with output:
        output.clear_output()
        path_to_dir = './dataset/'  # Path to directory with dataset
        dataset = select_dataset_and_archive(path_to_dir, positive=True)
        if dataset:
            print(f"Dataset selected and archived: {len(dataset)} items")
        else:
            print("Failed to select or archive dataset")

def on_train_button_clicked(b):
    with output:
        output.clear_output()
        dataset = select_dataset_and_archive('./dataset/', positive=True)
        if dataset:
            job_id = model.train(dataset, positive=True)
            if job_id:
                print(f"Training started, Job ID: {job_id}")
            else:
                print("Failed to start training")
        else:
            print("Failed to select dataset")

def on_test_button_clicked(b):
    with output:
        output.clear_output()
        test_data = [
            {"name": "Test 1", "text": "Sample text for testing", "label": "positive"}
        ]  # Replace with real test data
        predictions = model.test(test_data)
        if predictions:
            for entry, prediction in zip(test_data, predictions):
                print(f"Page: {entry['name']}, Prediction: {prediction}, Actual: {entry['label']}")
        else:
            print("Failed to test the model")

def on_save_button_clicked(b):
    with output:
        output.clear_output()
        job_id = model.current_job_id
        if job_id:
            model.save_job_id(job_id, "Training job description")
            print(f"Job ID {job_id} saved successfully")
        else:
            print("No Job ID to save")

def on_create_vector_store_button_clicked(b):
    with output:
        output.clear_output()
        client = ...  # Initialize your API client
        name = 'New Vector Store'  # Name of the vector store
        file_paths = []  # Add file paths if needed
        vector_store_id = model.create_vector_store(client, name, file_paths)
        if vector_store_id:
            print(f"Vector store created, ID: {vector_store_id}")
        else:
            print("Failed to create vector store")

def on_upload_files_button_clicked(b):
    with output:
        output.clear_output()
        client = ...  # Initialize your API client
        vector_store_id = 'your_vector_store_id'  # Replace with your vector store ID
        file_paths = []  # Add file paths
        model.upload_files_to_vector_store(client, vector_store_id, file_paths)
        print("Files uploaded to vector store")

def on_update_assistant_button_clicked(b):
    with output:
        output.clear_output()
        client = ...  # Initialize your API client
        assistant_id = model.assistant_id  # Use the assistant ID from the model
        vector_store_id = 'your_vector_store_id'  # Replace with your vector store ID
        model.update_assistant_with_file_search(client, assistant_id, vector_store_id)
        print("Assistant updated successfully")

def select_dataset_and_archive(path_to_dir: str, positive: bool = True) -> list[dict[str, str]]:
    """Select a dataset for training the model and archive the files.

    @param path_to_dir: Path to the directory containing the dataset files.
    @param positive: Boolean flag indicating if the label should be positive (True) or negative (False).
    @return: List of dictionaries containing 'text', 'label', and 'name' of each file.

    Example:
        dataset = select_dataset_and_archive('./dataset/', positive=True)
    """
    dataset = []
    for filename in os.listdir(path_to_dir):
        if filename.endswith('.txt'):
            with open(os.path.join(path_to_dir, filename), 'r', encoding='utf-8') as file:
                text = file.read()
                dataset.append({"text": text, "label": "positive" if positive else "negative", "name": filename})
    # Archive files
    archive_files(path_to_dir)
    return dataset

def archive_files(directory: str):
    """Archive all files in the specified directory with the current date in the filename.

    @param directory: Directory containing the files to archive.
    """
    current_date = datetime.now().strftime('%Y-%m-%d')
    archive_filename = f"{current_date}_files.zip"
    with zipfile.ZipFile(archive_filename, 'w') as zipf:
        for filename in os.listdir(directory):
            if filename.endswith('.txt'):
                zipf.write(os.path.join(directory, filename), filename)
    print("Files archived successfully.")

# Display widgets
display(file_upload)
display(dataset_button)
display(train_button)
display(test_button)
display(save_button)
display(create_vector_store_button)
display(upload_files_button)
display(update_assistant_button)
display(output)

# Bind event handlers
dataset_button.on_click(on_dataset_button_clicked)
train_button.on_click(on_train_button_clicked)
test_button.on_click(on_test_button_clicked)
save_button.on_click(on_save_button_clicked)
create_vector_store_button.on_click(on_create_vector_store_button_clicked)
upload_files_button.on_click(on_upload_files_button_clicked)
update_assistant_button.on_click(on_update_assistant_button_clicked)


FileUpload(value=(), accept='.txt', description='Upload', multiple=True)

Button(description='Select Dataset', style=ButtonStyle())

Button(description='Train Model', style=ButtonStyle())

Button(description='Test Model', style=ButtonStyle())

Button(description='Save Job ID', style=ButtonStyle())

Button(description='Create Vector Store', style=ButtonStyle())

Button(description='Upload Files to Vector Store', style=ButtonStyle())

Button(description='Update Assistant', style=ButtonStyle())

Output()

In [3]:
thread = model.client.beta.threads.create()

In [4]:
message = model.client.beta.threads.messages.create(
  thread_id=thread.id,
  role="user",
  content="Привет?"
)

In [5]:
pprint(message)

Message(id='msg_uKjiCQ4dkYhpAq8PK9QzFsc4', assistant_id=None, attachments=[], completed_at=None, content=[TextContentBlock(text=Text(annotations=[], value='Привет?'), type='text')], created_at=1721727924, incomplete_at=None, incomplete_details=None, metadata={}, object='thread.message', role='user', run_id=None, status=None, thread_id='thread_ugfttTOeQeVUeqHLachlB9j2')
Class: Message
['BaseModel']
Bases: None
Methods:
_calculate_keys()
_check_frozen()
_copy_and_set_values()
_get_value()
_iter()
construct()
copy()
dict()
from_orm()
json()
model_construct()
model_copy()
model_dump()
model_dump_json()
model_json_schema()
model_parametrized_name()
model_post_init()
model_rebuild()
model_validate()
model_validate_json()
model_validate_strings()
parse_file()
parse_obj()
parse_raw()
schema()
schema_json()
to_dict()
to_json()
update_forward_refs()
validate()
Properties:
_abc_impl = <_abc._abc_data object at 0x0000021C596D1180>
assistant_id = None
attachments = []
completed_at = None
content = 

In [6]:
th = model.client.beta.threads.retrieve(thread.id)

In [7]:
pprint(th)

Thread(id='thread_ugfttTOeQeVUeqHLachlB9j2', created_at=1721727924, metadata={}, object='thread', tool_resources=ToolResources(code_interpreter=ToolResourcesCodeInterpreter(file_ids=[]), file_search=None))
Class: Thread
['BaseModel']
Bases: None
Methods:
_calculate_keys()
_check_frozen()
_copy_and_set_values()
_get_value()
_iter()
construct()
copy()
dict()
from_orm()
json()
model_construct()
model_copy()
model_dump()
model_dump_json()
model_json_schema()
model_parametrized_name()
model_post_init()
model_rebuild()
model_validate()
model_validate_json()
model_validate_strings()
parse_file()
parse_obj()
parse_raw()
schema()
schema_json()
to_dict()
to_json()
update_forward_refs()
validate()
Properties:
_abc_impl = <_abc._abc_data object at 0x0000021C58D42880>
created_at = 1721727924
id = thread_ugfttTOeQeVUeqHLachlB9j2
metadata = {}
model_computed_fields = {}
model_config = {'extra': 'allow', 'defer_build': True}
model_extra = {}
model_fields = {'id': FieldInfo(annotation=str, required=Tru

In [8]:
with model.client.beta.threads.runs.stream(
  thread_id=model.thread.id,
  assistant_id=model.assistant.id
) as stream:
    response:str = ''
    for text in stream.text_deltas:
        response += text
    pprint(response)
    pprint('-------------')



It seems you've uploaded multiple files. How would you like to proceed with these files? Would you like to extract specific information, analyze them, or apply any specific transformations to the content?
-------------


In [9]:
pprint(model.client.beta.threads)

<openai.resources.beta.threads.threads.Threads object at 0x0000021C716B3920>
Class: Threads
['SyncAPIResource']
Bases: None
Methods:
_delete()
_get()
_get_api_list()
_patch()
_post()
_put()
_sleep()
create()
create_and_run()
create_and_run_poll()
create_and_run_stream()
delete()
retrieve()
update()
Properties:
_client = <openai.OpenAI object at 0x0000021C57AC0BC0>
messages = <openai.resources.beta.threads.messages.Messages object at 0x0000021C7146E630>
runs = <openai.resources.beta.threads.runs.runs.Runs object at 0x0000021C71E1EBD0>
with_raw_response = <openai.resources.beta.threads.threads.ThreadsWithRawResponse object at 0x0000021C71EAB2C0>
with_streaming_response = <openai.resources.beta.threads.threads.ThreadsWithStreamingResponse object at 0x0000021C71E0B620>


In [10]:
# Import additional required modules
import ipywidgets as widgets
from IPython.display import display
import os
import zipfile
from datetime import datetime

# Existing code and widget setup

# Widgets for existing functionalities
file_upload = widgets.FileUpload(accept='.txt', multiple=True)
dataset_button = widgets.Button(description='Select Dataset')
train_button = widgets.Button(description='Train Model')
test_button = widgets.Button(description='Test Model')
save_button = widgets.Button(description='Save Job ID')
create_vector_store_button = widgets.Button(description='Create Vector Store')
upload_files_button = widgets.Button(description='Upload Files to Vector Store')
update_assistant_button = widgets.Button(description='Update Assistant')
output = widgets.Output()

# New widgets for message input and sending
message_input = widgets.Textarea(
    description='Message:',
    placeholder='Type your message here',
    layout=widgets.Layout(width='100%', height='100px')
)
send_button = widgets.Button(description='Send')

# Display widgets
display(file_upload)
display(dataset_button)
display(train_button)
display(test_button)
display(save_button)
display(create_vector_store_button)
display(upload_files_button)
display(update_assistant_button)
display(message_input)
display(send_button)
display(output)

# Existing function definitions
def on_upload_change(change):
    with output:
        output.clear_output()
        if file_upload.value:
            files = [file for file in file_upload.value.values()]
            display(files)

file_upload.observe(on_upload_change, names='value')

def on_dataset_button_clicked(b):
    with output:
        output.clear_output()
        path_to_dir = './dataset/'  # Path to directory with dataset
        dataset = select_dataset_and_archive(path_to_dir, positive=True)
        if dataset:
            print(f"Dataset selected and archived: {len(dataset)} items")
        else:
            print("Failed to select or archive dataset")

def on_train_button_clicked(b):
    with output:
        output.clear_output()
        dataset = select_dataset_and_archive('./dataset/', positive=True)
        if dataset:
            job_id = model.train(dataset, positive=True)
            if job_id:
                print(f"Training started, Job ID: {job_id}")
            else:
                print("Failed to start training")
        else:
            print("Failed to select dataset")

def on_test_button_clicked(b):
    with output:
        output.clear_output()
        test_data = [
            {"name": "Test 1", "text": "Sample text for testing", "label": "positive"}
        ]  # Replace with real test data
        predictions = model.test(test_data)
        if predictions:
            for entry, prediction in zip(test_data, predictions):
                print(f"Page: {entry['name']}, Prediction: {prediction}, Actual: {entry['label']}")
        else:
            print("Failed to test the model")

def on_save_button_clicked(b):
    with output:
        output.clear_output()
        job_id = model.current_job_id
        if job_id:
            model.save_job_id(job_id, "Training job description")
            print(f"Job ID {job_id} saved successfully")
        else:
            print("No Job ID to save")

def on_create_vector_store_button_clicked(b):
    with output:
        output.clear_output()
        client = ...  # Initialize your API client
        name = 'New Vector Store'  # Name of the vector store
        file_paths = []  # Add file paths if needed
        vector_store_id = model.create_vector_store(client, name, file_paths)
        if vector_store_id:
            print(f"Vector store created, ID: {vector_store_id}")
        else:
            print("Failed to create vector store")

def on_upload_files_button_clicked(b):
    with output:
        output.clear_output()
        client = ...  # Initialize your API client
        vector_store_id = 'your_vector_store_id'  # Replace with your vector store ID
        file_paths = []  # Add file paths
        model.upload_files_to_vector_store(client, vector_store_id, file_paths)
        print("Files uploaded to vector store")

def on_update_assistant_button_clicked(b):
    with output:
        output.clear_output()
        client = ...  # Initialize your API client
        assistant_id = model.assistant_id  # Use the assistant ID from the model
        vector_store_id = 'your_vector_store_id'  # Replace with your vector store ID
        model.update_assistant_with_file_search(client, assistant_id, vector_store_id)
        print("Assistant updated successfully")

# New functions for message sending
def on_send_button_clicked(b):
    with output:
        output.clear_output()
        message = message_input.value
        if message:
            response = model.send_message(message)
            print(f"Response from model: {response}")
        else:
            print("No message to send")

# Bind event handlers to buttons
dataset_button.on_click(on_dataset_button_clicked)
train_button.on_click(on_train_button_clicked)
test_button.on_click(on_test_button_clicked)
save_button.on_click(on_save_button_clicked)
create_vector_store_button.on_click(on_create_vector_store_button_clicked)
upload_files_button.on_click(on_upload_files_button_clicked)
update_assistant_button.on_click(on_update_assistant_button_clicked)
send_button.on_click(on_send_button_clicked)

# Function to select dataset and archive
def select_dataset_and_archive(path_to_dir: str, positive: bool = True) -> list[dict[str, str]]:
    """Select a dataset for training the model and archive the files.

    @param path_to_dir: Path to the directory containing the dataset files.
    @param positive: Boolean flag indicating if the label should be positive (True) or negative (False).
    @return: List of dictionaries containing 'text', 'label', and 'name' of each file.

    Example:
        dataset = select_dataset_and_archive('./dataset/', positive=True)
    """
    dataset = []
    for filename in os.listdir(path_to_dir):
        if filename.endswith('.txt'):
            with open(os.path.join(path_to_dir, filename), 'r', encoding='utf-8') as file:
                text = file.read()
                dataset.append({"text": text, "label": "positive" if positive else "negative", "name": filename})
    # Archive files
    archive_files(path_to_dir)
    return dataset

def archive_files(directory: str):
    """Archive all files in the specified directory with the current date in the filename.

    @param directory: Directory containing the files to archive.
    """
    current_date = datetime.now().strftime('%Y-%m-%d')
    archive_filename = f"{current_date}_files.zip"
    with zipfile.ZipFile(archive_filename, 'w') as zipf:
        for filename in os.listdir(directory):
            if filename.endswith('.txt'):
                zipf.write(os.path.join(directory, filename), filename)
    print("Files archived successfully.")


FileUpload(value=(), accept='.txt', description='Upload', multiple=True)

Button(description='Select Dataset', style=ButtonStyle())

Button(description='Train Model', style=ButtonStyle())

Button(description='Test Model', style=ButtonStyle())

Button(description='Save Job ID', style=ButtonStyle())

Button(description='Create Vector Store', style=ButtonStyle())

Button(description='Upload Files to Vector Store', style=ButtonStyle())

Button(description='Update Assistant', style=ButtonStyle())

Textarea(value='', description='Message:', layout=Layout(height='100px', width='100%'), placeholder='Type your…

Button(description='Send', style=ButtonStyle())

Output()

In [11]:
import ipywidgets as widgets
from IPython.display import display, clear_output

# Widgets
file_upload = widgets.FileUpload(accept='.txt', multiple=True)
dataset_button = widgets.Button(description='Select Dataset')
train_button = widgets.Button(description='Train Model')
test_button = widgets.Button(description='Test Model')
save_button = widgets.Button(description='Save Job ID')
create_vector_store_button = widgets.Button(description='Create Vector Store')
upload_files_button = widgets.Button(description='Upload Files to Vector Store')
update_assistant_button = widgets.Button(description='Update Assistant')
message_input = widgets.Textarea(description='Message:', layout=widgets.Layout(width='100%', height='100px'))
send_message_button = widgets.Button(description='Send Message')
output = widgets.Output()

# Display widgets
display(file_upload)
display(dataset_button)
display(train_button)
display(test_button)
display(save_button)
display(create_vector_store_button)
display(upload_files_button)
display(update_assistant_button)
display(message_input)
display(send_message_button)
display(output)

def on_upload_change(change):
    with output:
        clear_output()
        if file_upload.value:
            files = [file for file in file_upload.value.values()]
            display(files)

file_upload.observe(on_upload_change, names='value')

def on_dataset_button_clicked(b):
    with output:
        clear_output()
        path_to_dir = './dataset/'  # Path to directory with dataset
        dataset = select_dataset_and_archive(path_to_dir, positive=True)
        if dataset:
            print(f"Dataset selected and archived: {len(dataset)} items")
        else:
            print("Failed to select or archive dataset")

def on_train_button_clicked(b):
    with output:
        clear_output()
        dataset = select_dataset_and_archive('./dataset/', positive=True)
        if dataset:
            job_id = model.train(dataset, positive=True)
            if job_id:
                print(f"Training started, Job ID: {job_id}")
            else:
                print("Failed to start training")
        else:
            print("Failed to select dataset")

def on_test_button_clicked(b):
    with output:
        clear_output()
        test_data = [
            {"name": "Test 1", "text": "Sample text for testing", "label": "positive"}
        ]  # Replace with real test data
        predictions = model.test(test_data)
        if predictions:
            for entry, prediction in zip(test_data, predictions):
                print(f"Page: {entry['name']}, Prediction: {prediction}, Actual: {entry['label']}")
        else:
            print("Failed to test the model")

def on_save_button_clicked(b):
    with output:
        clear_output()
        job_id = model.current_job_id
        if job_id:
            model.save_job_id(job_id, "Training job description")
            print(f"Job ID {job_id} saved successfully")
        else:
            print("No Job ID to save")

def on_create_vector_store_button_clicked(b):
    with output:
        clear_output()
        client = ...  # Initialize your API client
        name = 'New Vector Store'  # Name of the vector store
        file_paths = []  # Add file paths if needed
        vector_store_id = model.create_vector_store(client, name, file_paths)
        if vector_store_id:
            print(f"Vector store created with ID: {vector_store_id}")
        else:
            print("Failed to create vector store")

def on_upload_files_button_clicked(b):
    with output:
        clear_output()
        client = ...  # Initialize your API client
        vector_store_id = 'your-vector-store-id'
        file_paths = []  # Add file paths to upload
        response = model.upload_files_to_vector_store(client, vector_store_id, file_paths)
        if response:
            print(f"Files uploaded successfully: {response}")
        else:
            print("Failed to upload files")

def on_update_assistant_button_clicked(b):
    with output:
        clear_output()
        assistant_id = 'your-assistant-id'`
        description = 'Updated assistant description'L
        model.update_assistant(assistant_id, description)
        print(f"Assistant {assistant_id} updated successfully")
chat_log:SimpleNamespace = SimpleNamespace('user'=[],'')
def on_send_message_button_clicked(b):
    with output:
        clear_output()
        message = message_input.value
        if message:
            response = model.send_message(message)
            print(f"""Response from assistant: {response}""")
        else:
            print("No message to send")

# Attach event handlers
dataset_button.on_click(on_dataset_button_clicked)
train_button.on_click(on_train_button_clicked)
test_button.on_click(on_test_button_clicked)
save_button.on_click(on_save_button_clicked)
create_vector_store_button.on_click(on_create_vector_store_button_clicked)
upload_files_button.on_click(on_upload_files_button_clicked)
update_assistant_button.on_click(on_update_assistant_button_clicked)
send_message_button.on_click(on_send_message_button_clicked)


SyntaxError: expression cannot contain assignment, perhaps you meant "=="? (1776197103.py, line 116)

In [12]:
from pathlib import Path
import ipywidgets as widgets
from IPython.display import display, clear_output
from src.logger import logger
from src.utils import j_loads, j_loads_ns, j_dumps
from types import SimpleNamespace

# Widgets
file_upload = widgets.FileUpload(accept='.txt', multiple=True)
dataset_button = widgets.Button(description='Select Dataset')
train_button = widgets.Button(description='Train Model')
test_button = widgets.Button(description='Test Model')
save_button = widgets.Button(description='Save Job ID')
create_vector_store_button = widgets.Button(description='Create Vector Store')
upload_files_button = widgets.Button(description='Upload Files to Vector Store')
update_assistant_button = widgets.Button(description='Update Assistant')
message_input = widgets.Textarea(description='Message:', layout=widgets.Layout(width='100%', height='100px'))
send_message_button = widgets.Button(description='Send Message')
output = widgets.Output()

# Display widgets
display(file_upload)
display(dataset_button)
display(train_button)
display(test_button)
display(save_button)
display(create_vector_store_button)
display(upload_files_button)
display(update_assistant_button)
display(message_input)
display(send_message_button)
display(output)

def on_upload_change(change):
    """! Handle file upload changes """
    with output:
        clear_output()
        if file_upload.value:
            files = [file for file in file_upload.value.values()]
            display(files)

file_upload.observe(on_upload_change, names='value')

def on_dataset_button_clicked(b):
    """! Handle dataset selection button click """
    with output:
        clear_output()
        path_to_dir = Path('./dataset/')
        dataset = select_dataset_and_archive(path_to_dir, positive=True)
        if dataset:
            print(f"Dataset selected and archived: {len(dataset)} items")
        else:
            print("Failed to select or archive dataset")

def on_train_button_clicked(b):
    """! Handle train button click """
    with output:
        clear_output()
        dataset = select_dataset_and_archive(Path('./dataset/'), positive=True)
        if dataset:
            job_id = model.train(dataset, positive=True)
            if job_id:
                print(f"Training started, Job ID: {job_id}")
            else:
                print("Failed to start training")
        else:
            print("Failed to select dataset")

def on_test_button_clicked(b):
    """! Handle test button click """
    with output:
        clear_output()
        test_data = [
            {"name": "Test 1", "text": "Sample text for testing", "label": "positive"}
        ]
        predictions = model.test(test_data)
        if predictions:
            for entry, prediction in zip(test_data, predictions):
                print(f"Page: {entry['name']}, Prediction: {prediction}, Actual: {entry['label']}")
        else:
            print("Failed to test the model")

def on_save_button_clicked(b):
    """! Handle save button click """
    with output:
        clear_output()
        job_id = model.current_job_id
        if job_id:
            model.save_job_id(job_id, "Training job description")
            print(f"Job ID {job_id} saved successfully")
        else:
            print("No Job ID to save")

def on_create_vector_store_button_clicked(b):
    """! Handle create vector store button click """
    with output:
        clear_output()
        client = ...  # Initialize your API client
        name = 'New Vector Store'
        file_paths = []  # Add file paths if needed
        vector_store_id = model.create_vector_store(client, name, file_paths)
        if vector_store_id:
            print(f"Vector store created with ID: {vector_store_id}")
        else:
            print("Failed to create vector store")

def on_upload_files_button_clicked(b):
    """! Handle upload files to vector store button click """
    with output:
        clear_output()
        client = ...  # Initialize your API client
        vector_store_id = 'your-vector-store-id'
        file_paths = []  # Add file paths to upload
        response = model.upload_files_to_vector_store(client, vector_store_id, file_paths)
        if response:
            print(f"Files uploaded successfully: {response}")
        else:
            print("Failed to upload files")

def on_update_assistant_button_clicked(b):
    """! Handle update assistant button click """
    with output:
        clear_output()
        assistant_id = 'your-assistant-id'
        description = 'Updated assistant description'
        model.update_assistant(assistant_id, description)
        print(f"Assistant {assistant_id} updated successfully")

def on_send_message_button_clicked(b):
    """! Handle send message button click """
    with output:
        clear_output()
        message = message_input.value
        if message:
            response = model.send_message(message)
            print(f"Response from assistant: {response}")
        else:
            print("No message to send")

# Attach event handlers
dataset_button.on_click(on_dataset_button_clicked)
train_button.on_click(on_train_button_clicked)
test_button.on_click(on_test_button_clicked)
save_button.on_click(on_save_button_clicked)
create_vector_store_button.on_click(on_create_vector_store_button_clicked)
upload_files_button.on_click(on_upload_files_button_clicked)
update_assistant_button.on_click(on_update_assistant_button_clicked)
send_message_button.on_click(on_send_message_button_clicked)


FileUpload(value=(), accept='.txt', description='Upload', multiple=True)

Button(description='Select Dataset', style=ButtonStyle())

Button(description='Train Model', style=ButtonStyle())

Button(description='Test Model', style=ButtonStyle())

Button(description='Save Job ID', style=ButtonStyle())

Button(description='Create Vector Store', style=ButtonStyle())

Button(description='Upload Files to Vector Store', style=ButtonStyle())

Button(description='Update Assistant', style=ButtonStyle())

Textarea(value='', description='Message:', layout=Layout(height='100px', width='100%'))

Button(description='Send Message', style=ButtonStyle())

Output()

In [15]:
from pathlib import Path
import ipywidgets as widgets
from IPython.display import display, clear_output
from src.logger import logger
from src.utils import j_loads, j_loads_ns, j_dumps
from types import SimpleNamespace

# Widgets
file_upload = widgets.FileUpload(accept='.txt', multiple=True)
dataset_button = widgets.Button(description='Select Dataset')
train_button = widgets.Button(description='Train Model')
test_button = widgets.Button(description='Test Model')
save_button = widgets.Button(description='Save Job ID')
create_vector_store_button = widgets.Button(description='Create Vector Store')
upload_files_button = widgets.Button(description='Upload Files to Vector Store')
update_assistant_button = widgets.Button(description='Update Assistant')
message_input = widgets.Textarea(description='Message:', layout=widgets.Layout(width='100%', height='100px'))
send_message_button = widgets.Button(description='Send Message')
output = widgets.Output()

# Display widgets
display(file_upload)
display(dataset_button)
display(train_button)
display(test_button)
display(save_button)
display(create_vector_store_button)
display(upload_files_button)
display(update_assistant_button)
display(message_input)
display(send_message_button)
display(output)

def on_upload_change(change):
    """! Handle file upload changes """
    with output:
        clear_output()
        if file_upload.value:
            files = [file for file in file_upload.value.values()]
            display(files)

file_upload.observe(on_upload_change, names='value')

def on_dataset_button_clicked(b):
    """! Handle dataset selection button click """
    with output:
        clear_output()
        path_to_dir = Path('./dataset/')
        dataset = select_dataset_and_archive(path_to_dir, positive=True)
        if dataset:
            print(f"Dataset selected and archived: {len(dataset)} items")
        else:
            print("Failed to select or archive dataset")

def on_train_button_clicked(b):
    """! Handle train button click """
    with output:
        clear_output()
        dataset = select_dataset_and_archive(Path('./dataset/'), positive=True)
        if dataset:
            job_id = model.train(dataset, positive=True)
            if job_id:
                print(f"Training started, Job ID: {job_id}")
            else:
                print("Failed to start training")
        else:
            print("Failed to select dataset")

def on_test_button_clicked(b):
    """! Handle test button click """
    with output:
        clear_output()
        test_data = [
            {"name": "Test 1", "text": "Sample text for testing", "label": "positive"}
        ]
        predictions = model.test(test_data)
        if predictions:
            for entry, prediction in zip(test_data, predictions):
                print(f"Page: {entry['name']}, Prediction: {prediction}, Actual: {entry['label']}")
        else:
            print("Failed to test the model")

def on_save_button_clicked(b):
    """! Handle save button click """
    with output:
        clear_output()
        job_id = model.current_job_id
        if job_id:
            model.save_job_id(job_id, "Training job description")
            print(f"Job ID {job_id} saved successfully")
        else:
            print("No Job ID to save")

def on_create_vector_store_button_clicked(b):
    """! Handle create vector store button click """
    with output:
        clear_output()
        client = ...  # Initialize your API client
        name = 'New Vector Store'
        file_paths = []  # Add file paths if needed
        vector_store_id = model.create_vector_store(client, name, file_paths)
        if vector_store_id:
            print(f"Vector store created with ID: {vector_store_id}")
        else:
            print("Failed to create vector store")

def on_upload_files_button_clicked(b):
    """! Handle upload files to vector store button click """
    with output:
        clear_output()
        client = ...  # Initialize your API client
        vector_store_id = 'your-vector-store-id'
        file_paths = []  # Add file paths to upload
        response = model.upload_files_to_vector_store(client, vector_store_id, file_paths)
        if response:
            print(f"Files uploaded successfully: {response}")
        else:
            print("Failed to upload files")

def on_update_assistant_button_clicked(b):
    """! Handle update assistant button click """
    with output:
        clear_output()
        assistant_id = 'your-assistant-id'
        description = 'Updated assistant description'
        model.update_assistant(assistant_id, description)
        print(f"Assistant {assistant_id} updated successfully")

def on_send_message_button_clicked(b):
    """! Handle send message button click """
    with output:
        clear_output()
        message = message_input.value
        if message:
            response = model.send_message(message)
            if response:
                new_input = widgets.Textarea(value=response, layout=widgets.Layout(width='100%', height='100px'))
                display(new_input)
            else:
                print("Failed to get a response from the model")
        else:
            print("No message to send")

# Attach event handlers
dataset_button.on_click(on_dataset_button_clicked)
train_button.on_click(on_train_button_clicked)
test_button.on_click(on_test_button_clicked)
save_button.on_click(on_save_button_clicked)
create_vector_store_button.on_click(on_create_vector_store_button_clicked)
upload_files_button.on_click(on_upload_files_button_clicked)
update_assistant_button.on_click(on_update_assistant_button_clicked)
send_message_button.on_click(on_send_message_button_clicked)


FileUpload(value=(), accept='.txt', description='Upload', multiple=True)

Button(description='Select Dataset', style=ButtonStyle())

Button(description='Train Model', style=ButtonStyle())

Button(description='Test Model', style=ButtonStyle())

Button(description='Save Job ID', style=ButtonStyle())

Button(description='Create Vector Store', style=ButtonStyle())

Button(description='Upload Files to Vector Store', style=ButtonStyle())

Button(description='Update Assistant', style=ButtonStyle())

Textarea(value='', description='Message:', layout=Layout(height='100px', width='100%'))

Button(description='Send Message', style=ButtonStyle())

Output()