In [None]:
#RISPONDE SEMPRE A FASE PRECEDENTE E SUCCESSIVA (DA UTILIZZARE)
#Non risponde a domande tipo:"quale è il primo passo?", "quale è la prima cosa da fare?", per chiedere una fase specifica va chiesta "(frase) fase 1"
#E' un chatbot RAG che utilizza un linguaggio LLM per comprendere meglio il contesto, la domanda. In questi contesti, dove l'assistente deve prendere informazioni da manuali sono
#più adatti i chatbot RAG, mentre gli LLM per assistenti che devono rispondere a domande generiche e quindi hanno bisogno di essere addestrati con più informazioni.


import requests
import re
import ipywidgets as widgets
from IPython.display import display
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Configura l'API di Falcon
API_URL = "https://api-inference.huggingface.co/models/tiiuae/falcon-7b-instruct"
headers = {"Authorization": "Bearer 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"}

def query_falcon(prompt):
    response = requests.post(API_URL, headers=headers, json={"inputs": prompt})
    if response.status_code == 200:
        return response.json()
    else:
        return {"error": "Request failed with status code " + str(response.status_code)}

def preprocess_text(text):
    return text.lower()

class ManualLoader:
    def __init__(self, manual_paths):
        self.manuals = {}
        self.load_manuals(manual_paths)

    def load_manuals(self, paths):
        for name, path in paths.items():
            with open(path, 'r', encoding='utf-8') as file:
                content = file.read()
                self.manuals[name] = self._split_into_chunks(content)

    def _split_into_chunks(self, text):
        chunks = [preprocess_text(chunk.strip()) for chunk in text.split('---') if chunk.strip()]
        return chunks

class AssemblyAssistantBot:
    def __init__(self, manuals):
        self.manual_loader = ManualLoader(manuals)
        self.vectorizer = TfidfVectorizer()
        self._fit_vectorizer()
        self.sections = {}
        self.current_phase = {}  # Memorizza la fase corrente per ciascun prodotto

    def _fit_vectorizer(self):
        all_chunks = []
        for chunks in self.manual_loader.manuals.values():
            all_chunks.extend(chunks)
        self.vectorizer.fit(all_chunks)

    def _associate_phases(self, product):
        chunks = self.manual_loader.manuals.get(product, [])
        self.sections[product] = {}

        for i, chunk in enumerate(chunks):
            phase_match = re.search(r'\b(\d+)\b', chunk)
            if phase_match:
                phase_number = phase_match.group().strip()
                if phase_number not in self.sections[product]:
                    self.sections[product][phase_number] = (i, chunk)

    def _get_current_phase(self, product):
        """Restituisce la fase corrente del prodotto."""
        return self.current_phase.get(product)

    def _set_current_phase(self, product, phase):
        """Imposta la fase corrente per il prodotto."""
        self.current_phase[product] = phase

    def _get_next_phase(self, product):
        """Trova la fase successiva."""
        current_phase = self._get_current_phase(product)
        if current_phase is None:
            return ["Non è stata identificata una fase corrente."]
        
        sorted_phases = sorted(self.sections[product].items(), key=lambda x: int(x[0]))
        for phase, (order, content) in sorted_phases:
            if int(phase) == current_phase + 1:
                self._set_current_phase(product, int(phase))  # Aggiorna la fase corrente
                return [content]
        
        return ["Non ci sono fasi successive disponibili."]

    def _get_previous_phase(self, product):
        """Trova la fase precedente."""
        current_phase = self._get_current_phase(product)
        if current_phase is None:
            return ["Non è stata identificata una fase corrente."]
        
        sorted_phases = sorted(self.sections[product].items(), key=lambda x: int(x[0]))
        for phase, (order, content) in sorted_phases:
            if int(phase) == current_phase - 1:
                self._set_current_phase(product, int(phase))  # Aggiorna la fase corrente
                return [content]
        
        return ["Non ci sono fasi precedenti disponibili."]

    def _update_current_phase_from_chunk(self, product, chunk):
        """Aggiorna la fase corrente in base all'indice del chunk."""
        for phase, (index, content) in self.sections[product].items():
            if content == chunk:
                self._set_current_phase(product, int(phase))
                break

    def handle_query(self, product, query):
        if product not in self.manual_loader.manuals:
            return ["Manuale non trovato."]
        
        self._associate_phases(product)

        if "fase successiva" in query.lower():
            phase_response = self._get_next_phase(product)
            return phase_response

        if "fase precedente" in query.lower():
            phase_response = self._get_previous_phase(product)
            return phase_response

        current_phase = self._get_current_phase(product)
        phase_number_in_query = self._get_current_phase_from_query(query)

        if phase_number_in_query is not None:
            self._set_current_phase(product, phase_number_in_query)

        responses_map = {}
        general_responses = []

        queries = [q.strip() for q in re.split(r'\s+e\s+|\s+and\s+', query, flags=re.IGNORECASE)]
        for q in queries:
            if self._is_phase_query(q):
                phase_response = self._search_phase(product, q)
                if phase_response:
                    responses_map[q] = phase_response
            else:
                section_response = self._search_section(product, q)
                if section_response:
                    general_responses.append((q, section_response))

        ordered_responses = []
        for q in queries:
            if q in responses_map:
                ordered_responses.extend(responses_map[q])
            elif any(q.lower() in item[0].lower() for item in general_responses):
                for item in general_responses:
                    if q.lower() in item[0].lower():
                        ordered_responses.extend(item[1])

        combined_response = ordered_responses
        if combined_response:
            falcon_prompt = f"Given the following information from the manual: {', '.join(combined_response)}, provide a precise answer to the question: {query}"
            falcon_response = query_falcon(falcon_prompt)

            if 'generated_text' in falcon_response:
                response_text = falcon_response['generated_text'].strip()
                self._update_current_phase_from_chunk(product, combined_response[-1])  # Aggiorna la fase corrente
                return combined_response + [self._filter_falcon_response(response_text, query)]
            else:
                return combined_response

        return combined_response if combined_response else ["Nessuna informazione trovata."]

    def _get_current_phase_from_query(self, query):
        """Estrae la fase corrente dalla query."""
        match = re.search(r'\b\d+\b', query)
        if match:
            return int(match.group())
        return None

    def _is_phase_query(self, query):
        """Determina se la query riguarda una fase del manuale utilizzando solo numeri."""
        return bool(re.search(r'\b\d+\b', query))

    def _search_phase(self, product, query):
        query = preprocess_text(query)
        if product not in self.sections:
            return ["Sezione del manuale non trovata."]
        
        sorted_phases = sorted(self.sections[product].items(), key=lambda x: x[1][0])

        results = []
        for phase, (order, content) in sorted_phases:
            if phase in query:
                results.append(content)
        
        results = self._truncate_responses(results)
        
        if results:
            self._update_current_phase_from_chunk(product, results[-1])  # Aggiorna la fase corrente
        
        return results if results else ["Nessuna informazione trovata per la fase richiesta."]

    def _search_section(self, product, query):
        chunks = self.manual_loader.manuals.get(product, [])
        if not chunks:
            return ["Nessuna informazione trovata nel manuale per questo prodotto."]
        
        include_final_assembly = any(kw in query.lower() for kw in ['assemblaggio finale', 'fase finale', 'finale'])
        if not include_final_assembly:
            chunks = [chunk for chunk in chunks if 'assemblaggio finale' not in chunk.lower()]
        
        query_vec = self.vectorizer.transform([query])
        chunk_vecs = self.vectorizer.transform(chunks)
        similarities = cosine_similarity(query_vec, chunk_vecs).flatten()

        results = [chunks[index] for index, similarity in enumerate(similarities) if similarity > 0.1]
        
        if results:
            self._update_current_phase_from_chunk(product, results[-1])  # Aggiorna la fase corrente
        
        results = self._truncate_responses(results)
        
        return results if results else ["Nessuna informazione trovata."]

    def _filter_falcon_response(self, response, query):
        if 'assemblaggio finale' in query.lower():
            return response
        
        pattern = re.compile(r'assemblaggio finale dei moduli.*', re.DOTALL)
        filtered_response = pattern.split(response)[0].strip()
        
        return filtered_response

    def _truncate_responses(self, responses, max_length=2000):
        truncated_responses = []
        for response in responses:
            if len(response) > max_length:
                truncated_responses.append(response[:max_length] + '...')
            else:
                truncated_responses.append(response)
        return truncated_responses

# Percorsi ai manuali
manuals = {
    "Treno Arancio": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\TrenoArancio.txt",
    "Treno Viola": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\TrenoViola.txt",
    "Giraffa": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\Giraffa.txt"
}

# Creazione del bot
bot = AssemblyAssistantBot(manuals)

# Menu a tendina per la selezione del prodotto
product_dropdown = widgets.Dropdown(
    options=[('Seleziona un prodotto', None)] + [(name, name) for name in manuals.keys()],
    value=None,
    description='Prodotto:',
)

# Campo di inserimento per la domanda
query_input = widgets.Text(
    value='',
    placeholder='Cosa vuoi fare?',
    description='AssemblyBot:',
)

# Pulsante di invio
submit_button = widgets.Button(
    description='Invia',
    button_style='primary',
)
submit_button.on_click(lambda _: on_submit())

# Finestra di conversazione (modificata per visualizzare solo la risposta corrente)
conversation_output = widgets.Textarea(
    value='',
    placeholder='',
    description='Conversazione:',
    layout={'width': '100%', 'height': '400px'},
    style={'description_width': 'initial'}
)

def on_submit():
    product = product_dropdown.value
    query = query_input.value
    if product and query:
        response = bot.handle_query(product, query)
        
        # Unisci gli elementi della lista di risposte in una singola stringa
        response_text = "\n".join(response)
        
        # Mostra solo la risposta corrente, non le precedenti
        conversation_output.value = f"Tu: {query}\n\nBot: {response_text}\n"

        query_input.value = ''


# Layout dell'interfaccia
display(widgets.VBox([product_dropdown, query_input, submit_button, conversation_output]))


In [None]:
#Ho modificato la struttuta dei manuali per evitare i conflitti. In particolare nei moduli del treno arancio e viola nella parte della base anteriore
#ho elimitato la frase assemblaggio finale poichè per eliminare il problema della presenza costante dell'assemblaggio finale nella risposta, in questo codice
#c'è una funzione che elimina la parte di testo che contiene "assemblaggio finale" dalle risposte del chatbot o a meno che non vengia chiesto specificatamente dall'autente.
#Questa funzione però non permetteva al cahtbot di dare istruzioni riguardo la base anteriore perchè nel manuale, in quella sezione, copariva la frase assemblaggio finale.
#In più ho modificato l'interfaccia perchè con l'altro codice le frasi troppo lunge le troncava con dei puntini di sospensione, in questo modo la risposta si legge completamente.
#NB:quando l'utente pone la domanda il chatbot impiega qualche secondo a restituire la risposta.
#(NON UTILIZZARE)
import requests
import re
import ipywidgets as widgets
from IPython.display import display
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Configura l'API di GPT-2
API_URL = "https://api-inference.huggingface.co/models/gpt2"
headers = {"Authorization": "Bearer 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"}

def query_gpt2(prompt):
    response = requests.post(API_URL, headers=headers, json={"inputs": prompt})
    if response.status_code == 200:
        return response.json()
    else:
        return {"error": "Request failed with status code " + str(response.status_code)}

def preprocess_text(text):
    return text.lower()

class ManualLoader:
    def __init__(self, manual_paths):
        self.manuals = {}
        self.load_manuals(manual_paths)

    def load_manuals(self, paths):
        for name, path in paths.items():
            with open(path, 'r', encoding='utf-8') as file:
                content = file.read()
                self.manuals[name] = self._split_into_chunks(content)

    def _split_into_chunks(self, text):
        chunks = [preprocess_text(chunk.strip()) for chunk in text.split('---') if chunk.strip()]
        return chunks

class AssemblyAssistantBot:
    def __init__(self, manuals):
        self.manual_loader = ManualLoader(manuals)
        self.vectorizer = TfidfVectorizer()
        self._fit_vectorizer()
        

    def _fit_vectorizer(self):
        all_chunks = []
        for chunks in self.manual_loader.manuals.values():
            all_chunks.extend(chunks)
        self.vectorizer.fit(all_chunks)

    def handle_query(self, product, query):
        # Ricerca nei manuali
        response_from_manuals = self._search_in_manuals(product, query)
        
        # Usa GPT-2 per arricchire la risposta
        if response_from_manuals:
            gpt_prompt = f"Given the following information from the manual: {', '.join(response_from_manuals)}, provide a precise answer to the question: {query}"
            gpt_response = query_gpt2(gpt_prompt)
        
            if 'generated_text' in gpt_response:
                return response_from_manuals + [self._filter_gpt_response(gpt_response['generated_text'].strip(), query)]
        
        return response_from_manuals

    def _search_in_manuals(self, product, query):
        query = preprocess_text(query)
        if product not in self.manual_loader.manuals:
            return ["Manuale non trovato."]
        
        chunks = self.manual_loader.manuals.get(product, [])
        if not chunks:
            return ["Nessuna informazione trovata nel manuale per questo prodotto."]
        
        # Filtra la sezione "assemblaggio finale" se non richiesta esplicitamente
        include_final_assembly = any(kw in query.lower() for kw in ['assemblaggio finale', 'fase finale', 'finale'])
        if not include_final_assembly:
            chunks = [chunk for chunk in chunks if 'assemblaggio finale' not in chunk.lower()]
        
        # Trova i chunk più simili alla query
        query_vec = self.vectorizer.transform([query])
        chunk_vecs = self.vectorizer.transform(chunks)
        similarities = cosine_similarity(query_vec, chunk_vecs).flatten()

        # Riduci la soglia per trovare più risposte
        results = [chunks[index] for index, similarity in enumerate(similarities) if similarity > 0.1]
        
        # Riduci la lunghezza delle risposte
        results = self._truncate_responses(results)
        
        return results if results else ["Nessuna informazione trovata."]

    def _filter_gpt_response(self, response, query):
        # Includi la sezione "assemblaggio finale" solo se esplicitamente richiesta
        if 'assemblaggio finale' in query.lower():
            return response  # Mantieni l'intera risposta se richiesta
        
        # Altrimenti, filtra la sezione
        pattern = re.compile(r'assemblaggio finale dei moduli.*', re.DOTALL)
        filtered_response = pattern.split(response)[0].strip()
        
        return filtered_response

    def _truncate_responses(self, responses, max_length=2000):
        truncated_responses = []
        for response in responses:
            if len(response) > max_length:
                truncated_responses.append(response[:max_length] + '...')
            else:
                truncated_responses.append(response)
        return truncated_responses

def on_submit(_):
    product = product_dropdown.value
    query = query_input.value
    if product and query:
        response = bot.handle_query(product, query)
        conversation_output.value = f"Tu: {query}\n\n" + "\n\n".join([f"Bot: {info}" for info in response])
        query_input.value = ""
    else:
        conversation_output.value = "Bot: Seleziona un prodotto e inserisci una domanda."

# Percorsi ai manuali
manuals = {
    "Treno Arancio": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\TrenoArancio.txt",
    "Treno Viola": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\TrenoViola.txt",
    "Giraffa": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\Giraffa.txt"
}

# Creazione del bot
bot = AssemblyAssistantBot(manuals)

# Menu a tendina per la selezione del prodotto
product_dropdown = widgets.Dropdown(
    options=[('Seleziona un prodotto', None)] + [(name, name) for name in manuals.keys()],
    value=None,
    description='Prodotto:',
)

# Campo di inserimento per la domanda
query_input = widgets.Text(
    value='',
    placeholder='Inserisci la tua domanda',
    description='Domanda:',
)

# Pulsante di invio
submit_button = widgets.Button(
    description='Invia',
    button_style='primary',
)
submit_button.on_click(on_submit)

# Finestra di conversazione (modificata per visualizzare tutto il testo)
conversation_output = widgets.Textarea(
    value='',
    placeholder='Le risposte verranno visualizzate qui...',
    description='Conversazione:',
    layout={'width': '100%', 'height': '400px'},
    style={'description_width': 'initial'}
)

# Visualizza tutti i widget
display(product_dropdown, query_input, submit_button, conversation_output)


In [None]:
#risponde sia alle fasi: poiche ad ogni sezione è stata associata una fase in ordine crescente sia se si chiede nello specifico il modulo.
#devono essere effettuate delle richieste che contengano fase 1, fase 2 ecc o il nome del modulo.
#Risponde se gli cheidi fase 1 e fase 2. Se gli chied fase 1 e 2 risponde solo con le istruzioni della fase 1.
#(NON UTILIZZARE)

import requests
import re
import ipywidgets as widgets
from IPython.display import display
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Configura l'API di GPT-2
API_URL = "https://api-inference.huggingface.co/models/gpt2"
headers = {"Authorization": "Bearer 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"}

def query_gpt2(prompt):
    response = requests.post(API_URL, headers=headers, json={"inputs": prompt})
    if response.status_code == 200:
        return response.json()
    else:
        return {"error": "Request failed with status code " + str(response.status_code)}

def preprocess_text(text):
    return text.lower()

class ManualLoader:
    def __init__(self, manual_paths):
        self.manuals = {}
        self.load_manuals(manual_paths)

    def load_manuals(self, paths):
        for name, path in paths.items():
            with open(path, 'r', encoding='utf-8') as file:
                content = file.read()
                self.manuals[name] = self._split_into_chunks(content)

    def _split_into_chunks(self, text):
        chunks = [preprocess_text(chunk.strip()) for chunk in text.split('---') if chunk.strip()]
        return chunks

class AssemblyAssistantBot:
    def __init__(self, manuals):
        self.manual_loader = ManualLoader(manuals)
        self.vectorizer = TfidfVectorizer()
        self._fit_vectorizer()
        self.sections = {}

    def _fit_vectorizer(self):
        all_chunks = []
        for chunks in self.manual_loader.manuals.values():
            all_chunks.extend(chunks)
        self.vectorizer.fit(all_chunks)

    def _associate_phases(self, product):
        """Associa ogni chunk del manuale a una fase specifica e assegna una priorità in base all'ordine."""
        chunks = self.manual_loader.manuals.get(product, [])
        self.sections[product] = {}

        for i, chunk in enumerate(chunks):
            # Cerca la fase nel testo
            phase_match = re.search(r'fase\s*\d+', chunk)
            if phase_match:
                phase_number = phase_match.group().strip().lower()
                self.sections[product][phase_number] = (i, chunk)  # Associa il numero della fase e l'ordine
            else:
                # Associa una fase implicita basata sull'ordine
                if i > 0:
                    prev_phase_number = f"fase {i + 1}"  # Corregge il numero di fase per essere 1-based
                    self.sections[product][prev_phase_number] = (i, chunk)

    def handle_query(self, product, query):
        if product not in self.manual_loader.manuals:
            return ["Manuale non trovato."]
        
        # Associa le fasi al manuale
        self._associate_phases(product)

        # Verifica se la query riguarda una fase o una sezione specifica
        if self._is_phase_query(query):
            response_from_manuals = self._search_phase(product, query)
        else:
            response_from_manuals = self._search_section(product, query)
        
        # Usa GPT-2 per arricchire la risposta
        if response_from_manuals:
            gpt_prompt = f"Given the following information from the manual: {', '.join(response_from_manuals)}, provide a precise answer to the question: {query}"
            gpt_response = query_gpt2(gpt_prompt)
        
            if 'generated_text' in gpt_response:
                return response_from_manuals + [self._filter_gpt_response(gpt_response['generated_text'].strip(), query)]
        
        return response_from_manuals

    def _is_phase_query(self, query):
        """Determina se la query riguarda una fase del manuale."""
        return any(kw in query.lower() for kw in ['fase', 'fase finale', 'assemblaggio finale'])

    def _search_phase(self, product, query):
        """Cerca la fase richiesta nel manuale."""
        query = preprocess_text(query)
        if product not in self.sections:
            return ["Sezione del manuale non trovata."]
        
        # Ordina le sezioni per fase e ordine cronologico
        sorted_phases = sorted(self.sections[product].items(), key=lambda x: x[1][0])

        results = []
        for phase, (order, content) in sorted_phases:
            if phase in query:
                results.append(content)
        
        # Riduci la lunghezza delle risposte
        results = self._truncate_responses(results)
        
        return results if results else ["Nessuna informazione trovata per la fase richiesta."]

    def _search_section(self, product, query):
        """Cerca una sezione specifica del manuale basata sulla query."""
        chunks = self.manual_loader.manuals.get(product, [])
        if not chunks:
            return ["Nessuna informazione trovata nel manuale per questo prodotto."]
        
        # Filtra la sezione "assemblaggio finale" se non richiesta esplicitamente
        include_final_assembly = any(kw in query.lower() for kw in ['assemblaggio finale', 'fase finale', 'finale'])
        if not include_final_assembly:
            chunks = [chunk for chunk in chunks if 'assemblaggio finale' not in chunk.lower()]
        
        # Trova i chunk più simili alla query
        query_vec = self.vectorizer.transform([query])
        chunk_vecs = self.vectorizer.transform(chunks)
        similarities = cosine_similarity(query_vec, chunk_vecs).flatten()

        # Riduci la soglia per trovare più risposte
        results = [chunks[index] for index, similarity in enumerate(similarities) if similarity > 0.1]
        
        # Riduci la lunghezza delle risposte
        results = self._truncate_responses(results)
        
        return results if results else ["Nessuna informazione trovata."]

    def _filter_gpt_response(self, response, query):
        """Filtro della risposta GPT-2, mantenendo solo le informazioni rilevanti."""
        if 'assemblaggio finale' in query.lower():
            return response  # Mantieni l'intera risposta se richiesta
        
        # Altrimenti, filtra la sezione
        pattern = re.compile(r'assemblaggio finale dei moduli.*', re.DOTALL)
        filtered_response = pattern.split(response)[0].strip()
        
        return filtered_response

    def _truncate_responses(self, responses, max_length=2000):
        truncated_responses = []
        for response in responses:
            if len(response) > max_length:
                truncated_responses.append(response[:max_length] + '...')
            else:
                truncated_responses.append(response)
        return truncated_responses

def on_submit(_):
    product = product_dropdown.value
    query = query_input.value
    if product and query:
        response = bot.handle_query(product, query)
        conversation_output.value = f"Tu: {query}\n\n" + "\n\n".join([f"Bot: {info}" for info in response])
        query_input.value = ""
    else:
        conversation_output.value = "Bot: Seleziona un prodotto e inserisci una domanda."

# Percorsi ai manuali
manuals = {
    "Treno Arancio": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\TrenoArancio.txt",
    "Treno Viola": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\TrenoViola.txt",
    "Giraffa": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\Giraffa.txt"
}

# Creazione del bot
bot = AssemblyAssistantBot(manuals)

# Menu a tendina per la selezione del prodotto
product_dropdown = widgets.Dropdown(
    options=[('Seleziona un prodotto', None)] + [(name, name) for name in manuals.keys()],
    value=None,
    description='Prodotto:',
)

# Campo di inserimento per la domanda
query_input = widgets.Text(
    value='',
    placeholder='Inserisci la tua domanda',
    description='Domanda:',
)

# Pulsante di invio
submit_button = widgets.Button(
    description='Invia',
    button_style='primary',
)
submit_button.on_click(on_submit)

# Finestra di conversazione (modificata per visualizzare tutto il testo)
conversation_output = widgets.Textarea(
    value='',
    placeholder='Le risposte verranno visualizzate qui...',
    description='Conversazione:',
    layout={'width': '100%', 'height': '400px'},
    style={'description_width': 'initial'}
)

# Visualizza tutti i widget
display(product_dropdown, query_input, submit_button, conversation_output)


In [None]:
#codice con Falcon-GPT3 (NON UTILIZZARE)

import requests
import re
import ipywidgets as widgets
from IPython.display import display
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Configura l'API di Falcon
API_URL = "https://api-inference.huggingface.co/models/tiiuae/falcon-7b-instruct"
headers = {"Authorization": "Bearer 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"}

def query_falcon(prompt):
    response = requests.post(API_URL, headers=headers, json={"inputs": prompt})
    if response.status_code == 200:
        return response.json()
    else:
        return {"error": "Request failed with status code " + str(response.status_code)}

def preprocess_text(text):
    return text.lower()

class ManualLoader:
    def __init__(self, manual_paths):
        self.manuals = {}
        self.load_manuals(manual_paths)

    def load_manuals(self, paths):
        for name, path in paths.items():
            with open(path, 'r', encoding='utf-8') as file:
                content = file.read()
                self.manuals[name] = self._split_into_chunks(content)

    def _split_into_chunks(self, text):
        chunks = [preprocess_text(chunk.strip()) for chunk in text.split('---') if chunk.strip()]
        return chunks

class AssemblyAssistantBot:
    def __init__(self, manuals):
        self.manual_loader = ManualLoader(manuals)
        self.vectorizer = TfidfVectorizer()
        self._fit_vectorizer()
        self.sections = {}

    def _fit_vectorizer(self):
        all_chunks = []
        for chunks in self.manual_loader.manuals.values():
            all_chunks.extend(chunks)
        self.vectorizer.fit(all_chunks)

    def _associate_phases(self, product):
        """Associa ogni chunk del manuale a una fase specifica e assegna una priorità in base all'ordine."""
        chunks = self.manual_loader.manuals.get(product, [])
        self.sections[product] = {}

        for i, chunk in enumerate(chunks):
            # Cerca la fase nel testo
            phase_match = re.search(r'fase\s*\d+', chunk)
            if phase_match:
                phase_number = phase_match.group().strip().lower()
                self.sections[product][phase_number] = (i, chunk)  # Associa il numero della fase e l'ordine
            else:
                # Associa una fase implicita basata sull'ordine
                if i > 0:
                    prev_phase_number = f"fase {i + 1}"  # Corregge il numero di fase per essere 1-based
                    self.sections[product][prev_phase_number] = (i, chunk)

    def handle_query(self, product, query):
        if product not in self.manual_loader.manuals:
            return ["Manuale non trovato."]
        
        # Associa le fasi al manuale
        self._associate_phases(product)

        # Verifica se la query riguarda una fase o una sezione specifica
        if self._is_phase_query(query):
            response_from_manuals = self._search_phase(product, query)
        else:
            response_from_manuals = self._search_section(product, query)
        
        # Usa Falcon per arricchire la risposta
        if response_from_manuals:
            falcon_prompt = f"Given the following information from the manual: {', '.join(response_from_manuals)}, provide a precise answer to the question: {query}"
            falcon_response = query_falcon(falcon_prompt)
        
            if 'generated_text' in falcon_response:
                return response_from_manuals + [self._filter_falcon_response(falcon_response['generated_text'].strip(), query)]
        
        return response_from_manuals

    def _is_phase_query(self, query):
        """Determina se la query riguarda una fase del manuale."""
        return any(kw in query.lower() for kw in ['fase', 'fase finale', 'assemblaggio finale'])

    def _search_phase(self, product, query):
        """Cerca la fase richiesta nel manuale."""
        query = preprocess_text(query)
        if product not in self.sections:
            return ["Sezione del manuale non trovata."]
        
        # Ordina le sezioni per fase e ordine cronologico
        sorted_phases = sorted(self.sections[product].items(), key=lambda x: x[1][0])

        results = []
        for phase, (order, content) in sorted_phases:
            if phase in query:
                results.append(content)
        
        # Riduci la lunghezza delle risposte
        results = self._truncate_responses(results)
        
        return results if results else ["Nessuna informazione trovata per la fase richiesta."]

    def _search_section(self, product, query):
        """Cerca una sezione specifica del manuale basata sulla query."""
        chunks = self.manual_loader.manuals.get(product, [])
        if not chunks:
            return ["Nessuna informazione trovata nel manuale per questo prodotto."]
        
        # Filtra la sezione "assemblaggio finale" se non richiesta esplicitamente
        include_final_assembly = any(kw in query.lower() for kw in ['assemblaggio finale', 'fase finale', 'finale'])
        if not include_final_assembly:
            chunks = [chunk for chunk in chunks if 'assemblaggio finale' not in chunk.lower()]
        
        # Trova i chunk più simili alla query
        query_vec = self.vectorizer.transform([query])
        chunk_vecs = self.vectorizer.transform(chunks)
        similarities = cosine_similarity(query_vec, chunk_vecs).flatten()

        # Riduci la soglia per trovare più risposte
        results = [chunks[index] for index, similarity in enumerate(similarities) if similarity > 0.1]
        
        # Riduci la lunghezza delle risposte
        results = self._truncate_responses(results)
        
        return results if results else ["Nessuna informazione trovata."]

    def _filter_falcon_response(self, response, query):
        """Filtro della risposta Falcon, mantenendo solo le informazioni rilevanti."""
        if 'assemblaggio finale' in query.lower():
            return response  # Mantieni l'intera risposta se richiesta
        
        # Altrimenti, filtra la sezione
        pattern = re.compile(r'assemblaggio finale dei moduli.*', re.DOTALL)
        filtered_response = pattern.split(response)[0].strip()
        
        return filtered_response

    def _truncate_responses(self, responses, max_length=2000):
        truncated_responses = []
        for response in responses:
            if len(response) > max_length:
                truncated_responses.append(response[:max_length] + '...')
            else:
                truncated_responses.append(response)
        return truncated_responses

def on_submit(_):
    product = product_dropdown.value
    query = query_input.value
    if product and query:
        response = bot.handle_query(product, query)
        conversation_output.value = f"Tu: {query}\n\n" + "\n\n".join([f"Bot: {info}" for info in response])
        query_input.value = ""
    else:
        conversation_output.value = "Bot: Seleziona un prodotto e inserisci una domanda."

# Percorsi ai manuali
manuals = {
    "Treno Arancio": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\TrenoArancio.txt",
    "Treno Viola": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\TrenoViola.txt",
    "Giraffa": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\Giraffa.txt"
}

# Creazione del bot
bot = AssemblyAssistantBot(manuals)

# Menu a tendina per la selezione del prodotto
product_dropdown = widgets.Dropdown(
    options=[('Seleziona un prodotto', None)] + [(name, name) for name in manuals.keys()],
    value=None,
    description='Prodotto:',
)

# Campo di inserimento per la domanda
query_input = widgets.Text(
    value='',
    placeholder='Inserisci la tua domanda',
    description='Domanda:',
)

# Pulsante di invio
submit_button = widgets.Button(
    description='Invia',
    button_style='primary',
)
submit_button.on_click(on_submit)

# Finestra di conversazione (modificata per visualizzare tutto il testo)
conversation_output = widgets.Textarea(
    value='',
    placeholder='Le risposte verranno visualizzate qui...',
    description='Conversazione:',
    layout={'width': '100%', 'height': '400px'},
    style={'description_width': 'initial'}
)

# Visualizza tutti i widget
display(product_dropdown, query_input, submit_button, conversation_output)


In [None]:
#UTILIZZA FALCON, RISPONDE UN PO' PIU VELOCEMENTE E RISPONDE A DOMANDE COMBINATE (NON RISPONDE ALLE DOMANDE)
#risponde sia alle fasi: poiche ad ogni sezione è stata associata una fase in ordine crescente sia se si chiede nello specifico il modulo.
#devono essere effettuate delle richieste che contengano fase 1, fase 2 ecc o il nome del modulo.
#Risponde se gli cheidi fase 1 e fase 2. Se gli chied fase 1 e 2 risponde solo con le istruzioni della fase 1.
import requests
import re
import ipywidgets as widgets
from IPython.display import display
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Configura l'API di Falcon
API_URL = "https://api-inference.huggingface.co/models/tiiuae/falcon-7b-instruct"
headers = {"Authorization": "Bearer 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"}

def query_falcon(prompt):
    response = requests.post(API_URL, headers=headers, json={"inputs": prompt})
    if response.status_code == 200:
        return response.json()
    else:
        return {"error": "Request failed with status code " + str(response.status_code)}

def preprocess_text(text):
    return text.lower()

class ManualLoader:
    def __init__(self, manual_paths):
        self.manuals = {}
        self.load_manuals(manual_paths)

    def load_manuals(self, paths):
        for name, path in paths.items():
            with open(path, 'r', encoding='utf-8') as file:
                content = file.read()
                self.manuals[name] = self._split_into_chunks(content)

    def _split_into_chunks(self, text):
        chunks = [preprocess_text(chunk.strip()) for chunk in text.split('---') if chunk.strip()]
        return chunks

class AssemblyAssistantBot:
    def __init__(self, manuals):
        self.manual_loader = ManualLoader(manuals)
        self.vectorizer = TfidfVectorizer()
        self._fit_vectorizer()
        self.sections = {}

    def _fit_vectorizer(self):
        all_chunks = []
        for chunks in self.manual_loader.manuals.values():
            all_chunks.extend(chunks)
        self.vectorizer.fit(all_chunks)

    def _associate_phases(self, product):
        """Associa ogni chunk del manuale a una fase specifica e assegna una priorità in base all'ordine."""
        chunks = self.manual_loader.manuals.get(product, [])
        self.sections[product] = {}

        for i, chunk in enumerate(chunks):
            # Cerca la fase nel testo
            phase_match = re.search(r'fase\s*\d+', chunk)
            if phase_match:
                phase_number = phase_match.group().strip().lower()
                self.sections[product][phase_number] = (i, chunk)  # Associa il numero della fase e l'ordine
            else:
                # Associa una fase implicita basata sull'ordine
                if i > 0:
                    prev_phase_number = f"fase {i + 1}"  # Corregge il numero di fase per essere 1-based
                    self.sections[product][prev_phase_number] = (i, chunk)

    def handle_query(self, product, query):
        if product not in self.manual_loader.manuals:
            return ["Manuale non trovato."]
        
        # Associa le fasi al manuale
        self._associate_phases(product)

        # Suddividi la query in possibili sezioni (es. "fase 1" e "base anteriore")
        queries = query.split(' e ')  # Dividi la query per "e" o altre parole chiave che collegano più richieste
        
        responses = []
        
        for q in queries:
            q = q.strip()  # Rimuovi spazi bianchi

            # Verifica se la query riguarda una fase o una sezione specifica
            if self._is_phase_query(q):
                response_from_manuals = self._search_phase(product, q)
            else:
                response_from_manuals = self._search_section(product, q)
            
            # Usa Falcon per arricchire la risposta
            if response_from_manuals:
                falcon_prompt = f"Given the following information from the manual: {', '.join(response_from_manuals)}, provide a precise answer to the question: {q}"
                falcon_response = query_falcon(falcon_prompt)
            
                if 'generated_text' in falcon_response:
                    responses.append(response_from_manuals + [self._filter_falcon_response(falcon_response['generated_text'].strip(), q)])
                else:
                    responses.append(response_from_manuals)
            else:
                responses.append(response_from_manuals)
        
        return [item for sublist in responses for item in sublist]  # Appiattisci le risposte per concatenarle

    def _is_phase_query(self, query):
        """Determina se la query riguarda una fase del manuale."""
        return any(kw in query.lower() for kw in ['fase', 'fase finale', 'assemblaggio finale'])

    def _search_phase(self, product, query):
        """Cerca la fase richiesta nel manuale."""
        query = preprocess_text(query)
        if product not in self.sections:
            return ["Sezione del manuale non trovata."]
        
        # Ordina le sezioni per fase e ordine cronologico
        sorted_phases = sorted(self.sections[product].items(), key=lambda x: x[1][0])

        results = []
        for phase, (order, content) in sorted_phases:
            if phase in query:
                results.append(content)
        
        # Riduci la lunghezza delle risposte
        results = self._truncate_responses(results)
        
        return results if results else ["Nessuna informazione trovata per la fase richiesta."]

    def _search_section(self, product, query):
        """Cerca una sezione specifica del manuale basata sulla query."""
        chunks = self.manual_loader.manuals.get(product, [])
        if not chunks:
            return ["Nessuna informazione trovata nel manuale per questo prodotto."]
        
        # Filtra la sezione "assemblaggio finale" se non richiesta esplicitamente
        include_final_assembly = any(kw in query.lower() for kw in ['assemblaggio finale', 'fase finale', 'finale'])
        if not include_final_assembly:
            chunks = [chunk for chunk in chunks if 'assemblaggio finale' not in chunk.lower()]
        
        # Trova i chunk più simili alla query
        query_vec = self.vectorizer.transform([query])
        chunk_vecs = self.vectorizer.transform(chunks)
        similarities = cosine_similarity(query_vec, chunk_vecs).flatten()

        # Riduci la soglia per trovare più risposte
        results = [chunks[index] for index, similarity in enumerate(similarities) if similarity > 0.1]
        
        # Riduci la lunghezza delle risposte
        results = self._truncate_responses(results)
        
        return results if results else ["Nessuna informazione trovata."]

    def _filter_falcon_response(self, response, query):
        """Filtro della risposta Falcon, mantenendo solo le informazioni rilevanti."""
        if 'assemblaggio finale' in query.lower():
            return response  # Mantieni l'intera risposta se richiesta
        
        # Altrimenti, filtra la sezione
        pattern = re.compile(r'assemblaggio finale dei moduli.*', re.DOTALL)
        filtered_response = pattern.split(response)[0].strip()
        
        return filtered_response

    def _truncate_responses(self, responses, max_length=2000):
        truncated_responses = []
        for response in responses:
            if len(response) > max_length:
                truncated_responses.append(response[:max_length] + '...')
            else:
                truncated_responses.append(response)
        return truncated_responses

def on_submit(_):
    product = product_dropdown.value
    query = query_input.value
    if product and query:
        response = bot.handle_query(product, query)
        conversation_output.value = f"Tu: {query}\n\n" + "\n\n".join([f"Bot: {info}" for info in response])
        query_input.value = ""
    else:
        conversation_output.value = "Bot: Seleziona un prodotto e inserisci una domanda."

# Percorsi ai manuali
manuals = {
    "Treno Arancio": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\TrenoArancio.txt",
    "Treno Viola": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\TrenoViola.txt",
    "Giraffa": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\Giraffa.txt"
}

# Creazione del bot
bot = AssemblyAssistantBot(manuals)

# Menu a tendina per la selezione del prodotto
product_dropdown = widgets.Dropdown(
    options=[('Seleziona un prodotto', None)] + [(name, name) for name in manuals.keys()],
    value=None,
    description='Prodotto:',
)

# Campo di inserimento per la domanda
query_input = widgets.Text(
    value='',
    placeholder='Inserisci la tua domanda',
    description='Domanda:',
)

# Pulsante di invio
submit_button = widgets.Button(
    description='Invia',
    button_style='primary',
)
submit_button.on_click(on_submit)

# Finestra di conversazione (modificata per visualizzare tutto il testo)
conversation_output = widgets.Textarea(
    value='',
    placeholder='Le risposte verranno visualizzate qui...',
    description='Conversazione:',
    layout={'width': '100%', 'height': '400px'},
    style={'description_width': 'initial'}
)

# Visualizza tutti i widget
display(product_dropdown, query_input, submit_button, conversation_output)


In [None]:
#UTILIZZA FALCON, RISPONDE UN PO' PIU VELOCEMENTE E RISPONDE A DOMANDE COMBINATE, ma non risponde a fase successiva e precedente  (IL CODICE NELL'ULTIMA CELLA SI)
#Non risponde a domande tipo quale è il primo passo da fare, quale è la prima cosa da fare. Per fare questo bisognerebbe utilizzare modelli più perfprmanti
#o si richiede l'utilizzo di Pytorch, tensorflow che io non riesco proprio ad utilizzare sul mio pc.
#(NON UTILIZZARE)
import requests
import re
import ipywidgets as widgets
from IPython.display import display
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Configura l'API di Falcon
API_URL = "https://api-inference.huggingface.co/models/tiiuae/falcon-7b-instruct"
headers = {"Authorization": "Bearer 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"}

def query_falcon(prompt):
    response = requests.post(API_URL, headers=headers, json={"inputs": prompt})
    if response.status_code == 200:
        return response.json()
    else:
        return {"error": "Request failed with status code " + str(response.status_code)}

def preprocess_text(text):
    return text.lower()

class ManualLoader:
    def __init__(self, manual_paths):
        self.manuals = {}
        self.load_manuals(manual_paths)

    def load_manuals(self, paths):
        for name, path in paths.items():
            with open(path, 'r', encoding='utf-8') as file:
                content = file.read()
                self.manuals[name] = self._split_into_chunks(content)

    def _split_into_chunks(self, text):
        chunks = [preprocess_text(chunk.strip()) for chunk in text.split('---') if chunk.strip()]
        return chunks

class AssemblyAssistantBot:
    def __init__(self, manuals):
        self.manual_loader = ManualLoader(manuals)
        self.vectorizer = TfidfVectorizer()
        self._fit_vectorizer()
        self.sections = {}

    def _fit_vectorizer(self):
        all_chunks = []
        for chunks in self.manual_loader.manuals.values():
            all_chunks.extend(chunks)
        self.vectorizer.fit(all_chunks)

    def _associate_phases(self, product):
        """Associa ogni chunk del manuale a una fase specifica utilizzando solo i numeri delle fasi."""
        chunks = self.manual_loader.manuals.get(product, [])
        self.sections[product] = {}

        for i, chunk in enumerate(chunks):
            # Cerca solo i numeri delle fasi nel testo
            phase_match = re.search(r'\b(\d+)\b', chunk)
            if phase_match:
                phase_number = phase_match.group().strip()
                if phase_number not in self.sections[product]:
                    self.sections[product][phase_number] = (i, chunk)

    def handle_query(self, product, query):
        if product not in self.manual_loader.manuals:
            return ["Manuale non trovato."]
        
        # Associa le fasi al manuale
        self._associate_phases(product)

        # Suddividi la query in possibili sezioni (es. "fase 1" e "base anteriore")
        queries = [q.strip() for q in re.split(r'\s+e\s+|\s+and\s+', query, flags=re.IGNORECASE)]
        
        # Mappa delle risposte per mantenere l'ordine
        responses_map = {}
        general_responses = []

        for q in queries:
            # Verifica se la query riguarda una fase
            if self._is_phase_query(q):
                phase_response = self._search_phase(product, q)
                if phase_response:
                    responses_map[q] = phase_response
            else:
                section_response = self._search_section(product, q)
                if section_response:
                    general_responses.append((q, section_response))
        
        # Unisci e ordina le risposte
        ordered_responses = []
        for q in queries:
            if q in responses_map:
                ordered_responses.extend(responses_map[q])
            elif any(q.lower() in item[0].lower() for item in general_responses):
                for item in general_responses:
                    if q.lower() in item[0].lower():
                        ordered_responses.extend(item[1])
        
        # Usa Falcon per arricchire la risposta
        combined_response = ordered_responses
        if combined_response:
            falcon_prompt = f"Given the following information from the manual: {', '.join(combined_response)}, provide a precise answer to the question: {query}"
            falcon_response = query_falcon(falcon_prompt)
        
            if 'generated_text' in falcon_response:
                return combined_response + [self._filter_falcon_response(falcon_response['generated_text'].strip(), query)]
            else:
                return combined_response
        
        return combined_response if combined_response else ["Nessuna informazione trovata."]

    def _is_phase_query(self, query):
        """Determina se la query riguarda una fase del manuale utilizzando solo numeri."""
        return bool(re.search(r'\b\d+\b', query))

    def _search_phase(self, product, query):
        """Cerca la fase richiesta nel manuale."""
        query = preprocess_text(query)
        if product not in self.sections:
            return ["Sezione del manuale non trovata."]
        
        # Ordina le sezioni per fase e ordine cronologico
        sorted_phases = sorted(self.sections[product].items(), key=lambda x: x[1][0])

        results = []
        for phase, (order, content) in sorted_phases:
            if phase in query:
                results.append(content)
        
        # Riduci la lunghezza delle risposte
        results = self._truncate_responses(results)
        
        return results if results else ["Nessuna informazione trovata per la fase richiesta."]

    def _search_section(self, product, query):
        """Cerca una sezione specifica del manuale basata sulla query."""
        chunks = self.manual_loader.manuals.get(product, [])
        if not chunks:
            return ["Nessuna informazione trovata nel manuale per questo prodotto."]
        
        # Filtra la sezione "assemblaggio finale" se non richiesta esplicitamente
        include_final_assembly = any(kw in query.lower() for kw in ['assemblaggio finale', 'fase finale', 'finale'])
        if not include_final_assembly:
            chunks = [chunk for chunk in chunks if 'assemblaggio finale' not in chunk.lower()]
        
        # Trova i chunk più simili alla query
        query_vec = self.vectorizer.transform([query])
        chunk_vecs = self.vectorizer.transform(chunks)
        similarities = cosine_similarity(query_vec, chunk_vecs).flatten()

        # Riduci la soglia per trovare più risposte
        results = [chunks[index] for index, similarity in enumerate(similarities) if similarity > 0.1]
        
        # Riduci la lunghezza delle risposte
        results = self._truncate_responses(results)
        
        return results if results else ["Nessuna informazione trovata."]

    def _filter_falcon_response(self, response, query):
        """Filtro della risposta Falcon, mantenendo solo le informazioni rilevanti."""
        if 'assemblaggio finale' in query.lower():
            return response  # Mantieni l'intera risposta se richiesta
        
        # Altrimenti, filtra la sezione
        pattern = re.compile(r'assemblaggio finale dei moduli.*', re.DOTALL)
        filtered_response = pattern.split(response)[0].strip()
        
        return filtered_response

    def _truncate_responses(self, responses, max_length=2000):
        truncated_responses = []
        for response in responses:
            if len(response) > max_length:
                truncated_responses.append(response[:max_length] + '...')
            else:
                truncated_responses.append(response)
        return truncated_responses

def on_submit(_):
    product = product_dropdown.value
    query = query_input.value
    if product and query:
        response = bot.handle_query(product, query)
        conversation_output.value = f"Tu: {query}\n\n" + "\n\n".join([f"Bot: {info}" for info in response])
        query_input.value = ""
    else:
        conversation_output.value = "Bot: Seleziona un prodotto e inserisci una domanda."

# Percorsi ai manuali
manuals = {
    "Treno Arancio": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\TrenoArancio.txt",
    "Treno Viola": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\TrenoViola.txt",
    "Giraffa": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\Giraffa.txt"
}

# Creazione del bot
bot = AssemblyAssistantBot(manuals)

# Menu a tendina per la selezione del prodotto
product_dropdown = widgets.Dropdown(
    options=[('Seleziona un prodotto', None)] + [(name, name) for name in manuals.keys()],
    value=None,
    description='Prodotto:',
)

# Campo di inserimento per la domanda
query_input = widgets.Text(
    value='',
    placeholder='Cosa vuoi fare?',
    description='AssemblyBot:',
)

# Pulsante di invio
submit_button = widgets.Button(
    description='Invia',
    button_style='primary',
)
submit_button.on_click(on_submit)

# Finestra di conversazione (modificata per visualizzare tutto il testo)
conversation_output = widgets.Textarea(
    value='',
    placeholder='',
    description='Conversazione:',
    layout={'width': '100%', 'height': '400px'},
    style={'description_width': 'initial'}
)

# Visualizza tutti i widget
display(product_dropdown, query_input, submit_button, conversation_output)


In [1]:
#RISPONDE A FASE PRECEDENTE E SUCCESSIVA

import requests
import re
import ipywidgets as widgets
from IPython.display import display
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Configura l'API di Falcon
API_URL = "https://api-inference.huggingface.co/models/tiiuae/falcon-7b-instruct"
headers = {"Authorization": "Bearer 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"}

def query_falcon(prompt):
    response = requests.post(API_URL, headers=headers, json={"inputs": prompt})
    if response.status_code == 200:
        return response.json()
    else:
        return {"error": "Request failed with status code " + str(response.status_code)}

def preprocess_text(text):
    return text.lower()

class ManualLoader:
    def __init__(self, manual_paths):
        self.manuals = {}
        self.load_manuals(manual_paths)

    def load_manuals(self, paths):
        for name, path in paths.items():
            with open(path, 'r', encoding='utf-8') as file:
                content = file.read()
                self.manuals[name] = self._split_into_chunks(content)

    def _split_into_chunks(self, text):
        chunks = [preprocess_text(chunk.strip()) for chunk in text.split('---') if chunk.strip()]
        return chunks

class AssemblyAssistantBot:
    def __init__(self, manuals):
        self.manual_loader = ManualLoader(manuals)
        self.vectorizer = TfidfVectorizer()
        self._fit_vectorizer()
        self.sections = {}
        self.current_phase = {}  # Memorizza la fase corrente per ciascun prodotto

    def _fit_vectorizer(self):
        all_chunks = []
        for chunks in self.manual_loader.manuals.values():
            all_chunks.extend(chunks)
        self.vectorizer.fit(all_chunks)

    def _associate_phases(self, product):
        chunks = self.manual_loader.manuals.get(product, [])
        self.sections[product] = {}

        for i, chunk in enumerate(chunks):
            phase_match = re.search(r'\b(\d+)\b', chunk)
            if phase_match:
                phase_number = phase_match.group().strip()
                if phase_number not in self.sections[product]:
                    self.sections[product][phase_number] = (i, chunk)

    def _get_current_phase(self, product):
        """Restituisce la fase corrente del prodotto."""
        return self.current_phase.get(product)

    def _set_current_phase(self, product, phase):
        """Imposta la fase corrente per il prodotto."""
        self.current_phase[product] = phase

    def _get_next_phase(self, product):
        """Trova la fase successiva."""
        current_phase = self._get_current_phase(product)
        if current_phase is None:
            return ["Non è stata identificata una fase corrente."]
        
        sorted_phases = sorted(self.sections[product].items(), key=lambda x: int(x[0]))
        for phase, (order, content) in sorted_phases:
            if int(phase) == current_phase + 1:
                self._set_current_phase(product, int(phase))  # Aggiorna la fase corrente
                return [content]
        
        return ["Non ci sono fasi successive disponibili."]

    def _get_previous_phase(self, product):
        """Trova la fase precedente."""
        current_phase = self._get_current_phase(product)
        if current_phase is None:
            return ["Non è stata identificata una fase corrente."]
        
        sorted_phases = sorted(self.sections[product].items(), key=lambda x: int(x[0]))
        for phase, (order, content) in sorted_phases:
            if int(phase) == current_phase - 1:
                self._set_current_phase(product, int(phase))  # Aggiorna la fase corrente
                return [content]
        
        return ["Non ci sono fasi precedenti disponibili."]

    def handle_query(self, product, query):
        if product not in self.manual_loader.manuals:
            return ["Manuale non trovato."]
        
        self._associate_phases(product)

        if "fase successiva" in query.lower():
            phase_response = self._get_next_phase(product)
            return phase_response

        if "fase precedente" in query.lower():
            phase_response = self._get_previous_phase(product)
            return phase_response

        current_phase = self._get_current_phase(product)
        phase_number_in_query = self._get_current_phase_from_query(query)

        if phase_number_in_query is not None:
            self._set_current_phase(product, phase_number_in_query)

        responses_map = {}
        general_responses = []

        queries = [q.strip() for q in re.split(r'\s+e\s+|\s+and\s+', query, flags=re.IGNORECASE)]
        for q in queries:
            if self._is_phase_query(q):
                phase_response = self._search_phase(product, q)
                if phase_response:
                    responses_map[q] = phase_response
            else:
                section_response = self._search_section(product, q)
                if section_response:
                    general_responses.append((q, section_response))

        ordered_responses = []
        for q in queries:
            if q in responses_map:
                ordered_responses.extend(responses_map[q])
            elif any(q.lower() in item[0].lower() for item in general_responses):
                for item in general_responses:
                    if q.lower() in item[0].lower():
                        ordered_responses.extend(item[1])

        combined_response = ordered_responses
        if combined_response:
            falcon_prompt = f"Given the following information from the manual: {', '.join(combined_response)}, provide a precise answer to the question: {query}"
            falcon_response = query_falcon(falcon_prompt)

            if 'generated_text' in falcon_response:
                return combined_response + [self._filter_falcon_response(falcon_response['generated_text'].strip(), query)]
            else:
                return combined_response

        return combined_response if combined_response else ["Nessuna informazione trovata."]

    def _get_current_phase_from_query(self, query):
        """Estrae la fase corrente dalla query."""
        match = re.search(r'\b\d+\b', query)
        if match:
            return int(match.group())
        return None

    def _is_phase_query(self, query):
        """Determina se la query riguarda una fase del manuale utilizzando solo numeri."""
        return bool(re.search(r'\b\d+\b', query))

    def _search_phase(self, product, query):
        query = preprocess_text(query)
        if product not in self.sections:
            return ["Sezione del manuale non trovata."]
        
        sorted_phases = sorted(self.sections[product].items(), key=lambda x: x[1][0])

        results = []
        for phase, (order, content) in sorted_phases:
            if phase in query:
                results.append(content)
        
        results = self._truncate_responses(results)
        
        return results if results else ["Nessuna informazione trovata per la fase richiesta."]

    def _search_section(self, product, query):
        chunks = self.manual_loader.manuals.get(product, [])
        if not chunks:
            return ["Nessuna informazione trovata nel manuale per questo prodotto."]
        
        include_final_assembly = any(kw in query.lower() for kw in ['assemblaggio finale', 'fase finale', 'finale'])
        if not include_final_assembly:
            chunks = [chunk for chunk in chunks if 'assemblaggio finale' not in chunk.lower()]
        
        query_vec = self.vectorizer.transform([query])
        chunk_vecs = self.vectorizer.transform(chunks)
        similarities = cosine_similarity(query_vec, chunk_vecs).flatten()

        results = [chunks[index] for index, similarity in enumerate(similarities) if similarity > 0.1]
        
        results = self._truncate_responses(results)
        
        return results if results else ["Nessuna informazione trovata."]

    def _filter_falcon_response(self, response, query):
        if 'assemblaggio finale' in query.lower():
            return response
        
        pattern = re.compile(r'assemblaggio finale dei moduli.*', re.DOTALL)
        filtered_response = pattern.split(response)[0].strip()
        
        return filtered_response

    def _truncate_responses(self, responses, max_length=2000):
        truncated_responses = []
        for response in responses:
            if len(response) > max_length:
                truncated_responses.append(response[:max_length] + '...')
            else:
                truncated_responses.append(response)
        return truncated_responses

# Percorsi ai manuali
manuals = {
    "Treno Arancio": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\TrenoArancio.txt",
    "Treno Viola": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\TrenoViola.txt",
    "Giraffa": "C:\\Users\\marid\\OneDrive\\Desktop\\Progetto SF\\Giraffa.txt"
}

# Creazione del bot
bot = AssemblyAssistantBot(manuals)

# Menu a tendina per la selezione del prodotto
product_dropdown = widgets.Dropdown(
    options=[('Seleziona un prodotto', None)] + [(name, name) for name in manuals.keys()],
    value=None,
    description='Prodotto:',
)

# Campo di inserimento per la domanda
query_input = widgets.Text(
    value='',
    placeholder='Cosa vuoi fare?',
    description='AssemblyBot:',
)

# Pulsante di invio
submit_button = widgets.Button(
    description='Invia',
    button_style='primary',
)
submit_button.on_click(lambda _: on_submit())

# Finestra di conversazione (modificata per visualizzare solo la risposta corrente)
conversation_output = widgets.Textarea(
    value='',
    placeholder='',
    description='Conversazione:',
    layout={'width': '100%', 'height': '400px'},
    style={'description_width': 'initial'}
)

def on_submit():
    product = product_dropdown.value
    query = query_input.value
    if product and query:
        response = bot.handle_query(product, query)
        
        # Unisci gli elementi della lista di risposte in una singola stringa
        response_text = "\n".join(response)
        
        # Mostra solo la risposta corrente, non le precedenti
        conversation_output.value = f"Tu: {query}\n\nBot: {response_text}\n"

        query_input.value = ''


# Layout dell'interfaccia
display(widgets.VBox([product_dropdown, query_input, submit_button, conversation_output]))


VBox(children=(Dropdown(description='Prodotto:', options=(('Seleziona un prodotto', None), ('Treno Arancio', '…