<a href="https://colab.research.google.com/github/arielcintra/smart_bot_boy/blob/main/smart_boy_bot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
!pip install Flask sentence-transformers pymongo Werkzeug python-docx openpyxl PyPDF2 requests beautifulsoup4 lxml Pillow torch pytesseract

Collecting pytesseract
  Downloading pytesseract-0.3.13-py3-none-any.whl.metadata (11 kB)
Downloading pytesseract-0.3.13-py3-none-any.whl (14 kB)
Installing collected packages: pytesseract
Successfully installed pytesseract-0.3.13


In [38]:
import os
import json
from flask import Flask, request, render_template, jsonify
from sentence_transformers import SentenceTransformer, util
from pymongo import MongoClient
from werkzeug.utils import secure_filename
import docx
import openpyxl
import csv
import PyPDF2
import requests
from bs4 import BeautifulSoup
from PIL import Image
from abc import ABC, abstractmethod
from pytesseract import image_to_string

In [39]:
class BaseTextExtractor(ABC):
    @abstractmethod
    def extract_text(self, file):
        pass

In [40]:
class TxtTextExtractor(BaseTextExtractor):
    def extract_text(self, file):
        return file.read().decode("utf-8")

In [41]:
class DocxTextExtractor(BaseTextExtractor):
    def extract_text(self, file):
        doc = docx.Document(file)
        return "\n".join(para.text for para in doc.paragraphs)

In [42]:
class XlsxTextExtractor(BaseTextExtractor):
    def extract_text(self, file):
        workbook = openpyxl.load_workbook(file)
        sheet = workbook.active
        return "\n".join(" ".join(str(cell) for cell in row) for row in sheet.iter_rows(values_only=True))

In [44]:
class CsvTextExtractor(BaseTextExtractor):
    def extract_text(self, file):
        reader = csv.reader(file.read().decode('utf-8').splitlines())
        return "\n".join(" ".join(row) for row in reader)

In [45]:
class PdfTextExtractor(BaseTextExtractor):
    def extract_text(self, file):
        reader = PyPDF2.PdfReader(file)
        return "\n".join(page.extract_text() for page in reader.pages)

In [46]:
class ImageTextExtractor(BaseTextExtractor):
    def extract_text(self, file):
        try:
            from pytesseract import image_to_string
            image = Image.open(file)
            return image_to_string(image)
        except ImportError:
            return "OCR library (pytesseract) not installed. Cannot extract text from images."

In [47]:
class TextExtractorFactory:
    @staticmethod
    def get_extractor(extension):
        extractors = {
            'txt': TxtTextExtractor(),
            'docx': DocxTextExtractor(),
            'xlsx': XlsxTextExtractor(),
            'csv': CsvTextExtractor(),
            'pdf': PdfTextExtractor(),
            'jpg': ImageTextExtractor(),
            'jpeg': ImageTextExtractor(),
            'png': ImageTextExtractor(),
        }
        return extractors.get(extension)

In [48]:
class DocumentRepository:
    def __init__(self):
        self.client = MongoClient("mongodb://localhost:27017/")
        self.db = self.client['chatbot_db']
        self.collection = self.db['documents']

    def insert_document(self, text, embedding):
        self.collection.insert_one({
            "text": text,
            "embedding": embedding.tolist()
        })

    def find_all_documents(self):
        return self.collection.find()

In [49]:
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
repository = DocumentRepository()

In [50]:
class DocumentService:
    @staticmethod
    def process_file(file):
        extension = file.filename.rsplit('.', 1)[1].lower()
        extractor = TextExtractorFactory.get_extractor(extension)
        if extractor:
            text = extractor.extract_text(file)
            embedding = model.encode(text, convert_to_tensor=True)
            repository.insert_document(text, embedding)
            return text
        return "Unsupported file format."

    @staticmethod
    def extract_text_from_link(url):
        try:
            response = requests.get(url)
            if response.status_code == 200:
                soup = BeautifulSoup(response.content, 'html.parser')
                paragraphs = soup.find_all('p')
                text = "\n".join([para.get_text() for para in paragraphs])
                return text
            else:
                return "Failed to retrieve the webpage."
        except Exception as e:
            return f"An error occurred: {e}"

    @staticmethod
    def process_link(url):
        text = extract_text_from_link(url)
        if text:
            embedding = model.encode(text, convert_to_tensor=True)
            repository.insert_document(text, embedding)
            return "Link content processed and stored successfully!"
        return "No content retrieved from the URL."

    @staticmethod
    def search_answer(question):
        question_embedding = model.encode(question, convert_to_tensor=True)
        documents = repository.find_all_documents()
        similarities = []

        for doc in documents:
            doc_embedding = doc['embedding']
            similarity = util.pytorch_cos_sim(question_embedding, model.encode([doc_embedding], convert_to_tensor=True))[0][0]
            similarities.append((doc['text'], similarity))

        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[0][0] if similarities else "No relevant information found."

In [52]:
app = Flask(__name__)

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/upload', methods=['POST'])
def upload_file():
    if 'file' not in request.files:
        return "No file uploaded", 400

    file = request.files['file']
    if file.filename == '':
        return "No file selected", 400

    text = DocumentService.process_file(file)
    return f"Extracted Text:\n{text}"

@app.route("/upload_link", methods=["POST"])
def upload_link():
    link = request.form.get("url")
    if link:
        response = DocumentService.process_link(link)
        return response, 200
    return "Invalid URL", 400

@app.route("/api/messages", methods=["POST"])
def messages():
    user_message = request.json.get("text", "")
    if user_message:
        answer = DocumentService.search_answer(user_message)
        return jsonify({"text": answer}), 200
    return jsonify({"text": "No message received."}), 400

In [None]:
if __name__ == "__main__":
    app.run(debug=True, port=3978)

 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on http://127.0.0.1:3978
INFO:werkzeug:[33mPress CTRL+C to quit[0m
INFO:werkzeug: * Restarting with stat
