<a href="https://colab.research.google.com/github/AyushSoni14/Email_WebScraping_Project/blob/main/WebScraping.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import requests
from bs4 import BeautifulSoup
import re
import time
import random
from urllib.parse import urljoin
import concurrent.futures
from datetime import datetime

def extract_emails(text):
    """
    Extract email addresses from text, including obfuscated ones.
    Handles formats like 'name@domain.com', 'name at domain dot com',
    'name[at]domain[dot]com', etc.
    """
    # Standard email regex pattern
    standard_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
    standard_emails = re.findall(standard_pattern, text)

    # Obfuscated email pattern - for "at" and "dot" replacements
    # This handles various formats like "name at domain dot com" or "name [at] domain [dot] com"
    obfuscated_pattern = r'[a-zA-Z0-9._%+-]+\s*(?:\[at\]|\(at\)|{at}|\sat\s)\s*[a-zA-Z0-9.-]+\s*(?:\[dot\]|\(dot\)|{dot}|\sdot\s)\s*[a-zA-Z]{2,}'
    obfuscated_emails = re.findall(obfuscated_pattern, text, re.IGNORECASE)

    # Clean up obfuscated emails
    cleaned_obfuscated = []
    for email in obfuscated_emails:
        # Replace obfuscation with proper characters
        cleaned = email.lower()
        cleaned = re.sub(r'\s*\[at\]\s*|\s*\(at\)\s*|\s*{at}\s*|\s+at\s+', '@', cleaned)
        cleaned = re.sub(r'\s*\[dot\]\s*|\s*\(dot\)\s*|\s*{dot}\s*|\s+dot\s+', '.', cleaned)
        # Remove any remaining whitespace
        cleaned = cleaned.replace(' ', '')
        cleaned_obfuscated.append(cleaned)

    # Cleaning the extracted email
    all_emails = standard_emails + cleaned_obfuscated
    filtered_emails = [email for email in all_emails if not email.endswith(('.png', '.jpg', '.jpeg', '.gif')) and not re.match(r'^\d', email)]

    return filtered_emails

def refang_email(obfuscated_email):
    """Convert obfuscated email to standard format"""
    # Handle spaces and brackets around "at" and "dot"
    email = obfuscated_email.lower()
    email = re.sub(r'\s*\[at\]\s*|\s*\(at\)\s*|\s*{at}\s*|\s+at\s+', '@', email)
    email = re.sub(r'\s*\[dot\]\s*|\s*\(dot\)\s*|\s*{dot}\s*|\s+dot\s+', '.', email)
    # Remove any remaining whitespace or brackets
    email = re.sub(r'[\[\]\(\)\{\}\s]', '', email)
    return email

def scrape_emails_from_url(url, row_num):
    print(f"Processing row {row_num}: {url}")

    if not url or not isinstance(url, str):
        return ""

    # https:// + url  since in the sheet it does not contain https://
    if not url.startswith(('http://', 'https://')):
        url = 'https://' + url

    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    }

    all_emails = set()
    visited_urls = set()


    specific_paths = [
        '/support',
        '/about',
        '/team',
        '/careers',
        '/help',
        '/newsletter',
        '/privacy',
        'contact/us',
        'contact-us',
        '/contacto',
        '/privacy',
        '/sales',
        '/legal',
        '/partners',
        '/feedback',
        '/privacy-policy',
        '/info',
        '/work-with-us',
        '/sign-up',
        '/contact-opnemen',
        '/terms',
        '/disclaimer',
        '/get-in-touch',
        '/email',
        '/'

    ]


    found_emails_on_specific_paths = False

    # cheking specific paths
    for path in specific_paths:
        specific_url = urljoin(url, path)
        if specific_url in visited_urls:
            continue

        visited_urls.add(specific_url)

        try:
            print(f"  Checking specific path: {specific_url}")
            response = requests.get(specific_url, headers=headers, timeout=10)

            if response.status_code == 200:
                # Extract emails from the specific path page
                path_emails = extract_emails(response.text)
                if path_emails:
                    found_emails_on_specific_paths = True
                    all_emails.update(path_emails)


                soup = BeautifulSoup(response.text, 'html.parser')

                # Extract emails from mailto links
                for link in soup.find_all('a', href=True):
                    href = link['href']
                    if href.startswith('mailto:'):
                        email = href[7:].split('?')[0].strip()
                        if '@' in email:
                            found_emails_on_specific_paths = True
                            all_emails.add(email)


            time.sleep(random.uniform(0.5, 1.0))

        except Exception as e:
            print(f"  Error checking specific path {path}: {str(e)[:100]}")

    # If we've found emails from specific paths, return them without checking further
    if found_emails_on_specific_paths:
        return ', '.join(all_emails)

    # Otherwise, fall back to the original logic
    try:

        response = requests.get(url, headers=headers, timeout=15)
        if response.status_code == 200:
            # Extract emails from the main page
            #main_page_emails = extract_emails(response.text)
            #all_emails.update(main_page_emails)


            soup = BeautifulSoup(response.text, 'html.parser')

            # Extract emails from mailto links
            for link in soup.find_all('a', href=True):
                href = link['href']
                if href.startswith('mailto:'):
                    email = href[7:].split('?')[0].strip()
                    if '@' in email:
                        all_emails.add(email)

            # Look for contact/about pages
            contact_links = []
            contact_keywords = ['contact', 'kontakt', 'about', 'about-us', 'email', 'get-in-touch', 'reach-us']
            for link in soup.find_all('a', href=True):
                href = link['href']
                text = link.text.lower() if link.text else ""

                if any(keyword in text.lower() or keyword in href.lower() for keyword in contact_keywords):
                    full_url = urljoin(url, href)
                    if full_url not in visited_urls:
                        contact_links.append(full_url)

            # Visit up to 3 contact/about pages
            for contact_url in contact_links[:3]:
                visited_urls.add(contact_url)
                try:
                    contact_response = requests.get(contact_url, headers=headers, timeout=10)
                    if contact_response.status_code == 200:

                        contact_emails = extract_emails(contact_response.text)
                        all_emails.update(contact_emails)

                        # Extract emails from mailto links on contact page
                        contact_soup = BeautifulSoup(contact_response.text, 'html.parser')
                        for link in contact_soup.find_all('a', href=True):
                            href = link['href']
                            if href.startswith('mailto:'):
                                email = href[7:].split('?')[0].strip()
                                if '@' in email:
                                    all_emails.add(email)
                except:
                    pass


                time.sleep(random.uniform(1.0, 2.0))
    except Exception as e:
        print(f"  Error processing row {row_num}: {str(e)[:100]}")

    return ', '.join(all_emails) if all_emails else ""

def process_url(args):
    """Process a single URL with row tracking"""
    index, row_num, url = args
    try:
        emails = scrape_emails_from_url(url, row_num)
        return index, emails
    except Exception as e:
        print(f"  Failed to process row {row_num}: {str(e)[:100]}")
        return index, ""

def process_excel_file(max_workers=5):
    """
    Process websites.xlsx file containing URLs and extract emails.
    Calculate and report efficiency metrics.
    """
    input_file = "websites.xlsx"
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_file = f"websites_with_emails_{timestamp}.xlsx"

    try:

        df = pd.read_excel(input_file)
        print(f"Loaded {len(df)} rows from {input_file}")


        if 'Site link' not in df.columns:
            print("Error: Excel file must contain a 'Site link' column")
            return


        if 'E mail' not in df.columns:
            df['E mail'] = ""


        to_process = []

        for index, row in df.iterrows():

            to_process.append((index, index+1, row['Site link']))
        if not to_process:
            print("No URLs found to process")
            return

        total_urls = len(to_process)
        print(f"Processing {total_urls} URLs...")


        successful_extractions = 0


        batch_size = 10
        for i in range(0, total_urls, batch_size):
            batch = to_process[i:i+batch_size]
            print(f"Processing batch {i//batch_size + 1}/{(total_urls + batch_size - 1)//batch_size}")


            results = []
            if max_workers <= 1:

                for args in batch:
                    results.append(process_url(args))
            else:

                with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
                    results = list(executor.map(process_url, batch))


            for index, emails in results:
                df.at[index, 'E mail'] = emails
                if emails:
                    successful_extractions += 1


            df.to_excel(output_file, index=False)


            current_efficiency = (successful_extractions / (i + len(batch))) * 100
            print(f"Current efficiency: {successful_extractions}/{i + len(batch)} URLs ({current_efficiency:.2f}%)")


        final_efficiency = (successful_extractions / total_urls) * 100


        df_stats = pd.DataFrame({
            'Metric': ['Total URLs', 'Successful Extractions', 'Efficiency (%)'],
            'Value': [total_urls, successful_extractions, f"{final_efficiency:.2f}%"]
        })


        with pd.ExcelWriter(output_file) as writer:
            df.to_excel(writer, sheet_name='Emails', index=False)
            df_stats.to_excel(writer, sheet_name='Efficiency Metrics', index=False)

        print(f"\nFinal efficiency: {successful_extractions}/{total_urls} URLs ({final_efficiency:.2f}%)")
        print(f"Results saved to: {output_file}")

    except Exception as e:
        print(f"Error: {str(e)}")

if __name__ == "__main__":

    workers = input("Enter the number of concurrent workers (1-10, default 3): ")

    try:
        max_workers = int(workers) if workers else 3
        max_workers = min(max(1, max_workers), 10)
    except:
        max_workers = 3

    print(f"Using {max_workers} concurrent workers")
    print(f"Reading websites from 'websites.xlsx' file")
    process_excel_file(max_workers=max_workers)


Enter the number of concurrent workers (1-10, default 3): 8
Using 8 concurrent workers
Reading websites from 'websites.xlsx' file
Loaded 1001 rows from websites.xlsx
Processing 1001 URLs...
Processing batch 1/101
Processing row 1: optimalbux.com
  Checking specific path: https://optimalbux.com/support
Processing row 2: thepiejobs.com
  Checking specific path: https://thepiejobs.com/support
Processing row 3: studijos.lt
  Checking specific path: https://studijos.lt/support
Processing row 4: madlemmings.com
  Checking specific path: https://madlemmings.com/support
Processing row 5: 6mejores.com
  Checking specific path: https://6mejores.com/support
Processing row 6: paytobrowse.blogspot.com
  Checking specific path: https://paytobrowse.blogspot.com/support
Processing row 7: health-recovery-solution.blogspot.com
  Checking specific path: https://health-recovery-solution.blogspot.com/support
Processing row 8: mrsdaakustudio.com
  Checking specific path: https://mrsdaakustudio.com/support
 

  df.at[index, 'E mail'] = emails


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  Checking specific path: https://xboxcodefreenow.blogspot.com/support
  Error checking specific path /about: HTTPSConnectionPool(host='sparesomebitcoin.com', port=443): Max retries exceeded with url: /about (C
  Checking specific path: https://sparesomebitcoin.com/team
  Error checking specific path /team: HTTPSConnectionPool(host='mixerpoint.com', port=443): Max retries exceeded with url: /team (Caused b
  Checking specific path: https://mixerpoint.com/careers
  Error checking specific path /support: HTTPSConnectionPool(host='tombarnesmusic.com', port=443): Max retries exceeded with url: /support (C
  Checking specific path: https://tombarnesmusic.com/about
  Error checking specific path /about: HTTPSConnectionPool(host='yourmakeithappencoach.com', port=443): Max retries exceeded with url: /abo
  Checking specific path: https://yourmakeithappencoach.com/team
  Error checking specific path /careers: HTTPSConnectionPool(h