In [21]:
# import necessary Libraries
import os
import requests
import json
import time
import random
import pandas as pd
from typing import List
from dotenv import load_dotenv
from bs4 import BeautifulSoup
from IPython.display import display,Markdown,update_display
from openai import OpenAI
import csv
import re

In [23]:
# initialize and constants
load_dotenv()
api_key = os.getenv('OPENAI_API_KEY')

if api_key and api_key.startswith('sk-proj-') and len(api_key)>10:
    print("API key looks good so far")
else:
    print("There might be a problem with your API key? Please visit the troubleshooting notebook!")
    
MODEL = 'gpt-4o-mini'
openai = OpenAI()

API key looks good so far


In [25]:
# Define a function to fetch the webpage.
# This is like a sensor reading in a robot: we need fresh data from the environment.
def fetch_page(url, retries=3, delay=2):
    """
    Fetch the content of the page with a simple retry mechanism.
    This retry logic is our basic "control strategy" to ensure robustness.
    """
    headers = {
        'User-Agent': 'Mozilla/5.0 (compatible; LeadgenScraper/1.0; +http://google.com)'
    }
    for attempt in range(retries):
        try:
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()
            print(f"[INFO] Successfully fetched: {url}")
            return response.text
        except Exception as e:
            print(f"[WARN] Attempt {attempt+1}: Failed to fetch {url}. Reason: {e}")
            time.sleep(delay)
    print("[ERROR] All fetch attempts failed.")
    return None

In [45]:
fetch_page('https://reemanbot.com/contact/')

[INFO] Successfully fetched: https://reemanbot.com/contact/


'<!DOCTYPE html> <html dir="ltr" lang="en-US" prefix="og: https://ogp.me/ns#"> <head><script>if(navigator.userAgent.match(/MSIE|Internet Explorer/i)||navigator.userAgent.match(/Trident\\/7\\..*?rv:11/i)){let e=document.location.href;if(!e.match(/[?&]nonitro/)){if(e.indexOf("?")==-1){if(e.indexOf("#")==-1){document.location.href=e+"?nonitro=1"}else{document.location.href=e.replace("#","?nonitro=1#")}}else{if(e.indexOf("#")==-1){document.location.href=e+"&nonitro=1"}else{document.location.href=e.replace("#","&nonitro=1#")}}}}</script><link rel="preconnect" href="https://cdn-ilbioil.nitrocdn.com" /><meta charset="UTF-8" /><meta name="viewport" content="width=device-width, initial-scale=1" /><title>Contact - REEMAN Robot</title><meta name="description" content="If you are interested in Reeman&#039;s Big Dog, Fly Boat, Ironbove, Ironhide, Stackman, Moon Knight robot chassis, and other machine products, please contact Reeman." /><meta name="robots" content="max-image-preview:large" /><meta n

In [29]:
# Parse the HTML content to extract potential lead data.
# This function plays the role of a decision-making module in our control loop.
def parse_leads(html_content):
    """
    Use BeautifulSoup to parse the page and extract lead information.
    For demonstration, we focus on extracting email addresses and phone numbers.
    """
    soup = BeautifulSoup(html_content, 'html.parser')
    text = soup.get_text()
    
    # Use regex to find email addresses (a typical lead component)
    emails = re.findall(r'[\w\.-]+@[\w\.-]+\.\w+', text)
    
    # Optionally, add phone number extraction (example pattern, can be refined)
    phone_pattern = r'\+?\d[\d\-\(\) ]{7,}\d'
    phones = re.findall(phone_pattern, text)
    
    # Removing duplicates by converting to a set then back to list
    emails = list(set(emails))
    phones = list(set(phones))
    
    # Combine results into a list of dictionaries
    leads = []
    for email in emails:
        leads.append({'type': 'email', 'value': email})
    for phone in phones:
        leads.append({'type': 'phone', 'value': phone})
    
    print(f"[INFO] Found {len(leads)} potential leads.")
    return leads

In [43]:
parse_leads(fetch_page('https://reemanbot.com/contact/'))

[INFO] Successfully fetched: https://reemanbot.com/contact/
[INFO] Found 5 potential leads.


[{'type': 'email', 'value': 'reeman.sales@reeman.cn'},
 {'type': 'email', 'value': '86-18665898745Emailreeman.sales@reeman.cn'},
 {'type': 'phone', 'value': '+86-186 6589 8745'},
 {'type': 'phone', 'value': '+86-18665898745'},
 {'type': 'phone', 'value': '+86-755-86239429'}]

In [33]:
def save_leads(leads,filename='leads.csv'):
    """
    store the extracted lead information into a csv file
    This is analogous to actuators in a control system where decisions are implemented
    """
    with open(filename,mode='w',newline='',encoding='utf-8') as file:
        writer = csv.DictWriter(filename,filenames=['type','value'])
        writer.writeheader()
        for lead in leads:
            writer.writerow(leads)
        print("[INFO] Leads saved to filename {}".format(filename))
        

In [85]:
# Import necessary libraries
import requests
import pandas as pd
from bs4 import BeautifulSoup # Keep for potential pre-processing
import time
import random
import os
import json
from dotenv import load_dotenv
from openai import OpenAI, RateLimitError, APIError
import urllib.parse

# --- OpenAI Initialization ---
load_dotenv()
api_key = os.getenv('OPENAI_API_KEY')
openai_client = None
MODEL = 'gpt-4o-mini'
if api_key and api_key.startswith('sk-') and len(api_key) > 10:
    print("OpenAI API key loaded.")
    try:
        openai_client = OpenAI()
        print(f"Using OpenAI model: {MODEL}")
    except Exception as e:
        print(f"Error initializing OpenAI client: {e}")
else:
    print("Error loading OpenAI API key or key invalid.")

# --- Configuration ---
# Target Directory: Set BASE_URL to your chosen directory.
# BASE_URL = "https://www.yellowpages.com"
BASE_URL = "https://www.justdial.com" # Targeting Justdial

HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36',
    'Accept-Language': 'en-US,en;q=0.9',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
    'Referer': f'{BASE_URL}/' # Referer can sometimes help bypass simple blocks
}

# --- Helper Functions ---

def fetch_page(url, params=None, timeout=60):
    """ Fetches the content of a web page. """
    print(f"Attempting to fetch: {url} with params: {params}")
    try:
        response = requests.get(url, headers=HEADERS, params=params, timeout=timeout)
        response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
        print(f"Successfully fetched: {response.url} (Status: {response.status_code})")
        # Add site-specific checks if needed
        if "justdial.com" in url and response.status_code != 200:
             print(f"Warning: Received status code {response.status_code} from Justdial.")
        # Check for common blocking indicators in content (very basic)
        if "blocked" in response.text.lower() or "captcha" in response.text.lower():
            print("Warning: Page content might indicate blocking or CAPTCHA.")
        return response.text
    except requests.exceptions.Timeout:
        print(f"Error fetching {url}: Read timed out after {timeout} seconds.")
        return None
    except requests.exceptions.RequestException as e:
        print(f"Error fetching {url}: {e}")
        return None

# --- Normalization Helper ---
def normalize_lead_data(lead_list):
    """ Normalizes data within the extracted list of leads. """
    normalized_data = []
    if not isinstance(lead_list, list):
        print("Normalization error: Input is not a list.")
        return []
    for lead in lead_list:
        if isinstance(lead, dict):
            normalized_lead = { k: lead.get(k) for k in ['name', 'phone', 'email'] }
            for key, value in normalized_lead.items():
                 if value is None or (isinstance(value, str) and not value.strip()): normalized_lead[key] = None
            if normalized_lead.get('name') or normalized_lead.get('phone'):
                 normalized_data.append(normalized_lead)
            # else: print(f"Skipping normalized lead (no name/phone): {normalized_lead}") # Debug
        # else: print(f"Warning: Non-dict item during normalization: {lead}") # Debug
    return normalized_data

# --- Optional Pre-processing Helper ---
def preprocess_html(html_content, max_chars=50000):
    """ Basic HTML pre-processing: remove scripts/styles, limit length. """
    if not html_content: return ""
    try:
        soup = BeautifulSoup(html_content, 'html.parser')
        for script_or_style in soup(["script", "style", "nav", "footer", "aside"]): # Remove common non-lead sections
            script_or_style.decompose()
        # Attempt to find a main content area (selectors vary greatly by site)
        main_content = soup.find('main') or soup.find('div', id='main-content') or soup.find('div', class_='content') or soup.body
        text = main_content.get_text(separator=' ', strip=True) if main_content else soup.get_text(separator=' ', strip=True)

        if len(text) > max_chars:
            print(f"Pre-processed text truncated from {len(text)} to {max_chars} characters.")
            return text[:max_chars]
        else:
            print(f"Pre-processed text length: {len(text)} characters.")
            return text
    except Exception as e:
        print(f"Error during HTML pre-processing: {e}")
        return html_content[:max_chars] if len(html_content) > max_chars else html_content

# --- OpenAI Extraction Function ---
def extract_leads_from_page_with_openai(full_html_content, use_preprocessing=False):
    """
    Uses OpenAI API's JSON mode to identify and extract all leads from a directory search results page HTML.
    """
    if not openai_client: print("OpenAI client not initialized."); return None
    if not full_html_content: print("No HTML content provided."); return None

    print(f"Original HTML content length: {len(full_html_content)} characters.")
    if len(full_html_content) < 500: print("Warning: HTML content is very short.")

    processed_content = full_html_content
    if use_preprocessing:
        print("Attempting HTML pre-processing...")
        processed_content = preprocess_html(full_html_content)
        if not processed_content: print("Pre-processing failed. Aborting."); return None

    # System prompt focused on extracting MULTIPLE leads from a DIRECTORY SEARCH RESULTS page
    system_prompt = """
You are an expert web scraping assistant. Your task is to analyze the provided text content (extracted from an online directory's search results page HTML), identify all distinct business listings/leads presented on that page, and extract specific information for each lead found.

Your output MUST be a single, valid JSON array (a list) containing JSON objects.
Each JSON object in the array represents one business lead found on the page and MUST contain the keys "name", "phone", and "email".
- Extract the most plausible business name for the 'name' key.
- Extract the primary phone number for the 'phone' key.
- Extract the email address for the 'email' key.
If a piece of information (especially email) is not found for a specific lead, use null or an empty string for its value within that lead's JSON object.
If no leads are found on the page, return an empty JSON array: []. Do NOT return an empty JSON object {}.
Do not include any explanatory text, comments, or markdown formatting before or after the JSON array. Strictly output only the JSON array.
"""
    input_description = "HTML document" if not use_preprocessing else "text content extracted from an HTML document"
    user_prompt = f"""
    Please analyze the following {input_description} from a directory search results page, identify all business leads listed, and extract the name, phone number, and email address for each. Return the results strictly as a JSON array of objects.

    Content:
    ```
    {processed_content}
    ```
    """
    json_string = None
    try:
        print("Sending directory page request to OpenAI API (JSON mode)...")
        start_time = time.time()
        response = openai_client.chat.completions.create(
            model=MODEL,
            messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ],
            response_format={"type": "json_object"},
            temperature=0.1,
            max_tokens=3500 # May need adjustment based on directory page density
        )
        end_time = time.time(); print(f"OpenAI API call took {end_time - start_time:.2f} seconds.")
        json_string = response.choices[0].message.content
        print(f"--- OpenAI Raw JSON String Response ---\n{json_string}\n------------------------------------") # Log raw response
        data = json.loads(json_string)
        leads_list = None
        if isinstance(data, dict) and not data:
             print("Error: OpenAI returned an empty JSON object {}. Interpreting as no leads found.")
             return [] # Return empty list
        if isinstance(data, list):
            print(f"OpenAI returned a JSON array directly.")
            leads_list = data
        elif isinstance(data, dict):
            print(f"OpenAI returned a JSON object. Searching for list value...")
            found_list = False
            for key, value in data.items():
                if isinstance(value, list):
                    print(f"Found list value for key '{key}'. Assuming this is leads list.")
                    leads_list = value; found_list = True; break
            if not found_list:
                 print(f"Error: OpenAI returned dict, but no list value found. Keys: {list(data.keys())}")
                 return None # Indicate structural error
        else:
            print(f"Error: OpenAI response not list or dict. Type: {type(data)}")
            return None # Indicate structural error

        normalized_leads = normalize_lead_data(leads_list)
        print(f"Successfully extracted and normalized {len(normalized_leads)} leads from this page.")
        return normalized_leads

    except json.JSONDecodeError as e: print(f"Error parsing JSON: {e}\nRaw content: {json_string[:500]}..."); return None
    except RateLimitError: print("OpenAI API rate limit exceeded."); return None
    except APIError as e: print(f"OpenAI API error: {e}"); return None
    except Exception as e: print(f"Unexpected OpenAI processing error: {e}"); return None

def save_to_csv(data, filename="leads_directory.csv"): # Updated default filename
    """ Saves the extracted data (list of dicts) to a CSV file. """
    if data is None: print("Processing error. No data to save."); return
    if not isinstance(data, list): print(f"Invalid data format. Expected list, got {type(data)}."); return
    if not data: print("Data list empty. No leads found/extracted. No CSV created."); return
    if not all(isinstance(item, dict) for item in data): print("Invalid data format. List items not all dicts."); return
    try:
        all_keys = set().union(*(d.keys() for d in data))
        processed_data = [{k: d.get(k) for k in all_keys} for d in data]
        df = pd.DataFrame(processed_data)
        cols_order = ['name', 'phone', 'email']
        existing_cols_order = [col for col in cols_order if col in df.columns]
        remaining_cols = [col for col in df.columns if col not in existing_cols_order]
        df = df[existing_cols_order + remaining_cols]
        df.to_csv(filename, index=False, encoding='utf-8')
        print(f"Successfully saved {len(df)} leads to {filename}")
    except Exception as e: print(f"Error saving data to CSV: {e}")

# --- Main Execution Logic ---
if __name__ == "__main__":
    if not openai_client:
        print("\nOpenAI client failed to initialize. Exiting.")
    else:
        business_type = input("Enter the type of business to search for (e.g., plumbers): ")
        location = input("Enter the location (e.g., Vellore, Tamil Nadu): ")

        search_url = None
        search_params = None

        # --- Construct URL and Params based on the chosen BASE_URL ---
        print(f"\nConfiguring search for target: {BASE_URL}")
        if "justdial.com" in BASE_URL:
            # Justdial uses path structure: /City/Search-Term
            try:
                # Basic formatting, might need adjustment for specific Justdial rules
                city = location.split(',')[0].strip().replace(' ', '-')
                search_term = business_type.strip().replace(' ', '-')
                # URL-encode the components
                encoded_city = urllib.parse.quote(city)
                encoded_search = urllib.parse.quote(search_term)
                search_url = f"{BASE_URL.rstrip('/')}/{encoded_city}/{encoded_search}" # Ensure single slash
                print(f"Constructed Justdial URL: {search_url}")
            except Exception as e:
                print(f"Error constructing Justdial URL from location '{location}': {e}")
                search_url = BASE_URL # Fallback
            search_params = None # No params needed

        elif "yellowpages.com" in BASE_URL:
             # Yellowpages uses parameters
             search_url = f"{BASE_URL.rstrip('/')}/search"
             search_params = {'search_terms': business_type, 'geo_location_terms': location}

        # Add elif blocks here for other directories if needed

        else: # Default fallback (e.g., if BASE_URL wasn't recognized)
             print(f"Warning: BASE_URL '{BASE_URL}' not specifically handled. Using generic parameter approach.")
             search_url = BASE_URL # Assume BASE_URL is the full search endpoint
             search_params = {'q': f"{business_type} in {location}"} # Generic 'q' param

        # --- Execute Search and Extraction ---
        if not search_url:
             print("Could not determine search URL. Exiting.")
        else:
            print(f"\nStep 1: Fetching directory search results...")
            # Fetch the directory search results page
            html_content = fetch_page(search_url, params=search_params)

            if html_content:
                print(f"\nStep 2: Analyzing directory page content with OpenAI...")
                # Analyze the directory page content directly
                # Set use_preprocessing=True to try cleaning HTML first (recommended for complex directories)
                extracted_data = extract_leads_from_page_with_openai(html_content, use_preprocessing=True)

                print(f"\nStep 3: Saving results...")
                output_filename = f"{business_type.replace(' ', '_')}_{location.replace(' ', '_')}_leads_directory.csv"
                save_to_csv(extracted_data, output_filename) # Handles None, [], or list of dicts
            else:
                print("Failed to retrieve the directory search results page. Cannot proceed.")

        print("\nScraping process finished.")



OpenAI API key loaded.
Using OpenAI model: gpt-4o-mini


Enter the type of business to search for (e.g., plumbers):  plumbers
Enter the location (e.g., Vellore, Tamil Nadu):  vellore



Configuring search for target: https://www.justdial.com
Constructed Justdial URL: https://www.justdial.com/vellore/plumbers

Step 1: Fetching directory search results...
Attempting to fetch: https://www.justdial.com/vellore/plumbers with params: None
Error fetching https://www.justdial.com/vellore/plumbers: Read timed out after 60 seconds.
Failed to retrieve the directory search results page. Cannot proceed.

Scraping process finished.
