#  PREPARAZIONE DEL DATASET

**IMPORT**

In [37]:
import os, time, re
import pandas as pd
from dotenv import load_dotenv
from elasticsearch import Elasticsearch, helpers # type: ignore

PULIZIA DEL NOME DEI FILE:

In [38]:
def safe_filename(name):
    """Rimuove caratteri non validi da un nome di file."""
    # Rimuove / \ ? % * : | " < > e sostituisce spazi multipli
    name = re.sub(r'[\\/*?:"<>|]', "", name)
    name = re.sub(r"\s+", " ", name).strip()
    return name

CREAZIONE DATASET:

In [39]:
# Dataset
df = pd.read_csv("Dataset/Movies.csv")

# Subsampling del Dataset
df = df.sample(n=2976, random_state=42, replace=False)
df = df[['Title', 'Plot']].copy()
df = df.reset_index(drop=True)
 
# Crea la cartella per i .txt
directory = "Files"
os.makedirs(directory, exist_ok=True)

# Crea file di testo per ogni film:
# nome file basato sul Title e contenuto su Plot
for idx, row in df.iterrows():
    movieTitle = safe_filename(row["Title"])
    moviePlot = row["Plot"]
    
    filePath = os.path.join(directory, f"{movieTitle}.txt")

    with open(filePath, "w", encoding="utf-8") as f:
        f.write(str(moviePlot))

# ELASTICSEARCH
INDICIZZAZIONE:

In [66]:
# Carica variabili d'ambiente dal file .env
load_dotenv(dotenv_path="EnvAndDocker/.env")

# Recupera i valori dal .env
URL = os.getenv("URL")
PASSWORD = os.getenv("PASSWORD")

# Connessione a Elasticsearch
es = Elasticsearch(URL, basic_auth=("elastic", PASSWORD))

# Definizione del mapping e degli analyzer
MAPPING = {
    "mappings": {
        "properties": {
            "title": {"type": "text", "analyzer": "english", "search_analyzer": "english"},
            "content": {"type": "text", "analyzer": "english", "search_analyzer": "english"}
        }
    }
}


def index():
    """Crea l'indice moviesindex e indicizza i file .txt."""

    # Elimina indice se esiste già
    if es.indices.exists(index="moviesindex"):
        es.indices.delete(index="moviesindex")
        print(f"Indice esistente 'moviesindex' eliminato.")

    # Creazione nuovo indice con il mapping definito
    es.indices.create(index="moviesindex", body=MAPPING)
    print(f"Indice 'moviesindex' creato correttamente.")

    # Lista di documenti da indicizzare
    actions = []
    for filename in os.listdir("Files"):
        if not filename.endswith(".txt"):
            continue
        path = os.path.join("Files", filename)
        with open(path, "r", encoding="utf-8") as f:
            content = f.read()

        actions.append({
            "_index": "moviesindex",
            "_source": {
                "title": os.path.splitext(filename)[0],  # nome del file senza estensione
                "content": content
            }
        })

    if not actions:
        print("Nessun file .txt trovato nella cartella.")
        return

    # Indicizzazione in blocco
    start = time.perf_counter()
    success, _ = helpers.bulk(es.options(request_timeout=120), actions)
    elapsed = time.perf_counter() - start

    print(f"Documenti indicizzati correttamente: {success}")
    print(f"Tempo totale: {elapsed:.2f} secondi")

    return elapsed


if __name__ == "__main__":
    index()


Indice esistente 'moviesindex' eliminato.
Indice 'moviesindex' creato correttamente.
Documenti indicizzati correttamente: 2976
Tempo totale: 1.59 secondi


**Interrogazione dell’indice Elasticsearch**


In [None]:
def execute_search(query_body, user_input):
    """
    Esegue la ricerca nel database di Elasticsearch e stampa i risultati, con sottolineatura dei match.
    
    Args:
        query_body (dict): Il corpo della query da inviare a Elasticsearch.
        user_input (str): La query dell'utente, da visualizzare nei risultati.
    """
    response = es.search(index="moviesindex", body=query_body)
    total_results = response["hits"]["total"]["value"]
    print(f'Sono stati trovati {total_results} risultati per la ricerca: "{user_input}":')
    
    for idx, item in enumerate(response["hits"]["hits"]):
        document = item["_source"]
        movie_title = document.get("title", "Titolo non disponibile")
        movie_content = document.get("content", "Trama non disponibile")
        
        # Evidenzia le corrispondenze nella trama e nel titolo, se esistono
        highlighted_title = item.get("highlight", {}).get("title", [movie_title])[0]
        highlighted_content = item.get("highlight", {}).get("content", [movie_content[:200]])[0]
        
        # Visualizza il titolo del film con eventuale sottolineatura
        print(f'\n{idx + 1}. Titolo: {highlighted_title} ({item["_score"]})')
        
        # Se il testo evidenziato nella trama è presente, mostra solo quello
        if "content" in item.get("highlight", {}):
            highlighted_content = " ".join(item["highlight"]["content"])  # Unisce tutte le porzioni evidenziate
        else:
            # Mostra un estratto della trama se non ci sono evidenziamenti
            highlighted_content = movie_content[:200] + "..."
        
        print(f'   Trama (estratto): {highlighted_content}')  # Mostra l'estratto o il testo evidenziato
        print("-" * 50)  


def process_user_query(user_input):
    """
    Analizza l'input dell'utente e costruisce la query per Elasticsearch.
    
    Args:
        user_input (str): L'input dell'utente in formato 'campo:valore' o 'valore' per ricerca su entrambi i campi.
    
    Returns:
        None: Esegue direttamente la ricerca e stampa i risultati o messaggi di errore.
    """
    # Se l'input non contiene ":", si ricerca su entrambi i campi
    if ":" not in user_input:
        search_text = user_input.strip()
        
        # Se l'utente cerca una frase esatta
        if search_text.startswith('"') and search_text.endswith('"'):
            phrase = search_text.strip('"')
            query = {
                "query": {
                    "multi_match": {
                        "query": phrase,
                        "fields": ["title", "content"],
                        "type": "phrase"
                    }
                },
                "highlight": {
                    "fields": {
                        "title": {},
                        "content": {}
                    }
                },
                "size": 10000
            }
        else:
            # Ricerca normale su entrambi i campi
            query = {
                "query": {
                    "multi_match": {
                        "query": search_text,
                        "fields": ["title", "content"],
                        "fuzziness": "AUTO"  
                    }
                },
                "highlight": {
                    "fields": {
                        "title": {},
                        "content": {}
                    }
                },
                "size": 10000
            }
        
        execute_search(query, user_input)
        return
    
    # Se è presente un campo, suddividi l'input in campo e termine
    field_name, query_term = user_input.split(":", 1)
    field_name = field_name.strip().lower()
    query_term = query_term.strip()

    # Controlla se il campo è valido
    if field_name not in ["title", "content"]:
        print("Errore: Il campo specificato non è valido. Usa 'title:' o 'content:'.")
        return

    # Gestisci la ricerca di una frase esatta
    if query_term.startswith('"') and query_term.endswith('"'):
        phrase = query_term.strip('"')
        query = {
            "query": {
                "match_phrase": {field_name: phrase}
            },
            "highlight": {
                "fields": {
                    field_name: {}
                }
            },
            "size": 10000
        }
    else:
        # Ricerca normale su un campo specifico con supporto per pluralità/singularità e fuzzy
        query = {
            "query": {
                "match": {
                    field_name: {
                        "query": query_term,
                        "fuzziness": "AUTO"  
                    }
                }
            },
            "highlight": {
                "fields": {
                    field_name: {}
                }
            },
            "size": 10000
        }
    
    execute_search(query, user_input)

search_input = input('Inserisci la tua ricerca (es. title: Inception oppure content: "sogno dentro un sogno"): ').strip()

process_user_query(search_input)
