

### **Install modules**



In [None]:

%%capture
!pip install git+https://github.com/neuml/txtai
!pip install datasets
!pip install git+https://github.com/neuml/txtai#egg=txtai[pipeline]
!pip install transformers datasets
!pip install python-docx python-pptx pandas
!pip install docx2txt
!pip install PyPDF2 textract python-docx




# Get test data
!wget -N https://github.com/neuml/txtai/releases/download/v6.2.0/tests.tar.gz
!tar -xvzf tests.tar.gz

# Install NLTK
import nltk
nltk.download('punkt')


Function to read **files**


In [None]:
%%capture
from txtai.pipeline import Textractor
# Create textractor model
textractor = Textractor()

In [None]:
from google.colab import files
from txtai.embeddings import Embeddings
from docx import Document
from pptx import Presentation
from PyPDF2 import PdfReader
import io
import os
import ipywidgets as widgets
from IPython.display import display, clear_output

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Directory to save uploaded files in Google Drive
upload_dir = '/content/drive/MyDrive/UploadedFiles'
if not os.path.exists(upload_dir):
    os.makedirs(upload_dir)

# Initialize embeddings model
embeddings = Embeddings({"path": "sentence-transformers/paraphrase-MiniLM-L3-v2", "content": True})

# Function to upload multiple files and save them in Google Drive
def upload_files():
    uploaded = files.upload()
    for file_name, content in uploaded.items():
        with open(os.path.join(upload_dir, file_name), 'wb') as f:
            f.write(content)
    return uploaded

# Function to read content from lines and format it as a list of dictionaries
def read_content(lines):
    return [{"text": line.strip()} for line in lines if line.strip()]

# Function to stream data for indexing
def stream_from_content(data, field, limit):
    index = 0
    for item in data:
        yield (index, item[field], None)
        index += 1
        if index >= limit:
            break

# Function to process plain text content paragraph by paragraph
def process_text_content(content):
    text = content.decode("utf-8")
    paragraphs = text.split("\n\n")
    return [{"text": para.strip()} for para in paragraphs if para.strip()]

# Function to process CSV content paragraph by paragraph
def process_csv_content(content):
    text = content.decode("utf-8")
    lines = text.splitlines()
    return [{"text": line.strip()} for line in lines if line.strip()]

# Function to process PDF content paragraph by paragraph
def process_pdf_content(content):
    custom_data = []
    pdf_reader = PdfReader(io.BytesIO(content))
    for page_num in range(len(pdf_reader.pages)):
        page = pdf_reader.pages[page_num]
        text = page.extract_text()
        if text:
            paragraphs = text.split("\n\n")
            custom_data.extend([{"text": para.strip()} for para in paragraphs if para.strip()])
    return custom_data

# Function to process DOCX content paragraph by paragraph
def process_docx_content(content):
    doc = Document(io.BytesIO(content))
    custom_data = []

    # Extract text from paragraphs
    for para in doc.paragraphs:
        if para.text.strip():
            custom_data.append({"text": para.text.strip()})

    # Extract text from tables
    for table in doc.tables:
        for row in table.rows:
            for cell in row.cells:
                if cell.text.strip():
                    custom_data.append({"text": cell.text.strip()})

    return custom_data

# Function to process PPTX content paragraph by paragraph
def process_pptx_content(content):
    custom_data = []
    presentation = Presentation(io.BytesIO(content))
    for slide in presentation.slides:
        for shape in slide.shapes:
            if hasattr(shape, "text"):
                text = shape.text.strip()
                if text:
                    paragraphs = text.split("\n\n")
                    custom_data.extend([{"text": para.strip()} for para in paragraphs if para.strip()])
    return custom_data

# Function to process file content based on file type
def process_file_content(file_name, content):
    custom_data = []

    # Handle plain text file
    if file_name.endswith(".txt"):
        custom_data = process_text_content(content)

    # Handle CSV file
    elif file_name.endswith(".csv"):
        custom_data = process_csv_content(content)

    # Handle PDF file
    elif file_name.endswith(".pdf"):
        custom_data = process_pdf_content(content)

    # Handle DOCX file
    elif file_name.endswith(".docx"):
        custom_data = process_docx_content(content)

    # Handle PPTX file
    elif file_name.endswith(".pptx"):
        custom_data = process_pptx_content(content)

    return custom_data

# Function to process saved files from Google Drive
def process_saved_files():
    custom_data = []
    for file_name in os.listdir(upload_dir):
        file_path = os.path.join(upload_dir, file_name)
        if os.path.isfile(file_path):  # Ensure it's a file, not a directory
            with open(file_path, 'rb') as f:
                content = f.read()
            custom_data.extend(process_file_content(file_name, content))
    return custom_data

# Function to format search results nicely
def format_search_results(results):
    formatted_results = []
    for idx, result in enumerate(results, start=1):
        formatted_results.append(f"Result {idx}:")
        formatted_results.append(f"Score: {result['score']:.4f}")
        formatted_results.append(f"Text:\n{result['text']}\n")  # Display full text
    return "\n".join(formatted_results)

# Placeholder for uploaded files
uploaded_files = {}

# Upload button
upload_button = widgets.Button(description="Upload Files")
output = widgets.Output()

# Function to handle upload button click
def on_upload_button_click(b):
    global uploaded_files
    with output:
        clear_output()
        uploaded_files.update(upload_files())
        print("Files uploaded successfully.")

upload_button.on_click(on_upload_button_click)

# Submit button
submit_button = widgets.Button(description="Process Files")

# Function to handle submit button click
def on_submit_button_click(b):
    global embeddings
    all_custom_data = process_saved_files()

    # Index the data
    embeddings.index(stream_from_content(all_custom_data, "text", len(all_custom_data)))

    # Function to display all content
    def display_all_content(data):
        for idx, item in enumerate(data, start=1):
            print(f"Document {idx}:\n{item['text']}\n")

    # Display all content
    with output:
        clear_output()
        print("All content:")
        display_all_content(all_custom_data)

submit_button.on_click(on_submit_button_click)

# Search button
search_button = widgets.Button(description="Search")

# Text box for user to input search query
search_box = widgets.Text(placeholder="Enter search query here")

# Function to handle search button click
def on_search_button_click(b):
    global embeddings
    query = search_box.value
    if not query:
        with output:
            clear_output()
            print("No search query entered.")
        return

    # Define search function
    def search(query):
        try:
            results = embeddings.search(query, limit=50)
            return [{"score": result["score"], "text": result["text"]} for result in results]
        except Exception as e:
            print(f"An error occurred during search: {e}")
            return []

    # Search and display results
    results = search(query)

    # Format and print search results
    with output:
        clear_output()
        print("Search results:")
        formatted_results = format_search_results(results)
        print(formatted_results)

search_button.on_click(on_search_button_click)

# Display buttons, search box, and output
display(upload_button, submit_button, search_box, search_button, output)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/629 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/69.6M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/314 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

Button(description='Upload Files', style=ButtonStyle())

Button(description='Process Files', style=ButtonStyle())

Text(value='', placeholder='Enter search query here')

Button(description='Search', style=ButtonStyle())

Output()