In [1]:
# Core
import requests, re, os
from bs4 import BeautifulSoup
from dataclasses import dataclass
from typing import List, Optional
import pycountry

# UI
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML
import pandas as pd
from tqdm.notebook import tqdm

# HuggingFace
from transformers import pipeline
from typing import List

In [2]:
import pycountry
from typing import Optional, List

def _country_to_cc(country: Optional[str]) -> str:
    """
    Accepts 'Greece', 'GR', 'gr', etc. Returns 'gr' for GL/ccTLD usage.
    Falls back to 'com' when unknown.
    """
    if not country:
        return "com"
    try:
        rec = pycountry.countries.lookup(str(country).strip())
        return rec.alpha_2.lower()
    except Exception:
        s = str(country).strip()
        return s.lower() if len(s) == 2 else "com"

def _country_name(country: Optional[str]) -> str:
    if not country:
        return ""
    try:
        rec = pycountry.countries.lookup(str(country).strip())
        # Use common_name if present, else name
        return getattr(rec, "common_name", rec.name)
    except Exception:
        return str(country)

# Optional demonym/alias map to strengthen country text matches
_DEMONYM_MAP = {
    "gr": ["greece", "greek", "hellas", "hellenic"],
    "de": ["germany", "german", "deutschland"],
    "es": ["spain", "spanish", "españa"],
    "fr": ["france", "french", "français"],
    "it": ["italy", "italian", "italia"],
    "pt": ["portugal", "portuguese", "português"],
    "nl": ["netherlands", "dutch", "holland"],
    "se": ["sweden", "swedish", "sverige"],
    "fi": ["finland", "finnish", "suomi"],
    "no": ["norway", "norwegian", "norge"],
    "dk": ["denmark", "danish", "danmark"],
    "pl": ["poland", "polish", "polska"],
    "cz": ["czech republic", "czech", "česko"],
    "hu": ["hungary", "hungarian", "magyar"],
    "tr": ["turkey", "turkish", "türkiye"],
    "uk": ["ukraine", "ukrainian"],
    "gb": ["united kingdom", "uk", "britain", "british", "england", "scotland", "wales"],
    "ie": ["ireland", "irish", "éire"],
    "ro": ["romania", "romanian", "românia"]
}

def _country_clause(country: Optional[str]) -> str:
    """
    Returns a clause like: (greece OR greek OR hellas OR hellenic)
    Uses demonyms if we know them; otherwise just the country name.
    """
    cc = _country_to_cc(country)
    if cc == "com":
        return ""  # global search
    terms = _DEMONYM_MAP.get(cc)
    if not terms:
        name = _country_name(country)
        terms = [name.lower()]
    joined = " OR ".join(sorted(set(t.lower() for t in terms)))
    return f"({joined})"


In [3]:
from urllib.parse import urlparse

def _base_url(url: str) -> str:
    """
    Extracts the base URL (scheme://netloc).
    Example: https://www.abcfund.gr/about -> https://www.abcfund.gr
    """
    try:
        parsed = urlparse(url)
        return f"{parsed.scheme}://{parsed.netloc}"
    except Exception:
        return url

In [4]:
# ---- Synonym Agent ----
class SmartSynonymAgent:
    def __init__(
        self,
        model_name: str = "mistralai/Mistral-7B-Instruct-v0.1",
        device: int = -1,          # -1 = CPU, 0 = GPU
        max_new_tokens: int = 64,
        temperature: float = 0.3,  # low = more deterministic synonyms
        top_p: float = 0.9
    ):
        self.generator = pipeline(
            task="text-generation",
            model=model_name,
            device=device
        )
        self.max_new_tokens = max_new_tokens
        self.temperature = temperature
        self.top_p = top_p

    def expand(self, topic: str, user_company: str = "", linkedin_text: str = "") -> List[str]:
        prompt = (
            f"List exactly 12 different synonyms or alternative search phrases for the topic: {topic}. "
            f"Do not repeat the topic itself. "
            "Respond only with a plain comma-separated list of terms. "
            "No numbering, no quotes, no explanations, no filler text."
        )

        try:
            out = self.generator(
                prompt,
                max_new_tokens=self.max_new_tokens,
                do_sample=True,
                temperature=self.temperature,
                top_p=self.top_p
            )[0]["generated_text"]

            raw = out.split(prompt, 1)[-1].strip()

            terms = [
                t.strip().lower().strip('"').strip("'")
                for t in raw.split(",")
                if 2 < len(t.strip()) < 40
            ]
            if not terms:
                terms = [topic.lower()]

            # Deduplicate, preserve order, always include the topic
            seen, uniq = set(), []
            for t in [topic.lower()] + terms:
                if t and t not in seen:
                    seen.add(t)
                    uniq.append(t)

            print(f"[DEBUG] SmartSynonymAgent.expand('{topic}') -> {uniq}")
            return uniq
        except Exception as e:
            print(f"[ERROR] SmartSynonymAgent failed: {e}")
            return [topic.lower()]

# ---- Keyword Agent ----

@dataclass
class KeywordAgent:
    synonym_agent: SmartSynonymAgent

    max_queries: int = 14

    INTENTS = ['"official site"','"about us"']

    def generate(self, topic: str, country_code: Optional[str] = None) -> List[str]:
        synonyms = self.synonym_agent.expand(topic)
        cc = _country_to_cc(country_code) if country_code else "com"
        country_txt = _country_clause(country_code)  # e.g., (greece OR greek OR hellas OR hellenic)
        site_clause = f"(site:.{cc} OR site:.com)" if cc != "com" else "site:.com"

        queries: List[str] = []
        for term in synonyms:
            intent_clause = "(" + " OR ".join(self.INTENTS) + ")"
            # Quote the term to keep phrase matching
            if country_txt:
                q = f'"{term}" {intent_clause} {country_txt} {site_clause}'
            else:
                q = f'"{term}" {intent_clause} {site_clause}'
            queries.append(q)

        seen, uniq = set(), []
        for q in queries:
            if q not in seen:
                seen.add(q)
                uniq.append(q)
            if len(uniq) >= self.max_queries:
                break

        print(f"[DEBUG] KeywordAgent.generate('{topic}', country='{country_code}') -> {len(uniq)} queries")
        return uniq


# ---- SerpApi Search Agent ----
class SerpApiSearchAgent:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://serpapi.com/search"

    def search(self, queries: List[str], country: str) -> List[dict]:
        results: List[dict] = []
        seen_bases = set()
        gl = _country_to_cc(country)

        for q in queries:
            params = {"engine": "google", "q": q, "hl": "en", "gl": gl, "api_key": self.api_key}
            try:
                data = requests.get(self.base_url, params=params, timeout=20).json()
            except Exception as e:
                print(f"[WARN] SerpApi request failed for '{q}': {e}")
                continue

            for r in (data.get("organic_results") or []):
                link = r.get("link") or r.get("url")
                if not link:
                    continue

                base = _base_url(link)
                if base in seen_bases:
                    continue  # skip duplicates by base domain

                seen_bases.add(base)
                results.append({
                    "title": r.get("title") or "",
                    "url": link,
                    "base": base
                })

        print(f"[DEBUG] SerpApiSearchAgent.search -> {len(results)} unique base URLs")
        return results

# ---- Filter Agent ----
class FilterAgent:
    def filter(self, results: List[dict]) -> List[dict]:
        filtered: List[dict] = []
        for r in results:
            url = r.get("url")
            if not url:
                continue
            if any(block in url for block in ["linkedin.com", "crunchbase.com", "wikipedia.org"]):
                continue
            filtered.append(r)
        print(f"[DEBUG] FilterAgent.filter -> {len(filtered)} after filtering")
        return filtered


# ---- Contact Page Finder ----

class ContactPageFinder:
    def __init__(self):
        self.headers = {"User-Agent": "Mozilla/5.0"}
        self.email_pattern = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
        self.link_keywords = [
            "contact", "about", "support", "help", "customer-service", "press", "media", "get-in-touch"
        ]
        self.fallback_paths = ["/contact", "/about", "/press", "/support", "/help"]

    def fetch(self, url: str) -> Optional[str]:
        try:
            resp = requests.get(url, headers=self.headers, timeout=10)
            if resp.status_code == 200:
                return resp.text
        except Exception as e:
            print(f"[WARN] Failed to fetch {url}: {e}")
        return None

    def extract_emails(self, html: str) -> List[str]:
        soup = BeautifulSoup(html, "html.parser")
        text_blocks = soup.find_all(string=True)
        raw_text = " ".join(t.strip() for t in text_blocks if t.strip())
        return list(set(self.email_pattern.findall(raw_text)))

    def find_relevant_links(self, soup: BeautifulSoup, base_url: str) -> List[str]:
        links = []
        for a in soup.find_all("a", href=True):
            href = a["href"].lower()
            if any(kw in href for kw in self.link_keywords):
                if href.startswith("http"):
                    links.append(href)
                elif href.startswith("/"):
                    links.append(base_url.rstrip("/") + href)
        return links

    def find(self, base_url: str) -> dict:
        homepage_html = self.fetch(base_url)
        if not homepage_html:
            return {"name": None, "homepage": base_url, "email": None}

        soup = BeautifulSoup(homepage_html, "html.parser")
        name = soup.title.string.strip() if soup.title and soup.title.string else base_url

        #Try homepage
        emails = self.extract_emails(homepage_html)

        #Discover links from homepage
        candidate_links = self.find_relevant_links(soup, base_url)

        #fallback
        for path in self.fallback_paths:
            fallback_url = base_url.rstrip("/") + path
            if fallback_url not in candidate_links:
                candidate_links.append(fallback_url)

        #scan all links until email is found
        for link in candidate_links:
            linked_html = self.fetch(link)
            if linked_html:
                found = self.extract_emails(linked_html)
                if found:
                    emails.extend(found)
                    break

        email = emails[0] if emails else None
        return {"name": name, "homepage": base_url, "email": email} 
        
class PersonalizedSearchAgent:
    def __init__(self, user_profile: dict, keyword_agent, search_agent, filter_agent):
        self.user_profile = user_profile or {}
        self.keyword_agent = keyword_agent
        self.search_agent = search_agent
        self.filter_agent = filter_agent

    def search(self, topic: str, country_code: str, max_results: int = 10):
        # Build base queries
        base_queries = self.keyword_agent.generate(topic, country_code=country_code)

        # Personalization: add negative terms to avoid your own name/company/LinkedIn handle
        neg_terms = []
        full_name = f"{self.user_profile.get('name','').strip()} {self.user_profile.get('surname','').strip()}".strip()
        if full_name:
            neg_terms.append(full_name)
        if self.user_profile.get('company'):
            neg_terms.append(self.user_profile['company'])
        lnk = self.user_profile.get('linkedin','').strip()
        if lnk:
            handle = lnk.rsplit('/', 1)[-1]
            if handle:
                neg_terms.append(handle)

        if neg_terms:
            base_queries = [q + ''.join([f' -\"{t}\"' for t in neg_terms if t]) for q in base_queries]

        results = self.search_agent.search(base_queries, country=country_code)
        results = self.filter_agent.filter(results)

        # Simple re-ranking: penalize self-signals; small boost for ccTLD
        def _score(rec):
            url = (rec.get('url') or '').lower()
            title = (rec.get('title') or '').lower()
            score = 0

            company = (self.user_profile.get('company') or '').lower()
            if company and company in (url + ' ' + title):
                score -= 5

            fn = (self.user_profile.get('name') or '').lower().strip()
            ln = (self.user_profile.get('surname') or '').lower().strip()
            if fn and ln and (fn in (url + ' ' + title)) and (ln in (url + ' ' + title)):
                score -= 3

            cc = _country_to_cc(country_code)
            if cc != 'com' and (url.endswith('.' + cc) or ('.' + cc + '/' in url)):
                score += 1

            return score

        results = sorted(results, key=_score, reverse=True)
        return results[:max_results]

In [5]:
# --- Initialization of Agents ---

# 🔑 Replace this with your own SerpApi key
API_KEY = "your_key_here" 

# Synonym Agent 

synonym_agent = SmartSynonymAgent(
    model_name="mistralai/Mistral-7B-Instruct-v0.1",
    device=0
)

# Keyword Agent
ka = KeywordAgent(synonym_agent=synonym_agent)


# SerpApi Search Agent
sa = SerpApiSearchAgent(api_key=API_KEY)

# Filter Agent
fa = FilterAgent()

# Contact Page Finder (for optional email scraping)
finder = ContactPageFinder()

# Example user profile (UI will override this)
user_profile = {
    "name": "George",
    "surname": "Papanikolaou",
    "company": "Panathenea",
    "linkedin": "https://www.linkedin.com/in/geo"
}

# Personalized Search Agent
psa = PersonalizedSearchAgent(user_profile, ka, sa, fa)

print("✅ Agents initialized and ready.")

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Device set to use mps:0


✅ Agents initialized and ready.


In [6]:
# --- UI Inputs ---
first_name_input = widgets.Text(description="First name:")
surname_input = widgets.Text(description="Surname:")
company_input = widgets.Text(description="Company:")
linkedin_input = widgets.Text(description="LinkedIn:")
topic_input = widgets.Text(description="Topic:")

country_dropdown = widgets.Dropdown(
    options=sorted([c.name for c in pycountry.countries]),
    description="Country:"
)

max_results_input = widgets.IntText(value=10, description="Max results:")

run_button = widgets.Button(description="Run Search", button_style="success")
output_area = widgets.Output()

def on_button_click(b):
    with output_area:
        clear_output()
        print("⏳ Running search...")

        user_profile = {
            "name": first_name_input.value.strip(),
            "surname": surname_input.value.strip(),
            "company": company_input.value.strip(),
            "linkedin": linkedin_input.value.strip(),
        }

        psa = PersonalizedSearchAgent(user_profile, ka, sa, fa)

        results = psa.search(
            topic=topic_input.value.strip(),
            country_code=country_dropdown.value,
            max_results=int(max_results_input.value)
        )
        print(f"🔍 {len(results)} results fetched before deduplication")

        # Deduplicate by URL
        seen, unique_results = set(), []
        for r in results:
            if r["url"] not in seen:
                seen.add(r["url"])
                unique_results.append(r)
        print(f"📌 {len(unique_results)} unique results after deduplication")

        # Extract contact info
        directory_rows = []
        for r in tqdm(unique_results, desc="Fetching contact info"):
            info = finder.find(r["url"])
            directory_rows.append({
                "Name": info.get("name") or (r.get("title") or "Unknown"),
                "Homepage": f'<a href="{r["url"]}" target="_blank">Homepage</a>',
                "Email": info.get("email") or "No email found"
            })

        # Display
        df = pd.DataFrame(directory_rows)
        display(HTML(df.to_html(escape=False, index=False)))

        # Save
        df.to_csv("directory_results.csv", index=False)
        print("✅ Results saved to directory_results.csv")

run_button.on_click(on_button_click)

# Layout
display(
    widgets.VBox([
        widgets.HBox([first_name_input, surname_input]),
        company_input,
        linkedin_input,
        topic_input,
        widgets.HBox([country_dropdown, max_results_input]),
        run_button,
        output_area
    ])
)

VBox(children=(HBox(children=(Text(value='', description='First name:'), Text(value='', description='Surname:'…