Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 65 additions & 57 deletions www/services/api_retriever.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
"""
extract.py — Fase 1 (Extract) della Pipeline ETL Bibliometrix.
extract.py — Fase 1 (Estrazione) della pipeline ETL Bibliometrix.

Responsabilità del modulo:
- Interrogare le API di OpenAlex e PubMed tramite query bibliografiche.
- Gestire la paginazione, il rate-limiting e gli errori di rete in modo robusto.
- Persistere i dati grezzi su disco in formato JSON (OpenAlex) e XML (PubMed).
- Restituire i record parsati come lista di dizionari alla pipeline downstream.
- Restituire i record parsati come lista di dizionari al flusso successivo.

Architettura a livelli:
Livello 1 — Costruttori URL : Prepara gli indirizzi web esatti per interrogare i database partendo dalla ricerca dell'utente.
Livello 2 — Fetcher HTTP : Si collega a internet per scaricare i dati. Include un sistema di sicurezza: se la connessione fallisce o il server è bloccato, aspetta qualche secondo e riprova in automatico.
Livello 3 — Persistenza : Prende i file scaricati dalla rete e li salva fisicamente sul computer in formato JSON o XML per non perderli.
Livello 4 — Orchestratori : È il "cervello" del programma. Fa lavorare insieme i livelli precedenti, gestisce il cambio pagina (paginazione) e decide quando fermare l'estrazione dei dati..

Output su disco:
Risultati su disco:
- OpenAlex → <output_dir>/openalex_<timestamp>.json
- PubMed → <output_dir>/pubmed_<timestamp>.xml
"""
Expand Down Expand Up @@ -42,7 +42,7 @@
# direttamente nel sorgente (principio del "no hardcoded credentials").
MAILTO: str = os.environ.get("BIBLIOMETRIX_EMAIL", "aniellomarcorobe@progetto.it")

DEFAULT_PER_PAGE: int = 25 # Numero di record per richiesta (batch size)
DEFAULT_PER_PAGE: int = 25 # Numero di record per richiesta
DEFAULT_MAX_RETRIES: int = 3 # Numero massimo di tentativi per richiesta fallita

# Pausa minima tra richieste consecutive per rispettare il limite di
Expand Down Expand Up @@ -70,13 +70,13 @@ def build_openalex_url(
indica l'inizio della sequenza, mentre i valori successivi sono opachi e
restituiti dal campo meta.next_cursor di ogni risposta.

Args:
query: Stringa di ricerca bibliografica (verrà URL-encoded).
Parametri:
query: Stringa di ricerca bibliografica (verrà codificata per URL).
cursor: Token di paginazione corrente (default "*" = prima pagina).
per_page: Numero di record per pagina (max 200 per OpenAlex).
only_with_abstract: Se True, esclude i record privi di abstract.

Returns:
Restituisce:
URL completo e pronto per la richiesta GET.
"""
encoded_query = quote_plus(query)
Expand All @@ -87,7 +87,7 @@ def build_openalex_url(
f"&per-page={per_page}"
f"&mailto={MAILTO}"
)
# Il filtro has_abstract viene applicato opzionalmente: nelle pipeline
# Il filtro has_abstract viene applicato opzionalmente: nei flussi
# bibliometriche l'abstract è spesso prerequisito per l'analisi testuale.
if only_with_abstract:
url += "&filter=has_abstract:true"
Expand All @@ -105,12 +105,12 @@ def build_pubmed_search_url(
a efetch (vedi build_pubmed_fetch_url). Il formato JSON è preferito a XML
per semplicità di parsing lato client.

Args:
query: Stringa di ricerca in sintassi PubMed (verrà URL-encoded).
Parametri:
query: Stringa di ricerca in sintassi PubMed (verrà codificata per URL).
retstart: Offset di paginazione (0 = prima pagina).
retmax: Numero massimo di PMID da restituire per questa richiesta.

Returns:
Restituisce:
URL completo per esearch in formato JSON.
"""
encoded_query = quote_plus(query)
Expand All @@ -129,10 +129,10 @@ def build_pubmed_fetch_url(id_list: List[str]) -> str:
richieste batch. Il formato XML è obbligatorio per ottenere lo schema
Medline completo con tutti i campi bibliografici disponibili.

Args:
Parametri:
id_list: Lista di PMID (stringa) da recuperare.

Returns:
Restituisce:
URL completo per efetch in formato XML.
"""
ids = ",".join(id_list)
Expand All @@ -143,7 +143,7 @@ def build_pubmed_fetch_url(id_list: List[str]) -> str:


# ---------------------------------------------------------------------------
# LIVELLO 2 — Fetcher con retry e backoff esponenziale
# LIVELLO 2 — Recupero HTTP con nuovi tentativi e backoff esponenziale
# ---------------------------------------------------------------------------

def fetch_data_with_retries(
Expand All @@ -152,21 +152,22 @@ def fetch_data_with_retries(
response_format: str = "json",
) -> Optional[Union[Dict, str]]:
"""
Esegue una richiesta GET con gestione robusta degli errori e retry automatico.
Esegue una richiesta GET con gestione robusta degli errori e nuovi tentativi automatici.

Strategia di retry differenziata per tipo di errore:
- 429 (Rate Limit) : rispetta l'header Retry-After restituito dal server.
Strategia di nuovi tentativi differenziata per tipo di errore:
- 429 (limite di frequenza): rispetta l'header Retry-After restituito dal server.
- 5xx (Errore server): applica backoff esponenziale (2^attempt secondi).
- Errori di rete : applica lo stesso backoff esponenziale dei 5xx.
- 4xx non-429 : errore permanente, nessun retry (return None immediato).
- 4xx non-429 : errore permanente, nessun nuovo tentativo
(restituisce subito None).

Args:
Parametri:
url: URL da richiedere.
max_retries: Numero massimo di tentativi prima di arrendersi.
response_format: "json" per deserializzare la risposta, qualsiasi altro
valore per restituire il testo grezzo (es. XML).

Returns:
Restituisce:
dict se response_format == "json", str altrimenti.
Restituisce None se tutti i tentativi falliscono.
"""
Expand Down Expand Up @@ -203,7 +204,7 @@ def fetch_data_with_retries(

else:
# Errori 4xx (escluso 429): il problema è nella richiesta stessa,
# non nel server, quindi un retry non porterebbe a risultati diversi.
# non nel server, quindi un nuovo tentativo non porterebbe a risultati diversi.
print(f" -> Errore {response.status_code}: {response.text[:120]}")
return None

Expand All @@ -226,25 +227,35 @@ def _ensure_output_dir(output_dir: Path) -> None:
L'uso di parents=True ed exist_ok=True rende la funzione sicura da
chiamare ripetutamente: non solleva eccezioni se la directory (o parte
del suo percorso) esiste già.

Parametri:
output_dir: Percorso della directory da creare se assente.

Restituisce:
None.

Solleva:
OSError: Propagato dal filesystem se la directory non può essere
creata, ad esempio per permessi insufficienti o percorso non valido.
"""
output_dir.mkdir(parents=True, exist_ok=True)


def save_openalex_json(results: List[Dict], output_dir: Path) -> Path:
"""
Serializza la lista di works OpenAlex in un file JSON con timestamp univoco.
Serializza la lista di record Works OpenAlex in un file JSON con timestamp univoco.

Il timestamp nel nome del file previene sovrascritture tra esecuzioni
successive e consente la tracciabilità temporale dei dataset estratti.
ensure_ascii=False preserva i caratteri Unicode tipici dei metadati
accademici internazionali (es. caratteri accentati, CJK, greco).

Args:
Parametri:
results: Lista di dizionari da serializzare.
output_dir: Directory di destinazione.

Returns:
Path del file JSON creato.
Restituisce:
Percorso del file JSON creato.
"""
_ensure_output_dir(output_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
Expand All @@ -260,16 +271,16 @@ def save_pubmed_xml(articles_xml: List[str], output_dir: Path) -> Path:
Avvolge i record PubMed in un documento XML canonico e lo salva su disco.

L'elemento radice <PubmedArticleSet> è conforme alla DTD ufficiale PubMed,
rendendo il file compatibile con tool di analisi bibliometrica standard.
rendendo il file compatibile con strumenti di analisi bibliometrica standard.
Il parsing individuale per articolo isola eventuali record malformati:
un singolo errore XML non pregiudica il salvataggio dell'intero batch.

Args:
Parametri:
articles_xml: Lista di stringhe XML grezze (una per articolo).
output_dir: Directory di destinazione.

Returns:
Path del file XML creato.
Restituisce:
Percorso del file XML creato.
"""
_ensure_output_dir(output_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
Expand All @@ -280,8 +291,8 @@ def save_pubmed_xml(articles_xml: List[str], output_dir: Path) -> Path:
try:
root.append(ET.fromstring(raw))
except ET.ParseError as exc:
# Un record malformato viene saltato con un warning: la robustezza
# del batch è prioritaria rispetto alla completezza del singolo record.
# Un record malformato viene saltato con un avviso: la robustezza
# del lotto e' prioritaria rispetto alla completezza del singolo record.
print(f" -> Articolo ignorato per errore XML: {exc}")

tree = ET.ElementTree(root)
Expand All @@ -302,7 +313,7 @@ def extract_openalex_data(
max_results: int = 100,
) -> List[Dict]:
"""
Estrae i works da OpenAlex con paginazione cursor-based.
Estrae i record Works da OpenAlex con paginazione basata su cursore.

La paginazione cursor-based è il metodo raccomandato da OpenAlex per
iterare su dataset di grandi dimensioni: garantisce coerenza anche se
Expand All @@ -313,15 +324,15 @@ def extract_openalex_data(
2. Il server non restituisce un next_cursor (fine del dataset).
3. Il numero di record accumulati raggiunge max_results.

Args:
Parametri:
query: Stringa di ricerca bibliografica.
output_dir: Directory di output per il file JSON.
only_with_abstract: Se True, filtra i record senza abstract.
max_results: Limite massimo di record da estrarre.

Returns:
Lista di dizionari con i metadati dei works estratti.
Il file JSON viene salvato su disco come side-effect.
Restituisce:
Lista di dizionari con i metadati dei record Works estratti.
Il file JSON viene salvato su disco come effetto collaterale.
"""
sep = "=" * 70
print(f"\n{sep}\nESTRAZIONE OPENALEX — query: '{query}'\n{sep}")
Expand Down Expand Up @@ -371,15 +382,15 @@ def extract_pubmed_data(
max_results: int = 100,
) -> List[Dict]:
"""
Estrae i record da PubMed con paginazione offset-based.
Estrae i record da PubMed con paginazione basata su offset.

Il protocollo E-Utilities richiede due chiamate HTTP distinte per ogni pagina:
1. esearch → restituisce i PMID corrispondenti alla query (risposta JSON).
2. efetch → recupera i record completi dato l'elenco di PMID (risposta XML).
La pausa tra le due chiamate rispetta il limite di 3 req/s imposto da NCBI.

Due strutture dati parallele vengono mantenute in memoria:
- all_results : dizionari Medline parsati, pronti per la pipeline downstream.
- all_results : dizionari Medline parsati, pronti per il flusso successivo.
- raw_xml_list : stringhe XML grezze, destinate al file di output su disco.
La separazione consente di ottimizzare indipendentemente le due trasformazioni.

Expand All @@ -388,15 +399,15 @@ def extract_pubmed_data(
2. Il server restituisce una pagina parziale (meno record del previsto),
segnale che il dataset è stato completamente esaurito.

Args:
Parametri:
query: Stringa di ricerca in sintassi PubMed.
output_dir: Directory di output per il file XML.
only_with_abstract: Mantenuto per coerenza di firma (non utilizzato da PubMed).
max_results: Limite massimo di record da estrarre.

Returns:
Restituisce:
Lista di dizionari in formato Medline (al più max_results elementi).
Il file XML viene salvato su disco come side-effect.
Il file XML viene salvato su disco come effetto collaterale.
"""
sep = "=" * 70
print(f"\n{sep}\nESTRAZIONE PUBMED — query: '{query}'\n{sep}")
Expand Down Expand Up @@ -429,7 +440,7 @@ def extract_pubmed_data(
# il limite di 3 req/s di NCBI senza necessità di API key.
time.sleep(_PAGE_SLEEP)

# Fase 2: download dei record completi tramite efetch (risposta XML).
# Fase 2: scaricamento dei record completi tramite efetch (risposta XML).
fetch_url = build_pubmed_fetch_url(id_list)
xml_data: Optional[str] = fetch_data_with_retries(fetch_url, response_format="xml")

Expand All @@ -441,7 +452,7 @@ def extract_pubmed_data(
root = ET.fromstring(xml_data)
articles = root.findall(".//PubmedArticle")

# Separazione tra rappresentazione disco (XML raw) e
# Separazione tra rappresentazione disco (XML grezzo) e
# rappresentazione memoria (dizionari Medline parsati).
batch_raw_xml = [ET.tostring(art, encoding="unicode") for art in articles]
raw_xml_list.extend(batch_raw_xml)
Expand Down Expand Up @@ -503,20 +514,17 @@ def extract_data(
La normalizzazione del parametro source con .lower().strip() tolera variazioni
di capitalizzazione senza richiedere validazione esplicita da parte del chiamante.

Parametri
----------
query : Stringa di ricerca bibliografica.
source : Sorgente dati: "openalex" oppure "pubmed" (case-insensitive).
output_dir : Directory di output (accetta str o Path per flessibilità).
Parametri:
query: Stringa di ricerca bibliografica.
source: Sorgente dati: "openalex" oppure "pubmed" (senza distinzione tra maiuscole e minuscole).
output_dir: Directory di output, accettata come stringa o Path.

Restituisce
-----------
Lista di dict con i record estratti dalla sorgente indicata.
Salva anche i file grezzi su disco nella output_dir come side-effect.
Restituisce:
Lista di dizionari con i record estratti dalla sorgente indicata.
Salva anche i file grezzi su disco nella output_dir come effetto collaterale.

Raises
------
ValueError : Se source non è "openalex" né "pubmed".
Solleva:
ValueError: Se source non è "openalex" né "pubmed".
"""
output_path = Path(output_dir)
normalized = source.lower().strip()
Expand All @@ -542,10 +550,10 @@ def extract_data(
import sys

# Gli argomenti CLI vengono uniti per supportare query multi-parola
# senza richiedere virgolette (es: python extract.py machine learning).
# senza richiedere virgolette (es: python extract.py apprendimento automatico).
query_test = " ".join(sys.argv[1:]) if len(sys.argv) > 1 else "machine learning bibliometrics"
print(f"Query di test: '{query_test}'")

for src in ("openalex", "pubmed"):
records = extract_data(query_test, source=src)
print(f"\n→ {src}: {len(records)} record estratti.\n")
print(f"\n→ {src}: {len(records)} record estratti.\n")
Loading