In [None]:
# import shutil

# shutil.copyfile('arxiv_scraped_data.xlsx', 'arxiv_scraped_data_backup.xlsx')

'arxiv_scraped_data_backup.xlsx'

In [2]:
from dataclasses import dataclass, field
from typing import List, Optional
import os
import requests
from pydantic import BaseModel
from openpyxl import load_workbook
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity


def _load_excel(file_name: str) -> object:
    """Load the excel file."""
    wb = load_workbook(filename=file_name)
    ws = wb.active
    return wb, ws


class FindSimilarity:
    """Find similarity between the author names and emails."""
    def preprocess_emails(self, email_list: List) -> Optional[str]:
        """Preprocess the email address to extract email author name."""
        return " ".join([email.split("@")[0].lower() for email in email_list])

    def find_email_author_and_save(self, list_of_emails: List[str]) -> object:
        """Find the email address and author name using Cosine Similarity."""
        print("\n📍 Step 9: Finding Similarity and Saving to Excel!")
        wb, ws = _load_excel("arxiv_scraped_data_backup.xlsx")
        headers = [str(cell.value).strip().lower() if cell.value else None for cell in ws[1]]

        if "email" in headers:
            email_column = headers.index("email") + 1
        else:
            email_column = ws.max_column + 1
            ws.cell(row=1, column=email_column, value="Email")

        for email in list_of_emails:
            if email.__contains__("None") or email.startswith("protected")\
                or email.__contains__("*"):
                continue
            
            print(f"Processing email: {email}")
            doc1 = self.preprocess_emails([email])
            match_found = False

            for row_index, row in enumerate(ws.iter_rows(min_row=2, max_row=ws.max_row, values_only=True), start=2):
                author_name = row[0]
                if not author_name:
                    continue

                doc2 = str(author_name)

                vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(1, 2), lowercase=True)
                tfidf_matrix = vectorizer.fit_transform([doc1, doc2])

                cosine_sim = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])

                if cosine_sim[0][0] > 0.6:
                    print(f"Match Found: {author_name} | {cosine_sim[0][0]}")
                    current_email = ws.cell(row=row_index, column=email_column).value
                    if current_email:
                        new_email = f"{current_email}, {email}"
                        ws.cell(row=row_index, column=email_column, value=new_email)
                    else:
                        ws.cell(row=row_index, column=email_column, value=email)
                    match_found = True

            if not match_found:
                last_row = ws.max_row + 1
                ws.cell(row=last_row, column=email_column, value=email)

        return wb.save("arxiv_scraped_data_backup.xlsx")


@dataclass(frozen=True)
class PerplexityConfig:
    """Perplexity API Config Class."""
    url:     str = "https://api.perplexity.ai/chat/completions"
    model:   str = "sonar-pro"
    api_key: str = field(default_factory=lambda: os.getenv("PERPLEXITY_API_KEY", ""))

    def get_headers(self) -> dict:
        """Dynamically returning headers."""
        if not self.api_key:
            raise ValueError("PERPLEXITY_API_KEY is not set.")
        return {"Authorization": f"Bearer {self.api_key}"}


class AnswerFormat(BaseModel):
    """Answer Format for Perplexity API."""
    email_adress: str


class WebSearch:
    def __init__(self, name: str, config: PerplexityConfig = PerplexityConfig()) -> None:
        """Implement PerplexityConfig to WebSearch."""
        self.config = config
        self.author_name = name

    def perplexity_search(self) -> str:
        """Search for email addresses for the provided author name."""
        print("\n📍 Step 8: [Perplexity] Extracting Email Addresses!")
        payload = {
            "model": self.config.model,
            "messages": [
                {
                    "role": "system",
                    "content": (
                        "You are a web searcher assistant. The user will provide an author name. "
                        "Your task is: Search email addresses for provided author name. "
                        "If you find no email addresses, return 'None'. "
                        "Output only the emails or 'None'—no additional explanations."
                    ),
                },
                {"role": "user", "content": f"{self.author_name} email adress"},
            ],
            "response_format": {
                "type": "json_schema",
                "json_schema": {"schema": AnswerFormat.model_json_schema()},
            },
        }

        response = requests.post(self.config.url, headers=self.config.get_headers(), json=payload)

        if response.status_code == 200:
            try:
                response_json = response.json()
                print(response_json["choices"][0]["message"]["content"])
                list_of_emails = response_json["choices"][0]["message"]["content"].split("\n")
                if list_of_emails:
                    print(f"    ✅ [INFO] Email addresses listed: {list_of_emails}")
                    return list_of_emails

            except requests.JSONDecodeError:
                print("Error: Response content is not valid JSON")
        else:
            print(f"Error: Received status code {response.status_code}")

    def browser_use(self) -> str:
        """Search for email addresses using the browser-use."""
        print("\n📍 Step 8: [Browser-Use] Extracting Email Addresses!")
        try:
            from langchain_openai import ChatOpenAI
            from browser_use import Agent
            from browser_use import BrowserConfig, Browser
            import asyncio
        except ImportError:
            raise ImportError("Please install the required packages to run this function.")
        
        browser = Browser(
            config=BrowserConfig(
                headless=True,
                disable_security=True
            )
        )

        agent = Agent(
            task=f"""
            1. Go to Google.com.
            2. Search '{self.author_name} email address and enter.
            3. Output only the emails or 'None'—no additional explanations.
            """,
            llm=ChatOpenAI(model="gpt-4o"),
            browser=browser,
        )
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(agent.run())
        return result.final_result().split("\n")

In [3]:
def browser_use(self) -> str:
    """Search for email addresses using the browser-use."""
    print("\n📍 Step 8: [Browser-Use] Extracting Email Addresses!")
    try:
        from langchain_openai import ChatOpenAI
        from browser_use import Agent
        from browser_use import BrowserConfig, Browser
        import asyncio
    except ImportError:
        raise ImportError("Please install the required packages to run this function.")
    
    browser = Browser(
        config=BrowserConfig(
            headless=True,
            disable_security=True
        )
    )

    agent = Agent(
        task=f"""
        1. Go to Google.com.
        2. Search '{self.author_name} email address and enter.
        3. Output only the emails or 'None'—no additional explanations.
        """,
        llm=ChatOpenAI(model="gpt-4o"),
        browser=browser,
    )
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(agent.run())
    return result.final_result().split("\n")

In [None]:
import nest_asyncio
nest_asyncio.apply()

def fill_empty_emails_with_search():
    wb, ws = _load_excel("arxiv_scraped_data_backup.xlsx")

    # Check if the 'emails' column exists, if not, create it
    headers = [cell.value for cell in ws[1]]
    if 'emails' not in headers:
        ws.cell(row=1, column=7, value='emails')

    for row in ws.iter_rows(min_row=2, max_row=ws.max_row, values_only=True):
        author_name = row[0]
        email_row = row[6] if len(row) > 6 else None
        if email_row is not None:
            print(f"     [INFO] Email already exists for this [Author: {author_name}]. Skipping...")
            continue

        # Step 5: Search for email addresses using the browser-use
        web_search = WebSearch(name=str(author_name))
        list_of_emails = web_search.browser_use()
        
        # Step 6: Find similarity between the author names and emails
        similarity_finder = FindSimilarity()
        similarity_finder.find_email_author_and_save(list_of_emails)
        print("-----" * 15)

if __name__ == "__main__":
    # extract_and_search()
    fill_empty_emails_with_search()


📍 Step 8: [Browser-Use] Extracting Email Addresses!
INFO     [browser_use] BrowserUse logging setup complete with level info
INFO     [root] Anonymized telemetry enabled. See https://docs.browser-use.com/development/telemetry for more information.
INFO     [agent] 🚀 Starting task: 
            1. Go to Google.com.
            2. Search 'Aneek James email address and enter.
            3. Output only the emails or 'None'—no additional explanations.
            
INFO     [agent] 📍 Step 1


  value['message'] = load(value['message'])


INFO     [agent] 🤷 Eval: Unknown - I haven't started the main task yet.
INFO     [agent] 🧠 Memory: I need to search 'Aneek James email address' and extract email addresses from the results on Google.
INFO     [agent] 🎯 Next goal: Navigate to Google.com to begin the search.
INFO     [agent] 🛠️  Action 1/1: {"open_tab":{"url":"https://www.google.com"}}
INFO     [controller] 🔗  Opened new tab with https://www.google.com
INFO     [agent] 📍 Step 2
INFO     [agent] 👍 Eval: Success - I reached Google.com and recognized the cookie consent popup.
INFO     [agent] 🧠 Memory: I need to accept cookies and then search for 'Aneek James email address'.
INFO     [agent] 🎯 Next goal: Accept cookies to proceed with the search.
INFO     [agent] 🛠️  Action 1/1: {"click_element":{"index":5}}
INFO     [controller] 🖱️  Clicked button with index 5: Alle akzeptieren
INFO     [agent] 📍 Step 3
INFO     [agent] 👍 Eval: Success - Cookies were accepted, and search bar is accessible.
INFO     [agent] 🧠 Memory: I need

In [10]:
import json
from bs4 import BeautifulSoup
import requests
import re

params = {
    'mylisting-ajax': '1',
    'action': 'get_listings',
    'form_data[page]': '0',
    'form_data[preserve_page]': 'false',
    'form_data[category]': 'wedding-venues',
    'form_data[region]': 'melbourne',
    'form_data[sort]': 'top-rated',
    'listing_type': 'place',
}

response = requests.get('https://umdearborn.edu/people-um-dearborn/venus-kakdarvishi?utm_source=chatgpt.com/', params=params)
# get all urls
results = re.findall("https://umdearborn.edu/people-um-dearborn/venus-kakdarvishi?utm_source=chatgpt.com/suppliers/wedding-venues/melbourne/[a-zA-Z-]*/",
                     response.text.replace("\\", ""))
headers = {
    'accept': '*/*',
    'accept-language': 'en-GB,en-US;q=0.9,en;q=0.8,es;q=0.7,ru;q=0.6',
    'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Safari/537.36',
}
for result in results:
    print("Navigate: " + result)
    response = requests.get(result, headers=headers)
    soup = BeautifulSoup(response.content, 'html.parser')
    scripts = soup.find_all("script")
    for script in scripts:
        if "LocalBusiness" in script.text:
            data = json.loads(script.text)
            print("Name: " + data["name"])
            print("Telephone: " + data["telephone"])
            print("Email: " + data["email"])
            break
