In [1]:
# %pip install pandas
# %pip install selenium
# %pip install Pyarrow

In [47]:
# Import necessary libraries
from selenium import webdriver
from selenium.webdriver import ActionChains
from selenium.webdriver.common.actions.wheel_input import ScrollOrigin
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import pandas as pd
import time
import re
import numpy as np
from tqdm import tqdm


def extract_emails(browser, website_url):
    browser.get(website_url)
    
    # Get the regular expressions pattern to match on
    email_pattern = r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,4}"
    # Get the html content for the page
    html = browser.page_source
    # Get a list of emails
    emails = re.findall(email_pattern, html)
    # Get a unique array of the emails on the page
    unique_emails_arr = pd.DataFrame(columns=['email'],data=emails)['email'].unique()
    # Convert to string with a comma delimiting them
    unique_emails_str = ','.join(unique_emails_arr)
    return unique_emails_str


# Inner function to extract information about a specific place from its page
def extract_place_info(browser, place_url, location, search_query, all_addresses, all_place_names, debug_mode):
    wait = WebDriverWait(browser, 10)
    browser.get(place_url)
    wait.until(EC.presence_of_element_located((By.CLASS_NAME, "lMbq3e")))
            
    # Extract the header information about the place
    header_text = browser.find_element(By.CLASS_NAME,'lMbq3e').text
    header_list = header_text.split("\n")
    
    # Do not include locations that are temporarily closed
    # TODO: Also needs to be changed to handle English/Spanish
    if 'Temporarily closed' in header_list or 'Cerrado temporalmente' in header_list:
        return "skip"
    
    if len(header_list) == 4:  # If there are reviews but no price range
        place_name = header_list[0]
        reviews_stars = header_list[1]
        num_of_reviews = header_list[2].strip('(').strip(')')
        price_range = None
        place_description = header_list[3]
    elif len(header_list) == 5:  # If there are reviews and a price range
        place_name = header_list[0]
        reviews_stars = header_list[1]
        num_of_reviews = header_list[2].strip('(').strip(')')
        price_range = header_list[3]
        place_description = header_list[4]
    elif len(header_list) == 2:  # If there are no reviews
        place_name = header_list[0]
        reviews_stars = None
        num_of_reviews = None
        price_range = None
        place_description = header_list[1]
    elif len(header_list) == 1:  # Edge case with an empty string
        place_name = header_list[0]
        reviews_stars = None
        num_of_reviews = None
        price_range = None
        place_description = None
    else:
        print(f"Error extracting header information {header_list} for place with URL: {place_url}")
        return "skip"
    
    # Extract additional information like address, phone, and website
    info_elements = browser.find_elements(By.CLASS_NAME, "CsEnBe")
    website = None
    phone = None
    address = None
    emails = None
    
    # TODO: Find a way to make this robust against locale differences
    # Check for both English and Spanish labels
    for info_element in info_elements:
        aria_label = info_element.get_attribute("aria-label")
        if aria_label:
            if 'Address:' in aria_label or 'Dirección:' in aria_label:
                address = info_element.text.split('\n')[1] if '\n' in info_element.text else info_element.text
            if 'Phone:' in aria_label or 'Teléfono:' in aria_label:
                phone = info_element.text
            if 'Website:' in aria_label or 'Sitio web:' in aria_label:
                website = info_element.get_attribute("href")
    
    # Check if the place is a duplicate
    if address in all_addresses:
        if place_name in all_place_names:
            return "skip"
    else:
        all_addresses.append(address)
        all_place_names.append(place_name)
    
    if website is not None:
        emails = extract_emails(browser, website)
    
    # Compile the extracted information into a list
    place_info = [
        search_query,
        location,
        place_name,
        place_description,
        reviews_stars,
        num_of_reviews,
        address,
        phone,
        emails,
        website,
        price_range
    ]
    
    if debug_mode:
        print(place_info)
    
    return place_info


# Define a function to extract search results for a given search URL, location, and search_query
def extract_search_results(browser, search_url, location, search_query, max_places_to_find, max_num_scrolls, all_addresses, all_place_names, total_bar, debug_mode, url_update_count):

    # Navigate to the search URL
    browser.get(search_url)
    wait = WebDriverWait(browser, 10)
    
    # Wait for the page to load
    wait.until(EC.presence_of_element_located((By.CLASS_NAME, "hfpxzc")))
        
    # Find the element to start scrolling from
    start_scroll_element = browser.find_element(By.CLASS_NAME, "hfpxzc")
    scroll_origin = ScrollOrigin.from_element(start_scroll_element)
    
    # Perform scrolling action to reveal more search results
    for i in range(max_num_scrolls):
        ActionChains(browser).scroll_from_origin(scroll_origin, 0, 1500*(i+1)).perform()

        # TODO: Possibly find a way to use selenium WebDriverWait 
        time.sleep(3)
        total_bar.update(1)  # Explicitly update the progress bar
    
    # Find all elements that represent places in the search results
    place_elements = browser.find_elements(By.CLASS_NAME, "hfpxzc")
        
    # Extract the URLs of all places
    place_urls = [element.get_attribute("href") for element in place_elements]
    
    place_info_list = []
    update_count = 0
    iters = len(place_urls) - 1 if len(place_urls) < max_places_to_find else max_places_to_find - 1
    benchmark = iters / url_update_count
    
    # Loop through the URLs to extract information for each place
    for i, url in enumerate(place_urls):
        try: 
            temp_place_info = extract_place_info(browser, 
                                                 url, 
                                                 location, 
                                                 search_query, 
                                                 all_addresses, 
                                                 all_place_names, 
                                                 debug_mode)
            
            if temp_place_info == "skip":  # Skip if the place is a duplicate or contains errors
                continue
            
            place_info_list.append(temp_place_info)
            
        except Exception as e:
            print(f"Error extracting place with url: {url}. Error: {e}")
        
        # Stop if max_places_to_find places have been found
        if len(place_info_list) >= max_places_to_find:
            
            if debug_mode:
                print('Breaking out of the for loop because {max_places_to_find} places were found.')
            break
        
        if i > benchmark:
            update_count += 1
            benchmark += iters / url_update_count
            total_bar.update(1)  # Explicitly update the progress bar
    
    if update_count < url_update_count:
        total_bar.update(url_update_count - update_count)

    return place_info_list


def maps_scraper(search_queries, 
                 locations=[''], 
                 max_places_to_find=10, 
                 max_num_scrolls=3, 
                 headless=True, 
                 export_final_filename=None, 
                 export_by_search_query=False, 
                 debug_mode=False,
                 url_update_count=3):
    """Scrape Google Maps for information about places based on search queries and locations.
    
    Args:
        search_queries (list): A list of search queries to search for on Google Maps.
        locations (list): A list of locations to search in. Default is [''].
        max_places_to_find (int): The maximum number of places to find for each search query in each location. Default is 10.
        max_num_scrolls (int): The maximum number of times to scroll down the search results page. Default is 3.
        headless (bool): Whether to run the WebDriver in headless mode. Default is True.
        export_final_filename (str): The filename for the CSV file to export the final DataFrame to. Default is None which will not export the final DataFrame.
        export_by_search_query (bool): Whether to export the results to a CSV file for each search query. Default is False.
        debug_mode (bool): Whether to print debug information. Default is False.
        url_update_count (int): The number of times to update the progress bar for each URL. Default is 3.
        
    Returns:
        pandas.DataFrame: A DataFrame containing the information about the places found.
    """
    # Prepare to compile information for multiple locations and search_queries
    final_places_list = []
    all_addresses = []
    all_place_names = []

    # Initialize the WebDriver Options 
    options = Options()
    options.add_experimental_option('prefs', {'intl.accept_languages': 'en,en_US'})  # Set language preferences
    if headless:
        options.add_argument("--headless")

    with tqdm(total=len(search_queries) * len(locations) * (max_num_scrolls + url_update_count), desc="Total Progress", unit="search") as total_bar:
        # Loop through each search_query and location, perform searches, and compile results
        try: 
            for search_query in search_queries:
                if debug_mode:
                    print(search_query)
                for location in locations:  
                    # Initialize the WebDriver for Chrome
                    browser = webdriver.Chrome(options=options)
                    
                    try:
                        if debug_mode:
                            print(location)
                        search = f"{search_query} in {location}"
                        if debug_mode:
                            print(search)
                        search_url = f"https://www.google.com/maps/search/{search}"
                        
                        combo_place_info_list = extract_search_results(
                            browser, 
                            search_url, 
                            location, 
                            search_query, 
                            max_places_to_find, 
                            max_num_scrolls, 
                            all_addresses, 
                            all_place_names,
                            total_bar,
                            debug_mode,
                            url_update_count
                        )
                        
                        if export_by_search_query:
                            pd.DataFrame(combo_place_info_list).to_csv(f'{search_query} Export.csv', index=False)
                            
                        final_places_list.extend(combo_place_info_list)
                    except Exception as e:
                        print(f"Error extracting search results for {search_query} in {location}: ")
                        print(e)
                    
                    finally:
                        browser.close()
                    
                    total_bar.update(1)  # Explicitly update the progress bar
                    
        except Exception as e:
            print(e)
        finally:
            browser.quit()
            
    places_df = pd.DataFrame(
        columns=['Search Query', 'Location', 'Name', 'Description', 'Stars (out of 5)', 'Number of Reviews', 'Address', 'Phone', 'Emails','Website', 'Price Range'], 
        data=final_places_list
    )
    
    if export_final_filename:
        if export_final_filename.split('.')[-1] == 'csv':
            places_df.to_csv(export_final_filename, index=False)
        else:
            places_df.to_csv(export_final_filename + '.csv', index=False)
    
    return places_df


In [49]:
# Parameters for the search
max_places_to_find = 10  # Maximum number of places to find for each search
max_num_scrolls = 5  # Maximum number of times to scroll to reveal more search results

# Define the locations and search_queries to search for
locations = ['Los Angeles, CA',] # 'San Francisco, CA','New York, NY','Jacksonville, FL','Portland, OR','Denver, CO']
search_queries = ['Churros', 'Churro', 'Churrerria']  # Add more search_queries to search for
# 'Physical Therapy',
# 'HVAC',
# 'Veterinary Services',
# 'Plumbing',
# 'Insurance Brokerages',
# 'Landscaping',
# 'Automotive Repair',
# 'Chiropractic']

places_df = maps_scraper(search_queries, locations, max_places_to_find, max_num_scrolls, headless=True, export_by_search_query=False, debug_mode=False)

Total Progress: 31search [02:47,  5.41s/search]                    


In [92]:
import requests
from dotenv import load_dotenv
import os
from datetime import datetime, timedelta
import time
import numpy as np
import pandas as pd

# Load environment variables (including the API key)
load_dotenv()

# Get the API key from environment variables
GOOGLE_MAPS_API_KEY = os.getenv("GOOGLE_MAPS_API_KEY")


def get_next_2am_unix_timestamp():
    """Get the Unix timestamp for the next 2 a.m."""
    now = datetime.now()
    next_2am = now.replace(hour=2, minute=0, second=0, microsecond=0)

    # If it's already past 2 a.m. today, move to the next day
    if now >= next_2am:
        next_2am += timedelta(days=1)

    return int(time.mktime(next_2am.timetuple()))


def is_within_time(origins: np.ndarray, destinations: np.ndarray) -> np.ndarray:
    """Check if the travel time from the origin to each destination is within a specified time.
    
    Parameters:
        origins (ndarray(n,)): The starting locations.
        destinations (ndarray(m,)): A numpy array of destination addresses.
        time (int): The maximum travel time in seconds.
        
    Returns:
        ndarray(n, m): A numpy array of boolean values indicating if the travel time to each destination is within the specified time.
    """
    n, m = len(origins), len(destinations)
    times_arr = np.zeros((n, m), dtype=float)
    
    if type(origins) != list and type(origins) != np.ndarray:
        raise TypeError("Origins must be a numpy array.")
    else:
        destinations = np.array(destinations)
            
    if type(destinations) != list and type(destinations) != np.ndarray:
        raise TypeError("Destinations must be a numpy array.")
    else:
        destinations = np.array(destinations)
        
    if type(origins) == list:
        origins = np.array(origins)
    if type(destinations) == list:
        destinations = np.array(destinations)
    
    for i, origin in enumerate(origins):
        
        base_url = "https://maps.googleapis.com/maps/api/directions/json"
        within_time = []

        # Calculate the Unix timestamp for the next 2 a.m.
        departure_time = str(get_next_2am_unix_timestamp())

        for j, destination in enumerate(destinations):
            params = {
                "origin": origin,
                "destination": destination,
                "departure_time": departure_time,  # Dynamically calculated 2 a.m. timestamp
                "key": GOOGLE_MAPS_API_KEY
            }

            response = requests.get(base_url, params=params)
            if response.status_code == 200:
                data = response.json()

                # Check if the API returned a valid route
                if data["status"] == "OK":
                    duration_seconds = data["routes"][0]["legs"][0]["duration"]["value"]
                    times_arr[i, j] = duration_seconds  # Store the travel time

                else:
                    print(f"No valid route for {destination}. \nStatus: {data['status']}")
            else:
                raise Exception(f"Google Maps API error: {response.status_code}")
    
    df_times = pd.DataFrame(data=times_arr, columns=destinations, index=origins)
    
    return df_times


def get_closest_locations(times_df: pd.DataFrame, 
      origin_addresses: list, 
      origin_names: list, 
      destination_addresses: list, 
      destination_names: list, 
      max_time: int) -> pd.DataFrame:
    
    is_within_time = times_df.values < max_time

    i_arr, j_arr = np.nonzero(is_within_time)
    
    new_i_list = []
    new_j_list = []

    # Find the closest location for each origin-destination pair
    for k in j_arr:
        
        same_loc = i_arr[j_arr == k]
        
        closest_dist = np.inf
        
        for loc in same_loc:
            if times_df.iloc[loc, k] < closest_dist:
                closest_dist = times_df.iloc[loc, k]
                closest_loc = loc
        
        new_i_list.append(closest_loc)
        new_j_list.append(k)

    sort_mask = np.argsort(new_i_list)
    new_i_arr = np.array(new_i_list)[sort_mask]
    new_j_arr = np.array(new_j_list)[sort_mask]
    
    comb_arr = list(map(tuple, np.vstack((new_i_arr, new_j_arr)).T))

    output_list = []

    for i, j in comb_arr:
                
        output_string = (f"Origin Name: {origin_names[i]}\n"
              f"Origin Address: {origin_addresses[i]}\n"
              f"Destination Name: {destination_names[j]}\n"
              f"Destination Address: {destination_addresses[j]}\n")

        output_list.append(output_string)
    
    return output_list


In [15]:

# times_df = is_within_time(origin_list, churro_list)


In [39]:

# times_df.to_csv('times_df.csv', index=False)

times_df = pd.read_csv('places files/times_df.csv').drop(0)

df = pd.read_csv('places files/Unique Churro Places LA.csv')

origin_addresses = [
    "12405 Washington Blvd, Los Angeles, CA 90066, United States",
    "131 N Larchmont Blvd, Los Angeles, CA 90004, United States",
    "4455 Los Feliz Blvd, Los Angeles, CA 90027, United States",
    "100 S Avenue 64, Highland Park, CA 90042, USA",
    "1534 Montana Ave, Santa Monica, CA 90403, USA",
    "600 N Brand Blvd, Glendale, CA 91203, USA",
    "9615 S Santa Monica Blvd, Beverly Hills, CA 90210, USA",
    "96 E Colorado Blvd, Pasadena, CA 91105, United States",
]
origin_names = [
    "Culver city",
    "Hancock Park", 
    "Griffith Park",
    "Highland Park", 
    "Palisades Park",
    "Glendale",
    "Beverly Hills",
    "Pasadena",
]

destination_addresses = list(df['Address'])
destination_names = list(df['Name'])

max_time = 1200

output = get_closest_locations(times_df, 
      origin_addresses, 
      origin_names, 
      destination_addresses, 
      destination_names, 
      max_time)


In [3]:
import competitivelandscapeanalysis as cla

max_places_to_find = 10  # Maximum number of places to find for each search
max_num_scrolls = 5  # Maximum number of times to scroll to reveal more search results

# Define the locations and search_queries to search for
locations = ['Los Angeles, CA',] # 'San Francisco, CA','New York, NY','Jacksonville, FL','Portland, OR','Denver, CO']
search_queries = ['Churros', 'Churro', 'Churrerria'] 

cla.maps_scraper(search_queries, locations, max_places_to_find, max_num_scrolls, headless=False, export_by_search_query=False, debug_mode=False)

Total Progress:  83%|████████▎ | 20/24 [01:52<00:20,  5.01s/search]

Error extracting place with url: https://www.google.com/maps/place/The+Churros+Bros/data=!4m7!3m6!1s0x80c2cf0aad59b577:0x607f9fd72481479e!8m2!3d34.023153!4d-118.1638996!16s%2Fg%2F11hkbjjt99!19sChIJd7VZrQrPwoARnkeBJNeff2A?authuser=0&hl=es-419&rclk=1. Error: Message: no such window: target window already closed
from unknown error: web view not found
  (Session info: chrome=124.0.6367.156)
Stacktrace:
	GetHandleVerifier [0x00007FF6D0EB1562+60802]
	(No symbol) [0x00007FF6D0E2AC62]
	(No symbol) [0x00007FF6D0CE7CE4]
	(No symbol) [0x00007FF6D0CBDFDF]
	(No symbol) [0x00007FF6D0D61E57]
	(No symbol) [0x00007FF6D0D798D1]
	(No symbol) [0x00007FF6D0D5A923]
	(No symbol) [0x00007FF6D0D28FEC]
	(No symbol) [0x00007FF6D0D29C21]
	GetHandleVerifier [0x00007FF6D11B41FD+3217949]
	GetHandleVerifier [0x00007FF6D11F6197+3488183]
	GetHandleVerifier [0x00007FF6D11EF11F+3459391]
	GetHandleVerifier [0x00007FF6D0F6B926+823622]
	(No symbol) [0x00007FF6D0E35FFF]
	(No symbol) [0x00007FF6D0E30F24]
	(No symbol) [0x00007

Total Progress:  96%|█████████▌| 23/24 [01:54<00:04,  4.99s/search]
Exception ignored in: <function Service.__del__ at 0x00000284232E4180>
Traceback (most recent call last):
  File "c:\Users\wilso\AppData\Local\Programs\Python\Python312\Lib\site-packages\selenium\webdriver\common\service.py", line 189, in __del__
    self.stop()
  File "c:\Users\wilso\AppData\Local\Programs\Python\Python312\Lib\site-packages\selenium\webdriver\common\service.py", line 146, in stop
    self.send_remote_shutdown_command()
  File "c:\Users\wilso\AppData\Local\Programs\Python\Python312\Lib\site-packages\selenium\webdriver\common\service.py", line 126, in send_remote_shutdown_command
    request.urlopen(f"{self.service_url}/shutdown")
  File "c:\Users\wilso\AppData\Local\Programs\Python\Python312\Lib\urllib\request.py", line 215, in urlopen
    return opener.open(url, data, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\wilso\AppData\Local\Programs\Python\Python312\Lib\urllib\request.p

Unnamed: 0,Search Query,Location,Name,Description,Stars (out of 5),Number of Reviews,Address,Phone,Emails,Website,Price Range
0,Churros,"Los Angeles, CA",Churros Calientes,Churrería,4.5,578,"11521 Santa Monica Blvd, Los Angeles, CA 90025...",,churros@churroscalientes.com,http://churroscalientes.com/,·$
1,Churros,"Los Angeles, CA",Mr. Churro,Churrería,4.3,493,"E-12 Olvera St # C, Los Angeles, CA 90012, Est...",\n+1 213-680-9036,,,·$
2,Churros,"Los Angeles, CA",Churros El Gorilla,Restaurante de postres,5.0,12,"5525 Sunset Blvd, Los Angeles, CA 90028, Estad...",\n+1 747-258-8650,,,
3,Churros,"Los Angeles, CA",Churros Estilo Guadalajara,Tienda de postres,4.8,10,"4025 S Central Ave, Los Angeles, CA 90011, Est...",\n+1 323-604-8135,,,
4,Churros,"Los Angeles, CA",Churros Don Abel,Churrería,4.7,625,"5458 Whittier Blvd, Los Angeles, CA 90022, Est...",\n+1 562-612-5991,,https://instagram.com/churrosdonabel?utm_sourc...,·$
5,Churros,"Los Angeles, CA",Churrito loco,Restaurante de postres,4.6,38,"5161 Pomona Blvd # 105, East Los Angeles, CA 9...",,"eatchurritoloco@gmail.com,605a7baede844d278b89...",http://www.eatchurritoloco.com/,·$$
6,Churros,"Los Angeles, CA",Churros El Bochito,Restaurante,4.2,21,"1157 Cypress Ave, Los Angeles, CA 90065, Estad...",\n+1 213-820-0172,,,
7,Churros,"Los Angeles, CA",Churros carrusel,Tienda de postres,4.5,15,"Churros carrusel, Los Angeles, CA 90003, Estad...",,,,
8,Churros,"Los Angeles, CA",Churros La Paloma,Mobile caterer,4.3,20,"6101 S Central Ave, Los Angeles, CA 90001, Est...",\n+1 213-924-2649,,,
9,Churros,"Los Angeles, CA",Salinas Churros,Churrería,3.7,34,"918 S Lorena St, Los Angeles, CA 90023, Estado...",,,https://instagram.com/salinaschurros?igshid=Ym...,


In [6]:
import os
website_url = os.getenv("WEBSITE_URL")
print(website_url)

https://www.ubereats.com/
