### Extracting companies from each directories

### Web Crawling on directories

In [None]:
# ============================================
# Robust scraper for company listings (Zimbabwe)
# Crawls listing pages, pagination, subdirectories, and detail pages.
# Outputs: name, address, phone, email, website, source_url
# ============================================

from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import re
import time
import random
import json
import itertools

# --------------------------------
# If the input DF is not defined, try to load it
# --------------------------------
try:
    directories_with_listings
except NameError:
    try:
        directories_with_listings = pd.read_csv("directories_with_listings.csv")
        print("Loaded directories_with_listings.csv")
    except Exception:
        print("Please load 'directories_with_listings' DataFrame before running.")

# -------------------------
# Configuration
# -------------------------
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
    "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_6) AppleWebKit/605.1.15 "
    "(KHTML, like Gecko) Version/17.0 Safari/605.1.15",
]
def _headers():
    return {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept-Language": "en-US,en;q=0.9",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
        "Referer": "https://www.google.com/",
    }

REQUEST_TIMEOUT = 15
SLEEP_BETWEEN_REQUESTS = (0.6, 1.4)  # randomized sleep min/max sec

# Generic limits (for non-ZYP sites)
MAX_PAGES_PER_LISTING = 15     # per seed listing url
MAX_LISTING_DEPTH = 10         # how deep to follow listing subpages/categories
MAX_DETAIL_PAGES = 400         # safety limit per seed to avoid over-crawl

# ZimbabweYP-specific limits (tuned for very large coverage)
ZYP_MAX_CATEGORIES = None      # None = no explicit limit (will attempt all)
ZYP_MAX_PAGES_PER_CATEGORY = 2000  # deep pagination per category
ZYP_DETAIL_WORKERS = 8         # per-ZYP detail requests concurrency (be considerate!)

ZIM_COUNTRY = "Zimbabwe"       # annotate country

# -------------------------
# Utilities
# -------------------------
def new_session():
    s = requests.Session()
    retry = Retry(
        total=3,
        read=3,
        connect=3,
        status=3,
        backoff_factor=0.5,
        status_forcelist=[429, 500, 502, 503, 504],
        # allowed_methods defaults to GET/HEAD; we only use GET
    )
    adapter = HTTPAdapter(max_retries=retry)
    s.mount("http://", adapter)
    s.mount("https://", adapter)
    return s

def clean_text(x):
    if not x:
        return ""
    return re.sub(r"\s+", " ", str(x)).strip()

def get_domain(url):
    try:
        return urlparse(url).netloc.lower().replace("www.", "")
    except Exception:
        return ""

def same_domain(u1, u2):
    return get_domain(u1) == get_domain(u2)

def normalize_url(url):
    try:
        p = urlparse(url)
        base = f"{p.scheme}://{p.netloc}"
        path = p.path or "/"
        return base + path
    except Exception:
        return url

def sleep_brief():
    time.sleep(random.uniform(*SLEEP_BETWEEN_REQUESTS))

PHONE_RE = re.compile(r"(?:\+?\d[\d\-\s\(\)]{6,}\d)")
EMAIL_RE = re.compile(r"[A-Z0-9._%+\-]+@[A-Z0-9.\-]+\.[A-Z]{2,}", re.I)

def extract_phones(text):
    return list({clean_text(m.group(0)) for m in PHONE_RE.finditer(text or "")})

def extract_emails(text):
    return list({clean_text(m.group(0)) for m in EMAIL_RE.finditer(text or "")})

# -------------------------
# HTTP -> Soup
# -------------------------
def get_soup(url, session):
    try:
        resp = session.get(url, headers=_headers(), timeout=REQUEST_TIMEOUT, allow_redirects=True)
        if resp.status_code >= 400:
            return None
        if not resp.encoding:
            resp.encoding = "utf-8"
        return BeautifulSoup(resp.text, "lxml")
    except Exception:
        return None

# -------------------------
# Extraction helpers
# -------------------------
ORG_TYPES = {"Organization", "LocalBusiness", "Corporation", "Company", "NGO", "Store", "MedicalBusiness", "ProfessionalService"}

def extract_from_json_ld(soup, source_url):
    """Read JSON-LD and extract org-like records."""
    results = []
    for tag in soup.find_all("script", type=lambda t: t and "ld+json" in t):
        raw = tag.string or ""
        if not raw.strip():
            continue
        # Some sites stuff malformed JSON or CDATA
        try:
            data = json.loads(raw)
        except Exception:
            try:
                content = raw.strip()
                if content.startswith("<![CDATA["):
                    content = content[9:-3]
                data = json.loads(content)
            except Exception:
                continue

        candidates = []
        if isinstance(data, list):
            candidates = data
        elif isinstance(data, dict):
            if "@graph" in data and isinstance(data["@graph"], list):
                candidates = data["@graph"]
            else:
                candidates = [data]

        for item in candidates:
            if not isinstance(item, dict):
                continue
            typ = item.get("@type")
            types = set([typ]) if isinstance(typ, str) else set(typ or [])
            if not types.intersection(ORG_TYPES):
                # JSON-LD can also have ListItem entries linking to detail pages
                # We'll also parse ListItem link targets for discovery.
                continue
            name = clean_text(item.get("name"))
            if not name:
                continue

            addr = item.get("address", "")
            if isinstance(addr, dict):
                addr_parts = [
                    addr.get("streetAddress", ""),
                    addr.get("addressLocality", ""),
                    addr.get("addressRegion", ""),
                    addr.get("postalCode", ""),
                    addr.get("addressCountry", ""),
                ]
                address = clean_text(" ".join([a for a in addr_parts if a]))
            else:
                address = clean_text(addr)

            rec = {
                "name": name,
                "address": address,
                "phone": clean_text(item.get("telephone", "")),
                "email": clean_text(item.get("email", "")),
                "website": clean_text(item.get("url", "")),
                "source_url": source_url,
                "country": ZIM_COUNTRY,
            }
            results.append(rec)
    return results

def extract_listitem_links_from_jsonld(soup, base_url):
    """For listing pages that use JSON-LD itemListElement with links to companies."""
    links = set()
    for tag in soup.find_all("script", type=lambda t: t and "ld+json" in t):
        raw = tag.string or ""
        if not raw.strip():
            continue
        try:
            data = json.loads(raw)
        except Exception:
            try:
                content = raw.strip()
                if content.startswith("<![CDATA["):
                    content = content[9:-3]
                data = json.loads(content)
            except Exception:
                continue
        buckets = []
        if isinstance(data, list):
            buckets = data
        elif isinstance(data, dict):
            buckets = [data]
        for obj in buckets:
            if not isinstance(obj, dict):
                continue
            items = obj.get("itemListElement")
            if isinstance(items, list):
                for entry in items:
                    # entry can be {"@type": "ListItem", "item": {"@id": "...", "url": "..."}}
                    item = entry.get("item") if isinstance(entry, dict) else None
                    if isinstance(item, dict):
                        href = item.get("@id") or item.get("url")
                        if href:
                            full = urljoin(base_url, href)
                            if same_domain(full, base_url):
                                links.add(full)
    return list(links)

def extract_from_listing_blocks(soup, base_url):
    """Heuristics over common listing block patterns."""
    companies = []
    detail_links = set()

    selectors = [
        # generic listing containers
        'div[class*="listing"]', 'div[class*="result"]', 'div[class*="company"]',
        'div[class*="business"]', 'div[class*="entry"]', 'div[class*="directory"]',
        'li[class*="listing"]', 'li[class*="result"]', 'article',
        # card-like
        '.card', '.item', '.media',
        # table rows (skip header rows)
        'table tr'
    ]

    for sel in selectors:
        for block in soup.select(sel):
            try:
                if block.name == "tr" and block.find_all("th"):
                    continue

                # Candidate name
                name_elem = block.select_one('a[itemprop="name"], .name a, .title a, h2 a, h3 a, h4 a') \
                            or block.select_one('.name, .title, h2, h3, h4, strong, [itemprop="name"]')
                name = clean_text(name_elem.get_text(" ", strip=True)) if name_elem else ""

                # Address/phone/email within block
                address_elem = block.select_one('address, [itemprop="address"], .address, .location, .addr, .contact')
                address = clean_text(address_elem.get_text(" ", strip=True)) if address_elem else ""

                phone_elem = block.select_one('a[href^="tel:"], [itemprop="telephone"], .phone, .tel')
                phone = clean_text(phone_elem.get_text(" ", strip=True)) if phone_elem else ""

                email_elem = block.select_one('a[href^="mailto:"], .email')
                email = clean_text(email_elem.get_text(" ", strip=True)) if email_elem else ""

                website = ""
                detail_url = None

                link_candidates = block.find_all("a", href=True)
                for a in link_candidates:
                    href = a["href"]
                    if href.startswith(("mailto:", "tel:", "javascript:", "#")):
                        continue
                    full = urljoin(base_url, href)
                    text = (a.get_text(" ", strip=True) or "").lower()
                    if "website" in text or ("http" in href and not same_domain(full, base_url)):
                        website = full
                    # internal link as detail page
                    if name and name_elem and a == name_elem or (name and not detail_url and same_domain(full, base_url)):
                        detail_url = full

                # Fallback phone/email from text
                block_text = block.get_text(" ", strip=True)
                if not phone:
                    phones = extract_phones(block_text)
                    phone = phones[0] if phones else ""
                if not email:
                    emails = extract_emails(block_text)
                    email = emails[0] if emails else ""

                if name:
                    companies.append({
                        "name": name,
                        "address": address,
                        "phone": phone,
                        "email": email,
                        "website": website,
                        "detail_url": detail_url,
                    })
                if detail_url:
                    detail_links.add(detail_url)
            except Exception:
                continue

    # Also pick up detail links from JSON-LD ListItem if present
    jsonld_detail_links = extract_listitem_links_from_jsonld(soup, base_url)
    for l in jsonld_detail_links:
        detail_links.add(l)

    return companies, list(detail_links)

def parse_table_with_headers(soup, base_url):
    """If there's a well-structured table with headers, map columns."""
    table = soup.find("table")
    if not table:
        return []

    header_row = table.find("tr")
    if not header_row:
        return []
    headers = [clean_text(th.get_text()) for th in header_row.find_all(["th", "td"])]

    def col_index(names):
        for i, h in enumerate(headers):
            for n in names:
                if n in h.lower():
                    return i
        return None

    idx_name = col_index(["name", "company", "business"])
    if idx_name is None:
        return []

    idx_addr = col_index(["address", "location"])
    idx_phone = col_index(["phone", "tel", "contact"])
    idx_email = col_index(["email", "e-mail", "mail"])
    idx_site  = col_index(["website", "site", "url", "link"])

    companies = []
    for tr in table.find_all("tr")[1:]:
        tds = tr.find_all("td")
        if len(tds) <= idx_name:
            continue
        name = clean_text(tds[idx_name].get_text(" ", strip=True))
        if not name:
            continue
        address = clean_text(tds[idx_addr].get_text(" ", strip=True)) if idx_addr is not None and idx_addr < len(tds) else ""
        phone   = clean_text(tds[idx_phone].get_text(" ", strip=True)) if idx_phone is not None and idx_phone < len(tds) else ""
        email   = clean_text(tds[idx_email].get_text(" ", strip=True)) if idx_email is not None and idx_email < len(tds) else ""
        website = ""
        if idx_site is not None and idx_site < len(tds):
            link = tds[idx_site].find("a", href=True)
            website = urljoin(base_url, link["href"]) if link else clean_text(tds[idx_site].get_text(" ", strip=True))
        companies.append({"name": name, "address": address, "phone": phone, "email": email, "website": website})
    return companies

def find_next_page_url(soup, current_url):
    """Try to locate the 'next' page link."""
    # 1) rel=next
    for a in soup.find_all("a", href=True, rel=True):
        rel = a.get("rel")
        rels = [rel] if isinstance(rel, str) else (rel or [])
        rels = [r.lower() for r in rels]
        if "next" in rels:
            return urljoin(current_url, a["href"])

    # 2) Next labels/classes
    for a in soup.find_all("a", href=True):
        label = (a.get_text(" ", strip=True) or "").lower()
        classes = " ".join(a.get("class", [])).lower()
        aria = (a.get("aria-label") or "").lower()
        if any(t in label for t in ["next", "older", "more", "»", ">"]) or "next" in classes or "next" in aria:
            return urljoin(current_url, a["href"])

    # 3) numeric patterns
    m = re.search(r"([?&])(page|p)=(\d+)", current_url, flags=re.I)
    if m:
        n = int(m.group(3)) + 1
        return re.sub(r"([?&])(page|p)=\d+", rf"\1\2={n}", current_url)

    m2 = re.search(r"/page/(\d+)/?$", current_url, flags=re.I)
    if m2:
        n = int(m2.group(1)) + 1
        return re.sub(r"/page/\d+/?$", f"/page/{n}/", current_url)

    return None

def parse_detail_page(soup, url):
    """Parse a detail page for richer info (JSON-LD first, then heuristics)."""
    orgs = extract_from_json_ld(soup, url)
    if orgs:
        best = orgs[0]
        return {
            "name": best.get("name", ""),
            "address": best.get("address", ""),
            "phone": best.get("phone", ""),
            "email": best.get("email", ""),
            "website": best.get("website", ""),
        }

    # Heuristic parsing
    name_elem = soup.select_one('h1, h2, .entry-title, .page-title, .company-name, [itemprop="name"]')
    name = clean_text(name_elem.get_text(" ", strip=True)) if name_elem else ""
    if not name:
        title_elem = soup.find("title")
        name = clean_text(title_elem.get_text()) if title_elem else ""

    addr_elem = soup.select_one('address, [itemprop="address"], .address, .location, .contact, .details')
    address = clean_text(addr_elem.get_text(" ", strip=True)) if addr_elem else ""

    page_text = soup.get_text(" ", strip=True)
    phones = extract_phones(page_text)
    emails = extract_emails(page_text)

    website = ""
    for a in soup.find_all("a", href=True):
        t = (a.get_text(" ", strip=True) or "").lower()
        if "website" in t or "visit" in t or "homepage" in t:
            website = urljoin(url, a["href"])
            break

    return {
        "name": name,
        "address": address,
        "phone": phones[0] if phones else "",
        "email": emails[0] if emails else "",
        "website": website,
    }

# -------------------------
# Generic directory crawler (fallback for all sites)
# -------------------------
def crawl_listing_seed(seed_url, max_pages=MAX_PAGES_PER_LISTING, max_depth=MAX_LISTING_DEPTH, max_detail=MAX_DETAIL_PAGES):
    session = new_session()
    visited_listing = set()
    visited_detail = set()
    queue = [(seed_url, 0)]
    gathered = []
    pages_seen = 0
    domain = get_domain(seed_url)

    while queue and pages_seen < max_pages:
        url, depth = queue.pop(0)
        if url in visited_listing:
            continue
        visited_listing.add(url)

        soup = get_soup(url, session)
        if not soup:
            continue

        # JSON-LD records inline
        jsonld_companies = extract_from_json_ld(soup, url)
        for rec in jsonld_companies:
            rec["source_url"] = url
            rec["country"] = ZIM_COUNTRY
            gathered.append(rec)

        # Structured tables
        table_companies = parse_table_with_headers(soup, url)
        for rec in table_companies:
            rec["source_url"] = url
            rec["country"] = ZIM_COUNTRY
            gathered.append(rec)

        # Listing blocks and detail links
        block_companies, detail_links = extract_from_listing_blocks(soup, url)
        for rec in block_companies:
            rec["source_url"] = url
            rec["country"] = ZIM_COUNTRY
            gathered.append(rec)

        # Follow detail pages (limited)
        details_followed = 0
        for durl in detail_links:
            if details_followed >= max_detail:
                break
            if durl in visited_detail:
                continue
            visited_detail.add(durl)
            sleep_brief()
            dsoup = get_soup(durl, session)
            if not dsoup:
                continue
            enriched = parse_detail_page(dsoup, durl)
            merged = {
                "name": enriched.get("name", ""),
                "address": enriched.get("address", ""),
                "phone": enriched.get("phone", ""),
                "email": enriched.get("email", ""),
                "website": enriched.get("website", ""),
                "source_url": durl,
                "country": ZIM_COUNTRY,
            }
            if merged["name"] or merged["website"]:
                gathered.append(merged)
            details_followed += 1

        # Discover same-domain sub-listings
        if depth < max_depth:
            more = discover_listing_links(soup, url)
            for m in more:
                if get_domain(m) == domain and m not in visited_listing:
                    queue.append((m, depth + 1))

        # Pagination
        next_page = find_next_page_url(soup, url)
        if next_page and get_domain(next_page) == domain and next_page not in visited_listing:
            queue.append((next_page, depth))  # same depth for pagination

        pages_seen += 1
        sleep_brief()

    # Cleanup + dedupe
    rows = []
    for rec in gathered:
        name = clean_text(rec.get("name", ""))
        if not name:
            continue
        row = {
            "name": name,
            "address": clean_text(rec.get("address", "")),
            "phone": clean_text(rec.get("phone", "")),
            "email": clean_text(rec.get("email", "")),
            "website": clean_text(rec.get("website", "")),
            "source_url": clean_text(rec.get("source_url", "")),
            "country": rec.get("country", ZIM_COUNTRY),
        }
        rows.append(row)

    df = pd.DataFrame(rows)
    if df.empty:
        return df
    def dedupe_key(r):
        w = r.get("website", "")
        p = r.get("phone", "")
        return (r.get("name", ""), w if w else p, r.get("source_url", ""))
    df["_k"] = df.apply(dedupe_key, axis=1)
    df = df.drop_duplicates("_k").drop(columns="_k")
    return df

# -------------------------
# ZimbabweYP site-specific crawler (deep coverage)
# -------------------------
def is_zyp(url: str) -> bool:
    try:
        d = urlparse(url).netloc.lower()
        return "zimbabweyp.com" in d
    except Exception:
        return False

def zyp_extract_category_links(soup, base_url):
    """Extract category links from browse or category pages on ZimbabweYP."""
    links = set()
    for a in soup.find_all("a", href=True):
        href = a["href"]
        if href.startswith(("mailto:", "tel:", "javascript:", "#")):
            continue
        full = urljoin(base_url, href)
        if not same_domain(full, base_url):
            continue
        path = urlparse(full).path.lower()
        text = (a.get_text(" ", strip=True) or "").lower()
        # Typical patterns observed on YP-like sites
        if any(seg in path for seg in ["/category/", "/categories/", "/categories", "/companies/zimbabwe/", "/zimbabwe/"]):
            links.add(full)
        if "category" in text or "categories" in text:
            links.add(full)
    return list(links)

def zyp_extract_detail_links_from_list(soup, base_url):
    """Extract company detail links from a listing page."""
    detail_links = set()
    blocks = soup.select(
        'div[class*="listing"], div[class*="result"], div[class*="company"], '
        'div[class*="business"], div[class*="entry"], li[class*="listing"], li[class*="result"], article'
    )
    anchors = list(itertools.chain.from_iterable([b.find_all("a", href=True) for b in blocks])) if blocks else soup.find_all("a", href=True)
    for a in anchors:
        href = a["href"]
        if href.startswith(("mailto:", "tel:", "javascript:", "#")):
            continue
        full = urljoin(base_url, href)
        if not same_domain(full, base_url):
            continue
        path = urlparse(full).path.lower()
        if any(p in path for p in ["/company/", "/companies/", "/business/", "/profile/", "/listing/"]):
            detail_links.add(full)

    # JSON-LD list items can also reference detail URLs
    for l in extract_listitem_links_from_jsonld(soup, base_url):
        detail_links.add(l)
    return list(detail_links)

def zyp_find_next_page(soup, current_url):
    """ZimbabweYP pagination heuristics."""
    # rel=next
    for a in soup.find_all("a", href=True, rel=True):
        rel = a.get("rel")
        rels = [rel] if isinstance(rel, str) else (rel or [])
        rels = [r.lower() for r in rels]
        if "next" in rels:
            return urljoin(current_url, a["href"])
    # Labels/ARIA/classes
    for a in soup.find_all("a", href=True):
        label = (a.get_text(" ", strip=True) or "").lower()
        classes = " ".join(a.get("class", [])).lower()
        aria = (a.get("aria-label") or "").lower()
        if any(t in label for t in ["next", "older", "more", "»", ">"]) or "next" in classes or "next" in aria:
            return urljoin(current_url, a["href"])
    # ?page= / ?p=
    m = re.search(r"([?&])(page|p)=(\d+)", current_url, flags=re.I)
    if m:
        n = int(m.group(3)) + 1
        return re.sub(r"([?&])(page|p)=\d+", rf"\1\2={n}", current_url)
    # /page/N/
    m2 = re.search(r"/page/(\d+)/?$", current_url, flags=re.I)
    if m2:
        n = int(m2.group(1)) + 1
        return re.sub(r"/page/\d+/?$", f"/page/{n}/", current_url)
    return None

def zyp_fetch_detail(url, session):
    dsoup = get_soup(url, session)
    if not dsoup:
        return None
    rec = parse_detail_page(dsoup, url)
    rec["source_url"] = url
    rec["country"] = ZIM_COUNTRY
    return rec if rec.get("name") else None

def crawl_zimbabweyp(seed_url):
    """
    Full crawl for ZimbabweYP starting from either:
      - the browse page (preferred), or
      - any category page (will crawl that category; also discovers more categories).
    BFS over categories; deep pagination; then fetch all detail pages.
    """
    session = new_session()
    start_soup = get_soup(seed_url, session)
    if not start_soup:
        return pd.DataFrame()

    # Initialize category queue
    categories = set(zyp_extract_category_links(start_soup, seed_url))
    path = urlparse(seed_url).path.lower()
    if "/category/" in path or "/zimbabwe/" in path or "/companies/zimbabwe/" in path:
        categories.add(seed_url)
    if not categories:
        # If browse page does not clearly expose categories, still treat seed as category
        categories.add(seed_url)

    categories = list(categories)
    if ZYP_MAX_CATEGORIES is not None:
        categories = categories[:ZYP_MAX_CATEGORIES]

    print(f"ZimbabweYP: discovered {len(categories)} category URLs to crawl")
    seen_categories = set()
    all_detail_links = set()

    # BFS over categories, discovering more as we paginate
    idx = 0
    while idx < len(categories):
        cat = categories[idx]
        idx += 1
        if cat in seen_categories:
            continue
        seen_categories.add(cat)
        print(f"  [cat {len(seen_categories)}/{len(categories)}] {cat}")

        pages = 0
        url = cat
        while url and pages < ZYP_MAX_PAGES_PER_CATEGORY:
            soup = get_soup(url, session)
            if not soup:
                break

            # Collect detail links on this page
            detail_links = zyp_extract_detail_links_from_list(soup, url)
            for l in detail_links:
                all_detail_links.add(l)

            # Discover more categories from this page
            more_cats = zyp_extract_category_links(soup, url)
            for mc in more_cats:
                if mc not in seen_categories and mc not in categories and same_domain(mc, cat):
                    categories.append(mc)

            nxt = zyp_find_next_page(soup, url)
            if not nxt or nxt == url:
                break
            url = nxt
            pages += 1
            sleep_brief()

    print(f"ZimbabweYP: total unique detail links: {len(all_detail_links)}")

    # Fetch detail pages concurrently
    results = []
    links_list = list(all_detail_links)

    def worker(u):
        try:
            return zyp_fetch_detail(u, session)
        except Exception:
            return None

    if links_list:
        with ThreadPoolExecutor(max_workers=ZYP_DETAIL_WORKERS) as ex:
            futures = [ex.submit(worker, u) for u in links_list]
            completed = 0
            for fut in as_completed(futures):
                rec = fut.result()
                if rec:
                    results.append(rec)
                completed += 1
                if completed % max(50, len(links_list)//20 or 1) == 0 or completed == len(links_list):
                    print(f"    Detail progress: {completed}/{len(links_list)} "
                          f"({int(completed/len(links_list)*100)}%), records: {len(results)}")

    if not results:
        return pd.DataFrame()

    df = pd.DataFrame(results)
    # Dedupe by source_url (unique per company)
    if "source_url" in df.columns:
        df = df.drop_duplicates(subset=["source_url"])
    # Fallback dedupe
    if not df.empty and {"name","website"}.issubset(df.columns):
        df = df.drop_duplicates(subset=["name","website"], keep="first")
    return df[["name", "address", "phone", "email", "website", "source_url", "country"]]

# -------------------------
# Dispatcher over directory rows
# -------------------------
def process_listing_url(row):
    """Process a single row's listing_urls and return a merged DataFrame of companies."""
    listing_field = row.get("listing_urls")
    if pd.isna(listing_field) or not str(listing_field).strip():
        return pd.DataFrame()

    seeds = [u.strip() for u in str(listing_field).split("|") if u.strip()]
    if not seeds:
        return pd.DataFrame()

    out = []
    for u in seeds:
        print(f"  Crawling seed: {u}")
        try:
            if is_zyp(u):
                df = crawl_zimbabweyp(u)
            else:
                df = crawl_listing_seed(
                    u,
                    max_pages=MAX_PAGES_PER_LISTING,
                    max_depth=MAX_LISTING_DEPTH,
                    max_detail=MAX_DETAIL_PAGES
                )
            if isinstance(df, pd.DataFrame) and not df.empty:
                out.append(df)
        except Exception as e:
            print(f"  Error crawling {u}: {e}")
        sleep_brief()

    if out:
        merged = pd.concat(out, ignore_index=True)
        if "source_url" in merged.columns:
            merged = merged.drop_duplicates(subset=["source_url"])
        if {"name","website"}.issubset(merged.columns):
            merged = merged.drop_duplicates(subset=["name","website"], keep="first")
        return merged
    return pd.DataFrame()

def extract_all_companies(directories_df, max_workers=3):
    """Extract companies from all listing URLs in the DataFrame with progress."""
    df_in = directories_df.copy()
    if "has_listings" in df_in.columns:
        df_in = df_in[df_in["has_listings"] == True]
    df_in = df_in[df_in["listing_urls"].notna() & (df_in["listing_urls"].astype(str).str.strip() != "")]

    total = len(df_in)
    if total == 0:
        print("No valid listing URLs found.")
        return pd.DataFrame()

    print(f"Processing {total} directories with listings...")
    results = []
    completed = 0
    start = time.time()

    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        futures = {ex.submit(process_listing_url, row): idx for idx, row in df_in.iterrows()}
        for fut in as_completed(futures):
            try:
                out_df = fut.result()
                if isinstance(out_df, pd.DataFrame) and not out_df.empty:
                    results.append(out_df)
            except Exception as e:
                print("Error in worker:", e)
            finally:
                completed += 1
                if completed % max(1, total // 20) == 0 or completed == total:
                    elapsed = time.time() - start
                    print(f"Progress: {completed}/{total} ({int(completed/total*100)}%) | "
                          f"Elapsed: {elapsed/60:.1f}m | Batches with data: {len(results)}")

    if not results:
        return pd.DataFrame()

    all_df = pd.concat(results, ignore_index=True)

    # Final dedupe across all sources
    def final_key(r):
        w = r.get("website", "")
        p = r.get("phone", "")
        return (r.get("name", ""), w if w else p)
    all_df["_k"] = all_df.apply(final_key, axis=1)
    all_df = all_df.drop_duplicates("_k").drop(columns="_k")
    return all_df

# -------------------------
# Run extraction and save
# -------------------------
print("Starting company extraction (robust pipeline)...")
companies_df = extract_all_companies(directories_with_listings, max_workers=3)

if not companies_df.empty:
    ts = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S")
    out_file = f"extracted_companies_{ts}.csv"
    companies_df.to_csv(out_file, index=False)
    print(f"\nExtraction complete. Saved {len(companies_df)} companies to {out_file}")
    display(companies_df.head(20))
else:
    print("No companies were extracted. You may need to increase depth/pages or some sites are JS-rendered.")

Starting company extraction (robust pipeline)...
Processing 43 directories with listings...
  Crawling seed: https://afrikta.com/listing-locations/zimbabwe/
  Crawling seed: https://apps.apple.com/in/app/trade-with-vikas/id6608975480
  Crawling seed: https://antropocene.it/2021/10/23/parco-nazionale-e-riserva-di-wrangell-st-elias/
  Crawling seed: https://appadvice.com/app/trade-with-vikas/6608975480
  Crawling seed: https://forum.wordreference.com/threads/company-is-are.2961868/Progress: 2/43 (4%) | Elapsed: 0.3m | Batches with data: 1

  Error crawling https://afrikta.com/listing-locations/zimbabwe/: sequence item 2: expected str instance, dict found
  Crawling seed: https://en.wikipedia.org/wiki/Zimbabwe



Assuming this really is an XML document, what you're doing might work, but you should know that using an XML parser will be more reliable. To parse this document as XML, make sure you have the Python package 'lxml' installed, and pass the keyword argument `features="xml"` into the BeautifulSoup constructor.




  return BeautifulSoup(resp.text, "lxml")


  Crawling seed: https://forum.wordreference.com/threads/in-at-the-company.215026/Progress: 4/43 (9%) | Elapsed: 1.3m | Batches with data: 2

  Crawling seed: https://forum.wordreference.com/threads/m-s-followed-by-a-company-name.1606232/



Assuming this really is an XML document, what you're doing might work, but you should know that using an XML parser will be more reliable. To parse this document as XML, make sure you have the Python package 'lxml' installed, and pass the keyword argument `features="xml"` into the BeautifulSoup constructor.




  return BeautifulSoup(resp.text, "lxml")


  Crawling seed: https://eduniversal-ranking.com/gem-alpine-business-school-ranking.htmlProgress: 6/43 (13%) | Elapsed: 2.0m | Batches with data: 3

  Crawling seed: https://forum.wordreference.com/threads/who-is-company-name-or-who-are-company-name.3469707/
  Crawling seed: https://gizmodo.com/download/google-chromeProgress: 8/43 (18%) | Elapsed: 2.6m | Batches with data: 5




Assuming this really is an XML document, what you're doing might work, but you should know that using an XML parser will be more reliable. To parse this document as XML, make sure you have the Python package 'lxml' installed, and pass the keyword argument `features="xml"` into the BeautifulSoup constructor.




  return BeautifulSoup(resp.text, "lxml")


  Crawling seed: https://isnca.org/it/cosa-cè-di-così-speciale-nel-wrangell-st-elias/
  Crawling seed: https://it.wikipedia.org/wiki/Parco_nazionale_e_riserva_di_Wrangell-St._EliasProgress: 10/43 (23%) | Elapsed: 3.1m | Batches with data: 7

  Crawling seed: https://it.knowledgr.com/10726474/WrangellParcoNazionaleDiSEliasERiserva
  Crawling seed: https://news.co.zw/Progress: 12/43 (27%) | Elapsed: 4.0m | Batches with data: 7

  Crawling seed: https://ontheworldmap.com/zimbabwe/
  Crawling seed: https://mbagradschools.com/school/gem-alpine-business-schoolProgress: 14/43 (32%) | Elapsed: 4.6m | Batches with data: 9

  Crawling seed: https://play.google.com/store/apps/details?id=co.ted.jytfr&hl=en-US
  Crawling seed: https://sur.ly/i/yellowpages.co.zw/Progress: 16/43 (37%) | Elapsed: 5.0m | Batches with data: 9

  Crawling seed: https://thedirectory.co.zw/
  Crawling seed: https://thezimbabwemail.com/Progress: 18/43 (41%) | Elapsed: 5.2m | Batches with data: 9

  Crawling seed: https://wa

### Visuals

In [None]:
# Add these imports at the top of your notebook
import networkx as nx
import matplotlib.pyplot as plt
from collections import deque
from IPython.display import display, HTML

# Add this class to track BFS progress
class BFSTracker:
    def __init__(self):
        self.graph = nx.DiGraph()
        self.visited = set()
        self.queue = deque()
        self.levels = {}  # Track node levels for visualization
        self.pos = None   # Store node positions for consistent plotting

    def add_node(self, url, parent_url=None, level=0):
        if url not in self.visited:
            self.visited.add(url)
            self.graph.add_node(url, level=level)
            if parent_url is not None:
                self.graph.add_edge(parent_url, url)
            return True
        return False

    def add_to_queue(self, url, parent_url=None, level=0):
        if self.add_node(url, parent_url, level):
            self.queue.append((url, level))
            return True
        return False

    def get_next(self):
        if self.queue:
            return self.queue.popleft()
        return None, None

# Modify the crawl_zimbabweyp function to use the tracker
def crawl_zimbabweyp_with_tracking(seed_url, max_categories=50, max_pages_per_category=5):
    tracker = BFSTracker()
    session = new_session()
    results = []

    # Start with seed URL
    tracker.add_to_queue(seed_url, None, 0)
    current_level = 0
    categories_processed = 0

    while (url_level := tracker.get_next()) and categories_processed < max_categories:
        url, level = url_level
        current_level = level
        print(f"[Level {level}] Processing: {url}")

        try:
            soup = get_soup(url, session)
            if not soup:
                continue

            # Extract category links (next level)
            if level == 0 or "category" in url.lower():
                categories = zyp_extract_category_links(soup, url)
                for cat in categories[:5]:  # Limit to 5 categories per level for visualization
                    if tracker.add_to_queue(cat, url, level + 1):
                        print(f"  + Category: {cat}")

                # If this was a category page, process its listings
                if level > 0:
                    categories_processed += 1
                    pages_processed = 0
                    next_page = url

                    while next_page and pages_processed < max_pages_per_category:
                        page_soup = get_soup(next_page, session)
                        if not page_soup:
                            break

                        # Add pagination to graph
                        if pages_processed > 0:
                            tracker.add_node(next_page, url, level)

                        # Process listings...
                        # [Your existing listing processing code here]

                        # Get next page
                        next_page = zyp_find_next_page(page_soup, next_page)
                        pages_processed += 1

        except Exception as e:
            print(f"Error processing {url}: {e}")

    return tracker

def plot_bfs_tree(bfs_tracker):
    """Plot the BFS tree using NetworkX and Matplotlib"""
    plt.figure(figsize=(15, 10))

    # Create a hierarchical layout
    pos = nx.nx_agraph.graphviz_layout(bfs_tracker.graph, prog='dot')

    # Draw nodes with different colors based on level
    node_colors = []
    for node in bfs_tracker.graph.nodes():
        level = bfs_tracker.graph.nodes[node].get('level', 0)
        node_colors.append(plt.cm.tab20(level % 20))

    # Draw the graph
    nx.draw(bfs_tracker.graph, pos,
            with_labels=False,
            node_size=100,
            node_color=node_colors,
            edge_color='gray',
            arrows=True,
            alpha=0.7)

    # Add labels for the first few nodes
    for node, (x, y) in pos.items():
        label = node.split('/')[-1][:15] + '...' if len(node) > 15 else node
        plt.text(x, y, label, fontsize=8, ha='center', va='center')

    plt.title("BFS Traversal of ZimbabweYP Categories", fontsize=14)
    plt.axis('off')

    # Add legend for levels
    levels = sorted(set(nx.get_node_attributes(bfs_tracker.graph, 'level').values()))
    legend_elements = [plt.Line2D([0], [0], marker='o', color='w',
                                 markerfacecolor=plt.cm.tab20(i % 20),
                                 markersize=10,
                                 label=f'Level {i}')
                      for i in levels]
    plt.legend(handles=legend_elements, title="BFS Level", loc='upper right')

    plt.tight_layout()
    plt.show()

    # Return some stats
    stats = {
        "Total Nodes": bfs_tracker.graph.number_of_nodes(),
        "Total Edges": bfs_tracker.graph.number_of_edges(),
        "Levels": max(nx.get_node_attributes(bfs_tracker.graph, 'level').values()) + 1,
        "Categories Found": sum(1 for n, l in nx.get_node_attributes(bfs_tracker.graph, 'level').items() if l > 0)
    }

    return stats

# Example usage:
if __name__ == "__main__":
    # Run with tracking
    print("Starting BFS crawl with visualization...")
    tracker = crawl_zimbabweyp_with_tracking(
        "https://www.zimbabweyp.com/",
        max_categories=20,      # Limit for visualization
        max_pages_per_category=3
    )

    # Plot the BFS tree
    print("\nGenerating BFS visualization...")
    stats = plot_bfs_tree(tracker)

    print("\nCrawl Statistics:")
    for k, v in stats.items():
        print(f"{k}: {v}")