
# Worldwide Research Footprint

Map first-author affiliations for the Blue Health literature. We use the stored
PubMed metadata to infer the country of the first author for each article and
summarize publication hotspots on a world map.



> **Data requirement**: this notebook expects that `scripts/collect_words.py`
> has already been executed (for factors and, optionally, exposures) so that
> `data/words/words_*.p` files and their raw article metadata are available.
> Run the collection step first if those files are missing.


In [None]:

import sys
import subprocess
import importlib
import json
import re
from pathlib import Path
from collections import defaultdict

import numpy as np
import pandas as pd

from lisc.utils import SCDB, load_object


def ensure_package(package: str, import_name: str | None = None):
    """Import a package, installing it on-demand if needed."""
    name = import_name or package
    try:
        return importlib.import_module(name)
    except ImportError:
        print(f"Installing {package} â€¦")
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        except Exception as exc:
            raise RuntimeError(
                f"Failed to install {package}. Install it manually and rerun."
            ) from exc
        return importlib.import_module(name)


px = ensure_package("plotly", "plotly.express")
pycountry = ensure_package("pycountry")


In [None]:

from typing import Optional, Tuple
import pickle


def find_repo_root(marker: str = "requirements.txt") -> Path:
    """Ascend directories until the repository root is found."""
    path = Path.cwd().resolve()
    while path != path.parent:
        if (path / marker).exists():
            return path
        path = path.parent
    raise RuntimeError("Could not locate the repository root from the current directory.")


REPO_ROOT = find_repo_root()
WORDS_DIR = REPO_ROOT / "data" / "words"
WORDS_DIR.mkdir(parents=True, exist_ok=True)

available_paths = sorted(WORDS_DIR.glob("words_*.p"))
if not available_paths:
    raise FileNotFoundError(
        "No words_*.p files were found under data/words/. "
        "Run scripts/collect_words.py (and optionally set BLUEHEALTH_TEST_MODE=1 "
        "for a quick offline placeholder) before executing this notebook."
    )

print("Repository root:", REPO_ROOT)
print("Available words pickles:", [p.name for p in available_paths])


In [None]:

from lisc import Words


def load_words_object(path: Path) -> Words:
    """Load a Words object from a pickle, trying SCDB fallbacks."""
    try:
        db = SCDB(str(REPO_ROOT))
        words_obj = load_object(path.stem, directory=db, reload_results=True)
        print(f"Loaded {path.name} via SCDB.")
        return words_obj
    except Exception as exc_scdb:
        print(f"SCDB load failed for {path.name}: {exc_scdb}")
        try:
            words_obj = load_object(path.stem, directory=str(path.parent), reload_results=True)
            print(f"Loaded {path.name} from directory path.")
            return words_obj
        except Exception as exc_dir:
            print(f"Direct load failed for {path.name}: {exc_dir}")
            with open(path, "rb") as handle:
                words_obj = pickle.load(handle)
            print(f"Loaded {path.name} via raw pickle (reload_results=False).")
            return words_obj


words_objects: list[tuple[str, Words]] = []
for path in available_paths:
    try:
        words_objects.append((path.stem.replace("words_", ""), load_words_object(path)))
    except Exception as exc:
        print(f"[warn] Skipping {path.name}: {exc}")

if not words_objects:
    raise RuntimeError("None of the words_*.p files could be loaded.")

print(f"Loaded {len(words_objects)} words object(s).")


In [None]:

from lisc.data.articles import Articles


def extract_first_author(author_entry):
    """Return a (name, affiliation) pair for a first-author entry."""
    if isinstance(author_entry, dict):
        name_parts = [author_entry.get(key, "") for key in ("last", "first")]
        name = ", ".join(part for part in name_parts if part).strip(", ") or None
        affiliation = author_entry.get("affiliation")
    elif isinstance(author_entry, (list, tuple)):
        last = str(author_entry[0]).strip() if len(author_entry) > 0 else ""
        first = str(author_entry[1]).strip() if len(author_entry) > 1 else ""
        name = ", ".join(part for part in (last, first) if part).strip(", ") or None
        affiliation = author_entry[3] if len(author_entry) > 3 else None
    else:
        name = None
        affiliation = None
    if affiliation:
        affiliation = affiliation.strip() or None
    return name, affiliation


def iter_article_records(words_label: str, words_obj: Words):
    """Yield dictionaries describing each article gathered for a words object."""
    term_labels = list(getattr(words_obj.terms['A'], 'labels', []))
    results = list(getattr(words_obj, 'results', []))
    if not results:
        return
    for term_label, articles in zip(term_labels, results):
        n_articles = getattr(articles, 'n_articles', 0)
        if not n_articles:
            continue
        for article in articles:
            pmid = article.get('id')
            year = article.get('year')
            title = article.get('title')
            authors = article.get('authors') or []
            if not authors:
                continue
            first_name, affiliation = extract_first_author(authors[0])
            yield {
                'words_label': words_label,
                'term_label': term_label,
                'pmid': str(pmid) if pmid is not None else None,
                'year': int(year) if isinstance(year, (int, float)) and not np.isnan(year) else year,
                'title': title,
                'first_author': first_name,
                'affiliation': affiliation,
            }


records = []
for words_label, words_obj in words_objects:
    for record in iter_article_records(words_label, words_obj):
        records.append(record)

print(f"Collected {len(records)} article-level records from the loaded words objects.")



## Parse first-author affiliations

Affiliations often include department, institution, city, and country. We scan the
strings from the end, using `pycountry` (plus a few aliases) to infer the country
for each first author.


In [None]:

ALIAS_MAP = {
    'england': 'GBR',
    'scotland': 'GBR',
    'wales': 'GBR',
    'northern ireland': 'GBR',
    'united kingdom': 'GBR',
    'u.k': 'GBR',
    'u.k.': 'GBR',
    'uk': 'GBR',
    'great britain': 'GBR',
    'the netherlands': 'NLD',
    'netherlands': 'NLD',
    'czech republic': 'CZE',
    'south korea': 'KOR',
    'republic of korea': 'KOR',
    'korea': 'KOR',
    'north korea': 'PRK',
    'peoples republic of china': 'CHN',
    'p.r. china': 'CHN',
    'pr china': 'CHN',
    'u.s.a': 'USA',
    'u.s.': 'USA',
    'u.s': 'USA',
    'usa': 'USA',
    'us': 'USA',
    'united states': 'USA',
    'united states of america': 'USA',
    'ivory coast': 'CIV',
    'cote divoire': 'CIV',
    'democratic republic of the congo': 'COD',
    'republic of the congo': 'COG',
    'russia': 'RUS',
    'syria': 'SYR',
    'palestine': 'PSE',
    'taiwan': 'TWN',
    'hong kong': 'HKG',
    'macau': 'MAC',
    'vietnam': 'VNM',
    'viet nam': 'VNM',
    'burma': 'MMR',
    'myanmar': 'MMR',
    'laos': 'LAO',
    'bolivia': 'BOL',
    'venezuela': 'VEN',
    'iran': 'IRN',
    'uae': 'ARE',
    'u.a.e.': 'ARE',
    'emirates': 'ARE',
    'brunei': 'BRN',
    'cape verde': 'CPV',
    'eswatini': 'SWZ',
    'swaziland': 'SWZ',
    'timor leste': 'TLS',
    'east timor': 'TLS',
    'burundi': 'BDI',
    'tanzania': 'TZA',
    'macedonia': 'MKD',
    'kosovo': 'XKX',
    'slovakia': 'SVK',
    'moldova': 'MDA'
}


def normalize(text: str) -> str:
    cleaned = re.sub(r"[^A-Za-z\s]", " ", text).strip().lower()
    cleaned = re.sub(r"\s+", " ", cleaned)
    return cleaned


def match_country(candidate: str) -> Optional[Tuple[str, str]]:
    """Return (ISO3, country name) for a text candidate."""
    if not candidate:
        return None
    candidate = candidate.strip()
    if not candidate:
        return None
    try:
        country = pycountry.countries.lookup(candidate)
        return country.alpha_3, country.name
    except LookupError:
        pass
    normalized = normalize(candidate)
    if not normalized:
        return None
    alias_code = ALIAS_MAP.get(normalized)
    if alias_code:
        country = (
            pycountry.countries.get(alpha_3=alias_code)
            or pycountry.countries.get(alpha_2=alias_code)
        )
        label = country.name if country else alias_code
        return alias_code, label
    try:
        country = pycountry.countries.lookup(normalized)
        return country.alpha_3, country.name
    except LookupError:
        return None


def infer_country(affiliation: Optional[str]) -> Optional[Tuple[str, str, str]]:
    """Infer an ISO3 country code and canonical name from an affiliation string."""
    if not affiliation:
        return None
    chunks = re.split(r"[;/]", affiliation)
    parts: list[str] = []
    for chunk in chunks:
        pieces = [piece.strip() for piece in chunk.split(',') if piece.strip()]
        parts.extend(pieces)
    for part in reversed(parts):
        match = match_country(part)
        if match:
            iso, country_name = match
            return iso, country_name, part.strip()
    match = match_country(affiliation)
    if match:
        iso, country_name = match
        return iso, country_name, affiliation.strip()
    return None


for record in records:
    guess = infer_country(record.get('affiliation'))
    if guess:
        record['country_iso3'], record['country'], record['matched_text'] = guess
    else:
        record['country_iso3'] = None
        record['country'] = None
        record['matched_text'] = None

print("Finished inferring countries.")


In [None]:

records_df = pd.DataFrame(records)
print(
    f"Records with inferred countries: {(records_df['country_iso3'].notna()).sum()} of {len(records_df)}"
)
records_df.head()



## Aggregate by country

We count unique PubMed IDs per country (based on the inferred first-author
location). When the same article appears under multiple terms or words objects,
we keep a single occurrence per country.


In [None]:

if records_df.empty or not records_df['country_iso3'].notna().any():
    raise RuntimeError(
        "No article-level country information was found. Make sure the words objects include "
        "raw article metadata with author affiliations."
    )

unique_articles = (
    records_df.dropna(subset=['country_iso3'])
    .sort_values(['pmid', 'year'], ascending=[True, False])
    .drop_duplicates(subset=['pmid'])
)

country_summary = (
    unique_articles.groupby(['country_iso3', 'country'])
    .agg(
        articles=('pmid', 'nunique'),
        first_year=('year', 'min'),
        last_year=('year', 'max')
    )
    .reset_index()
    .sort_values('articles', ascending=False)
)

country_summary.head(10)



## Visualize the worldwide footprint

The choropleth highlights the countries where first authors are publishing blue
health research (based on the collected associations).


In [None]:

fig = px.choropleth(
    country_summary,
    locations='country_iso3',
    color='articles',
    hover_name='country',
    hover_data={
        'articles': True,
        'first_year': True,
        'last_year': True,
        'country_iso3': False,
    },
    color_continuous_scale='Blues',
    title='Blue Health Publications by First Author Country'
)
fig.update_layout(coloraxis_colorbar={'title': 'Articles'})
fig.show()



### Top countries and representative terms

To provide more context, the table below lists the leading countries along with
the blue-health factor labels most frequently associated with their articles.


In [None]:

country_term = (
    records_df.dropna(subset=['country_iso3'])
    .groupby(['country', 'term_label'])
    .size()
    .reset_index(name='article_count')
)

idx = country_term.groupby('country')['article_count'].idxmax()
country_leaders = country_term.loc[idx].sort_values('article_count', ascending=False)

summary_with_terms = country_summary.merge(
    country_leaders[['country', 'term_label', 'article_count']],
    on='country', how='left'
)
summary_with_terms.rename(
    columns={
        'term_label': 'leading_term',
        'article_count': 'articles_for_leading_term'
    },
    inplace=True
)
summary_with_terms.sort_values('articles', ascending=False).head(15)
