## Final Project - NLP-Driven Ingredient Health and Dietary Restriction Analysis

*Name: Laura Obermaier*

*Stevens ID: 20027358*

### Ingredient List Processing

##### imports

In [57]:
import re
import csv
import requests
import spacy
import random
import pubchempy as pcp
from rapidfuzz import process, fuzz
from collections import defaultdict
import sys
import io
import pickle
import json
import time
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from duckduckgo_search.exceptions import DuckDuckGoSearchException
from difflib import SequenceMatcher
from spacy.lang.en import English
from openai import OpenAI
import tiktoken
from textwrap import wrap

##### setup

In [58]:
client = OpenAI()

csv.field_size_limit(2**20)
# Load English tokenizer
nlp = spacy.load("en_core_web_sm")

# Global cache to track discovered aliases
global_alias_set = set()

# Local search result cache
search_cache = {}

##### alias relivancy filter

In [59]:
def is_relevant_alias(alias):
    alias_clean = alias.strip().lower()
    if len(alias_clean.split()) > 4:
        return False
    if re.search(r'\d{3,}|\d+%|[^\w\s\-]', alias_clean):
        return False
    if re.search(r'^\d{2,5}-\d{2,5}-\d$', alias_clean): 
        return False
    if len(alias_clean) > 40:
        return False
    if alias_clean.count(',') > 0 or alias_clean.count('(') > 1:
        return False
    if any(keyword in alias_clean for keyword in ['acs', 'usp', 'grade', 'reference', 'powder', 'solution', 'mist']):
        return False
    return True

##### PubChem(aliases) integration

In [60]:
def get_pubchem_aliases(ingredient_name):
    try:
        compounds = pcp.get_compounds(ingredient_name, 'name')
        if compounds:
            synonyms = compounds[0].synonyms
            filtered = [s.lower() for s in synonyms if is_relevant_alias(s)]
            return list(set(filtered))
        return []
    except Exception as e:
        print(f"[PubChem error for '{ingredient_name}']: {e}")
        return []

def update_alias_cache(aliases):
    for a in aliases:
        if a:
            global_alias_set.add(a.lower().strip())

##### fuzzy matching

In [61]:
def fuzzy_match_alias(name, threshold=90):
    if not global_alias_set:
        print("[Warning] Alias set is empty — did you run seed_aliases_from_open_food_facts?")
        return None
    result = process.extractOne(name, global_alias_set, scorer=fuzz.token_sort_ratio)
    if result is None:
        return None
    match, score, _ = result
    return match if score >= threshold else None

##### seed aliases from Open Food Facts

In [62]:
def seed_aliases_from_open_food_facts(limit=10000):
    url = "https://static.openfoodfacts.org/data/en.openfoodfacts.org.products.csv"
    response = requests.get(url, stream=True)
    response.encoding = 'utf-8'

    alias_dict = defaultdict(set)
    lines = (line.decode('utf-8') for line in response.iter_lines())
    reader = csv.DictReader(lines, delimiter='\t')

    langs = ['fr', 'de', 'es', 'it']
    count = 0

    for row in reader:
        if count % 500 == 0:
            print(f"Processing row {count}...")
        if count >= limit:
            break
        count += 1

        ingredients_text = row.get("ingredients_text", "")
        if not ingredients_text.strip():
            continue

        for ing in ingredients_text.split(','):
            ing = ing.strip().lower()
            if not ing:
                continue
            alias_dict[ing].add(ing)

            for lang in langs:
                key = f"ingredients_text_{lang}"
                alt = row.get(key)
                if alt:
                    for alt_ing in alt.split(','):
                        alt_ing = alt_ing.strip().lower()
                        if alt_ing:
                            alias_dict[ing].add(alt_ing)
                            alias_dict[alt_ing].add(ing)

    for aliases in alias_dict.values():
        update_alias_cache(list(aliases))
    print(f"[✓] Seeded {len(global_alias_set)} unique aliases from Open Food Facts.")

##### preprocessing

In [63]:
def standardize_ingredient_name(name, max_aliases=5):
    name = name.lower().strip()
    aliases = get_pubchem_aliases(name)
    
    # Filter and rank aliases if any were found
    if aliases:
        update_alias_cache(aliases)
        # Remove irrelevant aliases
        filtered_aliases = [a for a in aliases if is_relevant_alias(a)]
        # Rank aliases by similarity to input name
        ranked = sorted(filtered_aliases, key=lambda x: fuzz.token_sort_ratio(name, x), reverse=True)
        top_aliases = ranked[:max_aliases]
        return top_aliases[0] if top_aliases else name, top_aliases or [name]

    fuzzy = fuzzy_match_alias(name)
    if fuzzy:
        return fuzzy, [fuzzy]

    return name, [name]

##### Web-based Health info retrieval via trusted APIs

In [64]:
def query_pubmed(ingredient, max_results=5):
    try:
        base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
        params = {"db": "pubmed", "term": ingredient, "retmode": "json", "retmax": max_results}
        ids = requests.get(base_url, params=params).json()["esearchresult"].get("idlist", [])
        summaries = []
        for pmid in ids:
            summary_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
            r = requests.get(summary_url, params={"db": "pubmed", "id": pmid, "retmode": "json"}).json()
            result = r["result"].get(pmid)
            if result:
                summaries.append({"title": result.get("title"), "source": result.get("source"), "pubdate": result.get("pubdate")})
        return summaries
    except:
        return []

def query_openfda(ingredient):
    try:
        base_url = "https://api.fda.gov/food/enforcement.json"
        params = {"search": f"product_description:{ingredient}", "limit": 5}
        r = requests.get(base_url, params=params).json()
        return [rec["reason_for_recall"] for rec in r.get("results", [])]
    except:
        return []

def query_rxnorm(ingredient):
    try:
        url = "https://rxnav.nlm.nih.gov/REST/rxcui.json"
        rxcui = requests.get(url, params={"name": ingredient}).json()
        return rxcui.get("idGroup", {}).get("rxnormId", [])
    except:
        return []

##### Query Academic Health Documents

In [65]:
def query_academic_health_docs(ingredient, max_results=5):
    def extract_relevant_sections(text):
        if not text:
            return None
        matches = re.findall(
            r"(?:Abstract|Introduction|Background|Methods|Results|Discussion|Conclusion)[:\s]*([\s\S]*?)(?=(?:Abstract|Introduction|Background|Methods|Results|Discussion|Conclusion|References|Acknowledg(?:e)?ments)[:\s]|$)",
            text,
            re.IGNORECASE,
        )
        if matches:
            return "\n".join(m.strip() if isinstance(m, str) else m[0].strip() for m in matches)
        return text
    
    def query_pubmed_central():
        try:
            search_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
            params = {
                "db": "pmc",
                "term": f'"{ingredient}" \
                    AND ("well-being" OR "wellbeing" OR "diet" OR "dietary" OR "nutrition" OR "health" OR "benefits" OR "concerns" OR "dietary restrictions") \
                    AND ("nutrition science" OR "toxicology" OR "dietary science" OR "public health") \
                    AND ("human" OR "people" OR "men" OR "women" OR "individual" OR "clinical study") \
                    AND ("systematic review" OR "meta-analysis")',
                "retmode": "json",
                "retmax": max_results,
            }
            r = requests.get(search_url, params=params).json()
            ids = r.get("esearchresult", {}).get("idlist", [])
            summaries = []
            for pmid in ids:
                # Summary metadata
                summary_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
                summary_resp = requests.get(summary_url, params={"db": "pmc", "id": pmid, "retmode": "json"}).json()
                result = summary_resp["result"].get(pmid)

                # Full text via efetch
                fetch_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
                fetch_resp = requests.get(fetch_url, params={
                    "db": "pmc",
                    "id": pmid,
                    "retmode": "xml"
                })
                full_text = fetch_resp.text if fetch_resp.status_code == 200 else None

                if result:
                    cleaned_text = extract_relevant_sections(full_text)
                    summaries.append({
                        "title": result.get("title"),
                        "source": result.get("source"),
                        "pubdate": result.get("pubdate"),
                        "url": f"https://www.ncbi.nlm.nih.gov/pmc/articles/PMC{pmid}/",
                        "full_text": cleaned_text
                    })
            return summaries
        except Exception as e:
            print(f"[PubMedCentral error for '{ingredient}']: {e}")
            return []

    def query_europe_pmc():
        try:
            base_url = "https://www.ebi.ac.uk/europepmc/webservices/rest/search"
            query = f'"{ingredient}" \
                AND ("well-being" OR "wellbeing" OR "diet" OR "dietary" OR "nutrition" OR "health" OR "benefits" OR "concerns" OR "dietary restrictions") \
                AND ("nutrition science" OR "toxicology" OR "dietary science" OR "public health") \
                AND ("human" OR "people" OR "men" OR "women" OR "individual" OR "clinical study") \
                AND ("systematic review" OR "meta-analysis")'
            params = {
                "query": query,
                "format": "json",
                "resultType": "core",
                "pageSize": max_results
            }
            r = requests.get(base_url, params=params).json()
            results = []
            for record in r.get("resultList", {}).get("result", []):
                url = None
                full_text = None
                # Attempt to extract the full-text URL
                for ft in record.get("fullTextUrlList", {}).get("fullTextUrl", []):
                    if ft.get("documentStyle") == "html" and ft.get("url"):
                        url = ft["url"]
                        try:
                            page = requests.get(url, timeout=10)
                            if page.status_code == 200:
                                full_text = BeautifulSoup(page.text, "html.parser").get_text()
                        except Exception as e:
                            print(f"[EuropePMC full text fetch failed]: {e}")
                        break
                cleaned_text = extract_relevant_sections(full_text)
                results.append({
                    "title": record.get("title"),
                    "source": record.get("journalTitle"),
                    "pubdate": record.get("firstPublicationDate", record.get("pubYear")),
                    "url": url,
                    "full_text": cleaned_text
                })
            return results
        except Exception as e:
            print(f"[EuropePMC error for '{ingredient}']: {e}")
            return []

    # Combine and deduplicate by title
    pmc_results = query_pubmed_central()
    europepmc_results = query_europe_pmc()
    combined = pmc_results + europepmc_results
    seen_titles = set()
    unique_results = []
    for r in combined:
        if r["title"] and r["title"] not in seen_titles:
            unique_results.append(r)
            seen_titles.add(r["title"])
    return unique_results[:max_results]

##### NER + Semantic Web Search

In [66]:
# --- NER + Semantic Web Search ---
ner_nlp = spacy.load("en_core_web_sm")

def extract_entities(text):
    doc = nlp(text)
    return [(ent.text, ent.label_) for ent in doc.ents]

def search_web_snippets(ingredient, num_results=5):
    if ingredient in search_cache:
        return search_cache[ingredient]
    queries = [f'"{ingredient}" AND ("well-being" OR "wellbeing" OR "diet" OR "dietary" OR "nutrition" OR "health" OR "benefits" OR "concerns" OR "dietary restrictions") AND ("nutrition science" OR "toxicology" OR "dietary science" OR "public health") AND ("human" OR "people" OR "men" OR "women" OR "individual" OR "clinical study") AND ("systematic review" OR "meta-analysis") AND (language:English) AND (publication_date:[2005-01-01 TO 2025-01-01]) NOT ("synthetic route" OR "polymer" OR "conference abstract")']
    snippets = []

    try:
        with DDGS() as ddgs:
            for q in queries:
                retries = 0
                while retries < 3:
                    try:
                        results = ddgs.text(q, max_results=num_results)
                        for res in results:
                            if 'body' in res:
                                snippets.append(res['body'])
                            if 'title' in res:
                                snippets.append(res['title'])
                        break  # success
                    except Exception as e:
                        retries += 1
                        wait = 2 ** retries + random.uniform(0, 1)
                        print(f"[Retry {retries}] DuckDuckGo error for '{ingredient}': {e} — waiting {wait:.2f}s")
                        time.sleep(wait)

        search_cache[ingredient] = snippets
        return snippets

    except Exception as e:
        print(f"[DuckDuckGo Error for '{ingredient}']: {e}")
        return []

def semantic_scrape_summary(ingredient):
    snippets = search_web_snippets(ingredient)
    all_ents = []
    for text in snippets:
        if not text or len(text.strip()) < 20:
            continue
        ents = extract_entities(text)
        all_ents.extend(ents)
    return all_ents

def get_all_health_info(ingredient):
    return {
        "PubMed": query_pubmed(ingredient),
        "OpenFDA": query_openfda(ingredient),
        "RxNorm": query_rxnorm(ingredient),
        "Academic_Articles": query_academic_health_docs(ingredient)
    }

##### ingredient list processing based on health info

In [67]:
def preprocess_ingredient_list_with_health(text):
    raw_ingredients = re.split(r'[\,\n;/••]+', text)
    processed = []
    seen_terms = set()

    for raw in raw_ingredients:
        raw = raw.strip()
        if not raw:
            continue

        standard, aliases = standardize_ingredient_name(raw)

        filtered_aliases = [a for a in list(set([standard] + aliases)) if is_relevant_alias(a)]
        combined_health_info = {"PubMed": [], "OpenFDA": [], "RxNorm": [], "NER_Snippets": [], "Academic_Articles": []}

        for term in filtered_aliases:
            if term.lower() in seen_terms:
                continue
            seen_terms.add(term.lower())

            api_info = get_all_health_info(term)
            ner_info = semantic_scrape_summary(term)

            for k in combined_health_info:
                if k == "NER_Snippets":
                    combined_health_info[k].extend([i for i in ner_info if i not in combined_health_info[k]])
                else:
                    combined_health_info[k].extend([i for i in api_info[k] if i not in combined_health_info[k]])

        processed.append({
            "standard": standard,
            "aliases": aliases,
            "health_info": combined_health_info
        })

    return processed

##### store aliases for reuse

In [68]:
ALIAS_CACHE_FILE = "alias_cache.json"

def save_alias_cache(path=ALIAS_CACHE_FILE):
    try:
        with open(path, "w", encoding="utf-8") as f:
            json.dump(sorted(global_alias_set), f, ensure_ascii=False, indent=2)
        print(f"[✓] Alias cache saved to {path}")
    except Exception as e:
        print(f"[!] Error saving alias cache: {e}")

def load_alias_cache(path=ALIAS_CACHE_FILE):
    global global_alias_set
    try:
        with open(path, "r", encoding="utf-8") as f:
            global_alias_set = set(json.load(f))
        print(f"[✓] Loaded {len(global_alias_set)} aliases from cache.")
        return True
    except FileNotFoundError:
        print(f"[ ] Alias cache not found at {path}. Will seed from source...")
        return False
    except Exception as e:
        print(f"[!] Error loading alias cache: {e}")
        return False

if not load_alias_cache():
    seed_aliases_from_open_food_facts(limit=5000)
    save_alias_cache()

[✓] Loaded 5833 aliases from cache.


### Text Processing

##### Text Preprocessing

In [69]:
# Sentence splitter for document chunking
nlp_sentencizer = English()
nlp_sentencizer.add_pipe("sentencizer")

def num_tokens(text, model="gpt-3.5-turbo"):
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

def split_into_token_chunks(text, max_tokens=2000, model="gpt-3.5-turbo"):
    doc = nlp_sentencizer(text)
    chunks = []
    current_chunk = ""
    for sent in doc.sents:
        if num_tokens(current_chunk + sent.text, model) > max_tokens:
            if current_chunk:
                chunks.append(current_chunk.strip())
            current_chunk = sent.text
        else:
            current_chunk += " " + sent.text

    if current_chunk.strip():
        chunks.append(current_chunk.strip())
    return chunks

##### Text Classification

In [70]:
def num_tokens_from_messages(messages, model="gpt-3.5-turbo"):
    encoding = tiktoken.encoding_for_model(model)
    tokens_per_message = 4  # each message key structure (role, content, etc.)
    tokens = 0
    for message in messages:
        tokens += tokens_per_message
        for key, value in message.items():
            tokens += len(encoding.encode(value))
    return tokens + 2  # every reply is primed with <|start|>assistant

def classify_chunk(chunk, model="gpt-3.5-turbo"):
    prompt = f"""
You are a medical and dietary research assistant. Given the paragraph below, classify it into one or more of the following:
- Benefits
- Concerns
- Dietary Restrictions

Respond in this format:
Category: [Category 1, Category 2, ...]
Reason: [Short explanation]

Paragraph:
\"\"\"
{chunk}
\"\"\"
"""
    messages = [{"role": "user", "content": prompt}]
    total_tokens = num_tokens_from_messages(messages, model)

    if total_tokens > 16000:  # leave some room for response
        print(f"[✂] Skipping overlong chunk ({total_tokens} tokens)")
        return "Category: []\nReason: Skipped due to chunk length."

    response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0.2
    )
    return response.choices[0].message.content.strip()

##### Summarization by Category

In [71]:
def summarize_by_category(classified_chunks, model="gpt-3.5-turbo"):
    grouped = defaultdict(list)
    for chunk, result in classified_chunks:
        match = re.search(r"Category:\s*\[(.*?)\]", result)
        if match:
            cats = [c.strip() for c in match.group(1).split(",")]
            for cat in cats:
                grouped[cat].append(chunk)

    summaries = {}
    for cat, texts in grouped.items():
        max_input_tokens = 3000
        prompt_prefix = f"Summarize the following text into key findings related to {cat.lower()}:\n\n"
        prompt_token_len = num_tokens(prompt_prefix)
        available = max_input_tokens - prompt_token_len

        selected_texts = []
        running_total = 0
        for t in texts:
            tok = num_tokens(t)
            if running_total + tok > available:
                break
            selected_texts.append(t)
            running_total += tok

        if not selected_texts:
            summaries[cat] = "Summary skipped due to length."
            continue

        prompt = prompt_prefix + "\n".join(selected_texts)
        messages = [{"role": "user", "content": prompt}]
        response = client.chat.completions.create(
            model=model,
            messages=messages,
            temperature=0.3
        )
        summaries[cat] = response.choices[0].message.content.strip()
    return summaries

##### Summaries

In [72]:
def enrich_with_health_summaries(results, model="gpt-3.5-turbo"):
    for entry in results:
        all_chunks = []

        for article in entry["health_info"].get("Academic_Articles", []):
            text = article.get("full_text")
            if text and len(text.strip()) > 200:
                chunks = split_into_token_chunks(text, max_tokens=2000, model=model)
                all_chunks.extend(chunks)

        if not all_chunks:
            # Try fallback: summarize from available titles + sources
            fallback_chunks = []
            for article in entry["health_info"].get("Academic_Articles", []):
                title = article.get("title", "").strip()
                source = article.get("source", "").strip()
                if len(title) > 20:
                    snippet = f"Title: {title}"
                    if source:
                        snippet += f" | Source: {source}"
                    fallback_chunks.append(snippet)

            if fallback_chunks:
                classified = [(chunk, classify_chunk(chunk, model=model)) for chunk in fallback_chunks]
                summaries = summarize_by_category(classified, model=model)
                entry["health_summary"] = summaries
            else:
                entry["health_summary"] = {"note": "No usable full-text content or fallback title."}
            continue

        # Regular summarization path
        classified = [(chunk, classify_chunk(chunk, model=model)) for chunk in all_chunks]
        summaries = summarize_by_category(classified, model=model)
        entry["health_summary"] = summaries

    return results

##### test run

In [73]:
sample4 = "sodium chloride, ascorbic acid, curcuma longa, E300, sal, suagr"
results4 = preprocess_ingredient_list_with_health(sample4)
results4 = enrich_with_health_summaries(results4)

for entry in results4:
    print("\nIngredient:", entry["standard"])
    print("→ Aliases:", entry["aliases"])

    print("\nTrusted API Info:")
    for source, data in entry["health_info"].items():
        if source == "NER_Snippets":
            continue
        print(f"  • {source}:")
        if isinstance(data, list) and data:
            for item in data:
                print(f"     - {item}")
        elif isinstance(data, list):
            print("     - No results")
        else:
            print(f"     - {data}")

    print("\nNER Entities from Web Snippets:")
    if entry["health_info"]["NER_Snippets"]:
        for ent_text, ent_label in entry["health_info"]["NER_Snippets"]:
            print(f"     - {ent_text} ({ent_label})")
    else:
        print("     - No named entities found.")

    print("\nSummarized Health Insights:")
    if isinstance(entry.get("health_summary"), dict):
        for cat, summary in entry["health_summary"].items():
            print(f"  [{cat}]: {summary}")
    else:
        print("  - No summary generated")

[Retry 1] DuckDuckGo error for 'sodium-36 chloride': https://html.duckduckgo.com/html return None. params=None content=None data={'q': '"sodium-36 chloride" AND ("well-being" OR "wellbeing" OR "diet" OR "dietary" OR "nutrition" OR "health" OR "benefits" OR "concerns" OR "dietary restrictions") AND ("nutrition science" OR "toxicology" OR "dietary science" OR "public health") AND ("human" OR "people" OR "men" OR "women" OR "individual" OR "clinical study") AND ("systematic review" OR "meta-analysis") AND (language:English) AND (publication_date:[2005-01-01 TO 2025-01-01]) NOT ("synthetic route" OR "polymer" OR "conference abstract")', 'b': '', 'kl': 'wt-wt'} — waiting 2.32s
[Retry 2] DuckDuckGo error for 'sodium-36 chloride': https://html.duckduckgo.com/html return None. params=None content=None data={'q': '"sodium-36 chloride" AND ("well-being" OR "wellbeing" OR "diet" OR "dietary" OR "nutrition" OR "health" OR "benefits" OR "concerns" OR "dietary restrictions") AND ("nutrition science"

##### REPLACE OR REMOVE DUCK DUCK GO (NOT WORKING)
##### REPLACE OPENAI IF POSSIBLE (NOT WORKING & CHEAPER ALTERNATIVE)