In [None]:
pip install selenium



In [None]:
!pip install googlemaps



In [None]:

import re
import json
import requests
import openai
import time
import csv
import pandas as pd
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from datetime import datetime
import os
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from google.colab import drive
import urllib.parse

In [None]:
# API Keys - Replace with your actual keys
GOOGLE_API_KEY = ""
OPENAI_API_KEY = ""

In [None]:
@dataclass
class Business:
    place_id: str
    business_name: str
    address: str
    phone_number: Optional[str] = None
    latitude: Optional[float] = None
    longitude: Optional[float] = None
    website: Optional[str] = None
    hr_email: Optional[str] = None
    sales_email: Optional[str] = None
    careers_page: Optional[str] = None
    sales_page: Optional[str] = None
    social_media_links: Dict[str, str] = None
    business_type: List[str] = None
    skill_ids: List[str] = None
    skill_names: List[str] = None

In [None]:
class DirectoryManager:
    """Manages the directory structure and file operations"""

    def __init__(self, base_path: str = "/content/drive/MyDrive/Kobeyo Business Directory"):
        self.base_path = base_path
        self.output_folder = os.path.join(base_path, "Output")
        self.skill_tags_folder = os.path.join(base_path, "Skill tag sheets")
        self.business_groups_file = os.path.join(base_path, "Extracted_Business_Groups_and_Types.csv")

        # Ensure directories exist
        os.makedirs(self.output_folder, exist_ok=True)
        os.makedirs(self.skill_tags_folder, exist_ok=True)

    def get_business_group_from_type(self, business_type: str) -> Optional[str]:
        """Find the business group based on business type from the main CSV"""
        try:
            if not os.path.exists(self.business_groups_file):
                print(f"❌ Business groups file not found: {self.business_groups_file}")
                return None

            df = pd.read_csv(self.business_groups_file)

            # Search for business type in the CSV
            for _, row in df.iterrows():
                # Check if business type matches any column that contains business types
                for col in df.columns:
                    if pd.notna(row[col]) and business_type.lower() in str(row[col]).lower():
                        # Return the group name (assuming first column is the group name)
                        return str(row.iloc[0])

            print(f"⚠️ No business group found for type: {business_type}")
            return None

        except Exception as e:
            print(f"❌ Error reading business groups file: {e}")
            return None

    def get_all_business_types_in_group(self, group_name: str) -> List[str]:
        """Get all business types under a specific group"""
        try:
            if not os.path.exists(self.business_groups_file):
                return []

            df = pd.read_csv(self.business_groups_file)

            # Find the row with the group name
            group_row = df[df.iloc[:, 0].str.contains(group_name, case=False, na=False)]

            if group_row.empty:
                return []

            # Extract all business types from the row (excluding the group name column)
            business_types = []
            for col in df.columns[1:]:  # Skip first column (group name)
                value = group_row.iloc[0][col]
                if pd.notna(value) and str(value).strip():
                    business_types.append(str(value).strip())

            return business_types

        except Exception as e:
            print(f"❌ Error extracting business types for group {group_name}: {e}")
            return []

    def load_skill_tags_for_group(self, group_name: str) -> pd.DataFrame:
        """Load skill tags for a specific business group"""
        try:
            # Construct the skill tags file path
            skill_file_path = os.path.join(self.skill_tags_folder, f"skill tags - {group_name}.csv")

            if not os.path.exists(skill_file_path):
                print(f"⚠️ Skill tags file not found: {skill_file_path}")
                return pd.DataFrame()

            df = pd.read_csv(skill_file_path)
            print(f"✅ Loaded skill tags for {group_name} with {len(df)} entries")
            return df

        except Exception as e:
            print(f"❌ Error loading skill tags for {group_name}: {e}")
            return pd.DataFrame()

    def get_output_file_path(self, group_name: str) -> str:
        """Get the output file path for a specific business group"""
        return os.path.join(self.output_folder, f"businesses_{group_name.replace(' ', '_').replace('&', 'and')}.csv")


In [None]:
class SmartBusinessScraper:
    def __init__(self, google_api_key: str, openai_api_key: str):
        self.google_api_key = google_api_key
        self.openai_client = openai.OpenAI(api_key=openai_api_key)
        self.places_base_url = "https://maps.googleapis.com/maps/api/place"
        self.website_context = {}  # Store scraped website context

        # Initialize directory manager
        self.directory_manager = DirectoryManager()

        # Current business group context
        self.current_group = None
        self.current_skill_tags_df = pd.DataFrame()

        # Mount Google Drive if not already mounted
        if not os.path.exists('/content/drive'):
            drive.mount('/content/drive')
        elif not os.path.ismount('/content/drive'):
             drive.mount('/content/drive')
        else:
            print("Google Drive is already mounted.")


        # Setup selenium driver
        self.setup_driver()

    def setup_driver(self):
        """Setup Chrome driver for web scraping"""
        chrome_options = Options()
        chrome_options.add_argument("--headless")
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("--disable-gpu")
        chrome_options.add_argument("--window-size=1920,1080")
        chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")
        self.driver = webdriver.Chrome(options=chrome_options)

    def categorize_business_type(self, business_type: str, location: str) -> Dict[str, Any]:
        """Categorize business type and load appropriate skill tags"""
        # Find the business group for this type
        group_name = self.directory_manager.get_business_group_from_type(business_type)

        if not group_name:
            print(f"⚠️ Using fallback: searching for similar business types")
            return self.fallback_categorization(business_type)

        # Get all business types in this group
        all_business_types = self.directory_manager.get_all_business_types_in_group(group_name)

        # Load skill tags for this group
        skill_tags_df = self.directory_manager.load_skill_tags_for_group(group_name)

        # Update current context
        self.current_group = group_name
        self.current_skill_tags_df = skill_tags_df

        return {
            "group_name": group_name,
            "all_business_types": all_business_types,
            "skill_tags_loaded": not skill_tags_df.empty,
            "skill_tags_count": len(skill_tags_df)
        }

    def fallback_categorization(self, business_type: str) -> Dict[str, Any]:
        """Fallback method when business type is not found in the main CSV"""
        # Try to guess the group based on common keywords
        group_mappings = {
            "food": ["restaurant", "cafe", "bakery", "bar", "catering"],
            "cleaning": ["cleaning", "janitorial", "maintenance", "housekeeping"],
            "animal care": ["veterinary", "pet", "animal", "grooming"]
        }

        business_type_lower = business_type.lower()

        for group, keywords in group_mappings.items():
            if any(keyword in business_type_lower for keyword in keywords):
                skill_tags_df = self.directory_manager.load_skill_tags_for_group(group.title())
                self.current_group = group.title()
                self.current_skill_tags_df = skill_tags_df

                return {
                    "group_name": group.title(),
                    "all_business_types": [business_type],
                    "skill_tags_loaded": not skill_tags_df.empty,
                    "skill_tags_count": len(skill_tags_df),
                    "fallback_used": True
                }

        # If no match found, use generic approach
        self.current_group = "Generic"
        self.current_skill_tags_df = pd.DataFrame()

        return {
            "group_name": "Generic",
            "all_business_types": [business_type],
            "skill_tags_loaded": False,
            "skill_tags_count": 0,
            "fallback_used": True
        }

    def scrape_website_content(self, url: str) -> str:
        """Scrape website content to extract skills-related information"""
        if not url:
            return ""

        try:
            # Normalize URL
            if not url.startswith(('http://', 'https://')):
                url = 'https://' + url

            self.driver.get(url)
            time.sleep(3)

            # Get page content
            page_source = self.driver.page_source
            soup = BeautifulSoup(page_source, 'html.parser')

            # Extract relevant text content
            text_content = soup.get_text()

            # Focus on skills-related content (first 2000 characters)
            skills_context = text_content[:2000].lower()

            # Store in memory
            self.website_context[url] = skills_context

            return skills_context

        except Exception as e:
            print(f"Error scraping website {url}: {e}")
            return ""

    def find_careers_and_sales_pages(self, website_url: str) -> Dict[str, str]:
        """Find careers and sales pages on the website"""
        careers_page = None
        sales_page = None

        if not website_url:
            return {"careers_page": careers_page, "sales_page": sales_page}

        try:
            if not website_url.startswith(('http://', 'https://')):
                website_url = 'https://' + website_url

            self.driver.get(website_url)
            time.sleep(2)

            # Look for careers/jobs links
            careers_keywords = ['careers', 'jobs', 'employment', 'work-with-us', 'join-our-team']
            for keyword in careers_keywords:
                try:
                    element = self.driver.find_element(By.PARTIAL_LINK_TEXT, keyword)
                    careers_page = element.get_attribute('href')
                    break
                except:
                    continue

            # Look for sales/contact links
            sales_keywords = ['sales', 'contact', 'get-quote', 'services', 'about']
            for keyword in sales_keywords:
                try:
                    element = self.driver.find_element(By.PARTIAL_LINK_TEXT, keyword)
                    sales_page = element.get_attribute('href')
                    break
                except:
                    continue

        except Exception as e:
            print(f"Error finding pages on {website_url}: {e}")

        return {"careers_page": careers_page, "sales_page": sales_page}

    def extract_emails_and_social_media(self, website_url: str) -> Dict[str, Any]:
        """Extract emails and social media links from website"""
        hr_email = None
        sales_email = None
        social_media = {"facebook": None, "instagram": None, "linkedin": None}

        if not website_url:
            return {"hr_email": hr_email, "sales_email": sales_email, "social_media": social_media}

        try:
            if not website_url.startswith(('http://', 'https://')):
                website_url = 'https://' + website_url

            self.driver.get(website_url)
            time.sleep(2)

            page_source = self.driver.page_source

            # Extract emails
            email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
            emails = re.findall(email_pattern, page_source)

            for email in emails:
                email_lower = email.lower()
                if any(keyword in email_lower for keyword in ['hr', 'jobs', 'careers', 'hiring']):
                    hr_email = email
                elif any(keyword in email_lower for keyword in ['sales', 'contact', 'info', 'hello']):
                    sales_email = email

            # Extract social media links
            social_patterns = {
                'facebook': r'https?://(?:www\.)?facebook\.com/[^/\s]+',
                'instagram': r'https?://(?:www\.)?instagram\.com/[^/\s]+',
                'linkedin': r'https?://(?:www\.)?linkedin\.com/[^/\s]+'
            }

            for platform, pattern in social_patterns.items():
                matches = re.findall(pattern, page_source)
                if matches:
                    social_media[platform] = matches[0]

        except Exception as e:
            print(f"Error extracting emails/social media from {website_url}: {e}")

        return {"hr_email": hr_email, "sales_email": sales_email, "social_media": social_media}

    def search_businesses(self, query: str, location: str = "Los Angeles, CA", max_results: int = 10) -> List[Dict]:
        """Search for businesses using Google Places API"""
        url = f"{self.places_base_url}/textsearch/json"
        params = {
            'query': f"{query} in {location}",
            'key': self.google_api_key,
            'type': 'establishment'
        }

        response = requests.get(url, params=params)

        if response.status_code != 200:
            raise Exception(f"Google Places API error: {response.status_code}")

        data = response.json()

        if data.get("status") != "OK":
            if data.get("status") == "ZERO_RESULTS":
                return []
            raise Exception(f"Google Places API error: {data.get('error_message', 'Unknown error')}")

        return data.get("results", [])[:max_results]

    def get_business_details(self, place_id: str) -> Dict:
        """Get detailed information about a specific business"""
        details_url = f"{self.places_base_url}/details/json"
        params = {
            "place_id": place_id,
            "fields": "name,formatted_address,geometry,website,formatted_phone_number,business_status,opening_hours,rating,reviews,types",
            "key": self.google_api_key
        }

        response = requests.get(details_url, params=params)

        if response.status_code == 200:
            data = response.json()
            if data.get("status") == "OK":
                return data.get("result", {})

        return {}

    def get_relevant_skills_from_group(self, business_types: List[str]) -> str:
        """Get relevant skills from the loaded skill tags DataFrame"""
        if self.current_skill_tags_df.empty:
            return "No skill tags available for this business group"

        try:
            # Get all available skills for this group
            skills_text = ""

            # Limit to first 10 skills to avoid token limits
            for _, row in self.current_skill_tags_df.head(10).iterrows():
                skill_id = row.get('Skills IDs', row.get('skill_ids', 'N/A'))
                skill_name = row.get('Skills Names', row.get('skill_names', 'N/A'))
                prompt_rule = row.get('Prompt Rule', row.get('prompt_rule', 'N/A'))

                skills_text += f"ID: {skill_id}, Name: {skill_name}, Rule: {str(prompt_rule)[:100]}...\n"

            return skills_text

        except Exception as e:
            print(f"Error processing skills from group: {e}")
            return "Error processing skills from group CSV"

    def tag_skills_with_ai(self, business_info: Dict, website_context: str) -> Dict[str, Any]:
        """Use OpenAI to tag skills based on business information and loaded skill tags"""
        business_name = business_info.get("name", "")
        business_types = business_info.get("types", [])

        # Get relevant skills from the loaded group
        relevant_skills = self.get_relevant_skills_from_group(business_types)

        # Create prompt
        prompt = f"""
Business: {business_name}
Types: {', '.join(business_types[:3])}
Context: {website_context[:200]}
Business Group: {self.current_group}

Available Skills for this group:
{relevant_skills}

Based on this business info, return relevant skill IDs and names as JSON:
{{
  "skill_ids": ["id1", "id2"],
  "skill_names": ["name1", "name2"]
}}
"""

        try:
            response = self.openai_client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[
                    {"role": "system", "content": "Tag business skills using the provided group-specific skills. Respond with valid JSON only."},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                max_tokens=200
            )

            content = response.choices[0].message.content
            json_match = re.search(r'\{.*\}', content, re.DOTALL)

            if json_match:
                return json.loads(json_match.group())
            else:
                return json.loads(content)

        except Exception as e:
            print(f"AI skill tagging error: {e}")
            return {"skill_ids": [], "skill_names": []}

    def process_business(self, place_data: Dict) -> Business:
        """Process a single business and create Business object"""
        place_id = place_data.get("place_id")
        details = self.get_business_details(place_id) if place_id else {}

        # Merge basic and detailed data
        business_info = {**place_data, **details}

        # Get website
        website = business_info.get("website")

        # Scrape website content for skills context
        website_context = self.scrape_website_content(website) if website else ""

        # Find careers and sales pages
        pages_info = self.find_careers_and_sales_pages(website)

        # Extract emails and social media
        contact_info = self.extract_emails_and_social_media(website)

        # Get AI skill tagging
        skill_info = self.tag_skills_with_ai(business_info, website_context)

        # Extract location
        geometry = business_info.get("geometry", {})
        location = geometry.get("location", {})

        return Business(
            place_id=place_id,
            business_name=business_info.get("name", ""),
            address=business_info.get("formatted_address", ""),
            phone_number=business_info.get("formatted_phone_number"),
            latitude=location.get("lat"),
            longitude=location.get("lng"),
            website=website,
            hr_email=contact_info.get("hr_email"),
            sales_email=contact_info.get("sales_email"),
            careers_page=pages_info.get("careers_page"),
            sales_page=pages_info.get("sales_page"),
            social_media_links=contact_info.get("social_media"),
            business_type=business_info.get("types", []),
            skill_ids=skill_info.get("skill_ids", []),
            skill_names=skill_info.get("skill_names", [])
        )

    def scrape_businesses_by_group(self, business_type: str, location: str = "Los Angeles, CA", max_results: int = 10) -> List[Business]:
        """Main method to scrape businesses with group-based categorization"""
        print(f"🔍 Analyzing business type: '{business_type}' in {location}")

        # Step 1: Categorize the business type and load appropriate resources
        categorization_result = self.categorize_business_type(business_type, location)
        print(f"📊 Categorization result: {categorization_result}")

        # Step 2: Get all business types in the group for comprehensive searching
        all_business_types = categorization_result["all_business_types"]
        print(f"🏢 Business types in group: {all_business_types}")

        # Step 3: Search for businesses using all types in the group
        all_businesses = []

        for biz_type in all_business_types[:3]:  # Limit to prevent too many API calls
            print(f"🔍 Searching for: {biz_type}")
            try:
                places = self.search_businesses(biz_type, location, max_results//len(all_business_types[:3]))

                for place in places:
                    business_name = place.get('name', 'Unknown')
                    print(f"Processing: {business_name}")

                    try:
                        business = self.process_business(place)
                        all_businesses.append(business)
                        time.sleep(1)  # Rate limiting
                    except Exception as e:
                        print(f"Error processing {business_name}: {e}")
                        continue

            except Exception as e:
                print(f"Error searching for {biz_type}: {e}")
                continue

        return all_businesses

    def save_to_group_output(self, businesses: List[Business]):
        """Save results to the appropriate group output file"""
        if not businesses or not self.current_group:
            print("❌ No businesses to save or no group context")
            return

        # Get the output file path for this group
        output_file_path = self.directory_manager.get_output_file_path(self.current_group)

        # Check if file exists to append or create new
        file_exists = os.path.exists(output_file_path)

        with open(output_file_path, 'a' if file_exists else 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)

            # Write header only if file is new
            if not file_exists:
                writer.writerow([
                    'place_id', 'business_name', 'address', 'phone_number', 'latitude', 'longitude',
                    'website', 'hr_email', 'sales_email', 'careers_page', 'sales_page',
                    'social_media_facebook', 'social_media_instagram', 'social_media_linkedin',
                    'business_type', 'skill_ids', 'skill_names', 'business_group', 'scraped_date'
                ])

            # Write data
            current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            for business in businesses:
                social_media = business.social_media_links or {}
                writer.writerow([
                    business.place_id,
                    business.business_name,
                    business.address,
                    business.phone_number or '',
                    business.latitude or '',
                    business.longitude or '',
                    business.website or '',
                    business.hr_email or '',
                    business.sales_email or '',
                    business.careers_page or '',
                    business.sales_page or '',
                    social_media.get('facebook', ''),
                    social_media.get('instagram', ''),
                    social_media.get('linkedin', ''),
                    '; '.join(business.business_type) if business.business_type else '',
                    '; '.join(business.skill_ids) if business.skill_ids else '',
                    '; '.join(business.skill_names) if business.skill_names else '',
                    self.current_group,
                    current_date
                ])

        print(f"✅ Results saved to {output_file_path}")
        print(f"📊 Added {len(businesses)} businesses to the {self.current_group} group file")

    def display_results(self, businesses: List[Business]):
        """Display formatted results"""
        if not businesses:
            print("❌ No businesses found matching your criteria.")
            return

        print(f"\n🎯 Found {len(businesses)} Businesses in {self.current_group} Group")
        print("=" * 80)

        for i, business in enumerate(businesses, 1):
            print(f"\n{i}. 🏢 {business.business_name}")
            print(f"   Address: {business.address}")
            print(f"   Website: {business.website or 'Not available'}")
            print(f"   Phone: {business.phone_number or 'Not available'}")
            print(f"   Skill IDs: {', '.join(business.skill_ids) if business.skill_ids else 'None'}")
            print(f"   Skill Names: {', '.join(business.skill_names) if business.skill_names else 'None'}")
            print("-" * 80)

    def __del__(self):
        """Clean up selenium driver"""
        if hasattr(self, 'driver'):
            self.driver.quit()

def get_user_input():
    """Get user input for business type and location"""
    print("🔧 Enhanced Smart Business Scraper")
    print("=" * 40)

    business_type = input("\nEnter business type (e.g., 'restaurant', 'cafe', 'cleaning service'): ").strip()
    location = input("Enter location (e.g., 'Los Angeles, CA', 'New York, NY'): ").strip()

    if not business_type:
        business_type = "restaurant"
        print(f"Using default business type: {business_type}")

    if not location:
        location = "Los Angeles, CA"
        print(f"Using default location: {location}")

    return business_type, location

def main():
    """Main function to run the enhanced scraper"""
    # Get user input
    business_type, location = get_user_input()

    # Initialize scraper
    scraper = SmartBusinessScraper(GOOGLE_API_KEY, OPENAI_API_KEY)

    try:
        print(f"\n🔍 Processing: {business_type} in {location}")

        # Use the new group-based scraping method
        businesses = scraper.scrape_businesses_by_group(business_type, location, max_results=10)

        # Display results
        scraper.display_results(businesses)

        # Save to group-specific output file
        scraper.save_to_group_output(businesses)

        print(f"✅ Completed processing for {scraper.current_group} group")

    except Exception as e:
        print(f"❌ Error processing '{business_type}' in '{location}': {e}")

    print("\n🎉 Enhanced business scraping completed!")

if __name__ == "__main__":
    main()

🔧 Enhanced Smart Business Scraper

Enter business type (e.g., 'restaurant', 'cafe', 'cleaning service'): Cleaning services
Enter location (e.g., 'Los Angeles, CA', 'New York, NY'): LA


ValueError: Mountpoint must not already contain files