This Jupter notebook paired with the "Conversation Script" and "Assistant Creator" notebook goes over the entire process of creating a fully functioning chatbot. This notebook specifically sets up and saves any resources that our full chatbot uses, make sure to run this first before you run the other notebooks, after running this you should run the "Assistant Creator" then the "Conversation Script".

# Contents

**1 - Setup**

- 1.1 Imports
- 1.2 Open AI
- 1.3 Directories

**2 - Data Retrieval**

- 2.1 URLs
- 2.2 Documents
- 2.3 Image Extraction

**3 - Data Processing**

- 3.1 Image Description Creation
- 3.2 Chunking
- 3.3 Vector Store Creation
- 3.4 Question Types

# 1 - Setup

This section details the process of setting up the modules, OpenAI functionality and directories we'll need.

## 1.1 Imports

These are our imports, you'll see these modules used throughout the notebook, I'll go over what each are used for now:

`from openai import OpenAI` the OpenAI module allows us to set up a client that can communicate with OpenAI's services. These services are not specific to just chatbots although it does include this purpose we can use these services to create vector stores (more on this later) and upload and change files.

`import os` this module allows us to modify and access files and folders

`from PyPDF2 import PdfReader` this module allows for the reading of our text within our pdf files so we can gather information from them and allow our created chatbot to use it as a resource

`import json` this module allows for the reading and creation of .json files which allow us to store the data we process for later use

`import requests` this module allows us to make external requests to outside urls, specifically we will be making requests to OpenAI

`from PIL import Image` this module allows for the storage and retreival of images given image data

`import base64` OpenAI requires any images we upload to be encoded in base64, this module allows us to do that

`import fitz` this module allows us to retrieve image data from a pdf

`import io` this module allows us to convert images from the retrieved image data that fitz provides to the image data the PIL can read

`import shutil` allows us to access whole directories

`import urllib.parse` allows us to access the names of urls so they can be saved 

`from urllib.parse import urljoin` allows us to join names of urls together

`from bs4 import BeautifulSoup` allows us to extract text and images from the html files and websites we want to save

`from langchain.text_splitter import RecursiveCharacterTextSplitter` allows us to chunk recursively

`from langchain_text_splitters import CharacterTextSplitter` allows us to split text by characters

In [None]:
from openai import OpenAI
import os
from PyPDF2 import PdfReader
import json
import requests
from PIL import Image
import base64
import fitz
import io
import pickle
import shutil
import urllib.parse
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_text_splitters import CharacterTextSplitter

If any errors are returned when trying to run the above due to modules not being installed you can remove the # from the appropriate commands below to install the module.

In [None]:
#pip install openai
#pip install  os
#pip install PyPDF2
#pip install  json
#pip install  requests
#pip install  PIL
#pip install  base64
#pip install  fitz
#pip install PyMuPDF
#pip install  io
#pip install  pickle
#pip install urllib.parse
#pip install bs4
#pip install langchain

## 1.2 Open AI

`api_key` this is essentially a password provided by OpenAI, it allows us to access OpenAI's services whenever we use them

`client = OpenAI(api_key=api_key)` this sets up a client which can communicate with OpenAI's services, we specify this beforehand so we do not have to write out "OpenAI(api_key=api_key)" when we want to communicate with OpenAI

In [None]:
api_key = ""

client = OpenAI(api_key=api_key)

## 1.3 Directories

We set up any directories for files or websites that we will use later

`store_name =` this is a general purpose name that we will use when creating files, this allows us to make sure we are retrieving the documents we want later on.

`data_directory =` this is the file directory where we'll store and retrieve most kinds of data.

`document_directory =` this is the file directory where we store and retrieve our documents from.

`image_directory =` this is the file directory where we'll store and retreieve any images.

`question_directory =` this is where we retrieve .txt file from that contain example questions we want to match user queries against

`question_data_directory =`  this is where we store any processing done to the example questions

`urls =` you should specify here any urls you want the assistant to have access to

`urls_with_subdomains =` this list specifies any urls with subdomains all of which you want to add to the urls list

You should make sure when specifying these that they are the same as you used in the Assistant Creator and Conversation Scripts

In [None]:
#automatically assigns the directories 
store_name = "Labs Dutchess"

this_directory = os.getcwd()

directories = os.listdir(this_directory)

directories = [os.path.join(this_directory, entry) for entry in directories if not os.path.isfile(os.path.join(this_directory, entry))] 

for directory in directories:
    if "Data Base" in os.path.basename(directory):
        data_directory = directory
    elif "Documents" in os.path.basename(directory):
        document_directory = directory
    elif "Output Images" in os.path.basename(directory):
        image_directory = directory
    elif "Question_Types" in os.path.basename(directory):
        question_directory = directory
    elif "Question Data Store" in os.path.basename(directory):
        question_data_directory= directory
    

print(f"data_directory = {data_directory}")
print(f"document_directory = {document_directory}")
print(f"image_directory = {image_directory}")
print(f"question_data_directory = {question_data_directory}")
print(f"question_directory = {question_directory}")

#urls to use
urls = []

urls_with_subdomains = []

# 2 - Data Retrieval

In this next section we go over the retrieval of information from our information sources and ensure they are ready for processing. We'll download the text information from the urls we've specified and we'll store any images from all of our documents and websites.

## 2.1 URLs

We need to retrieve all the information from the relevant urls and store this in our `document_directory`, to do this we'll first create a function called `get_all_links` which takes in a url with several subdomains and outputs all the subdomains as a list. We then loop through our list of `urls_with_subdomains` and add each of the subdomains to the `urls` list.

In [None]:
def get_all_links(base_url):
    response = requests.get(base_url) # Send a request to the base URL
    soup = BeautifulSoup(response.content, 'html.parser') # Parse the content with BeautifulSoup
    links = soup.find_all('a', href=True)  # Extract all the anchor tags
    full_links = []  # Initialize a list to store full URLs

    for link in links:
        href = link.get('href') # gets the end part of the url
        full_url = urljoin(base_url, href)  # Create full URL
        if full_url.startswith(base_url): # checks that the url starts with the base url 
            full_links.append(full_url) # adds that url to the list
    
    return full_links

In [None]:
for base_url in urls_with_subdomains: # loops through all the urls in the urls_with_subdomains list
    all_links = get_all_links(base_url) # retrieves all of the subdomain links

    all_links = list(set(all_links)) # gets rid of any dublicates

    for link in all_links: # loops through all the subdomains and adds them to the urls list
        urls.append(link)

print(urls)

After we have done this we loop through every url in the `urls` list and download the text content as a html file inside our `document_directory`.

In [None]:
for url in urls:
    response = requests.get(url)  # Send a GET request to the website

    if response.status_code == 200:  # Check if the request was successful
        soup = BeautifulSoup(response.content, 'html.parser')  # Parse the content with BeautifulSoup
        html_content = response.text  # Get the content of the response

        # Extract the page title for filename
        page_title = soup.title.string if soup.title else 'no_title'
        page_title = page_title.replace(" ", "_").replace("/", "_").replace(":", "_")

        url_name = f"{page_title}.html"  # Use the title as the filename
        output_file_path = os.path.join(document_directory, url_name)  # Make the directory for the file

        # Write the HTML content to the file
        with open(output_file_path, 'w', encoding='utf-8') as file:  # Save it as a HTML
            file.write(html_content)

        print(f'{url} downloaded and saved to {output_file_path}')
    else:
        print(f"Website {url} download unsuccessful")

## 2.2 Documents

Now that all the files we want are saved in our `document_directory` we create a function that returns a list of the entries inside a given folder, this is the `get_all_files_in_folder` function. We then run the function with the target folder as our `document_directory` and it returns a list of our files. Remember these are the files we'll be using as our knowledge base. We also print the list to make sure it is correct.

In [None]:
def get_all_files_in_folder(folder_path):
    try:
        entries = os.listdir(folder_path) # Makes a list of all entries in a given directory
        
        files = [os.path.join(folder_path, entry) for entry in entries if os.path.isfile(os.path.join(folder_path, entry))] # combines the entries with the folder directory so that we have their full file path
        
        return files
    except FileNotFoundError:
        return "The folder path does not exist."

In [None]:
files = get_all_files_in_folder(document_directory)
for file in files:
    print(f"{os.path.basename(file)}")

This next step is not nessecary but can be very useful for user convenience. We can set up a list of allias' for our files. These allias' are what will be shown to the user when certain documents are quoted as the reference for information. When setting up the allias' you should use the following format:

`allias = [["document_name.pdf", "document_allias"],["document2_name.html", document2_allias]]`

so to use some real file names we might do:

`allias = [["Creating Simple Functions.py", "Creating Simple Functions"], ["First_order_ODE____Computational_Physics_2021_2022.html", "First Order ODE's Computational Physics Lecture Notes"]]`

these do not need to be in any order and you do not need to create an allias for every file name for an allias to be used.

If you choose to leave an allias or all the allias` blank the references quoted to user will be the file name. To compare the results you would end up with:

Without allias:

**Main Text** All information has been sourced from Creating Simple Functions.py, First_order_ODE____Computational_Physics_2021_2022.html.

With allias:

**Main Text** All information has been sourced from Creating Simple Functions, First Order ODE's Computational Physics Lecture Notes.

In [None]:
allias = [["NewtonsRings.pdf","Newtons rings lab script"],["CoulombsLaw.pdf","Coulombs law lab script"],["Radioactivity.pdf", "Radioactivity lab script"],["PrismSpectrometer.pdf", "prism spectrometer lab script"],["PrecisionInterferometer.pdf", "Precision Interferometer lab script"],["Semiconductors.pdf", "Semiconductors lab script"],["Radioactivity_.pdf", "Radioactivity lab script"],["Xray_Properties.pdf", "X ray properties lab script"],["projectiles.pdf","projectiles lab script"],["Circuits2.pdf","Circuits lab script 2"],["LRC.pdf","LRC lab script"],["BiotSavart.pdf", "Biot Savart law lab script"],["SpeedofLight.pdf","Speed of light lab script"],["Enterprise.pdf","Enterprise lab script"],["BifilarPendulum.pdf","Bifilar pendulem lab script"],["LabsTimetable.pdf", "Labs timetable"],["Xray_Diffraction.pdf", "X-ray lab script"],["Level1_Discovery_RoomAllocation_24.pdf", "Level 1 room allocations epiphany"],["DC_Circuits.pdf", "DC circuits lab script"],["LabsTimetable.pdf", "Labs Timetable"],["Errorslectures6.pdf", "Arin Errors lecture 6"],["PhysicsTimetable.pdf", "Physics Timetable"],["Module_Information.pdf", "Module Information"],["How_to_give_a_great_scientific_talk.html","How to give s scientific talk website"],["ErrorsLectures5.pdf","Arin Errors lecture 5"],["Errorslectures4.pdf","Arin Errors lecture 4"],["Errorslectures3.pdf","Arin Errors lecture 3"],["Errorslectures2.pdf","Arin Errors lecture 2"],["Errorslectures1.pdf","Arin Errors lecture 1"],["AssessmentPolicy.pdf", "department of physics assessment policy"],["Summative_Laboratory_Notebook_Assessment.pdf", "Lab-book-keeping assesment info"],["DiscoverySkillsIntroductoryLecture.pdf", "discovery skills intro lecture"],["Create_a_presentation_in_PowerPoint_-_Microsoft_Support.html", "presentation advice website"],["How_to_give_a_dynamic_scientific_presentation.html", "giving a good presentation website"],["StaffContact.pdf", "Staff contact infformation page on ULTRA"],["LabBook.pdf", "Pippa lecture on lab book records"],["LabReports.pdf", "Pippa lecture on lab repoorts"],["LabReports2.pdf","Pippa lecture on lab reports 2"],["LabsTimetable.pdf","Lab Timetable"],["HH.txt","Hughes and Hase"],["quantum_2.pdf","Theoretical physics Quantum lecture notes"],["quantum_3.pdf","Theoretical Physics 2 quantum lecture notes"],["quantum_1.pdf","FoP Quantum lecture notes"]]

allias_name = f"allias_{store_name}.json" # creates the file name for the allias'
allias_path = os.path.join(data_directory, allias_name) # creates the file path for the allias'

with open(allias_path, "w") as file: # saves the allias' as a .json file
    json.dump(allias, file)

## 2.3 Image Extraction

Next we go over how images are extracted from the documents and then stored for later processing. 

To start with we make a function called `extract_images_from_pdf` that takes a pdf file path and extracts image's data from the given pdf, this image data is a dictionary of each of the images within the pdfs page number, image number on a given page, the type of the image (.png, .jpeg etc) and the actual image itself. The function then adds each of these image data to a list and returns this list as its output thus giving us all the images in a pdf.

In [None]:
def extract_images_from_pdf(pdf_file):
    if not os.path.isfile(pdf_file): # checks if the file path to the pdf exists
        print(f"Error: The file '{pdf_file}' does not exist.")
        return None
    
    pdf_document = fitz.open(pdf_file) # opens the file using the fitz module
    image_list = [] # create a list for the image data to be stored in

    for page_num in range(len(pdf_document)): # loops through every single page in the document
        page = pdf_document.load_page(page_num) # loads a single page
        page_images = page.get_images(full=True) # grabs all the images from a single page

        for img_idx, img_info in enumerate(page_images): # loops through all the images on a single page (enumerate() adds a counter to page_images and allows us to have an automatic counter for each item, img_inx
            # holds the current index of the iteration, this is our counter from enumerate() and img_info holds the actual item from page_images) 
            base_image = pdf_document.extract_image(img_info[0]) # this extracts information from the current image and holds it in base_image
            
            image_data = { # a dictionary that we can store information about our image in
                "page_number": page_num + 1, # stores what page the image is on
                "image_index": img_idx + 1, # stores whether it is the first, second, third etc image on that page
                "image_extension": base_image["ext"], # stores the image extension (the file type of that image)
                "image_data": base_image["image"] # stores the image_data itself, what the image is
            }
            image_list.append(image_data) # we then store that image in our image list and the loop continues onto the next page

    pdf_document.close() # we close the document
    return image_list # output the image list with all of the data we could need

We then create a function called `save_image`, the job of this function is to take generated image data and store it as an image in a given output directory under a given file name.

In [None]:
def save_image(image_data, output_dir, filename):
    image_bytes = image_data["image_data"] # retrieve the image information from the image data
    image_ext = image_data["image_extension"] #retrieve the image extension from the image data
    
    img = Image.open(io.BytesIO(image_bytes)) # Load image from the raw image information

    if not os.path.exists(output_dir): # if the output directory does not exist, we create a directory so it does
        os.makedirs(output_dir)

    image_filename = f"{filename}_page{image_data['page_number']}_image{image_data['image_index']}.{image_ext}" # this specifies the name of the saved image, we specify the page number and index on that page
    # within the name, we use the filename to ensure one image from one file does not overwrite another from another file if they have images on the same page, and we store it using the extension we found
    image_path = os.path.join(output_dir, image_filename) # creates a path to where the image should be stored by combining the output directory and the name we just made
    img.save(image_path) # saves the image

    print(f"Image saved: {image_path}") # lets us know the image has been saved

We define another function whose job it is to download images from the urls we have defined, we call this `save_images_from_urls` it takes a list of urls and downloads all images from them in a given output directory, this function automatically filters out any images with sizes less than or equal to 40x40 pixels as these are usually logos or images we would generally not want to show the user. If you want to change this you can modify the line which reads `if width > 40 and height > 40` changing the two 40's to be the relevant dimensions above which we save an image.

In [None]:
def save_images_from_urls(url_list, output_dir):  
    allowed_extensions = ('.png', '.jpg', '.jpeg') # specifies which file types we want

    for url in url_list: # loops through each url
        response = requests.get(url) # Send a request to the website

        if response.status_code == 200: # Check if the request was successful

            html_content = response.text # Get the content of the response

            soup = BeautifulSoup(html_content, 'html.parser') # Parse the HTML content with BeautifulSoup (converts it to a BeautifulSoup object which are easier to work with)

            img_tags = soup.find_all('img') # Find all image tags

            for img in img_tags: # loops through all images in the images tags
                img_url = img.get('src') # 'src' tag contains the url of the image
                
                if img_url:
                    if not img_url.startswith('http'): # Handles relative URLs
                        img_url = urllib.parse.urljoin(url, img_url)
                    
                    img_name = os.path.basename(urllib.parse.urlparse(img_url).path) # Extract the image name from the URL

                    
                    if img_name.lower().endswith(allowed_extensions): # Check if the image has an allowed extension
                        try:
                            img_response = requests.get(img_url) # send a request to the image url
                            img_response.raise_for_status()  # Check if the request was successful
                            
                            img_data = io.BytesIO(img_response.content) # use io.bytesIO to extract the image data from the response
                            img_object = Image.open(img_data) # Use PIL to check the image size
                            width, height = img_object.size

                            
                            if width > 40 and height > 40: # Ignore images with 40px or less in width or height
                                img_path = os.path.join(output_dir, img_name) # Define the full path for saving the image

                                
                                with open(img_path, 'wb') as img_file: # Save the image content to the path we specified
                                    img_file.write(img_response.content)

                                print(f'Image downloaded and saved to {img_path}')

                            # error handling and reasons for ignoring certain images:
                            else:
                                print(f'Image {img_name} ignored due to small size ({width}x{height}px).')
                        except requests.exceptions.RequestException as e:
                            print(f'Failed to download image {img_url}: {e}')
                        except Image.UnidentifiedImageError:
                            print("Error: Cannot identify image file. Please check the file format and data.")
                    else:
                        print(f'Image {img_name} skipped due to unsupported file type.')
        else:
            print(f'Failed to download the website. Status code: {response.status_code}')

To ensure we do not end up with duplicate images or non images in our image directory we can use the `delete_files_in_directory` function to clean our image directory before we begin saving images.

In [None]:
def delete_files_in_directory(directory_path): 
    if not os.path.exists(directory_path): # Check if the directory exists
        print(f"The directory {directory_path} does not exist.")
        return
    for filename in os.listdir(directory_path): # this loops through all files and directories in the specified directory
        file_path = os.path.join(directory_path, filename)

        try:
            if os.path.isfile(file_path) or os.path.islink(file_path): # check if it is a file and then delete it
                os.unlink(file_path)
                print(f"Deleted file: {file_path}")
            elif os.path.isdir(file_path): # we check if it is a directory then delete it
                shutil.rmtree(file_path)
                print(f"Deleted directory and its contents: {file_path}")
        except Exception as e: # lets us know if any deletion has been unsucessful and prints the reason why
            print(f"Failed to delete {file_path}. Reason: {e}")

Now we call all the functions we have made. We start by deleting all files within the image directory using the `delete_files_in_directory` function (note that you can delete this part of the following box without removing much functionality but not running it may cause the process to break in the future). We then run our `save_images_from_urls` function and loop over all the files in the document directory to find our pdfs, these then have their image data extracted using the `extract_images_from_pdf` function and these image data are saved using the `save_image` function. At this point, if you so wish, you can manually drag and drop images into the image directory to be processed later on, however should you ever run this part of the script again they will be deleted by the `delete_files_in_directory` function and you will have to manually drop them in again.

In [None]:
delete_files_in_directory(image_directory) # deletes all files in the image directory to ensure it is clean, this can be removed if you do not want it to do this however this may cause issues in the future
#such as multiple instances of the same image or non image files within the directory which will cause the rest of the image processing to break.

save_images_from_urls(urls, image_directory) # saves all the images from the urls we specified

for file in files: # loops through all the files in the files list 
    document_name = os.path.basename(file) # grabs the name of the file to be used to name the images
    if file.endswith('.pdf'):
        extracted_images = extract_images_from_pdf(file) # extracts all the images in the file and stores them in a list
    
        if extracted_images: # only runs the following lines if images were extracted
            for image_data in extracted_images: # loops over each image
                save_image(image_data, image_directory, document_name) # saves the image to our image directory, giving it a unique name to prevent overwrites

print("all done")

# 3 - Data Processing

Now we move on to the actual processing of the data, in this section we send the images to GPT-4o to have descriptions written about them and we split the text from the documents into "chunks" of a given length. We also upload the entire set of documents to OpenAI to be used as a knowledge.

## 3.1 Image Description Creation

Next we'll create descriptions of each of the images we have saved by sending the images to GPT-4o. The reason we create descriptions instead of having our final chatbot choose an image is for efficiency, it is far quicker for our final chatbot to decide which of several short descriptions are most relevant to a user query than to have it interpret which image is most relevant out of all the images, chatbots are far slower at interpretting images than text so this makes for a far quicker process. We are essentially going to preprocess all of the images by storing them as short descriptive peices of text which our chatbot can then decide out of all the descriptions which is most relavent to the user query. Then based on which description it picks we pick the associated image to show to the user. 

To send off our images to OpenAI for a description we need to create a function that encodes them in base64 as this is the format OpenAI requires for image interpretation as such we create the `encode_image` function

In [None]:
def encode_image(image_path):
    with open(image_path, "rb") as image_file: # open the image
        return base64.b64encode(image_file.read()).decode('utf-8') # encode the image

We then send our images to gpt-4o to be interpretted and then have our descriptions returned, this normally takes a while so dont worry if it does. The descriptions are returned as list we aptly name `descriptions`. Each time an image is processed a relevant message will be printed to indicate as such and once all images have been processed a final "all done" message will print.

In [None]:
headers = { # headers specify certain peices of information we need to give OpenAI 
    "Content-Type": "application/json", # content-type specifies the type of content we are expecting from the response
    "Authorization": f"Bearer {api_key}" # this gives OpenAI our api key (our password)
}

descriptions = [] # create a list for the descriptions to be stored in
i = 1
for filename in os.listdir(image_directory): # loops through every file in the image directory
    if filename.endswith(".jpeg") or filename.endswith(".jpg") or filename.endswith(".png"): # checks that the file is an image
        image_path = os.path.join(image_directory, filename) # grabs the path of the image by combining the directory and the image file name
        
        base64_image = encode_image(image_path) #encodes the image in base 64
        
        payload = { # this is our "payload" it's what we're actually sending to OpenAI
            "model": "gpt-4o", # specifies the model we want to use
            "messages": [ # contains the main content of what we want to send
                {
                    "role": "user", # specifies that this is a message from a user
                    "content": [ # what is included in our message, what gpt-4o will see
                        {
                            "type": "text", # specifies what the following input is
                            "text": f"use a short but scientific description to describe what is in this image, the image name is {filename}, it should be no longer than 10 words?" # we tell gpt-4o what we want it to do with our image
                        },
                        {
                            "type": "image_url", # specifies what the following input is
                            "image_url": {
                                "url": f"data:image/jpeg;base64,{base64_image}" # gives gpt-4o the image to be described
                            }
                        }
                    ]
                }
            ],
            "max_tokens": 300 # sets a limit on the number of tokens to be used per message, tokens equate to processing which equates to money, so by limiting this we keep cost and processing time down
            # note that increases in the max_tokens is nessecary for more complex tasks
        }
        
        response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) # we post the payload to chat/completions which is essentially just like putting in our message
        # through a browser and we store the response
        
        if response.status_code == 200: # checks if the response was sucessful
            response_json = response.json() # temporarily stores the response in a .json
            text_content = response_json['choices'][0]['message']['content'] # takes just the message so we get just the content of the description
            descriptions.append(text_content) # add the description to our list of descriptions
            print(f"Description for {filename} done, {i} of {len(os.listdir(image_directory))} completed")
            i += 1
        else:
            print(f"Request for {filename} failed with status code: {response.status_code}")

print("all done")

We want to store as much information about these images as possible so they are easier to retrieve later on, we store both the image names and descriptions in two .json files in our `data_directory`.

In [None]:
image_names = [] # creates an image name list

for names in os.listdir(image_directory): # loops over the image names and add them to the list
    image_names.append(names) 

image_names_list = f"{store_name}_image_names.json" # create a file name for our .json file
image_names_path = os.path.join(data_directory, image_names_list) # create the path for where the .json file will be stored

with open(image_names_path, "w") as file: # saves the .json file with the list of image names
    json.dump(image_names, file)

In [None]:
descriptions_name = f"{store_name}_descriptions.json" # create a file name for our.json file
descriptions_path = os.path.join(data_directory, descriptions_name) # create the path for where the .json file will be stored

with open(descriptions_path, "w") as file: # saves the .json file with the list of image names
    json.dump(descriptions, file)

## 3.2 Chunking

Next we'll perform what is known as chunking, this splits the text of our documents into several chunks, just the relevant chunks can then be retrieved by the chatbot. 

We start by defining a several functions that are able to extract texts from different file types so that we can appropriately handle the relevant text.

In [None]:
def extract_text_from_pdf(file_path):
    text = ""
    document = fitz.open(file_path)
    for page_num in range(len(document)):
        page = document.load_page(page_num)
        text += page.get_text()
    return text

def extract_text_from_html(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        soup = BeautifulSoup(file, 'html.parser')
        text = soup.get_text()
    return text

def extract_text(file_path):
    if file_path.endswith('.pdf'):
        return extract_text_from_pdf(file_path)
    elif file_path.endswith('.html') or file_path.endswith('.htm'):
        return extract_text_from_html(file_path)
    else:
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()


### Recursive Chunking

Now that we've extracted the text from the relevant files we are able to split that text up into smaller bits so that we can feed a smaller amount of data to Dutch. If we simply pass the whole text our model wouldn't know where to look. So what we do here is we say that we want the text to be split into chunks of max size x. We then try to make the chunk as large as possible according to that max size, but whilst also only splitting the chunk when there is a specific piece of text that we define. It will first look for a new paragraph, if it is unable to find that it will stop at the end of a sentence, following that it tries to split by clauses and then just simply character splits. But this method of chunking performed best in RAGAS and therefore will hopefully allow the most accurate information to be fed to our bot.

In [None]:
def recursive_chunking(file_path, chunk_size, chunk_overlap):
    relevant_chunks = []
    irrelevant_chunks = 0
    chunks=[]

    text = extract_text(file_path)
        
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size = chunk_size,
        chunk_overlap = chunk_overlap,
        length_function = len,
    )
    chunks = text_splitter.split_text(text)

    for chunk in chunks:
        if chunk == ".":
            irrelevant_chunks += 1
        else:
            relevant_chunks.append(chunk)
    return relevant_chunks

Next we define a function that uses the `recursive_chunking` function to store the entire text as a list of chunks, we call this the `process_files` function and it takes in a list of file paths to know which documents to process.

In [None]:
def process_files(file_paths, chunk_size, chunk_overlap): # defines the function and number of characters to go up to 
    chunks = [] # creates empty list for chunks to be stored in 
    for file_path in file_paths: # loops over files in the document directory
        relevant_chunks = recursive_chunking(file_path, chunk_size=chunk_size,chunk_overlap=chunk_overlap)
        for relevant_chunk in relevant_chunks:
            chunks.append(f"{os.path.basename(file_path)}: {relevant_chunk}")
    return chunks # returns the list

We then use the this to create chunks out of the documents in our document folder. We then save this list of chunks in a .json file in our `data_directory`

In [None]:
if files != []:   
    chunks = process_files(files, chunk_size=2000, chunk_overlap=200) # runs process_files for the files we have in our document directory

    chunks_name = f"chunks_{store_name}.json" # creates a file name for the .json
    chunks_path = os.path.join(data_directory, chunks_name) # creates the path for where the .json file will be stored

    with open(chunks_path, "w") as file: # saves the .json file with the chunks
        json.dump(chunks, file)

else:
    chunks = "empty"
    
    chunks_name = f"chunks_{store_name}.json" # creates a file name for the .json
    chunks_path = os.path.join(data_directory, chunks_name) # creates the path for where the .json file will be stored

    with open(chunks_path, "w") as file: # saves the .json file with the chunks
        json.dump(chunks, file)

We now want to create an embedding of these chunks. An embedding of a peice of text essentially converts the text into a vector, this vector is incredibly long and can be used to test the similarity of two peices of text. For example if I were to embed the words "apple" and "orange" I might find that when I performed a cosine similarity test (cosine of the angle between the two vectors), which outputs on a scale of 0-1 with 0 being the closest and 1 being the furthest apart, that the output is 0.1 but if I used "window" instead of "orange" I might get 0.5 however interestingly if I used "phone" instead of "orange" I might get 0.05 because "apple" has become associated with "phone". Embedding allows us to compare two sets of text and see how similar they are semantically. We use this in our chat bot to see which chunks are most similar to our user query, to then know which ones to explicitely give our chat bot.

Once we have created the embedding we can store it in a pickle file. Pickle files allow for non subscriptible items to be stored locally which is why we use it for our embeddings, this way all the data of the embedding is conserved. If we tried to store it via a .json file we'd find that we'd have to abandon some of the information which we don't want to do.

In [None]:
db = client.embeddings.create( # creates an embedding
  model="text-embedding-ada-002", # picks a model to use for embedding, this is a general purpose one for text but there are others for other purposes
  input=chunks, # selects what list we want to use for our embedding
  encoding_format="float" # selects what format the embedding is in, the other option is base64
)

In [None]:
data_path = os.path.join(data_directory, f"{store_name}database.pkl") # creates the file directory for the pickle file

with open(data_path, 'wb') as f: # saves the pickle file with the embedding
    pickle.dump(db, f)

## 3.3 Vector Store Creation

So we have created a series of chunks that make up the entirity of the text, the reason we've done this is so that we can give the chat bot the parts of our source documents most relavent to the user query, this is faster than the alternative and provides more specificity to user queries, however we should still set up the alternative as they can work in tandem. The alternative is to use what are known as vector stores, these are stored externally by OpenAI and need to be retrieved by our chatbot when it wants to use the knowledge, known as a file search (this is one reason why it is slower as we are accessing files outside of our local space). 

The reason we want to use vector stores as well as the chunking method is because although our chunking method does well when dealing with queries concerning the content of the text, it might not do as well when dealing with the structure of the text. As an example we might set up a knowledge base that contains a textbook about tudor history. If we recieve a user query along the lines of "when was king henry viii king?" our chunking method will be incredibly quick at retrieving this information compared to the file search system as it is able to quickly find which parts of the text are most similar to the user query as the information is stored locally, however if we were to ask "in which chapter could I find more about this?" in a follow up question our chunking method would break down, this is because the chunking method would then retireve any messages with reference to chapters, which isn't useful, our file search system however has access to the entire document and it is able to find the relavent information to the query. 

To summarise the chunking method is good at creating specifity in the chatbot so it can give highly technical answers but it finds it harder to deal with user requests that concern the structure of the text itself where as the file search is the opposite, it struggles more with specifiity because it has access to the whole text but can deal with structure question effectively because it has access to the whole text. By using both we get the best of both worlds.

The following outlines how to set up and upload a vector store. If the below outputs its status as "failed" check the OpenAI dashboard first before you atempt to retry as sometimes it still works.

In [None]:
if files != []: 
  file_streams = [open(path, "rb") for path in files] # creates a list of OpenAI readable versions of the file paths

  vector_store = client.beta.vector_stores.create(name=store_name) #creates a vector store with the name we specified

  file_batch = client.beta.vector_stores.file_batches.upload_and_poll( #uploads the files
    vector_store_id=vector_store.id, # designates which vector store to upload the files to 
    files=file_streams # which files to upload
  )

  print(file_batch.status) # tells us whether the upload was successful
  print(file_batch.file_counts) # lets us know more details about the upload

When we create our chatbot we'll want to know which vector store to use therefore we'll store the vector store id in a .json file

In [None]:
if files != []:
    vector_name = f"vector_store_id_{store_name}.json" # creates the file name for the vector store id
    vector_path = os.path.join(data_directory, vector_name) # creates the file path for the vector store id

    with open(vector_path, "w") as file: # saves the vector store id as a .json file
            json.dump(vector_store.id, file)
else:
    print("if no files in document folder, then no vector store needs to be created")

# 3.4 Question Splitter

Within Dutchess we want to send different queries to different assistants using different prompts so that each type of query we enter has its own behaviour response. To do this we retrieve a series of example queries which we retrieve from the `question_directory`, to do this we first get their file directories:

In [None]:
questions = []
question_categories = []

question_types = get_all_files_in_folder(question_directory)
print(question_types)

We then split the text of these files based on if they are sepered by a ? or ., we also store as a list the "question categories" which are just the names of the .txt files

In [None]:
# Using regex to define multiple separators
question_splitter = CharacterTextSplitter(
    separator=r"[?.]",  # Regex to split at either ? or .
    chunk_size=35,  # This defines the chunk size
    chunk_overlap=5,  # This defines the overlap size
    length_function=len,  # This defines the function to calculate length
    is_separator_regex=True  # This tells the splitter that the separator is a regex
)

for file in question_types:
    with open(file) as f:
        question_text = f.read()
    questions_chunks = question_splitter.split_text(question_text)
    for question in questions_chunks:
        question_categories.append(os.path.basename(file))
        questions.append(question)

print(questions)
print(question_categories)

To make it so we can compare these questions against new user queries we embed the stored questions. 

In [None]:
question_embedding = client.embeddings.create(
          model="text-embedding-ada-002",
          input=questions,
          encoding_format="float"
        )

We then store all of this data so we can use it in Dutchess

In [None]:
questions_name = f"questions.json" # creates the file name for the 
questions_path = os.path.join(question_data_directory, questions_name) # creates the file path for the 

with open(questions_path, "w") as file: # saves the vector store id as a .json file
    json.dump(questions, file)

question_categories_name = f"questions_categories.json" # creates the file name for the 
question_categories_path = os.path.join(question_data_directory, question_categories_name) # creates the file path for the 

with open(question_categories_path, "w") as file: # saves the vector store id as a .json file
    json.dump(question_categories, file)

question_embedding_path = os.path.join(question_data_directory, f"question_embeddings.pkl") # creates the file directory for the pickle file

with open(question_embedding_path, 'wb') as f: # saves the pickle file with the embedding
    pickle.dump(question_embedding, f)