In [None]:
from credentials import credentials

import requests
import pandas as pd
import time

In [None]:
centers_switzerland = [
    "Zurich, Switzerland",
    "Geneva, Switzerland",
    "Basel, Switzerland",
    "Lausanne, Switzerland",
    "Bern, Switzerland",
    "Winterthur, Switzerland",
    "Lucerne, Switzerland",
    "St. Gallen, Switzerland",
    "Lugano, Switzerland",
    "Biel/Bienne, Switzerland",
    "Thun, Switzerland",
    "Köniz, Switzerland",
    "La Chaux-de-Fonds, Switzerland",
    "Schaffhausen, Switzerland",
    "Fribourg, Switzerland",
    "Chur, Switzerland",
    "Neuchâtel, Switzerland",
    "Uster, Switzerland",
    "Sion, Switzerland"
]

centers_germany = [
    "Berlin, Germany",
    "Hamburg, Germany",
    "Munich, Germany",
    "Cologne, Germany",
    "Frankfurt, Germany",
    "Stuttgart, Germany",
    "Düsseldorf, Germany",
    "Dortmund, Germany",
    "Essen, Germany",
    "Leipzig, Germany",
    "Bremen, Germany",
    "Dresden, Germany",
    "Hanover, Germany",
    "Nuremberg, Germany",
    "Duisburg, Germany",
    "Bochum, Germany",
    "Wuppertal, Germany",
    "Bielefeld, Germany",
    "Bonn, Germany",
    "Münster, Germany",
    "Karlsruhe, Germany",
    "Mannheim, Germany",
    "Augsburg, Germany",
    "Wiesbaden, Germany",
    "Gelsenkirchen, Germany",
    "Mönchengladbach, Germany",
    "Braunschweig, Germany",
    "Chemnitz, Germany",
    "Kiel, Germany",
    "Aachen, Germany",
    "Halle, Germany",
    "Magdeburg, Germany",
    "Freiburg, Germany",
    "Krefeld, Germany",
    "Lübeck, Germany",
    "Oberhausen, Germany",
    "Erfurt, Germany",
    "Mainz, Germany",
    "Rostock, Germany",
    "Kassel, Germany"
]

In [None]:
# Google Maps API Key
API_KEY = credentials['api_key']

# Google Places and Place Details endpoint URLs
BASE_PLACES_URL = 'https://maps.googleapis.com/maps/api/place/nearbysearch/json'
PLACE_DETAILS_URL = 'https://maps.googleapis.com/maps/api/place/details/json'


def get_coordinates_for_city(city_name):
    """Fetch the latitude and longitude for a given city using Google's Geocoding API."""
    print(city_name)
    
    params = {
        'address': city_name,
        'key': API_KEY
    }
    
    response = requests.get(BASE_GEOCODING_URL, params=params)
    location_data = response.json()
    print(location_data)
    location = location_data['results'][0]['geometry']['location']
    
    return location['lat'], location['lng']


def get_place_details(place_id):
    """Retrieve detailed information for a given place using its place_id."""
    
    params = {
        'place_id': place_id,
        'fields': 'name,address_component,formatted_phone_number,website',
        'key': API_KEY
    }
    
    response = requests.get(PLACE_DETAILS_URL, params=params)
    
    return response.json().get('result', {})


def get_boutiques_for_center(lat, lng):
    """Fetch boutiques near a given latitude and longitude."""
    
    boutiques = []
    params = {
        'location': f'{lat},{lng}',
        'radius': 15000,
        'type': 'store',
        'keyword': 'boutique',
        'key': API_KEY
    }
    
    response = requests.get(BASE_PLACES_URL, params=params)
    results = response.json()
    
    boutiques.extend(results.get('results', []))
    
    # Get additional results from next pages (if they exist)
    while 'next_page_token' in results:
        time.sleep(2)  # Wait a bit before next request to ensure the token is valid
        params['pagetoken'] = results['next_page_token']
        response = requests.get(BASE_PLACES_URL, params=params)
        results = response.json()
        boutiques.extend(results.get('results', []))
    
    return boutiques


def process_centers(centers):
    """Retrieve boutiques details for a list of city centers."""
    
    all_boutiques = []
    
    for center in centers:
        lat, lng = get_coordinates_for_city(center)
        boutiques = get_boutiques_for_center(lat, lng)
        
        for boutique in boutiques:
            details = get_place_details(boutique['place_id'])

            # Extracting required information
            name = details.get('name', '')
            address = ''.join([component['long_name'] + ' ' for component in details.get('address_components', []) if component['types'][0] in ['route', 'street_number']]).strip()
            postal_code = ''.join([component['long_name'] for component in details.get('address_components', []) if 'postal_code' in component['types']])
            phone_number = details.get('formatted_phone_number', '')
            website = details.get('website', '')

            all_boutiques.append({
                'Name': name,
                'Address': address,
                'PLZ': postal_code,
                'Phone Number': phone_number,
                'Website': website
            })

    return all_boutiques


# Process Switzerland and Germany centers separately and save results to Excel
df_switzerland = pd.DataFrame(process_centers(centers_switzerland)).drop_duplicates()
df_switzerland.to_excel('swiss_boutiques.xlsx', index=False)

df_germany = pd.DataFrame(process_centers(centers_germany)).drop_duplicates()
df_germany.to_excel('german_boutiques.xlsx', index=False)


In [None]:
import pandas as pd
data_swiss_boutiques = pd.read_excel('boutiques_in_switzerland.xlsx')

In [None]:
websites = data_swiss_boutiques['Website']

In [None]:
## Fetch emails from all of the websites

In [None]:
import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import urlparse, urljoin
import threading

BASE_URLS = websites[len(emails_list):]

class EmailFoundException(Exception):
    """Exception raised when an email is found."""
    pass


def is_relevant_link(base_url, href):
    """Determine if the link is relevant for our search."""

    # Exclude specific file types
    ignored_extensions = ['.jpg', '.jpeg', '.png', '.css', '.js', '.gif', '.svg', '.webp']
    if any(href.endswith(ext) for ext in ignored_extensions):
        return False

    # Limit to same domain
    parsed_base = urlparse(base_url)
    parsed_href = urlparse(href)
    if parsed_href.netloc and parsed_base.netloc != parsed_href.netloc:
        return False

    # Exclude certain URL patterns
    ignored_patterns = ["/api/", "/assets/"]
    if any(pattern in href for pattern in ignored_patterns):
        return False

    return True


def find_emails_on_page(base_url, url, visited_urls, depth=0, max_depth=3):
    # If we've visited this URL before, skip it
    if url in visited_urls:
        return None

    # Add the URL to our set of visited URLs
    visited_urls.add(url)

    try:
        # Fetch the content of the page
        response = requests.get(url)
        response.raise_for_status()  # raise exception for bad responses
        soup = BeautifulSoup(response.content, 'html.parser')

        # Extract emails with regex
        email_pattern = re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,3}")
        page_emails = email_pattern.findall(soup.text)
        if page_emails:
            raise EmailFoundException(page_emails[0])  # raise exception with the found email

        # Limit the depth of the recursion to avoid getting stuck in loops
        if depth > max_depth:
            return None

        # Find and visit all internal links
        for a_tag in soup.find_all('a', href=True):
            href = a_tag['href']

            # Check if the link is relevant for our search
            if not is_relevant_link(base_url, href):
                continue

            # Check if href starts with http or https
            if href.startswith('http'):
                find_emails_on_page(base_url, href, visited_urls, depth=depth+1)
            else:
                # Construct full URL for relative paths
                new_url = urljoin(base_url, href)
                find_emails_on_page(base_url, new_url, visited_urls, depth=depth+1)

    except EmailFoundException as e:
        raise  # Propagate the exception upwards
    except Exception as e:
        print(f"Error while processing {url}: {e}")
        return None


def process_website(url):
    """Process a website to find an email within a certain timeframe."""

    # Store the result of the email scraping in a list for thread-safe operations
    result = []

    def worker():
        try:
            email = find_emails_on_page(url, url, set())
            if email:
                result.append(email)
        except EmailFoundException as e:
            result.append(e.args[0])

    # Create a thread for the email scraping process
    thread = threading.Thread(target=worker)
    thread.start()

    # Wait for 7-8 seconds for the thread to complete
    thread.join(10)  # You can adjust this value as needed

    # If the thread is still alive after the timeout, it means it's still working and we can stop it and move on
    if thread.is_alive():
        print(f"Timeout reached for {url}. Moving to next website.")
        return None  # or return a placeholder message if desired

    return result[0] if result else None


# Example usage:
# emails_list = []
for url in BASE_URLS:
    email = process_website(url)
    if email:
        print(f"Found email: {email} on {url}")
        emails_list.append(email)
    else:
        emails_list.append('None')

print("Finished processing all websites.")


In [None]:
data_swiss_boutiques['emails'] = emails_list

In [None]:
data_swiss_boutiques.to_excel('swiss_boutiques_with_emails.xlsx')