# Preprocess data from OFAS

## Set up the environment

In [42]:
import json
import logging
import pandas as pd
from typing import Dict, List, Any
from haystack.dataclasses import ByteStream, Document
from haystack.components.converters import PyPDFToDocument
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
import requests
from tqdm import tqdm
import re
import tiktoken
import os
from dotenv import load_dotenv
from bs4 import BeautifulSoup
import ast
from pydantic import BaseModel
from openai import AsyncOpenAI
from dataclasses import dataclass
import deepl
import asyncio

In [43]:
load_dotenv()
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", None)

In [44]:
llm_client = AsyncOpenAI(
    api_key=OPENAI_API_KEY
)

In [45]:
MAX_CONTEXT_TOKENS = 120000
tokenizer = tiktoken.get_encoding("cl100k_base")

In [46]:
# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)

## Load the data

In [47]:
# Load the PDF URLs from the JSON file
with open('sources/pdf_urls.json', 'r') as file:
    pdf_urls = json.load(file)
# pdf_urls = pdf_urls[:1]
pdf_urls = pdf_urls[:20] + pdf_urls[460:480] + pdf_urls[-20:]
# pdf_urls = pdf_urls[:20] + pdf_urls[220:240]+ pdf_urls[430:450] + pdf_urls[460:480] + pdf_urls[930:950] + pdf_urls[1390:1410] + pdf_urls[-20:]

## Pre-processing

### LLM augmentation

#### HYQ generation

In [48]:
class HYQReformulationSchema(BaseModel):
    hyq: List[str]
    hyq_declarative: List[str]

In [49]:
QUERY_STATEMENT_REWRITING_PROMPT_DE = """<anweisungen>
    <anweisung>Gebe den untenstehenden <text> vor, formuliere {n_alt_queries} Fragen, die der Text genau beantworten kann</anweisung>
    <anweisung>Geben Sie die generierten Fragen vor, formulieren Sie sie in einem deklarativen/affirmativen Tonfall in mehrere alternative Aussagen um</anweisung>
    <anweisung>Jede umformulierte Aussage sollte die Bedeutung der ursprünglichen Anfrage beibehalten, sie aber auf eine etwas andere Weise ausdrücken</anweisung>
    <anweisung>Schreiben Sie Fragen/Reformulierungen immer in derselben Sprache wie der <text></anweisung>
</anweisungen>

<Beispiele>
hyq: [„Wie ist das Wetter?“, „Was ändert sich mit AHV21?“, „Was bedeutet das flexible Rentenalter?“]
hyq_delarative: [„Ich möchte wissen, wie das Wetter ist“, „Erklär mir, was sich mit der AHV21 ändert“, „Flexibles Rentenalter erklärt“]
</Beispiele>

<format_der_antwort>
HYQReformulationSchema(BaseModel)
    hyq: List[str] # eine Liste von Fragen, die der <text> genau beantworten kann.
    hyq_declarative = List[str] # die affirmative/deklarative Umformulierung der hyq-Fragen.
</format_der_antwort>

<text>
{text}
</text>"""

QUERY_STATEMENT_REWRITING_PROMPT_FR = """<instructions>
    <instruction>Étant donné le <texte> ci-dessous, formulez {n_alt_queries} questions auxquelles le texte peut exactement répondre</instruction>
    <instruction>Étant donné les questions générées, reformulez les en plusieurs énoncés alternatifs sur un ton déclaratif/affirmatif</instruction>
    <instruction>Chaque déclaration reformulée doit conserver le sens de la requête originale mais l'exprimer d'une manière légèrement différente</instruction>
    <instruction>Toujours écrire les questions/reformulations dans la même langue que le <texte></instruction>
</instructions>

<exemples>
hyq: ["Quel temps fait-il?", "Que change avec AVS21 ?", "Que signifie l'âge de la retraite flexible ?"]
hyq_delarative: ["J'aimerais connaître le temps qu'il fait", "Explique moi ce qui change avec AVS21", "L'âge de la retraite flexible expliqué"]
</exemples>

<format_de_réponse>
HYQReformulationSchema(BaseModel)
    hyq: List[str] # une liste de questions auxquelles le <texte> peut répondre exactement
    hyq_declarative = List[str] # la reformulation de manière affirmative/déclarative des questions hyq
</format_de_réponse>

<texte>
{text}
</texte>"""

QUERY_STATEMENT_REWRITING_PROMPT_IT = """<istruzioni>
    <istruzione>Dato il <testo> sottostante, formulare {n_alt_queries} domande a cui il testo può rispondere esattamente</istruzione>.
    <istruzione>Date le domande generate, riformularle in diverse affermazioni alternative con un tono dichiarativo/affermativo</istruzione>.
    <istruzione>Ogni affermazione riformulata deve mantenere il significato della domanda originale, ma esprimerlo in modo leggermente diverso</istruzione>.
    <istruzione>Scrivere sempre le domande/riformulazioni nella stessa lingua del <testo></istruzione>.
</istruzioni>

<esempi>
hyq: [“Com'è il tempo?”, “Cosa sta cambiando con AVS21?”, “Cosa significa l'età pensionabile flessibile?”]
hyq_delarative: [“Vorrei sapere com'è il tempo”, “Spiegami cosa sta cambiando con AVS21”, “L'età pensionabile flessibile spiegata”].
</esempi>

<formato_di_risposta>
HYQReformulationSchema(BaseModel)
    hyq: List[str] # un elenco di domande a cui il <testo> può rispondere esattamente
    hyq_declarative = List[str] # la riformulazione affermativa/declarativa delle domande hyq.
</formato_di_risposta>

<testo>
{text}
</testo>"""

hyq_prompts = {
    "de": QUERY_STATEMENT_REWRITING_PROMPT_DE,
    "fr": QUERY_STATEMENT_REWRITING_PROMPT_FR,
    "it": QUERY_STATEMENT_REWRITING_PROMPT_IT,
}

#### Summary generation

In [50]:
class SummarySchema(BaseModel):
    summary: str

In [51]:
QUERY_STATEMENT_SUMMARY_PROMPT_DE = """<anweisungen>
    <anweisung>Gebe den untenstehenden <text> vor, und formuliere eine Zusammenfassung, die den Inhalt des Textes präzise wiedergibt</anweisung>
    <anweisung>Die Zusammenfassung sollte in einem klaren und prägnanten Tonfall erfolgen</anweisung>
    <anweisung>Die Zusammenfassung muss den wesentlichen Inhalt des Textes erfassen und ihn in eigenen Worten wiedergeben</anweisung>
    <anweisung>Schreiben Sie die Zusammenfassung immer in derselben Sprache wie der <text></anweisung>
</anweisungen>

<Beispiele>
summary: "Das Individuelle Konto (IK) ist entscheidend für die Rentenberechnung in der AHV. Es erfasst alle Einkommen, Beitragszeiten und Betreuungsgutschriften, die für Alters-, Hinterlassenen- oder Invalidenrenten relevant sind. Fehlende Beitragsjahre, auch als Beitragslücken bezeichnet, können zu einer Kürzung der Versicherungsleistungen führen. Die Ausgleichskassen führen für jede versicherte Person ein IK und sind verantwortlich für die korrekte Erfassung der Beiträge."
</Beispiele>

<format_der_antwort>
SummarySchema(BaseModel)
    summary: str # Eine Zusammenfassung, die den <text> exakt wiedergibt
</format_der_antwort>

<text>
{text}
</text>"""

QUERY_STATEMENT_SUMMARY_PROMPT_FR = """<instructions>
    <instruction>Étant donné le <texte> ci-dessous, formulez un résumé qui restitue précisément le contenu du texte</instruction>
    <instruction>Le résumé doit être présenté de manière claire et concise</instruction>
    <instruction>Le résumé doit capturer l'essence du contenu du texte en le reformulant avec vos propres mots</instruction>
    <instruction>Rédigez le résumé toujours dans la même langue que le <texte></instruction>
</instructions>

<exemples>
summary: "Le Compte Individuel (CI) est déterminant pour le calcul de la retraite dans l’AVS. Il recense tous les revenus, périodes de cotisation et crédits de prise en charge qui sont pertinents pour les rentes de vieillesse, de survivants ou d'invalidité. Des années de cotisation manquantes, également appelées lacunes de cotisation, peuvent entraîner une réduction des prestations. Les caisses de compensation tiennent un CI pour chaque assuré et sont responsables de l’enregistrement correct des contributions."
</exemples>

<format_de_réponse>
SummarySchema(BaseModel)
    summary: str # Un résumé qui restitue précisément le contenu du <texte>
</format_de_réponse>

<texte>
{text}
</texte>"""

QUERY_STATEMENT_SUMMARY_PROMPT_IT = """<istruzioni>
    <istruzione>Dato il <testo> sottostante, formula un riassunto che riporti esattamente il contenuto del testo</istruzione>
    <istruzione>Il riassunto deve essere presentato in modo chiaro e conciso</istruzione>
    <istruzione>Il riassunto deve catturare l'essenza del contenuto del testo riformulandolo con parole proprie</istruzione>
    <istruzione>Scrivi il riassunto sempre nella stessa lingua del <testo></istruzione>
</istruzioni>

<esempi>
summary: "Il Conto Individuale (CI) è fondamentale per il calcolo delle pensioni nell'AVS. Esso registra tutti i redditi, periodi di contribuzione e crediti per l'assistenza che sono rilevanti per le pensioni di vecchiaia, superstiti o invalidità. Anni di contribuzione mancanti, noti anche come lacune contributive, possono portare a una riduzione delle prestazioni. Le casse di compensazione mantengono un CI per ogni assicurato e sono responsabili della corretta registrazione dei contributi."
</esempi>

<formato_di_risposta>
SummarySchema(BaseModel)
    summary: str # Un riassunto che riporti esattamente il contenuto del <testo>
</formato_di_risposta>

<testo>
{text}
</testo>"""

summary_prompts = {
    "de": QUERY_STATEMENT_SUMMARY_PROMPT_DE,
    "fr": QUERY_STATEMENT_SUMMARY_PROMPT_FR,
    "it": QUERY_STATEMENT_SUMMARY_PROMPT_IT,
}

### Parsing

In [52]:
# Define the parser class
class OFASParser:
    def __init__(self):
        self.pdf_converter = PyPDFToDocument()
        self.cleaner = DocumentCleaner(
            remove_empty_lines=True,
            remove_extra_whitespaces=True,
            remove_repeated_substrings=False,
        )
        self.splitter = DocumentSplitter(
            split_by="sentence",
            split_length=5,
            split_overlap=1,
            split_threshold=4,
        )

    def clean_text(self, text: str) -> str:
        # Remove excess dots and formatting artifacts
        text = re.sub(r'\.{4,}', '', text)  # Remove sequences of three or more dots
        text = re.sub(r'\s+', ' ', text)  # Replace multiple spaces with a single space
        text = re.sub(r'\n+', '\n', text)  # Replace multiple newlines with a single newline
        text = text.strip()  # Remove leading and trailing whitespace
        return text

    async def generate_hyq(self, language: str, text: str) -> list:
        prompt = hyq_prompts.get(language)
        messages = [
            {"role": "user", "content": prompt.format(n_alt_queries=3, text=text)}
        ]
        res = await llm_client.beta.chat.completions.parse(
            model="gpt-4o",
            temperature=0,
            top_p=0.95,
            max_tokens=2048,
            messages=messages,
            response_format=HYQReformulationSchema,
        )
        
        hyq = res.choices[0].message.parsed.hyq
        hyq_declarative = res.choices[0].message.parsed.hyq_declarative
        return [hyq, hyq_declarative]

    async def generate_summary(self, language: str, text: str) -> list:
        prompt = summary_prompts.get(language)
        messages = [
            {"role": "user", "content": prompt.format(text=text)}
        ]
        res = await llm_client.beta.chat.completions.parse(
            model="gpt-4o",
            temperature=0,
            top_p=0.95,
            max_tokens=2048,
            messages=messages,
            response_format=SummarySchema,
        )
        
        summary = res.choices[0].message.parsed.summary
        return summary

    async def convert_to_documents(self, pdf_urls: List[dict]) -> List[dict]:
        documents = []
        for url in tqdm(pdf_urls, desc="Processing PDFs", unit="file"):
            # print(url)
            try:
                # Fetch the PDF content
                response = requests.get(url['url'])
                response.raise_for_status()

                # Check if the content is a PDF
                if response.headers.get('Content-Type') != 'application/pdf':
                    logger.warning(f"Skipping non-PDF content from {url['url']}")
                    continue

                pdf_content = response.content

                # Convert PDF content to Document objects
                byte_stream = ByteStream(data=pdf_content)
                result = self.pdf_converter.run(sources=[byte_stream])
                converted_docs = result["documents"]  # Ensure this is a list of Document objects

                # Clean documents
                cleaned_result = self.cleaner.run(documents=converted_docs)
                cleaned_docs = cleaned_result["documents"]  # Extract the list of cleaned Document objects

                # Split documents
                split_result = self.splitter.run(documents=cleaned_docs)
                split_docs = split_result["documents"]  # Extract the list of split Document objects

                # Create document format for each converted document
                for doc in cleaned_docs:
                    # Clean the text content
                    cleaned_text = self.clean_text(doc.content)

                    # Check if the cleaned text exceeds the token limit
                    tokenized_text = tokenizer.encode(cleaned_text)
                    if len(tokenized_text) > MAX_CONTEXT_TOKENS:
                        chunks = [tokenizer.decode(tokenized_text[i:i + MAX_CONTEXT_TOKENS]) for i in range(0, len(tokenized_text), MAX_CONTEXT_TOKENS)]
                        for chunk in chunks:
                            hyq = await self.generate_hyq(language, chunk)
                            summary = await self.generate_summary(language, chunk)
                            document = {
                                "url": url['url'],
                                "text": chunk,
                                "language": language,
                                "tags": url['tag'],
                                "subtopics": url['subtopics'],
                                "summary": summary,  # TODO
                                "doctype": "context_doc",  # Constant value
                                "organizations": "OFAS",  # Constant value
                                "hyq": "{SEP}".join(hyq[0]),
                                "hyq_declarative": "{SEP}".join(hyq[1])
                            }
                            documents.append(document)
                    else:
                        # Extract language from the URL
                        language = url['url'].split('/')[3]
                        hyq = await self.generate_hyq(language, cleaned_text)
                        summary = await self.generate_summary(language, cleaned_text)
                        document = {
                            "url": url['url'],
                            "text": cleaned_text,
                            "language": language,
                            "tags": url['tag'],
                            "subtopics": url['subtopics'],
                            "summary": summary,  # TODO
                            "doctype": "context_doc",  # Constant value
                            "organizations": "OFAS",  # Constant value
                            "hyq": "{SEP}".join(hyq[0]),
                            "hyq_declarative": "{SEP}".join(hyq[1])
                        }
                        documents.append(document)
            except requests.RequestException as e:
                logger.error(f"Failed to fetch PDF from {url}: {e}")
        return documents

In [53]:
# Initialize the parser
ofas_parser = OFASParser()

In [54]:
# Process the PDF URLs
async def process_pdfs():
    # Convert PDFs to documents
    documents = await ofas_parser.convert_to_documents(pdf_urls)
    
    # Output the documents
    for doc in documents[:2]:
        print(doc)
    
    return documents


In [55]:
# Run the processing function
documents = await process_pdfs()

Processing PDFs:   0%|          | 0/60 [00:00<?, ?file/s]

2025-03-10 13:07:14,970 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-03-10 13:07:21,645 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
Processing PDFs:   2%|▏         | 1/60 [00:11<11:27, 11.65s/file]2025-03-10 13:07:43,344 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-03-10 13:07:59,484 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
Processing PDFs:   3%|▎         | 2/60 [00:49<26:09, 27.06s/file]2025-03-10 13:08:19,657 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-03-10 13:08:40,872 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
Processing PDFs:   5%|▌         | 3/60 [01:30<31:55, 33.60s/file]2025-03-10 13:09:05,553 - httpx - INFO - HTTP Request: POST https://api.openai.

{'url': 'https://sozialversicherungen.admin.ch/it/d/6905/download', 'text': '1 Panoramica delle direttive sui contributi, stato 1º gennaio 2021 Il presente documento offre una panoramica delle direttive sui contributi AVS/AI/IPG/AD e fornisce agli utenti uno strumento per trovare rapidamente le disposizioni delle direttive determinanti per una precisa questione. Per motivi di chiarezza, singoli temi non ordinari sono qui tralasciati o trattati solo a grandi linee. Le direttive principali a cui si fa riferiment o nella presente panoramica sono le Direttive sull’obbligo assicurativo nell’AVS/AI (DOA), le Direttive sul salario determinante nell’AVS/AI e nelle IPG (DSD), le Direttive sui contributi dei lavoratori indipendenti e delle persone senza attività lucrativa nell’AVS/AI e nelle IPG (DIN) e le Direttive sulla riscossione dei contributi nell’AVS/AI e nelle IPG (DRC). Le DOA determinano quali persone sono soggette all’AVS/AI/IPG/AD obbligatoria. Le DSD elencano i redditi dei salariati




In [56]:
documents

[{'url': 'https://sozialversicherungen.admin.ch/it/d/6905/download',
  'text': '1 Panoramica delle direttive sui contributi, stato 1º gennaio 2021 Il presente documento offre una panoramica delle direttive sui contributi AVS/AI/IPG/AD e fornisce agli utenti uno strumento per trovare rapidamente le disposizioni delle direttive determinanti per una precisa questione. Per motivi di chiarezza, singoli temi non ordinari sono qui tralasciati o trattati solo a grandi linee. Le direttive principali a cui si fa riferiment o nella presente panoramica sono le Direttive sull’obbligo assicurativo nell’AVS/AI (DOA), le Direttive sul salario determinante nell’AVS/AI e nelle IPG (DSD), le Direttive sui contributi dei lavoratori indipendenti e delle persone senza attività lucrativa nell’AVS/AI e nelle IPG (DIN) e le Direttive sulla riscossione dei contributi nell’AVS/AI e nelle IPG (DRC). Le DOA determinano quali persone sono soggette all’AVS/AI/IPG/AD obbligatoria. Le DSD elencano i redditi dei salari

In [57]:
# Save documents to a CSV file
df = pd.DataFrame(documents)  # Create a DataFrame from the list of documents
df.to_csv('output/ofas.csv', index=False)  # Save to CSV without the index