Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions www/services/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""
main.py — Orchestratore della Pipeline ETL Bibliometrix (Livello Avanzato)

Questo script unisce la Fase 1 (Extract) e le Fasi 2-5 (Transform, Validation,
Calculated Fields, Export) in un unico flusso automatizzato, senza alcun
intervento manuale sui file, come richiesto dalla specifica Advanced.
"""

import pandas as pd
from api_retriever import extract_data
from standardizer import convert2df

def run_etl_pipeline(query: str, source: str = "openalex", output_csv: str = "bibliometrix_export.csv") -> pd.DataFrame | None:
"""
Esegue l'intera pipeline ETL:
1. Scarica i dati via API (Extract).
2. Pulisce, valida e calcola i campi derivati (Transform).
3. Salva il risultato su disco (Load).
"""
print("\n" + "="*70)
print("🚀 AVVIO PIPELINE ETL BIBLIOMETRIX (LIV. AVANZATO)")
print("="*70)

# ---------------------------------------------------------
# FASE 1: EXTRACT (Download via API)
# ---------------------------------------------------------
print(f"\n[1/3] FASE DI ESTRAZIONE ({source.upper()})...")

# Questa chiamata avvia il prompt interattivo del tuo collega!
raw_records = extract_data(query=query, source=source)

if not raw_records:
print("\n❌ Nessun dato estratto. Pipeline interrotta.")
return None

# ---------------------------------------------------------
# FASI 2, 4 e 5: TRANSFORM, CALCULATED FIELDS e VALIDATION
# ---------------------------------------------------------
print(f"\n[2/3] FASE DI TRASFORMAZIONE E VALIDAZIONE...")

try:
# Passiamo i record grezzi al tuo standardizer.
# NOTA BENE: for_csv_export=True è FONDAMENTALE per convertire
# le liste in stringhe con il delimitatore ';' come chiede l'esame!
df = convert2df(
raw_records=raw_records,
source=source,
validate=True,
for_csv_export=True
)
except Exception as e:
print(f"\n❌ Errore critico durante la trasformazione: {e}")
return None

# ---------------------------------------------------------
# FASE 3: LOAD (Salvataggio in CSV)
# ---------------------------------------------------------
print(f"\n[3/3] FASE DI CARICAMENTO (Salvataggio CSV)...")

# encoding="utf-8-sig" è cruciale per permettere a Excel/R di leggere
# correttamente i caratteri speciali degli autori internazionali.
df.to_csv(output_csv, index=False, encoding="utf-8-sig")

print("\n" + "="*70)
print(f"✅ PIPELINE COMPLETATA CON SUCCESSO!")
print(f"📊 Dimensioni DataFrame: {df.shape[0]} righe x {df.shape[1]} colonne")
print(f"💾 File salvato in: {output_csv}")
print("="*70 + "\n")

return df

# =========================================================
# ESECUZIONE DELLO SCRIPT
# =========================================================
if __name__ == "__main__":
# Test della pipeline con una query accademica reale
QUERY_ESAME = "machine learning AND bibliometrics"

# Esegue l'ETL e crea il file "risultati_finali.csv"
df_risultato = run_etl_pipeline(
query=QUERY_ESAME,
source="openalex",
output_csv="risultati_finali.csv"
)
47 changes: 46 additions & 1 deletion www/services/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,53 @@ def parse_cochrane_data(datapath):

return data

#### FUNZIONI DEFINITE DA NOI

def parse_pubmed_medline_text(text: str) -> list[dict]:
"""
Legge un blocco di testo in formato MEDLINE (PubMed) e lo converte
in una lista di dizionari. Gestisce i campi ripetuti (es. multipli 'AU')
creando automaticamente delle liste.
"""
records = []
current_record = {}
current_key = None

for line in text.splitlines():
# Ignora righe vuote
if not line.strip():
continue

# Ogni nuovo record PubMed inizia col tag PMID
if line.startswith("PMID-"):
if current_record:
records.append(current_record)
current_record = {}

# Identifica un nuovo tag (es. "TI - ") - Il tag occupa i primi 4 caratteri
if len(line) > 6 and line[4:6] == "- ":
current_key = line[:4].strip()
value = line[6:].strip()

if current_key in current_record:
# Se la chiave esiste già (es. un secondo autore "AU"), trasformala in lista
if isinstance(current_record[current_key], list):
current_record[current_key].append(value)
else:
current_record[current_key] = [current_record[current_key], value]
else:
# Altrimenti salva il valore scalare
current_record[current_key] = value

# Se non c'è un tag, è la continuazione della riga precedente (es. un Abstract lungo)
elif current_key and line.startswith(" "):
if isinstance(current_record[current_key], list):
current_record[current_key][-1] += " " + line.strip()
else:
current_record[current_key] += " " + line.strip()

# Aggiungi l'ultimo record
if current_record:
records.append(current_record)

#parser
return records
Loading