In [None]:
import pandas as pd
import re
import numpy as np
import json
import sqlite3
import fitz
import os
from typing import List, Tuple
from ollama import chat
from ollama import Client
from ollama import ChatResponse
from ollama import generate
from transformers import AutoTokenizer

In [2]:
client = Client(host='http://localhost:11434')

In [None]:
options = {
    "num_ctx": 8192
}

In [3]:
file_path = "data-extration.xlsx"
xls = pd.ExcelFile(file_path)
rq_sheets = ["RQ2", "RQ3_Systems", "RQ4_Learning", "RQ5_Partition_Method", "RQ5_Metric"]

def extract_citation_key(cite):
    if isinstance(cite, str):
        match = re.search(r"\\cite{(.+?)}", cite)
        return match.group(1) if match else cite
    return cite

processed_sheets = {}
for sheet in rq_sheets:
    df = xls.parse(sheet)
    df.columns = [str(col).strip().lower() for col in df.columns]
    
    df = df.loc[:, ~df.columns.str.contains('^unnamed')]
    df = df.loc[:, ~df.columns.str.contains('^comentário')]
    df = df.loc[:, ~df.columns.str.contains('^obs:')]
    
    df = df.dropna(how='all')
    
    if "reference" in df.columns:
        df["reference"] = df["reference"].apply(extract_citation_key)
        
        agg_dict = {}
        for col in df.columns:
            if col != "reference":
                agg_dict[col] = lambda x: list(x.dropna().unique()) if len(x.dropna()) > 0 else np.nan
        
        df = df.groupby("reference", as_index=False).agg(agg_dict)
        
        for col in df.columns:
            if col != "reference":
                df[col] = df[col].apply(lambda x: x[0] if isinstance(x, list) and len(x) == 1 else x)

    processed_sheets[sheet] = df

merged_df = processed_sheets[rq_sheets[0]]
for sheet in rq_sheets[1:]:
    merged_df = merged_df.merge(processed_sheets[sheet], on="reference", how="outer")

merged_df.columns = [col.split('.')[-1] if '.' in col else col for col in merged_df.columns]

merged_df.drop(columns=["related", "ref", "cite"], inplace=True)
merged_df



Unnamed: 0,reference,sampling method,systemname,domain,nfp,strategy,dataset,technique,partition method,evaluation metric
0,Alshehri2023,reamostragem SMOTE,Eclipse,system files,,,,"[AdaBoost com J48, J48]",Validação cruzada,"[Recall , Precision, Medida F ]"
1,Alves2020,"[Coverage-based , Solver-based, Randomized ...",x264,,"Tempo de codificação e codificação, tamanho",,https://github.com/jualvespereira/ICPE2020,regressão linear múltipla,NI,MRE
2,Arcaini2020,Não identifiquei,No identificado,,,,,,,
3,Ballesteros2021,NI,"[x264, Wget, Berkeley DB Memory, Sensor Networ...",,Population Size / Archive Size: 400; Number of...,SI,,regressão linear,NI,Coverage Metric (CM)
4,Chen2022,amostragem adaptativa com d-Simplexed,Spark,Database System,"Count,Executor Memory,Executor Threads, Memory...",,,Rede Neural Multicamadas (NN),Bootstrap,MAPE
...,...,...,...,...,...,...,...,...,...,...
59,tipu2022:cc,Random Sampling,"[MPI-I/O, SEG-Y I/O]",,"[Number of MPI node, MPI processes per node, S...",,,Artificial Neural Networks,,"[accuracy, MSE, MAE, MAPE]"
60,valov2020:icpe,amostragem pseudoaleatória,"[BZIP2, GZIP, XZ, FLAC, x264]",,,,,"[árvores de regressão, Regressão linear simple]",,"[MAPE, LOOCV]"
61,vitui2021:ese,Amostragem aleatória,"[Open-Src, Entprz. 1, Entprz. 2]",,,,,"[Random Forest, XGBoost trees, Multi-Layer Per...","[cross-validation, validação cruzada leave-one...","[Median Percentage Deviation, MAPE, MAE, (MSE,..."
62,yufei2024:jss,"[Random Sampling, Neighborhood Sampling, Input...","[SQLite, BDB-C, BDB-J, LLVM, Sac, Apache, x264...","[Database, Compiler, Web Server, Video Encoder...","[Execution Time, Response Time, Video Encoding...",Execution,https://github.com/RSFIN/RSFIN/tree/master/data,"[Artificial Neural Networks (ANN), Deep Learni...",,


In [4]:
merged_df[merged_df["reference"] == "lesoil2024"]

Unnamed: 0,reference,sampling method,systemname,domain,nfp,strategy,dataset,technique,partition method,evaluation metric
46,lesoil2024,"[K-means, HDBScan, Amostragem aleatória, Submo...","[gcc, ImageMagick, lingeling, nodeJS, poppler,...","[.c programs, images, SAT formulae, .js script...","[size, ctime, exec, size, time, #confl.,#reduc...",EX,,"[OLS Regression, Desicion Tree, Random forest,...",NI,Mean Absolute Percentage Error (MAPE)


In [5]:
merged_df["reference"] = merged_df["reference"].str.replace(":", "_", regex=False)

In [None]:
def build_message(topic: str, context: str, question: str, some_answer_examples: str,answer_prefix: str, base_text: str, text_example:str, answer_text_example:str) -> str:
    message = [{
        'role': "system",
        'content': f"""You are an expert scientific article analyzer. Your task is to extract specific information 
        from scientific texts based on provided questions and context. When asked about {topic}, understand that {context}
        Examples include: {some_answer_examples}
        Your answer should be concise and directly address the question based on the provided text, starting with the phrase:{answer_prefix}"""
    },
    {
        'role': "user",
        'content': f"""Please answer the question: {question}, based on the following text: {base_text}"""
    },{
        'role': "assistant",
        'content': f"""Question: {question}\n base_text: {text_example}\nAnswer: {answer_text_example}
        """
    }]

    return message
    
    

In [7]:
references_list = merged_df["reference"].tolist()

In [None]:
evaluated_metrics = ["sampling method", "dataset", "technique", "partition method", "evaluation metric"]

In [11]:
local_llm_models = ["deepseek-r1:32b", "magistral:24b", "llama4:latest"] #testar o llama 4

In [None]:
def create_llm_tables(db_path: str, llm_models: list):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS llm_models (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            model_name TEXT NOT NULL
        )
    ''')
    
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS llm_responses (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name_id varchar(510) UNIQUE NOT NULL,
            model_id INTEGER,
            pdf_name varchar(255),
            prompt_technique varchar(255),
            text_segmentation_stategy varchar(255),
            metric varchar(255),
            response TEXT,
            FOREIGN KEY (model_id) REFERENCES llm_models(id)
        )
    ''')
    
    for model in llm_models:
        cursor.execute('INSERT INTO llm_models (model_name) VALUES (?)', (model,))
    
    conn.commit()
    conn.close()

def insert_llm_response(db_path: str, model_name: str, pdf_name: str, prompt_technique: str, metric: str, response: str, name_id: str):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    cursor.execute('SELECT id FROM llm_models WHERE model_name = ?', (model_name,))
    model_id = cursor.fetchone()
    if not model_id:
        cursor.execute('INSERT INTO llm_models (model_name) VALUES (?)', (model_name,))
        conn.commit()
        cursor.execute('SELECT id FROM llm_models WHERE model_name = ?', (model_name,))
        model_id = cursor.fetchone()
    
    if model_id:
        cursor.execute('''
            INSERT INTO llm_responses (model_id, pdf_name, prompt_technique, metric, response, name_id)
            VALUES (?, ?, ?, ?, ?, ?)
        ''', (model_id, pdf_name, prompt_technique, metric, response, name_id))
        
        conn.commit()
    else:
        print(f"Model {model_name} not found in the database.")
    
    conn.close()

In [13]:
def find_all_paths_and_names(folder_path):
    all_paths = []
    all_names = []
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            all_paths.append(os.path.join(root, file))
            all_names.append(file)
    return all_paths, all_names

In [19]:
def simple_extraction(path: str) -> dict[int, str]:
        """This function extracts text from a PDF file and returns the text along with the number of pages.

        Args:
            doc (fitz.Document): The PDF document object

        Returns:
            tuple[str, int]: The text inside the pdf(without any cleaning) and page count
        """
        try:
            text = ""
            total_pages = 0
            doc = fitz.open(path)
            for idx, page in enumerate(doc):
                text += f"<--page_start:{idx+1}-->"
                text +=  page.get_text()
                text += f"<--page_end:{idx+1}-->"
                total_pages += 1
            return text, total_pages

        except Exception as e:
            print(f"An error occurred: {e} in {__file__} ")
            return "", 0

In [None]:
def update_database_with_text_segments(db_path: str):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    cursor.execute("SELECT id, pdf_name FROM pdfs")
    pdfs = cursor.fetchall()
    
    all_paths, all_names = find_all_paths_and_names("/home/pramos/Documents/AutoSLR/papers_pdf")
    
    for pdf_id, pdf_name in pdfs:
        print(f"Processing PDF: {pdf_name} (ID: {pdf_id})")
        
        pdf_path = None
        for path, name in zip(all_paths, all_names):
            if name == pdf_name:
                pdf_path = path
                break
        
        if not pdf_path:
            print(f"PDF file not found for: {pdf_name}")
            continue
        
        try:
            full_text, count = simple_extraction(pdf_path)
        except Exception as e:
            print(f"Error extracting text from {pdf_name}: {e}")
            continue
        
        cursor.execute("""
            SELECT id, position, section_number, section_title 
            FROM extracted_text 
            WHERE pdf_id = ? 
            ORDER BY position ASC
        """, (pdf_id,))
        
        sections = cursor.fetchall()
        
        if not sections:
            print(f"No sections found for PDF: {pdf_name}")
            continue
        
        # Atualizar o content de cada seção
        for i, (section_id, start_pos, section_number, section_title) in enumerate(sections):
            try:
                if i + 1 < len(sections):
                    end_pos = sections[i + 1][1]
                else:
                    end_pos = len(full_text)
                
                section_content = full_text[start_pos:end_pos].strip()
                
                cursor.execute("""
                    UPDATE extracted_text 
                    SET content = ? 
                    WHERE id = ?
                """, (section_content, section_id))
                
                print(f"  Updated section {section_number}: {section_title[:50]}... "
                      f"(chars: {start_pos}-{end_pos}, length: {len(section_content)})")
                
            except Exception as e:
                print(f"Error processing section {section_number} of {pdf_name}: {e}")
                continue
        
        conn.commit()
        print(f"Completed processing {pdf_name}")
    
    conn.close()
    print("Database update completed!")




    
update_database_with_text_segments("/home/pramos/Documents/AutoSLR/validations/regex_validation/results/extern_llm-gemini.db", )

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import re

def preprocess_text(text: str) -> str:
    text = re.sub(r'^\d+\.?\d*\.?\s*', '', text)
    text = re.sub(r'[^\w\s]', ' ', text.lower())
    text = ' '.join(text.split())
    return text

def calculate_vector_similarity(target_words: List[str], section_titles: List[str]) -> List[Tuple[int, float]]:
    processed_targets = [preprocess_text(word) for word in target_words]
    processed_titles = [preprocess_text(title) for title in section_titles]
    
    target_text = ' '.join(processed_targets)
    
    corpus = [target_text] + processed_titles

    vectorizer = TfidfVectorizer(stop_words='english', ngram_range=(1, 2))
    tfidf_matrix = vectorizer.fit_transform(corpus)

    target_vector = tfidf_matrix[0:1]
    title_vectors = tfidf_matrix[1:]
    
    similarities = cosine_similarity(target_vector, title_vectors)[0]

    similarity_scores = [(i, sim) for i, sim in enumerate(similarities)]
    similarity_scores.sort(key=lambda x: x[1], reverse=True)
    
    return similarity_scores

def extract_words_from_segments(segment_names: List[str]) -> List[str]:
    all_words = []
    for segment in segment_names:
        words = re.findall(r'\b\w+\b', segment.lower())
        all_words.extend(words)

    unique_words = []
    for word in all_words:
        if word not in unique_words and len(word) > 2:  # Ignora palavras muito curtas
            unique_words.append(word)
    
    return unique_words

def find_segment_text(db_path: str, pdf_name: str, segment_names: List[str], similarity_threshold: float = 0.1) -> str:
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    try:
        cursor.execute("SELECT id FROM pdfs WHERE pdf_name = ?", (pdf_name,))
        pdf_result = cursor.fetchone()
        
        if not pdf_result:
            print(f"PDF '{pdf_name}' não encontrado no banco de dados.")
            return ""
        
        pdf_id = pdf_result[0]
        
        cursor.execute("""
            SELECT section_number, section_title, content, position
            FROM extracted_text 
            WHERE pdf_id = ?
            ORDER BY position ASC
        """, (pdf_id,))
        
        all_sections = cursor.fetchall()
        
        if not all_sections:
            print(f"Nenhuma seção encontrada no PDF '{pdf_name}'.")
            return ""
        
        section_titles = [section[1] for section in all_sections]
        
        search_words = extract_words_from_segments(segment_names)
        print(f"Palavras de busca extraídas: {search_words}")
        
        similarity_scores = calculate_vector_similarity(search_words, section_titles)
        
        matched_sections = []
        found_sections = []
        
        for section_idx, similarity in similarity_scores:
            if similarity >= similarity_threshold:
                section_data = all_sections[section_idx]
                section_number, section_title, content, position = section_data
                
                if content:
                    matched_sections.append((section_data, similarity))
                    found_sections.append((section_title, similarity))
                else:
                    print(f"Aviso: Seção '{section_title}' encontrada mas sem conteúdo.")
        
        if not matched_sections:
            print(f"Nenhuma seção encontrada com similaridade >= {similarity_threshold}")
            print("Seções disponíveis:")
            for i, title in enumerate(section_titles[:10]):
                print(f"  {i+1}. {title}")
            return ""
        
        matched_sections.sort(key=lambda x: x[0][3])
        
        combined_content = []
        for (section_number, section_title, content, position), similarity in matched_sections:
            combined_content.append(f"=== {section_number} - {section_title} (Similaridade: {similarity:.3f}) ===\n{content}")
        
        print(f"Seções encontradas no PDF '{pdf_name}':")
        for title, similarity in found_sections:
            print(f"  '{title}' (similaridade: {similarity:.3f})")
        
        return "\n\n".join(combined_content)
    
    except Exception as e:
        print(f"Erro ao buscar segmentos: {e}")
        return ""
    
    finally:
        conn.close()

def find_best_matching_sections(db_path: str, pdf_name: str, segment_names: List[str], max_sections: int = 3) -> str:
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    try:
        cursor.execute("SELECT id FROM pdfs WHERE pdf_name = ?", (pdf_name,))
        pdf_result = cursor.fetchone()
        
        if not pdf_result:
            print(f"PDF '{pdf_name}' não encontrado no banco de dados.")
            return ""
        
        pdf_id = pdf_result[0]
        
        cursor.execute("""
            SELECT section_number, section_title, content, position
            FROM extracted_text 
            WHERE pdf_id = ?
            ORDER BY position ASC
        """, (pdf_id,))
        
        all_sections = cursor.fetchall()
        
        if not all_sections:
            print(f"Nenhuma seção encontrada no PDF '{pdf_name}'.")
            return ""
        
        section_titles = [section[1] for section in all_sections]
        search_words = extract_words_from_segments(segment_names)
        
        print(f"Buscando por: {search_words}")
        
        similarity_scores = calculate_vector_similarity(search_words, section_titles)
        
        top_sections = []
        for i in range(min(max_sections, len(similarity_scores))):
            section_idx, similarity = similarity_scores[i]
            section_data = all_sections[section_idx]
            section_number, section_title, content, position = section_data
            
            if content and similarity > 0: 
                top_sections.append((section_data, similarity))
        
        if not top_sections:
            print("Nenhuma seção relevante encontrada.")
            return ""
        
        top_sections.sort(key=lambda x: x[0][3])
        
        combined_content = []
        print(f"Top {len(top_sections)} seções mais relevantes:")
        
        for (section_number, section_title, content, position), similarity in top_sections:
            print(f"  {section_title} (similaridade: {similarity:.3f})")
            combined_content.append(f"=== {section_number} - {section_title} ===\n{content}")
        
        return "\n\n".join(combined_content)
    
    except Exception as e:
        print(f"Erro ao buscar segmentos: {e}")
        return ""
    
    finally:
        conn.close()


In [None]:
find_best_matching_sections("/home/pramos/Documents/AutoSLR/validations/regex_validation/results/extern_llm-gemini.db", "lesoil2024.pdf", ["Introduction", "Methodology"])

Seções encontradas no PDF 'lesoil2024.pdf': ['Introduction']
Seções não encontradas: ['Methodology']


'=== 1 - Introduction ===\n1. Introduction\nMost modern software systems are widely configurable, featuring\nmany configuration options that users can set or modify according to\ntheir needs, for instance, to maximize some performance metrics (e.g.,\nexecution time, energy consumption). As a software system matures,\nit diversifies its user base and adds new features to satisfy new needs,\nincreasing its overall number of options. However, manually quantify-\ning the individual impact of each option and their interactions quickly\nbecomes tedious, costly and time-consuming, which reinforces the need\nto automate how to study and combine these options together. Soft-\nware reliability can be largely degraded if inappropriate configuration\noptions are selected or if ageing-related bugs such as configuration-\ndependent memory leaks remain undetected (Xu et al., 2020).\nTo address these issues, researchers typically apply machine learning\n(ML) techniques (Guo et al., 2013; Temple et al.

In [None]:
def ask_local_llm(json_path:str, model_name:str, topic:str, base_text:list[str], prompt_type:str = "") -> str:
    options = {
        "num_ctx": 4096
    }
    with open(json_path, 'r') as file:
        data = json.load(file)

    if base_text == []:
        return ""
    
    if isinstance(base_text, list):
        base_text = "\n\n".join(base_text)

    item = data.get(topic, {})

    context = item.get("context", "")
    question = item.get("question", "")
    some_answer_examples = item.get("some_answer_examples", "")
    answer_prefix = item.get("answer_prefix", "")
    base_text = base_text
    text_example = item.get("text_example", "")
    answer_text_example = item.get("answer_text_example", "")
    
    message = build_message(topic, context, question, some_answer_examples, answer_prefix, base_text, text_example, answer_text_example)
    if prompt_type == "simple":
        message[0]['content'] += f"\n\nPrompt Type: {prompt_type}"
    
    answer = chat(model=model_name, messages=message, options=options)
    return answer

In [None]:
def join_texts(text1:str, tex2:str) -> str:
    if not text1 and not tex2:
        err = "No section found for this paper"
        return 
    elif text1[:50] == tex2[:50]:
        text = text1
        err = None
    else:
        text = join_texts([text1, tex2])
        err = None
    return err, text

In [None]:
references_list = merged_df["reference"].tolist()

In [None]:
prompt_types = ["simple", "complex", "one shot"]

In [None]:
#1) local llm tests
db_path = "local_llm_evaluation.db"
create_llm_tables(db_path, local_llm_models)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

for model in local_llm_models:
    print(f"Evaluating with model: {model}")
    for name in references_list:
        
        text_intro = find_best_matching_sections(db_path, name, "Introduction", max_sections=1)
        text_method = find_best_matching_sections(db_path, name, "Methodology", max_sections=1)

        err, text = join_texts(text_intro, text_method)
        if err:
            print(f"Error for {name}: {err}")
            continue
            
        for metric in evaluated_metrics:
            for prompt_type in prompt_types:
                name_id = f"{name}_{metric}_{model}_{prompt_type}"
                
                cursor.execute("SELECT 1 FROM llm_responses WHERE name_id = ?", (name_id,))
                exists = cursor.fetchone()
                conn.close()
                if exists:
                    continue
                
                answer = ask_local_llm("metrics.json", model, metric, text, prompt_type)
                
                insert_llm_response(db_path, model, name, prompt_type, metric, answer, name_id)
        


In [None]:
extern_llm_models = ["gemini-2.5-flash-preview-05-20"]

In [None]:
#2) remote llm tests
db_path = "global_llm_evaluation.db"
create_llm_tables(db_path, extern_llm_models)

conn = sqlite3.connect(db_path)
cursor = conn.cursor()

for model in extern_llm_models:
    print(f"Evaluating with model: {model}")
    for name in references_list:
        
        text_intro = find_best_matching_sections(db_path, name, "Introduction", max_sections=1)
        text_method = find_best_matching_sections(db_path, name, "Methodology", max_sections=1)

        err, text = join_texts(text_intro, text_method)
        if err:
            print(f"Error for {name}: {err}")
            continue
            
        for metric in evaluated_metrics:
            for prompt_type in prompt_types:
                name_id = f"{name}_{metric}_{model}_{prompt_type}"
                
                cursor.execute("SELECT 1 FROM llm_responses WHERE name_id = ?", (name_id,))
                exists = cursor.fetchone()
                conn.close()
                if exists:
                    continue
                
                #answer = ask_local_llm("metrics.json", model, metric, text, prompt_type)
                #TO-DO: Implement a gemini call
                
                insert_llm_response(db_path, model, name, prompt_type, metric, answer, name_id)