In [7]:
pip install -q PyPDF2


Note: you may need to restart the kernel to use updated packages.


In [11]:
# import base64
# import email
# from googleapiclient.discovery import build
# from google.oauth2.credentials import Credentials
# from bs4 import BeautifulSoup
# import base64
# import re
# from sentence_transformers import SentenceTransformer
# import chromadb
# import tiktoken
# from chromadb.utils import embedding_functions
# import gradio as gr
# import os
# import shutil
# ---------------------------------------------------
import os
import gradio as gr
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.schema import Document

In [None]:
MODEL = "gpt-4o-mini"
db_name = "vector_db"
load_dotenv(override=True)
os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY', 'your-key-if-not-using-env')

In [21]:
def chunk_doc(embeding_ready, file_path):
    source_name = os.path.basename(file_path)
    
    doc = Document(page_content=embedding_ready, metadata={"doc_name": source_name})

    text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
    chunks = text_splitter.split_documents([doc])

    for chunk in chunks:
        chunk.metadata["doc_name"] = source_name

    return chunks

In [24]:
# get file from user
user_file = None

def get_file_extension(file_path):
    """Zwraca rozszerzenie pliku, np. '.txt'."""
    return os.path.splitext(file_path)[1].lower()

def get_file_size_kb(file_path):
    """Zwraca rozmiar pliku w KB (z dokładnością do dwóch miejsc po przecinku)."""
    size_bytes = os.path.getsize(file_path)
    size_kb = size_bytes / 1024
    return round(size_kb, 2)

def detect_file_type(extension):
    """Rozpoznaje typ pliku na podstawie rozszerzenia."""
    types = {
        '.txt': 'Text file',
        '.pdf': 'PDF file',
        '.md': 'Markdown file',
        '.csv': 'CSV file'
    }
    return types.get(extension, 'Unknown file type')

def get_file_from_user(file):
    """Główna funkcja – przyjmuje plik, sprawdza typ i rozmiar, zwraca opis jako tekst."""
    if file is None:
        return "No file uploaded."
    
    file_path = file.name  # Gradio File obj ma .name = path do pliku
    global user_file
    user_file = file_path  # ustawiamy globalnie, żeby process_user_file miało dostęp
    extension = get_file_extension(file_path)
    file_type = detect_file_type(extension)
    file_size_kb = get_file_size_kb(file_path)

    result = f"File type: {file_type}\nFile size: {file_size_kb} KB"
    return result

def process_user_file():
    global user_file
    if user_file is None:
        return "No file to process. Please upload a file first."

    file_size_kb = get_file_size_kb(user_file)
    if file_size_kb > 10240:  # 10 MB = 10240 KB
        return f"File size: {file_size_kb} KB. Max file size is 10MB."

    extension = get_file_extension(user_file)
    file_type = detect_file_type(extension)

    # Przygotowanie pliku w zależności od typu (tu prosta symulacja)
    if extension == '.txt' or extension == '.md':
        with open(user_file, 'r', encoding='utf-8') as f:
            content = f.read()
        embedding_ready = content  # np. surowy tekst dla embeddingu
        info = f"File ready for embedding. Type: {file_type}."
    elif extension == '.csv':
        import pandas as pd
        df = pd.read_csv(user_file)
        embedding_ready = df.to_json()  # np. konwersja dataframe do json stringa
        info = f"CSV file converted to JSON for embedding."
    elif extension == '.pdf':
        try:
            from PyPDF2 import PdfReader
            reader = PdfReader(user_file)
            content = ''
            for page in reader.pages:
                content += page.extract_text() + '\n'
            embedding_ready = content
            info = f"PDF text extracted for embedding."
        except Exception as e:
            return f"Error processing PDF: {str(e)}"
    else:
        return "Unsupported file type for embedding."

    chunks = chunk_doc(embedding_ready, user_file)
    return len(chunks)

with gr.Blocks() as ui:
    gr.Markdown("## Load your files and perform semantic search")

    with gr.Row(equal_height=True):
        with gr.Column(elem_id="col2"):
            file_input = gr.File(label="Upload your file")
        with gr.Column():
            file_output = gr.Textbox(label="File info", interactive=False)
            file_btn = gr.Button("Check file")
            embed_btn = gr.Button("Add to VectorBD")
    
    file_btn.click(get_file_from_user, inputs=[file_input], outputs=[file_output])
    embed_btn.click(process_user_file, outputs=[file_output])

ui.launch()



* Running on local URL:  http://127.0.0.1:7868
* To create a public link, set `share=True` in `launch()`.




Traceback (most recent call last):
  File "C:\Users\tromb\anaconda3\envs\projects\Lib\site-packages\gradio\queueing.py", line 625, in process_events
    response = await route_utils.call_process_api(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\tromb\anaconda3\envs\projects\Lib\site-packages\gradio\route_utils.py", line 322, in call_process_api
    output = await app.get_blocks().process_api(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\tromb\anaconda3\envs\projects\Lib\site-packages\gradio\blocks.py", line 2218, in process_api
    result = await self.call_function(
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\tromb\anaconda3\envs\projects\Lib\site-packages\gradio\blocks.py", line 1729, in call_function
    prediction = await anyio.to_thread.run_sync(  # type: ignore
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\tromb\anaconda3\envs\projects\Lib\site-packages\anyio\to_thread.py", line 56, in run_

In [3]:
def clean_html(raw_html):
    """Removes HTML tags and returns clean text."""
    soup = BeautifulSoup(raw_html, "html.parser")
    return soup.get_text()

def remove_links(text):
    """Removes all HTTP/HTTPS links from the text."""
    url_pattern = r'https?://\S+'
    return re.sub(url_pattern, '', text)

def get_header(headers, name):
    """Helper function to extract a specific header (e.g., Subject, From, Date)."""
    for header in headers:
        if header['name'].lower() == name.lower():
            return header['value']
    return None

def clean_email_body(text):
    """Removes soft hyphens, zero-width spaces, and similar invisible characters from the text."""
    cleaned = re.sub(
        r'[\u2007\u200b\u200d\u200e\u200f\u202a-\u202e\u2060\u2061\u2062\u2063\u2064\ufeff\u00ad]', 
        '', 
        text
    )
    return cleaned

def get_message_body(payload):
    """
    Recursively extracts the email body (text part only),
    decodes it, cleans it from invisible characters, and removes links.
    """
    if 'parts' in payload:
        for part in payload['parts']:
            if part['mimeType'] == 'text/plain':
                data = part['body'].get('data')
                if data:
                    decoded_data = base64.urlsafe_b64decode(data).decode('utf-8')
                    decoded_data = clean_email_body(decoded_data)
                    clean_text = remove_links(decoded_data)
                    return clean_text
            else:
                # Recursively check nested parts
                result = get_message_body(part)
                if result:
                    return remove_links(result)
    else:
        if payload.get('mimeType') == 'text/plain':
            data = payload['body'].get('data')
            if data:
                decoded_data = base64.urlsafe_b64decode(data).decode('utf-8')
                decoded_data = clean_email_body(decoded_data)
                return remove_links(decoded_data)
    return ""

def store_messages_in_dict(messages, service):
    """
    Fetches messages and stores them in a list of dictionaries.
    Each dictionary contains: id, date, sender, subject, and cleaned body text.
    """
    emails_data = [] 
    if not messages:
        print('No emails found.')
    else:
        for msg in messages:
            msg_data = service.users().messages().get(userId='me', id=msg['id'], format='full').execute()
            payload = msg_data.get('payload', {})
            headers = payload.get('headers', [])

            email = {
                'id': msg['id'],
                'date': get_header(headers, 'Date'),
                'from': get_header(headers, 'From'),
                'title': get_header(headers, 'Subject'),
                'body': clean_html(get_message_body(payload))
            }

            emails_data.append(email)
    return emails_data


In [4]:
model_name = "all-MiniLM-L6-v2"
token_limit = 300
embedding_model = SentenceTransformer(model_name)
tokenizer = tiktoken.get_encoding("cl100k_base")

client = chromadb.PersistentClient(path="chroma_db")
collection = client.get_or_create_collection("emails_collection")

def chunk_text(text, max_tokens=token_limit):
    """Splits the text into chunks limited by the number of tokens."""
    tokens = tokenizer.encode(text)
    return [tokenizer.decode(tokens[i:i + max_tokens]) for i in range(0, len(tokens), max_tokens)]

def embed_and_store_email(email):
    """Embeds and stores a single email into ChromaDB."""
    doc_id = email['id']
    metadata_base = {
        "email_id": doc_id,
        "from": email['from'],
        "date": email['date']
    }

    # Title embedding
    title_embedding = embedding_model.encode([email['title']])[0]
    collection.add(
        documents=[email['title']],
        metadatas=[{**metadata_base, "part": "title"}],
        ids=[f"{doc_id}_title"],
        embeddings=[title_embedding.tolist()]
    )

    # Body embedding with chunking
    body_chunks = chunk_text(email['body'])
    for idx, chunk in enumerate(body_chunks):
        chunk_embedding = embedding_model.encode([chunk])[0]
        collection.add(
            documents=[chunk],
            metadatas=[{**metadata_base, "part": f"body_chunk_{idx}"}],
            ids=[f"{doc_id}_body_chunk_{idx}"],
            embeddings=[chunk_embedding.tolist()]
        )

def process_and_store_emails(emails):
    """Processes a list of emails: embedding + storing into ChromaDB."""
    if not emails:
        return "No emails to process."

    for email in emails:
        embed_and_store_email(email)

    return f"Stored {len(emails)} emails in ChromaDB."

In [5]:
def get_emails_from_gmail(maxEmails, batch_size=100):
    creds = Credentials.from_authorized_user_file('token.json', ['https://www.googleapis.com/auth/gmail.readonly'])
    service = build('gmail', 'v1', credentials=creds)

    all_messages = []
    next_page_token = None

    while len(all_messages) < maxEmails:
        remaining = maxEmails - len(all_messages)
        this_batch_size = min(batch_size, remaining)  # the last batch may be smaller than batch_size

        results = service.users().messages().list(
            userId='me',
            maxResults=this_batch_size,
            pageToken=next_page_token
        ).execute()

        messages = results.get('messages', [])
        all_messages.extend(messages)

        next_page_token = results.get('nextPageToken')

        if not next_page_token:
            break  # no more emails to fetch

    emails_data = store_messages_in_dict(all_messages, service)
    process_and_store_emails(emails_data)
    return (f'Done for {len(all_messages)} records')

def save_credentials(file):
    if file is None:
        return "No file to save."

    filename = file.name.split("/")[-1] 
    if filename != "credentials.json":
        return "Incorrect file name. Expected: credentials.json"
 
    dest_path = os.path.join(os.getcwd(), "credentials.json")
    shutil.copy(file.name, dest_path)
    return f"File saved as {dest_path}"
    

In [12]:
def semantic_search(query, num_results=5):
    """
    Wyszukuje semantycznie podobne emaile w bazie ChromaDB
    """
    try:
        if not query.strip():
            return "Proszę wprowadzić zapytanie."
        
        # Sprawdź czy kolekcja ma jakieś dokumenty
        collection_count = collection.count()
        if collection_count == 0:
            return "Baza danych jest pusta. Najpierw pobierz emaile z Gmail."
        
        # Wykonaj wyszukiwanie semantyczne
        results = collection.query(
            query_texts=[query],
            n_results=min(num_results, collection_count)
        )
        
        if not results['documents'] or not results['documents'][0]:
            return "Nie znaleziono żadnych wyników dla tego zapytania."
        
        # Formatuj wyniki
        formatted_results = []
        documents = results['documents'][0]
        metadatas = results['metadatas'][0] if results['metadatas'] else [{}] * len(documents)
        distances = results['distances'][0] if results['distances'] else [0] * len(documents)

        SIMILARITY_THRESHOLD = 1.2 
        for i, (doc, metadata, distance) in enumerate(zip(documents, metadatas, distances)):
            if distance > SIMILARITY_THRESHOLD:
                if i == 0:  # jeśli nawet pierwszy wynik jest słaby
                    return f"Nie znaleziono podobnych emaili dla zapytania '{query}'. \nNajlepszy wynik miał odległość {distance:.3f}, co wskazuje na słabe dopasowanie."
                break  # przestań dodawać słabe wyniki
            result_text = f"**Wynik {i+1}** (odległość: {distance:.3f})\n"
            
            # Dodaj metadane jeśli są dostępne
            if metadata:
                if 'subject' in metadata:
                    result_text += f"**Temat:** {metadata['subject']}\n"
                if 'sender' in metadata:
                    result_text += f"**Nadawca:** {metadata['sender']}\n"
                if 'date' in metadata:
                    result_text += f"**Data:** {metadata['date']}\n"
            
            # Dodaj fragment treści (ograniczone do 300 znaków)
            content_preview = doc[:300] + "..." if len(doc) > 300 else doc
            result_text += f"**Treść:** {content_preview}\n"
            result_text += "-" * 50 + "\n"
            
            formatted_results.append(result_text)
        
        return "\n".join(formatted_results)
        
    except Exception as e:
        return f"Błąd podczas wyszukiwania: {str(e)}"

def get_collection_stats():
    """
    Zwraca statystyki kolekcji
    """
    try:
        count = collection.count()
        return f"Liczba emaili w bazie: {count}"
    except Exception as e:
        return f"Błąd przy pobieraniu statystyk: {str(e)}"


In [4]:
with gr.Blocks() as ui:
    gr.Markdown("## Load your files and perform semantic search")

    with gr.Row(equal_height=True):
        with gr.Column(elem_id="col2"):
            file_input = gr.File(label="Upload your file")  # file upload input for credentials
        with gr.Column():
            file_output = gr.Textbox(label="File info", interactive=False)
            file_btn = gr.Button("Check file")
            embed_btn = gr.Button("Add to VectorBD")
    
    file_btn.click(get_file_from_user, inputs=[file_input], outputs=[file_output])
    embed_btn.click(process_user_file, outputs=[file_output])
    

    with gr.Row():
        email_input = gr.Number(label="Number of emails to fetch", value=10)  # input for number of emails to download
        email_output = gr.Textbox(label="Download status")  # output box to show download status

    email_btn = gr.Button("Fetch emails")  # button to trigger fetching emails
    email_btn.click(get_emails_from_gmail, inputs=[email_input], outputs=[email_output])

    gr.Markdown("### Semantic search")
    
    with gr.Row():
        with gr.Column():
            stats_output = gr.Textbox(label="Database statistics", interactive=False)
            stats_btn = gr.Button("Refresh statistics")
            stats_btn.click(get_collection_stats, outputs=[stats_output])
    
    with gr.Row():
        with gr.Column():
            search_query = gr.Textbox(
                label="Query", 
                placeholder="E.g. 'meeting next week' or 'invoice for January'",
                lines=2
            )
            num_results = gr.Slider(
                label="Number of results", 
                minimum=1, 
                maximum=20, 
                value=5, 
                step=1
            )
            search_btn = gr.Button("Search", variant="primary")
        
        with gr.Column():
            search_results = gr.Textbox(
                label="Search results", 
                lines=15, 
                interactive=False
            )
    
    search_btn.click(
        semantic_search, 
        inputs=[search_query, num_results], 
        outputs=[search_results]
    )
    
ui.launch()


NameError: name 'get_emails_from_gmail' is not defined