In [None]:
from dotenv import load_dotenv
load_dotenv
import os
import re
import io
import sys
import json
import time
import logging
import traceback  # [QA CHANGE] For logging full tracebacks
import requests
import pandas as pd
import pyzotero #(has a Squiggly orange line under it)
# BioPython for PubMed
from Bio import Entrez

# PyAlex for OpenAlex
import pyalex #(has a Squiggly orange line under it)
from pyalex import Works #(has a Squiggly white line under it)

# Azure
from azure.core.credentials import AzureKeyCredential
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import AnalyzeResult
from azure.core.exceptions import HttpResponseError

# PyZotero for Zotero API access
from pyzotero import zotero #(has a Squiggly white line under "pyzotero", and "zotero" is white text)

# citeproc-py for citation formatting
from citeproc import CitationStylesStyle, CitationStylesBibliography, formatter #(has a Squiggly white line under "citeproc" and is white text)
from citeproc import Citation, CitationItem #(has a Squiggly white line under "citeproc" and is white text )
from citeproc.source.json import CiteProcJSON #(has a Squiggly white line under "citeproc" and is white text)

import openai


In [None]:
# For approximate token counting (optional)
try:
    import tiktoken
    tokenizer = tiktoken.get_encoding("gpt2")
    def count_tokens(text: str) -> int:
        return len(tokenizer.encode(text)) if isinstance(text, str) else 0
except ImportError:
    tokenizer = None
    def count_tokens(text: str) -> int:
        return 0

In [None]:
# ============== CONFIG ==============
LOG_DIR = "G:/ETL_LOGS/pipeline_test.log"
os.makedirs(LOG_DIR, exist_ok=True)  # [QA CHANGE] Ensure directory exists

LOG_FILE = os.path.join(LOG_DIR, "pipeline_test.log")
logging.basicConfig(
    filename=LOG_FILE,
    filemode='w',
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# [QA CHANGE] Optional: Also log CRITICAL errors to console if you wish
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.ERROR)
formatter_console = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter_console)
logging.getLogger().addHandler(console_handler)

#--------- configure API's -------------
Entrez.email = os.getenv("ENTREZ_EMAIL", "default_email@example.com")
UNPAYWALL_EMAIL = os.getenv("UNPAYWALL_EMAIL", "default_email@example.com")
pyalex.config.email = os.getenv("PYALEX_EMAIL", "default_email@example.com")

AZURE_ENDPOINT = os.getenv("AZURE_ENDPOINT", "https://your-azure-endpoint.cognitiveservices.azure.com/")
AZURE_KEY = os.getenv("AZURE_KEY", "your azure key")

if not AZURE_KEY or "YOUR_KEY" in AZURE_KEY:
    logging.error("AZURE_KEY is missing or placeholder. Exiting.")
    sys.exit(1)

#-----------Directories----------------
OUTPUT_CSV = os.getenv("OUTPUT_CSV", "G:/ETL_OUT/ETL_output.csv")
OUTPUT_FEATHER = os.getenv("OUTPUT_FEATHER", "G:/ETL_OUT/ETL_output2.feather")
ENRICHED_FEATHER = os.getenv("ENRICHED_FEATHER", "G:/ETL_OUT/enriched_output.feather")
PDF_SAVE_FOLDER = os.getenv("PDF_SAVE_FOLDER", "G:/ETL_PDFS")

os.makedirs(PDF_SAVE_FOLDER, exist_ok=True) # [QA CHANGE] Ensure directory exists




# ---------- Zotero Configuration ----------
ZOTERO_LIBRARY_ID = os.getenv("ZOTERO_LIBRARY_ID", "your zotero library ID")
ZOTERO_LIBRARY_TYPE = os.getenv("ZOTERO_LIBRARY_TYPE", "user")
ZOTERO_API_KEY = os.getenv("ZOTERO_API_KEY", "your zotero API key")
zot = zotero.Zotero(ZOTERO_LIBRARY_ID, ZOTERO_LIBRARY_TYPE, ZOTERO_API_KEY)

# Default collection key for added PDFs
DEFAULT_ZOTERO_COLLECTION = os.getenv("DEFAULT_ZOTERO_COLLECTION", "your zotero collection")

# [QA CHANGE] Validate the Zotero credentials
if not ZOTERO_LIBRARY_ID or "YOUR_LIBRARY_ID" in ZOTERO_LIBRARY_ID:
    logging.error("ZOTERO_LIBRARY_ID is missing or placeholder. Exiting.")
    sys.exit(1)
if not ZOTERO_API_KEY:
    logging.error("ZOTERO_API_KEY is missing. Exiting.")
    sys.exit(1)

In [None]:
ZOTERO_API_KEY = "your zotero key"
# Try to fetch key info to verify connectivity
try:
    key_info = zot.key_info()
    print("Zotero API connection successful. Key info:")
    print(key_info)
except Exception as e:
    print("Error connecting to Zotero:", e)

In [None]:
# 2. ChatGPT Setup
# -----------------------------
OPEN_API_KEY ="your OpenAI API key"
openai.api_key = OPEN_API_KEY
logging.debug("OpenAI API key set.")
MODEL_NAME = "text-embedding-3-large" 

In [None]:
# A dictionary of user-friendly style names => .csl file paths
# NOTE: you have to download these files: https://www.zotero.org/styles
# Havent tested all styles, style "2" works for sure
# ------------------------------------------------------------------------------
CITATION_STYLES = {
    "1": {"name": "American Medical Association 11th edition","path": r"G:\ETL_CSL\american-medical-association.csl"},
    "2": {"name": "American Psychological Association (APA) 6th Edition ","path": r"G:\ETL_CSL\apa-6th-edition.csl"},
    "3": {"name": "American Psychological Association (APA) 7th Edition ","path": r"G:\ETL_CSL\apa.csl"},
    "4": {"name": "American Psychological Association (APA) 7th Edition (annotated bibliography)","path": r"G:\ETL_CSL\apa-annotated-bibliography_7th.csl"},
    "5": {"name": "American Political Science Association","path": r"G:\ETL_CSL\APSA.csl"},
    "6": {"name": "Chicago Manual of Style 17th Edition (author-date)","path": r"G:\ETL_CSL\CHICAGO_AUTHOR_DATE.csl"},
    "7": {"name": "Chicago Manual of Style 17th Edition (full note)", "path": r"G:\ETL_CSL\chicago-fullnote-bibliography.csl"},
    "8": {"name": "Chicago Manual of Style 17th edition (note)", "path": r"G:\ETL_CSL\chicago-note-bibliography.csl"},
    "9": {"name": "Cite Them Right 12th edition - Harvard", "path": r"G:\ETL_CSL\harvard-cite-them-right.csl"},
    "10": {"name": "Elsevier - Harvard (with titles)","path": r"G:\ETL_CSL\elsevier-harvard.csl"},
    "11": {"name": "IEEE","path": r"G:\ETL_CSL\ieee.csl"},
    "12": {"name": "Modern Humanities Research Association 4th edition (notes with bibliography)","path": r"G:\ETL_CSL\modern-humanities-research-association.csl"},
    "13": {"name": "Modern Language Association 9th edition","path": r"G:\ETL_CSL\modern-language-association.csl"},
    "14": {"name": "Nature","path": r"G:\ETL_CSL\nature.csl"},
    "15": {"name": "Taylor & Francis - Chicago Manual of Style (author-date)", "path": r"G:\ETL_CSL\taylor-and-francis-chicago-author-date.csl"},
    "16": {"name": "Vancouver","path": r"G:\ETL_CSL\vancouver.csl"},   
}


In [None]:
# 1) PubMed - chunked search
# ------------------------------------------------------------------------------
def pubmed_search_chunked(query, desired_count=1000, chunk_size=200):
    """
    Searches PubMed for `query`, retrieves up to `desired_count` PMIDs in chunks.
    If fewer articles exist, retrieves all available.
    
    Returns a list of PMIDs.
    """
    logging.info(f"Searching PubMed for: {query}")
    print(f"[*] Searching PubMed for query: '{query}' ...")
    
    # 1) First get total count with a small retmax=0 search
    try:
        handle = Entrez.esearch(db="pubmed", term=query, retmax=0)
        record = Entrez.read(handle)
    except Exception as e:
        logging.error(f"PubMed query error for '{query}': {e}", exc_info=True)
        print(f"[!] Error contacting PubMed: {e}")
        return []

    total_found = int(record["Count"])
    logging.info(f"Total articles found for '{query}': {total_found}")
    print(f"[*] PubMed reports {total_found} total articles for this query.")

    # 2) Determine how many we actually want to retrieve
    retrieve_count = min(desired_count, total_found)
    if retrieve_count == 0:
        print("[!] No articles to retrieve (retrieve_count=0).")
        return []
    
    pmid_list = []
    start = 0
    while start < retrieve_count:
        batch = min(chunk_size, retrieve_count - start)
        logging.info(f"Retrieving records {start+1} to {start+batch}...")
        print(f"    -> Retrieving records {start+1} to {start+batch} ...")

        try:
            handle = Entrez.esearch(db="pubmed", term=query, retstart=start, retmax=batch)
            record = Entrez.read(handle)
            batch_pmids = record.get("IdList", [])
            pmid_list.extend(batch_pmids)
            start += batch
            # Be polite to NCBI servers
            time.sleep(0.2)
        except Exception as e:
            logging.error(f"Error retrieving chunk {start+1}-{start+batch} from PubMed: {e}", exc_info=True)
            print(f"[!] Error retrieving chunk {start+1}-{start+batch} from PubMed: {e}")
            break

    logging.info(f"Retrieved {len(pmid_list)} PMIDs (requested up to {retrieve_count}).")
    print(f"[*] Finished retrieving PMIDs. Got {len(pmid_list)} total.")
    return pmid_list

In [None]:
# 2) PubMed - fetch metadata
# ------------------------------------------------------------------------------
def fetch_pubmed_metadata(pmid_list):
    """
    Fetch data for each PMID, including: Title, DOI, Abstract,
    [Enhancement] plus Authors, Date, Journal, Volume, Issue, Pages.
    Returns a list of dicts suitable for DataFrame creation.
    """
    print("[*] Fetching metadata for each PMID...")
    logging.info(f"Starting metadata fetch for {len(pmid_list)} PMIDs.")

    results = []
    for i, pmid in enumerate(pmid_list):
        logging.debug(f"Fetching metadata for PMID {pmid} (index={i}).")
        print(f"    -> PMID {pmid} ({i+1}/{len(pmid_list)}) ...")

        try:
            handle = Entrez.efetch(db='pubmed', id=str(pmid), retmode='xml')
            record = Entrez.read(handle)
            
            article = record['PubmedArticle'][0]['MedlineCitation']['Article']
            title = article.get('ArticleTitle', '')
            
            # Abstract
            if 'Abstract' in article and 'AbstractText' in article['Abstract']:
                abs_list = article['Abstract']['AbstractText']
                abstract = " ".join(abs_list)
            else:
                abstract = ""
            
            # Try to get DOI
            doi = ""
            elocation = article.get('ELocationID', [])
            for loc in elocation:
                if loc.attributes.get('EIdType') == 'doi':
                    doi = str(loc)
                    break
            if not doi:
                # Also check ArticleIdList
                article_ids = record['PubmedArticle'][0]['PubmedData']['ArticleIdList']
                for aid in article_ids:
                    if aid.attributes.get('IdType') == 'doi':
                        doi = str(aid)
                        break
            
            # [Enhancement] Authors
            authors = []
            if 'AuthorList' in article:
                for author in article['AuthorList']:
                    if 'ForeName' in author and 'LastName' in author:
                        authors.append({
                            "firstName": author.get('ForeName', ''),
                            "lastName": author.get('LastName', '')
                        })
            
            # [Enhancement] Journal, Volume, Issue, Pages, and Date
            journal = ""
            volume = ""
            issue = ""
            pages = ""
            date_str = ""

            journal_info = article.get('Journal', {})
            if journal_info:
                journal = journal_info.get('Title', '')
                journal_issue_info = journal_info.get('JournalIssue', {})
                volume = journal_issue_info.get('Volume', '')
                issue = journal_issue_info.get('Issue', '')

                # Pages can be in 'Pagination' -> 'MedlinePgn'
                pagination = article.get('Pagination', {})
                if 'MedlinePgn' in pagination:
                    pages = pagination['MedlinePgn']
                
                # Date can come from 'PubDate' or from 'ArticleDate'
                pub_date = journal_issue_info.get('PubDate', {})
                # PubDate might have 'Year', 'Month', 'Day'
                if 'Year' in pub_date:
                    date_str = pub_date['Year']  # e.g. '2020'
                elif 'MedlineDate' in pub_date:
                    # sometimes it's e.g. '2020 Jan-Feb'
                    date_str = pub_date['MedlineDate']
            
            # Another possible place for date: 'ArticleDate'
            if 'ArticleDate' in article and article['ArticleDate']:
                # Typically a list of dicts with Year, Month, Day
                art_date = article['ArticleDate'][0]
                y = art_date.get('Year', '')
                m = art_date.get('Month', '')
                d = art_date.get('Day', '')
                # If we found a year, use it
                if y:
                    date_str = f"{y}"
                    if m:
                        date_str += f"-{m}"
                        if d:
                            date_str += f"-{d}"

            results.append({
                "PMID": pmid,
                "Title": title,
                "DOI": doi,
                "Abstract": abstract,
                "Authors": authors,        # [Enhancement]
                "Date": date_str,         # [Enhancement]
                "Journal": journal,       # [Enhancement]
                "Volume": volume,         # [Enhancement]
                "Issue": issue,           # [Enhancement]
                "Pages": pages            # [Enhancement]
            })
        except Exception as e:
            logging.error(f"Error fetching PMID {pmid}: {e}", exc_info=True)
            results.append({
                "PMID": pmid,
                "Title": "",
                "DOI": "",
                "Abstract": "",
                "Authors": [],
                "Date": "",
                "Journal": "",
                "Volume": "",
                "Issue": "",
                "Pages": "",
                "Error": str(e)
            })

    logging.info(f"Metadata fetch complete. {len(results)} records processed.")
    print("[*] Finished fetching metadata.")
    return results

In [None]:
# 3) PyAlex - OpenAlex search
# ------------------------------------------------------------------------------
def pyalex_search_works(query, desired_count=50):
    """
    Use PyAlex to search works matching 'query' and retrieve up to 'desired_count' items.
    """
    logging.info(f"PyAlex: Searching OpenAlex for '{query}' (max={desired_count})")
    print(f"[*] Searching OpenAlex (PyAlex) for '{query}'...")

    # We'll paginate in chunks of 25
    works_iter = Works().search(query).paginate(per_page=25)
    results = []
    total_fetched = 0

    try:
        for page in works_iter:
            for w in page:
                # print(json.dumps(w, indent=2))
                # title and abstract
                title = w.get("display_name", "")
                abstract_text = w.get("abstract") or ""
                
                # DOI
                doi_full = w.get("doi")
                if not isinstance(doi_full, str):
                    doi_full = ""
                doi_full = doi_full.lower()
                if doi_full.startswith("https://doi.org/"):
                    doi = doi_full.replace("https://doi.org/", "")
                else:
                    doi = doi_full
                
                # OpenAlex ID
                openalex_id = w.get("id", "")
                
                # Authors
                authors = []
                for auth in w.get("authorships", []):
                    author_obj = auth.get("author", {})
                    display_name = author_obj.get("display_name", "")
                    
                    if "," in display_name:
                        parts = display_name.split(",", 1)
                        last = parts[0].strip()
                        first = parts[1].strip() if len(parts) > 1 else ""
                    else:
                        sparts = display_name.split()
                        if len(sparts) > 1:
                            first = sparts[0]
                            last = " ".join(sparts[1:])
                        else:
                            first = display_name
                            last = ""
                    authors.append({"firstName": first, "lastName": last})

                # Date
                date_str = ""
                if w.get("publication_year"):
                    date_str = str(w["publication_year"])
                    if w.get("publication_date"):
                        date_str = w["publication_date"]  
                
                # Journal from primary_location->source->display_name
                primary_loc = w.get("primary_location") or {}
                if not isinstance(primary_loc, dict):
                    primary_loc = {}
                primary_src = primary_loc.get("source") or {}
                if not isinstance(primary_src, dict):
                    primary_src = {}
                journal = primary_src.get("display_name", "")
                
                # If you want the publisher (e.g. "Oxford University Press"):
                publisher = primary_src.get("host_organization_name", "")
                
                # If you want the PMID:
                pmid_url = w.get("ids", {}).get("pmid", "")
                pmid_str = pmid_url.split("/")[-1] if pmid_url else ""
                
                biblio = w.get("biblio", {})
                volume = biblio.get("volume", "")
                issue = biblio.get("issue", "")
                first_p = biblio.get("first_page", "")
                last_p = biblio.get("last_page", "")
                pages = ""
                if first_p and last_p:
                    pages = f"{first_p}-{last_p}"
                elif first_p:
                    pages = first_p

                    # Collect final metadata
                results.append({
                    "PMID": pmid_str,
                    "Title": title,
                    "DOI": doi,
                    "Abstract": abstract_text,
                    "OpenAlexID": openalex_id,
                    "Authors": authors,
                    "Date": date_str,
                    "Journal": journal,        # <--- Actual journal name 
                    "Publisher": publisher,    # <--- If you want
                    "Volume": volume,
                    "Issue": issue,
                    "Pages": pages
                })

                total_fetched += 1
                if total_fetched >= desired_count:
                    break
            if total_fetched >= desired_count:
                break
    except Exception as e:
        logging.error(f"PyAlex: Error searching OpenAlex for '{query}': {e}", exc_info=True)
        print(f"[!] Error searching OpenAlex with PyAlex: {e}")

    logging.info(f"PyAlex: Retrieved {len(results)} results for '{query}'")
    print(f"[*] PyAlex found {len(results)} works for '{query}'")
    return pd.DataFrame(results)

In [None]:
# 4) PDF retrieval from Unpaywall
# ------------------------------------------------------------------------------
def get_pdf_from_unpaywall(doi, email=UNPAYWALL_EMAIL):
    """
    Query Unpaywall for a PDF corresponding to the given DOI.
    Returns (pdf_content_bytes, content_type) if found, else (None, None).
    """
    logging.debug(f"Attempting Unpaywall PDF retrieval for DOI={doi}")
    if not doi:
        logging.warning("No DOI provided. Skipping PDF retrieval.")
        return None, None
    
    api_url = f"https://api.unpaywall.org/v2/{doi}?email={email}"
    try:
        resp = requests.get(api_url)
        if resp.status_code == 200:
            try:
                result = resp.json()
            except Exception as e:
                logging.error(f"JSON decode failed from Unpaywall for {doi}: {e}")
                return None, None
            
            if not isinstance(result, dict):
                logging.error(f"Unpaywall returned a non-dict result for {doi}. Skipping.")
                return None, None
            
            best_pdf = result.get("best_oa_location") or {}
            pdf_url = best_pdf.get("url_for_pdf")
            
            if pdf_url:
                logging.debug(f"Found best_oa_location PDF: {pdf_url}")
                try:
                    pdf_resp = requests.get(pdf_url, timeout=20)
                    if (pdf_resp.status_code == 200 
                            and "pdf" in pdf_resp.headers.get("Content-Type", "").lower()):
                        return pdf_resp.content, pdf_resp.headers["Content-Type"]
                except Exception as e:
                    logging.warning(f"Failed to fetch best_oa_location PDF for {doi}: {e}")                  
                    
         # fallback: check all oa_locations
            oa_locs = result.get("oa_locations", [])
            if isinstance(oa_locs, list):
                for loc in oa_locs:
                    if not isinstance(loc, dict):
                        continue
                    pdf_url = loc.get("url_for_pdf")
                    if pdf_url:
                        logging.debug(f"[Unpaywall fallback] Trying {pdf_url}")
                        try:
                            pdf_resp = requests.get(pdf_url, timeout=20)
                            if (pdf_resp.status_code == 200 
                                    and "pdf" in pdf_resp.headers.get("Content-Type", "").lower()):
                                return pdf_resp.content, pdf_resp.headers["Content-Type"]
                        except Exception as e:
                            logging.warning(f"Failed fallback PDF for {doi}: {e}")
                            
            logging.info(f"No valid PDF link found for DOI={doi}.")
        else:
            logging.error(f"Unpaywall error. status={resp.status_code} for DOI={doi}")
    except Exception as e:
        logging.error(f"Exception retrieving PDF from Unpaywall for {doi}: {e}", exc_info=True)
    
    return None, None


In [None]:
# 5) Azure PDF Extraction
# ------------------------------------------------------------------------------
class AzurePDFExtractor:
    """
    Wraps Azure DocumentIntelligenceClient to extract text and tables via the 'prebuilt-layout' model.
    """
    def __init__(self, endpoint, key):
        self.client = DocumentIntelligenceClient(endpoint=endpoint, credential=AzureKeyCredential(key))

    def extract_text_and_tables(self, pdf_path: str):
        logging.debug(f"Starting Azure extraction for: {pdf_path}")
        with open(pdf_path, "rb") as f:
            poller = self.client.begin_analyze_document("prebuilt-layout", body=f)
        result: AnalyzeResult = poller.result()

        all_pages_text = []
        for page in result.pages:
            page_lines = []
            if page.lines:
                for line in page.lines:
                    page_lines.append(line.content)
            page_text = "\n".join(page_lines)
            all_pages_text.append(page_text)
        full_text = "\n\n".join(all_pages_text)

        table_dataframes = []
        if result.tables:
            for table in result.tables:
                matrix = [
                    ["" for _ in range(table.column_count)]
                    for _ in range(table.row_count)
                ]
                for cell in table.cells:
                    matrix[cell.row_index][cell.column_index] = cell.content or ""
                df_table = pd.DataFrame(matrix)
                table_dataframes.append(df_table)

        logging.debug(f"Azure extraction complete for: {pdf_path}")
        return full_text, table_dataframes
# Why is AzurePDFExtractor declared as a class?
# The short answer: Object-Oriented organization. It’s a small “wrapper class” that holds:
## 1) A client (DocumentIntelligenceClient) as a member (in __init__), and
## 2)  A method (extract_text_and_tables) that uses that client.
# This means whenever you instantiate an AzurePDFExtractor, you automatically get:
## 1) The Azure Document Intelligence client, already set up with endpoint and key.
## 2) A reusable method (.extract_text_and_tables(...)) to parse PDFs.
# In functional terms, it’s just a convenient way to bundle (1) the Azure credentials and (2) the parsing logic, rather than having a bunch of separate functions with extra parameters. 

In [None]:
# 6) Deduplicate & Merge
# ------------------------------------------------------------------------------
def deduplicate_by_doi_title(df):
    """
    Remove duplicates within 'df' by checking:
      - 'DOI' (case-insensitive)
      - fallback: Title if 'DOI' is empty
    Return a deduplicated dataframe.
    """
    # Create normalized columns
    df["DOI_clean"] = df["DOI"].astype(str).str.lower().str.strip()
    df["Title_clean"] = df["Title"].astype(str).str.lower().str.strip()

    # Sort to keep first occurrence
    df.sort_values(by=["DOI_clean","Title_clean"], inplace=True)

    # Drop duplicates by doi
    df.drop_duplicates(subset=["DOI_clean"], keep="first", inplace=True)

    # For any with empty doi, also check Title
    df.drop_duplicates(subset=["Title_clean"], keep="first", inplace=True)

    df.drop(columns=["DOI_clean","Title_clean"], inplace=True)
    df.reset_index(drop=True, inplace=True)
    return df

In [None]:
# 7) Utility: Add a PDF to Zotero, generate item, attach PDF, retrieve metadata
# ------------------------------------------------------------------------------
def add_pdf_to_zotero(pdf_path, metadata, collection_keys=None):
    """
    Adds the PDF to Zotero with the provided metadata and returns the new item key.
    This function:
      1) Creates a 'journalArticle' item with fields (title, DOI, authors, date, etc.).
      2) Attaches the PDF to that item.
      3) Returns the Zotero item key if successful, else None.
      
      Parameters:
        pdf_path (str): Path to the downloaded PDF.
        metadata (dict): Dictionary containing metadata (e.g., Title, DOI, Authors, etc.).
        collection_keys (list, optional): List of Zotero collection keys where the item should be placed.
    
    Returns:
      item_key (str): The Zotero item key if successful, else None.

    'metadata' is expected to be a dict that may have the following keys:
      - "Title" (string)
      - "DOI" (string)
      - "Authors" (list of dict, each with e.g. {"firstName": "...", "lastName": "..."})
      - "Date" (string, e.g. "2019" or "2019-05-10")
      - "Journal" (string, if you want to store publication title)
      - "Volume", "Pages", "Issue" (optional strings)
    
    Example of metadata:
    {
      "Title": "A SWAT-based optimization tool ...",
      "DOI": "10.1016/j.scitotenv.2019.07.175",
      "Authors": [
        {"firstName": "Y.", "lastName": "Liu"},
        {"firstName": "T.", "lastName": "Guo"},
        # ...
      ],
      "Date": "2019",
      "Journal": "Science of The Total Environment",
      "Volume": "691",
      "Pages": "685-696"
    }
    """
    
    # 1) Use the default collection key if none are provided
    if collection_keys is None:
        collection_keys = [DEFAULT_ZOTERO_COLLECTION]

    # 2) Start the item_data with minimal required fields
    item_data = {
        "itemType": "journalArticle",
        "title": metadata.get("Title", ""),   # from user metadata
        "DOI": metadata.get("DOI", ""),       # from user metadata
        "collections": collection_keys
    }

    # 3) Build the list of creators (authors, editors, etc.) if provided
    authors_list = metadata.get("Authors", [])
    if authors_list and isinstance(authors_list, list):
        # Prepare the creators field for Zotero
        zotero_creators = []
        for author in authors_list:
            # Each 'author' is a dict with e.g. {"firstName": "Y.", "lastName": "Liu"}
            zotero_creators.append({
                "creatorType": "author",
                "firstName": author.get("firstName", ""),
                "lastName": author.get("lastName", "")
            })
        # Now attach this to item_data
        item_data["creators"] = zotero_creators

    # 4) Date, Journal, Volume, Pages, etc. are optional but help build robust citations
    #    Only add them if they exist in metadata
    if "Date" in metadata:
        item_data["date"] = metadata["Date"]
    if "Journal" in metadata:
        item_data["publicationTitle"] = metadata["Journal"]
    if "Volume" in metadata:
        item_data["volume"] = metadata["Volume"]
    if "Issue" in metadata:
        item_data["issue"] = metadata["Issue"]
    if "Pages" in metadata:
        item_data["pages"] = metadata["Pages"]
    if "Abstract" in metadata:
        item_data["abstractNote"] = metadata["Abstract"]
    if "Publisher" in metadata:
        item_data["publisher"] = metadata["Publisher"]
    if "PMID" in metadata and metadata["PMID"]:
        pmid_str = f"PMID: {metadata['PMID']}"
        # If there's already "extra", append; else create
        if "extra" in item_data and item_data["extra"]:
            item_data["extra"] += f"\n{pmid_str}"
        else:
            item_data["extra"] = pmid_str         
        
    try:
        # 5) Create the item in Zotero
        created_items = zot.create_items([item_data])
        # Zotero's response is usually a dict with a "successful" key.
        if isinstance(created_items, dict):
            successful_items = created_items.get("successful", {})
            if not successful_items:
                logging.error("Zotero create_items did not return any successful items.")
                return None
        # 6) Extract the item key from the response
            first_key = list(successful_items.keys())[0]
            item_data_resp = successful_items[first_key]
            item_key = item_data_resp.get("data", {}).get("key")
        elif isinstance(created_items, list) and created_items:
            item_key = created_items[0].get("data", {}).get("key")
        else:
            logging.error("Zotero create_items did not return a valid list.")
            return None
        if not item_key:
            logging.warning("No item key retrieved from Zotero creation.")
            return None
        # 7) Attach the PDF to the newly created item
        try:
            zot.attachment_simple(
                files=[pdf_path],
                parentid=item_key,
            )
            logging.info(f"PDF attached to Zotero item {item_key}")
        except Exception as e:
            logging.error(f"Error attaching PDF to Zotero item {item_key}: {e}", exc_info=True)
        return item_key
    except Exception as e:
        logging.error(f"Error adding PDF to Zotero: {e}", exc_info=True)
    return None


In [None]:
# 8) retrieve the metadata from Zotero via Pyzotero and then use a CSL (Citation Style Language) processor to format the citations in the desired style (e.g., APA)
# ------------------------------------------------------------------------------
def get_citation_csl_json(item_key, style):
    """
    Retrieves item metadata from Zotero, converts it to CSL JSON,
    then uses citeproc-py to generate both an in-text and a full citation.
    
    Returns (in_text_citation, full_citation).
    """
# 1) Fetch the item from Zotero
    try:
        item = zot.item(item_key)
        data = item.get("data", {})
    except Exception as e:
        logging.error(f"Error retrieving Zotero item {item_key}: {e}")
        return (None, None)

    # Convert Zotero data to something close to CSL JSON. For a real approach,
    # you'd map Zotero fields (creators -> author, date -> issued, etc.) more fully.
    # This minimal example just sets a few keys:
    # Map Zotero data to minimal CSL JSON format.
    csl_item = {
        "id": item_key,
        "type": "article-journal",
        "title": data.get("title", ""),
        "DOI": data.get("DOI", ""),
        # For authors, if in "creators", map them to CSL "author"
        "author": []
    }
    
    for creator in data.get("creators", []):
        if creator.get("creatorType") in ("author", "editor", "contributor"):
            csl_item["author"].append({
                "family": creator.get("lastName", "Unknown"),
                "given": creator.get("firstName", "")
            })
            
    # Process the date.
    date_str = data.get("date", "")
    if date_str:
        # Attempt to split on non-digits to create a date-parts list.
        # For example, "2023" becomes [[2023]] and "2023-05-10" becomes [[2023, 5, 10]]
        try:
            parts = [int(x) for x in re.split(r'\D+', date_str) if x.isdigit()]
            if parts:
                csl_item["issued"] = {"date-parts": [parts]}
            else:
                csl_item["issued"] = {"raw": date_str}
        except Exception:
            csl_item["issued"] = {"raw": date_str}
    
    # Map additional bibliographic fields.
    if data.get("publicationTitle"):
        csl_item["container-title"] = data.get("publicationTitle")
    if data.get("volume"):
        csl_item["volume"] = data.get("volume")
    if data.get("issue"):
        csl_item["issue"] = data.get("issue")
    if data.get("pages"):
        csl_item["page"] = data.get("pages")
        
        # [Enhancement #1] Abstract -> csl_item["abstract"]
    abstract_str = data.get("abstractNote", "")
    if abstract_str:
        csl_item["abstract"] = abstract_str
    
    # [Enhancement #2] Publisher (uncommon for journalArticle, but possible)
    publisher = data.get("publisher", "")
    if publisher:
        csl_item["publisher"] = publisher

    # [Enhancement #3] URL -> csl_item["URL"]
    url_str = data.get("url", "")
    if url_str:
        csl_item["URL"] = url_str
    
    # [Enhancement #4] Extract PMID from extra field
    pmid_val = None
    extra_str = data.get("extra", "")
    if "PMID:" in extra_str:
        for line in extra_str.splitlines():
            line = line.strip()
            if line.upper().startswith("PMID:"):
                pmid_val = line.split(":", 1)[1].strip()
                break
    if pmid_val:
        # Not a standard CSL field, but storing it doesn't hurt
        csl_item["PMID"] = pmid_val
    
    # Build the CiteProc JSON

    csl_source = CiteProcJSON([csl_item])
    
# 2) Load a citation style (apa, etc.)
# If 'style' is a path on disk, treat it as a local .csl file
    try:
        if os.path.isfile(style):
            csl_style = CitationStylesStyle(style, locale="en-US", validate=False)
        else:
            # e.g. if user typed "apa" or "ieee"
            raise ValueError(f"Style file {style} not found.")
    except Exception as e:
        logging.error(f"Error loading CSL style from {style}: {e}")
        return(None, None)
            
            
# 3) Create a bibliography object
    bibliography = CitationStylesBibliography(csl_style, csl_source, formatter.plain)
# For an in-text citation, we create a Citation object referencing the item ID
    citation = Citation([CitationItem(item_key)])
# Register this citation
    bibliography.register(citation)   
# 4) The first output from bibliography is the in-text citation
# Use the cite() method with a warning callback.
    def warn(citation_item):
        logging.warning(f"Reference with key '{citation_item.key}' not found.")
    in_text_citation = bibliography.cite(citation, warn)
    bib_entries = bibliography.bibliography()
    full_citation = str(bib_entries[0]) if bib_entries else ""
    return (in_text_citation, full_citation)

In [None]:
def extract_additional_fields(feather_path, output_path=None):
    """
    Loads the DataFrame from 'feather_path', then for each row, combines
    FullText + TablesJson, chunks if needed, and extracts the following fields:
      StudyType, Scenario, Agronomic or Management Scenarios,
      Management Practices and Operational Details, Economic and Agronomic Benefits,
      Regional and Climatic Variability, Tillage Practices, Soil Type/Characteristics,
      Cover Crop Type, Metrics (array), QA_Note

    The function also stores the entire extracted record as JSON in a column called 'AllExtracted'.

    If 'output_path' is given, saves the enriched DataFrame back to Feather.
    Otherwise, just returns the in-memory DataFrame.
    """
    logging.info(f"Loading DataFrame from {feather_path} ...")
    print(f"Loading DataFrame from {feather_path} ...")
    try:
        df = pd.read_feather(feather_path)
        print("DataFrame loaded successfully.")
    except Exception as e:
        logging.error(f"Error reading feather {feather_path}: {e}", exc_info=True)
        print(f"Error reading feather {feather_path}: {e}")
        return None

    # 2) Helper function: Combine FullText and TablesJson into one string
    def combine_text_and_tables(full_text, tables_json_str):
        try:
            table_text = ""
            if isinstance(tables_json_str, str) and tables_json_str.strip():
                try:
                    tables_data = json.loads(tables_json_str)
                    for i, table_rows in enumerate(tables_data):
                        table_text += f"Table #{i}:\n"
                        if isinstance(table_rows, list):
                            for row_dict in table_rows:
                                try:
                                    row_str = ", ".join(str(v) for v in row_dict.values())
                                    table_text += row_str + "\n"
                                except Exception as e:
                                    logging.error(f"Error processing row in table {i}: {e}", exc_info=True)
                        table_text += "\n"
                except Exception as e:
                    logging.error(f"Error parsing TablesJson: {e}", exc_info=True)
                    table_text += "[Could not parse TablesJson]\n"
            combined = (full_text or "") + "\n\n[TABLES]\n" + table_text
            return combined.strip()
        except Exception as e:
            logging.error(f"Error combining text and tables: {e}", exc_info=True)
            return full_text

    # 3) Helper function: Chunk the text (using a simple substring approach)
    def chunk_text_for_extraction(full_str, max_tokens=20000, overlap=1000, model_name=MODEL_NAME):
        try:
            enc = tiktoken.encoding_for_model(model_name)
            tokens = enc.encode(full_str)
            chunks = []
            start = 0
            while start < len(tokens):
                end = start + max_tokens
                chunk_tokens = tokens[start:end]
                chunk_part = enc.decode(chunk_tokens)
                chunks.append(chunk_part)
                # Move start forward but allow overlap
                start = max(0, end - overlap)
            return chunks
        except Exception as e:
            logging.error(f"Error chunking text: {e}", exc_info=True)
            return [full_str]

    # 4) Prompt template for extraction
    extraction_template = """
You are an expert data-extraction agent focusing on additional metadata for cover crops and greenhouse gas data.

Below is the article's known metadata (some fields may be empty):
{metadata_json}

Below is a chunk of text from the article's FullText + Tables:
\"\"\"{chunk_text}\"\"\"

Your primary goal: **extract** or **update** the following fields in a JSON object:

1) "Title"
2) "StudyType"
3) "Scenario"
4) "Agronomic or Management Scenarios"
5) "Management Practices and Operational Details"
6) "Economic and Agronomic Benefits"
7) "Regional and Climatic Variability"
8) "Tillage Practices"
9) "Soil Type/Characteristics"
10) "Cover Crop Type"
11) "Metrics" (array of objects) — numeric data for GWP, SOC, yield, N₂O, etc. 
    - Each object must have exactly: {{ "Metric": "...", "Value": "...", "Unit": "..." }}
12) "QA_Note" — if no numeric data are found, set this to "No numeric data found."

**No other keys** should appear in your output JSON.  
If the chunk does not contain new info, just **return the same JSON** object unchanged.

## Detailed Instructions
1. Read the existing JSON object (some fields may already be populated).
2. Read the text chunk above. If new data is found that updates or refines these fields, overwrite or fill them in.
3. If numeric data is found (e.g., N2O = 1.2 kg/ha), place it under "Metrics" as an object with "Metric", "Value", and "Unit".
4. If **no** numeric data is found, set `"Metrics": []` and `"QA_Note": "No numeric data found."`
5. Return a **single JSON object** with exactly these 12 fields (do not add or rename any fields).
6. If you see references to machine learning or special conditions (like “random forest”), mention them briefly in "QA_Note".

## Example Output

json
{{
  "Title": "Example Cover Crop Study",
  "StudyType": "On-farm trial",
  "Scenario": "Reduced tillage with barley",
  "Agronomic or Management Scenarios": "Early planting vs late planting",
  "Management Practices and Operational Details": "Seeds planted Oct 1, terminated May 15, 100 kg N fertilizer",
  "Economic and Agronomic Benefits": "Yield increased by 5%, moderate cost savings",
  "Regional and Climatic Variability": "Mid-Atlantic region, 1200 mm rainfall",
  "Tillage Practices": "No-till",
  "Soil Type/Characteristics": "Loam, pH 6.2",
  "Cover Crop Type": "Barley",
  "Metrics": [
    {{
      "Metric": "N2O",
      "Value": "3.1",
      "Unit": "kg N/ha"
    }}
  ],
  "QA_Note": "Mentioned random-forest approach for yield prediction."
}}
Return only the final updated JSON (no extra commentary).
 
"""
    # 5) Helper function: Merge new extracted data into the base record
    def unify_fields(existing_data, new_data):
        try:
            fields = [
                "StudyType", "Scenario", "Agronomic or Management Scenarios",
                "Management Practices and Operational Details", "Economic and Agronomic Benefits",
                "Regional and Climatic Variability", "Tillage Practices",
                "Soil Type/Characteristics", "Cover Crop Type", "Metrics", "QA_Note"
            ]
            for f in fields:
                try:
                    if f in new_data and new_data[f]:
                        if f == "Metrics":
                            if not existing_data.get(f):
                                existing_data[f] = []
                            for m in new_data[f]:
                                if m not in existing_data[f]:
                                    existing_data[f].append(m)
                        else:
                            existing_data[f] = new_data[f]
                except Exception as e:
                    logging.error(f"Error updating field {f}: {e}", exc_info=True)
            return existing_data
        except Exception as e:
            logging.error(f"Error in unify_fields: {e}", exc_info=True)
            return existing_data

    # 6) Add new columns to the DataFrame if they don't exist
    new_cols = [
        "StudyType", "Scenario", "Agronomic or Management Scenarios",
        "Management Practices and Operational Details", "Economic and Agronomic Benefits",
        "Regional and Climatic Variability", "Tillage Practices",
        "Soil Type/Characteristics", "Cover Crop Type",
        "Metrics", "QA_Note"
    ]
    for c in new_cols:
        try:
            if c not in df.columns:
                df[c] = None
        except Exception as e:
            logging.error(f"Error adding column {c} to DataFrame: {e}", exc_info=True)
    try:
        if "AllExtracted" not in df.columns:
            df["AllExtracted"] = None
    except Exception as e:
        logging.error(f"Error adding 'AllExtracted' column: {e}", exc_info=True)

    # 7) Main loop: Process each row of the DataFrame
    for i in range(len(df)):
        print(f"Processing row {i} ...")
        try:
            base_record = {
                "Title": df.at[i, "Title"] or "",
                "StudyType": df.at[i, "StudyType"] or "",
                "Scenario": df.at[i, "Scenario"] or "",
                "Agronomic or Management Scenarios": df.at[i, "Agronomic or Management Scenarios"] or "",
                "Management Practices and Operational Details": df.at[i, "Management Practices and Operational Details"] or "",
                "Economic and Agronomic Benefits": df.at[i, "Economic and Agronomic Benefits"] or "",
                "Regional and Climatic Variability": df.at[i, "Regional and Climatic Variability"] or "",
                "Tillage Practices": df.at[i, "Tillage Practices"] or "",
                "Soil Type/Characteristics": df.at[i, "Soil Type/Characteristics"] or "",
                "Cover Crop Type": df.at[i, "Cover Crop Type"] or "",
                "Metrics": df.at[i, "Metrics"] if ("Metrics" in df.columns and df.at[i, "Metrics"]) else [],
                "QA_Note": df.at[i, "QA_Note"] if ("QA_Note" in df.columns and df.at[i, "QA_Note"]) else ""
            }
        except Exception as e:
            logging.error(f"Error building base record for row {i}: {e}", exc_info=True)
            print(f"Skipping row {i} due to error building base record.")
            continue

        try:
            full_text = df.at[i, "FullText"] or ""
        except Exception as e:
            logging.error(f"Error retrieving FullText for row {i}: {e}", exc_info=True)
            full_text = ""

        try:
            tables_json_str = df.at[i, "TablesJson"] or ""
        except Exception as e:
            logging.error(f"Error retrieving TablesJson for row {i}: {e}", exc_info=True)
            tables_json_str = ""

        try:
            combined_str = combine_text_and_tables(full_text, tables_json_str)
        except Exception as e:
            logging.error(f"Error combining text and tables for row {i}: {e}", exc_info=True)
            combined_str = full_text

        if not combined_str.strip() or combined_str == "ANALYSIS_ERROR":
            logging.debug(f"Row {i}: No meaningful text/tables to parse.")
            print(f"Row {i}: Skipping because no meaningful text/tables found.")
            continue

        try:
            chunks = chunk_text_for_extraction(combined_str, max_tokens=20000, overlap=1000)
            print(f"Row {i}: Text chunked into {len(chunks)} chunks.")
        except Exception as e:
            logging.error(f"Error chunking combined text for row {i}: {e}", exc_info=True)
            chunks = [combined_str]

        # Process each text chunk
        for c_i, chunk_txt in enumerate(chunks):
            print(f"Row {i}, Chunk {c_i}: Sending prompt to OpenAI.")
            try:
                # Escape curly braces in the JSON string
                metadata_json_str = json.dumps(base_record, indent=2).replace("{", "{{").replace("}", "}}")
                prompt_str = extraction_template.format(
                    metadata_json=metadata_json_str,
                    chunk_text=chunk_txt
                )
            except Exception as e:
                logging.error(f"Error formatting prompt for row {i}, chunk {c_i}: {e}", exc_info=True)
                continue

            try:
                resp = openai.ChatCompletion.create(
                    model="o3-mini",  # or your chosen model
                    messages=[{"role": "user", "content": prompt_str}],
                )
            except Exception as e:
                logging.error(f"Error calling OpenAI for row {i}, chunk {c_i}: {e}", exc_info=True)
                print(f"Row {i}, Chunk {c_i}: OpenAI API call failed: {e}")
                continue

            try:
                content = resp.choices[0].message.content.strip()
            except Exception as e:
                logging.error(f"Error accessing response content for row {i}, chunk {c_i}: {e}", exc_info=True)
                continue

            try:
                new_data = json.loads(content)
            except Exception as e:
                logging.error(f"Could not parse JSON for row {i}, chunk {c_i}: {e}", exc_info=True)
                print(f"Row {i}, Chunk {c_i}: JSON parsing failed.")
                continue

            try:
                base_record = unify_fields(base_record, new_data)
                print(f"Row {i}, Chunk {c_i}: Successfully merged new data.")
            except Exception as e:
                logging.error(f"Error merging new data for row {i}, chunk {c_i}: {e}", exc_info=True)

        # End chunk loop – update the DataFrame with the final base_record for this row
        for c in new_cols:
            try:
                if c == "Metrics" and isinstance(base_record.get(c), list):
                    df.at[i, c] = base_record[c]
                else:
                    df.at[i, c] = base_record.get(c, "")
            except Exception as e:
                logging.error(f"Error storing field {c} for row {i}: {e}", exc_info=True)

        try:
            df.at[i, "AllExtracted"] = json.dumps(base_record)
        except Exception as e:
            logging.error(f"Error storing AllExtracted for row {i}: {e}", exc_info=True)

        logging.debug(f"Row {i} extraction done: {base_record}")
        print(f"Row {i}: Extraction complete.")

    # End main loop

    # 8) Save the updated DataFrame if output_path is provided
    if output_path:
        try:
            df.to_feather(output_path)
            logging.info(f"Enriched DataFrame saved to {output_path}")
            print(f"Enriched DataFrame saved to {output_path}")
        except Exception as e:
            logging.error(f"Error saving enriched DataFrame: {e}", exc_info=True)
            print(f"Error saving enriched DataFrame: {e}")
    else:
        logging.info("Extraction complete. DataFrame not saved because output_path is None.")
        print("Extraction complete. DataFrame not saved.")

    return df


In [None]:
# 9) Main Pipeline
# ------------------------------------------------------------------------------
def pipeline():
    print("\n=== Starting ETL + RAG Pipeline ===")
    logging.info("Pipeline execution started.")

    # -----------------------
    # STEP 1: Collect queries
    # -----------------------
    print("[Setp 1/11] Enter multiple queries, one per line. Press ENTER on a blank line to finish.")
    user_queries = []
    while True:
        line = input("Query (blank line to finish): ").strip()
        if not line:
            break
        user_queries.append(line)

    if not user_queries:
        print("[!] No queries entered. Exiting.")
        return
    # ------------------------------------------------
    # STEP 2: Choose Citation Style (unchanged logic)
    # ------------------------------------------------
    print("[Step 2/10] Choose your citation style:")
    for k, style_info in CITATION_STYLES.items():
        print(f"   {k}) {style_info['name']}", flush=True)
    style_choice = input("Enter style number(e.g., 1/2/3/...): ").strip()
    style_data = CITATION_STYLES.get(style_choice)
    if not style_data:
        # Fallback if user picks none
        print("[!] Invalid style choice. Defaulting to 'APA 6th ed.'")
        style_data = CITATION_STYLES["2"]  # e.g. APA 6th
    chosen_style_path = style_data["path"]
    chosen_style_name = style_data["name"]
    print(f"[*] You selected style: {chosen_style_name}")    
        
    # ------------------------------------------------
    # STEP 3: Choose data sources
    # ------------------------------------------------    
        
    print("[Step 3/11] Choose your data source(s):")
    print("   1) PubMed only")
    print("   2) OpenAlex (PyAlex) only")
    print("   3) Both PubMed + PyAlex")
    choice = input("Enter choice (1/2/3): ").strip()
    if choice not in ("1","2","3"):
        print("[!] Invalid choice. Exiting.")
        logging.warning("User chose invalid source selection.")
        return    
        
    # ------------------------------------------------
    # STEP 3.1: How many articles
    # ------------------------------------------------
    desired_str = input("How many articles per query (max)? [default=50] ").strip()
    try:
        desired_num = int(desired_str)
        print(f"[*] You asked for up to {desired_num} articles per source.")
    except ValueError:
        desired_num = 50
    print(f"[*] Will retrieve up to {desired_num} articles per query for each source.")

    # ------------------------------------------------
    # Initialize storage for partial results
    # -----------------------------------------------
    from_pubmed_all = []
    from_openalex_all = []

    # OPTIONAL: We'll keep track of how many hits each query got from each source
    pubmed_query_counts = {}
    openalex_query_counts = {}

    # ------------------------------------------------
    # STEP 4: Loop over user queries
    # ------------------------------------------------

    for q in user_queries:
        print(f"[Step 4/11]\n=== PROCESSING QUERY: '{q}' ===")

        # ~~~~~~~~~~~~ PubMed retrieval ~~~~~~~~~~~~~~~
        if choice in ("1","3"):  # PubMed
            try:
                pmid_list = pubmed_search_chunked(q, desired_count=desired_num, chunk_size=200)
            except Exception as e:
                logging.error(f"Error searching PubMed for '{q}': {e}", exc_info=True)
                print(f"[!] PubMed query failed for '{q}': {e}")
                pmid_list = []
                
            if pmid_list:
                #fetch metadata
                try:
                    pm_data = fetch_pubmed_metadata(pmid_list)
                    pm_df = pd.DataFrame(pm_data)
                    from_pubmed_all.append(pm_df)
                    pubmed_query_counts[q] = len(pm_df)
                    print(f"     -> PubMed Returned {len(pm_df)} articles for '{q}.")
                except Exception as e:
                    logging.error(f"Error fetching PubMed metadata for '{q}': {e}", exc_info=True)
                    print(f"[!] PubMed metadata fetch failed for '{q}': {e}")
                    pubmed_query_counts[q] = 0
    
            else:
                pubmed_query_counts[q] = 0
                
            # ~~~~~~~~~~~~ OpenAlex retrieval ~~~~~~~~~~~~~~~
            if choice in ("2","3"):  # OpenAlex
                try:
                    oa_df = pyalex_search_works(q, desired_count=desired_num)
                except Exception as e:
                    logging.error(f"Error searching OpenAlex for '{q}': {e}", exc_info=True)
                    print(f"[!] OpenAlex query failed for '{q}': {e}")
                    oa_df = pd.DataFrame()
                if not oa_df.empty:
                    from_openalex_all.append(oa_df)
                    openalex_query_counts[q] = len(oa_df)
                    print(f"    -> OpenAlex returned {len(oa_df)} articles for '{q}'.")
                else:
                    openalex_query_counts[q] = 0
                

        # If no queries returned anything, just exit
        if not from_pubmed_all and not from_openalex_all:
            print("[!] No articles found across all queries. Exiting.")
            return

    # --------------------------------------------
    # Print a quick summary of how many hits we got
    # --------------------------------------------
    print("\n=== Summary of Article Hits per Query ===")
    for q in user_queries:
        pm_count = pubmed_query_counts.get(q, 0)
        oa_count = openalex_query_counts.get(q, 0)
        total_count = pm_count + oa_count
        print(f" - Query '{q}': PubMed={pm_count}, OpenAlex={oa_count}, combined={total_count}")

    # -------------------------------------------------
    # Step 5: Merge & deduplicate
    # -------------------------------------------------
    print("[Step 5/11]\nMerging all partial results and deduplicating...")
    combined_df = pd.DataFrame()
    if from_pubmed_all:
        combined_df = pd.concat(from_pubmed_all, ignore_index=True)
    if from_openalex_all:
        openalex_merged = pd.concat(from_openalex_all, ignore_index=True)
        combined_df = pd.concat([combined_df, openalex_merged], ignore_index=True)
        
    combined_df = deduplicate_by_doi_title(combined_df)
    print(f"[*] After deduplication, total unique articles: {len(combined_df)}")

    # -------------------------------------------------
    # Step 7: PDF retrieval
    # -------------------------------------------------
    print("[Step 7/11] Attempting to download PDFs via Unpaywall...")
    combined_df["PDFPath"] = None
    combined_df["PDFStatus"] = None

    for i, row in combined_df.iterrows():
        doi = row.get("DOI","")
        title = row.get("Title","")

        if not doi:
            combined_df.at[i, "PDFStatus"] = "No DOI"
            continue

        title_val = row.get("Title", "")
        if not isinstance(title_val, str):
            # convert None or float('nan') or anything else to an empty string
            title_val = str(title_val) if title_val is not None else ""

        print(f" -> Downloading PDF for row {i}, Title='{title_val[:30]}' ...")
        pdf_content, ctype = get_pdf_from_unpaywall(doi)
        if pdf_content:
            safe_title = re.sub(r'[\\/*?:"<>|()]+', '', title_val)
            # Also limit length so it doesn't get too big
            if len(safe_title) > 100:
                safe_title = safe_title[:100]
            filename = f"{safe_title}_{i}.pdf"
    
            save_path = os.path.join(PDF_SAVE_FOLDER, filename)
            try:
                with open(save_path, "wb") as f:
                    f.write(pdf_content)
                combined_df.at[i, "PDFPath"] = save_path
                combined_df.at[i, "PDFStatus"] = "Saved"
                logging.info(f"PDF saved for row {i}, Title={title[:30]}, path={save_path}")
            except Exception as e:
                logging.error(f"Error saving PDF row={i}, Title={title[:30]}: {e}")
                combined_df.at[i, "PDFPath"] = None
                combined_df.at[i, "PDFStatus"] = f"Error saving PDF: {e}"
        else:
            combined_df.at[i, "PDFPath"] = None
            combined_df.at[i, "PDFStatus"] = "No PDF found"
    print("[*] Dropping rows with no PDF found or no DOI...")
    pdf_saved_count = (combined_df["PDFStatus"] == "Saved").sum()
    print(f"[*] Successfully retrieved PDFs for {pdf_saved_count} articles via Unpaywall.")
    combined_df = combined_df[~combined_df["PDFStatus"].isin(["No PDF found", "No DOI"])].copy()
    combined_df.reset_index(drop=True, inplace=True)
    print(f"[*] After dropping no-PDF rows, we have {len(combined_df)} rows left.")
            
    # ------------------------------------------------
    # Step 8: Zotero Integration
    # ------------------------------------------------
    print("[Step 8/11] Adding PDFs to Zotero, generating citations...")
    combined_df["ZoteroKey"] = ""
    combined_df["InTextCitation"] = ""
    combined_df["FullCitation"] = ""
    for i, row in combined_df.iterrows():
        pdf_path = row.get("PDFPath", "")
        if pdf_path and os.path.exists(pdf_path):
            # [Enhancement] Pass all relevant metadata fields to add_pdf_to_zotero
            metadata = {
                "DOI": row.get("DOI", ""),
                "Title": row.get("Title", ""),
                "Authors": row.get("Authors", []),  # [Enhancement]
                "Date": row.get("Date", ""),        # [Enhancement]
                "Journal": row.get("Journal", ""),  # [Enhancement]
                "Volume": row.get("Volume", ""),    # [Enhancement]
                "Issue": row.get("Issue", ""),      # [Enhancement]
                "Pages": row.get("Pages", ""),      # [Enhancement]
            }
            item_key = add_pdf_to_zotero(pdf_path, metadata)
            if item_key:
                combined_df.at[i, "ZoteroKey"] = item_key
                intext, fullcite = get_citation_csl_json(item_key, style=chosen_style_path)
                if intext and fullcite:
                    combined_df.at[i, "InTextCitation"] = intext
                    combined_df.at[i, "FullCitation"] = fullcite
                else:
                    combined_df.at[i, "InTextCitation"] = "N/A"
                    combined_df.at[i, "FullCitation"] = "N/A"
            else:
                combined_df.at[i, "ZoteroKey"] = "Error in creation"
        else:
            combined_df.at[i, "ZoteroKey"] = "No PDF"

    print("[*] Zotero processing complete.")        
    
    # ------------------------------------------------
    # Step 9: Azure extraction
    # ------------------------------------------------
    print("[Step 9/11] Extracting text/tables via Azure Document Intelligence...")
    azure_extractor = AzurePDFExtractor(AZURE_ENDPOINT, AZURE_KEY)

    combined_df["FullText"] = ""
    combined_df["TablesJson"] = ""
    combined_df["TokenCount"] = 0

    for i, row in combined_df.iterrows():
        pdf_path = row["PDFPath"]
        if not pdf_path or not os.path.exists(pdf_path):
            continue
        
        try:
            print(f"   -> Analyzing PDF for row {i}, file: {os.path.basename(pdf_path)}")
            full_text, table_dfs = azure_extractor.extract_text_and_tables(pdf_path)
            combined_df.at[i, "FullText"] = full_text
            tcount = count_tokens(full_text)
            combined_df.at[i, "TokenCount"] = tcount

            # Convert tables to JSON
            table_list = [df_table.to_dict(orient="records") for df_table in table_dfs]
            combined_df.at[i, "TablesJson"] = json.dumps(table_list, ensure_ascii=False)
            
        except HttpResponseError as e:
            logging.error(f"Azure error row={i}, PDF={pdf_path}: {e}")
            combined_df.at[i, "FullText"] = "ANALYSIS_ERROR"
            combined_df.at[i, "TablesJson"] = "ANALYSIS_ERROR"
            print(f"       -> [!] Azure Document Intelligence error for {pdf_path}: {e}")
        except Exception as e:
            logging.error(f"Unexpected error row={i}, PDF={pdf_path}: {e}")
            combined_df.at[i, "FullText"] = "ANALYSIS_ERROR"
            combined_df.at[i, "TablesJson"] = "ANALYSIS_ERROR"
            print(f"       -> [!] Unexpected error analyzing {pdf_path}: {e}")

    # ------------------------------------------------
    # Step 10: Save final
    # ------------------------------------------------
    print("[Step 10/11] Saving final DataFrame to Feather...")
    
    try:
        # Convert citation columns to string to avoid feather conversion issues
        combined_df["InTextCitation"] = combined_df["InTextCitation"].astype(str)
        combined_df["FullCitation"] = combined_df["FullCitation"].astype(str)
        combined_df.to_feather(OUTPUT_FEATHER)
        print(f"[*] Results also saved to {OUTPUT_FEATHER}")
    except Exception as e:
        logging.error(f"Error saving Feather: {e}")
        print(f"[!] Error saving Feather file: {e}")

    print("\n=== Pipeline complete. Check log for details. ===")
    logging.info("Pipeline execution finished.")
    return combined_df


if __name__ == "__main__":
    final_df = pipeline()
    
    
    # ------------------------------------------------
    # Step 11: field extraction
    # ------------------------------------------------
    # 2) Now call the new extraction function on the Feather file
#    which will parse text/tables, chunk, LLM extraction, store new fields + "AllExtracted"
    print("[Step 11/11] Saving final DataFrame to Feather...")
    enriched_df = extract_additional_fields(feather_path=OUTPUT_FEATHER, output_path=ENRICHED_FEATHER)

    # "enriched_output.feather" now includes columns like "StudyType", "Scenario", "Metrics", etc.,
    # plus an "AllExtracted" column with the entire JSON record per row.
    print("All done!")  


=== Starting ETL + RAG Pipeline ===
[Setp 1/11] Enter multiple queries, one per line. Press ENTER on a blank line to finish.
[Step 2/10] Choose your citation style:
   1) American Medical Association 11th edition
   2) American Psychological Association (APA) 6th Edition 
   3) American Psychological Association (APA) 7th Edition 
   4) American Psychological Association (APA) 7th Edition (annotated bibliography)
   5) American Political Science Association
   6) Chicago Manual of Style 17th Edition (author-date)
   7) Chicago Manual of Style 17th Edition (full note)
   8) Chicago Manual of Style 17th edition (note)
   9) Cite Them Right 12th edition - Harvard
   10) Elsevier - Harvard (with titles)
   11) IEEE
   12) Modern Humanities Research Association 4th edition (notes with bibliography)
   13) Modern Language Association 9th edition
   14) Nature
   15) Taylor & Francis - Chicago Manual of Style (author-date)
   16) Vancouver
[*] You selected style: American Psychological Assoc