# MAPS EMAIL ADDRESS SCRAPER #

_By: Michiel Tange_, _Last updated: 25/11/2024_

This script scrapes email addresses off of websites of _places_ found through Google Maps in a specific area and within specific categories.

It does this by first getting all relevant _places_ from Google Maps using the Places API. It finds places within a fixed radius around a point (longitude & latitude coordinates), using keywords (e.g., sports club, barber, etc.). It then gets the website for each of these _places_. Once the main website is known, it scrapes through it looking for email addresses. These email addresses are then added to an output Excel file. From here these can be used further.

## Set-up ##

### Imports ###

In [2]:
from aiohttp import ClientSession
from haversine import haversine
import pandas as pd
import urllib.parse
import googlemaps
import openpyxl
import asyncio
import time
import re

### Places API Key ###

In [3]:
with open("APIkey.txt", 'r') as file:
    API_key = file.read()
file.close()

### Places API client ###

In [4]:
gmaps = googlemaps.Client(key=API_key)

### Places definitions ###
To find places the following procedure is used:

1. Draw a circle around a point (defined by longitude and latitude coordinates), of radius $r$.
2. Use Google's Places API to find places within this circle. Places are filtered through keywords (e.g., barber, tennis club, etc.). Google will only return one page of results (20 places), so repeat $x$ amount of times using Google's 'next page token'. Save some information about each of these places - most importantly, 'place id'.

NOTE: Despite only searching within the circle, Google will still return places further away than $r$. These places are additonally filtered out by computing the distance between the place and the central point of the circle using the haversine formula:

$$
a = \sin^{2}(\frac{\Delta \phi}{2}) + \cos \phi_1 \cdot \cos \phi_2 \cdot \sin^{2}(\frac{\Delta \lambda}{2})
$$
$$
c = 2 \cdot \arctan 2(\sqrt{a}, \sqrt{1-a})
$$
$$
d = \textrm{R} \cdot c
$$
Where $\phi$ is latitude, $\lambda$ is longitude, and $\textrm{R}$ is the radius of Earth (approximately 6,371km).

3. Use Google's Place Details API to get additional details (e.g., website) about each place, using the 'place id' as an identifier.
4. Save all relevant information into a Pandas DataFrame. This DataFrame will form the basis for the later web scraping task.

#### Parameters ####

In [5]:
details = ['website'] # details to get about a place

#### Functions ####

In [6]:
def distance_check(coords : dict, second_coords : dict, dist : int) -> bool:
    """
    check whether the distance between two places is smaller than a specified amount (in meters), using haversine

    args
    ----
    coords : dict
        dictionary containing the first set of coordinates. 'lat' for latitude, 'lng' for longitude
    second_coords : dict
        dictionary containing the second set of coordinates. 'lat' for latitude, 'lng' for longitude
    dist : int
        the distance (in meters) to check against
    
    returns
    -------
    bool : bool
        Boolean specifying whether the calculated distance is smaller than dist
    """

    # put the first coordinates in a tuple
    coords_tup = (coords['lat'], coords['lng'])
    
    # put the second coordinates in a tuple
    second_coords_tup = (second_coords['lat'], second_coords['lng'])

    # calculate the distance using haversine
    distance = int(haversine(coords_tup, second_coords_tup, unit='m'))

    return distance <= dist

#### Objects ####

In [7]:
class nearby_search_task():
    """ a nearby search task """

    def __init__(self, coords : dict, radius : int, keywords : list, pages : int, outcomes : list=['name', 'place_id', 'types']) -> None:
        """
        initialise an instance of nearby_search_task

        properties
        ----------
        coords : dict
            the central coordinates to search around. Provided in a dictionary with 'lat' and 'lng' as keys denoting latitude and longitude
        radius : int
            the search radius around the central coordinates in meters
        keywords : list
            the keywords to search the area for
        pages : int
            the maximum number of results pages to consider (20 results per page)
        outcomes : list
            list containing which outcome results are relevant
        """

        self.coords = coords
        self.radius = radius
        self.keywords = keywords
        self.pages = pages
        self.outcomes = outcomes

    def get_page_places(self, keyword : str, page_token : str = None) -> list:
        """
        get place info through a Google Places Nearby search (limited to 20 per page)

        args
        ----
        keyword : str
            the keyword to search the area for
        page_token : str
            the page token for the search (only relevant for accessing multiple pages of the same search)

        returns
        -------
        result : list
            list of dictionaries where each dictionary contains a place's outcomes
        """

        results = {}

        # API call
        response = gmaps.places_nearby(location=self.coords, radius=self.radius, keyword=keyword, page_token=page_token)

        # save the next page token
        if 'next_page_token' in response:
            results['next_page_token'] = response['next_page_token']
        else:
            results['next_page_token'] = 'no more pages'

        # process the outcomes
        result = []
        for i in range(len(response['results'])):
            
            if distance_check(self.coords, response['results'][i]['geometry']['location'], dist=self.radius): # only include places within the radius
                response_outcomes = {}
                for outcome in self.outcomes:
                    response_outcomes[outcome] = response['results'][i][outcome]
                result.append(response_outcomes)

        # save the outcomes
        results['result'] = result

        # META DATA - start
        global places_cnt
        places_cnt += len(response['results'])
        # META DATA - end

        return results
    
    def get_all_page_places(self) -> list:
        """
        get place info for all pages and all keywords through a Google Places Nearby search

        returns
        -------
        result : list
            list of dictionaries where each dictionary contains a place's outcomes & keyword used to find the place
        """

        result = []

        # run through all keywords and up to the maximum number of pages
        for keyword in self.keywords:
            page_token = None
            for i in range(self.pages):
                response = self.get_page_places(keyword=keyword, page_token=page_token)
                result.append(response['result'])
                page_token = response['next_page_token']
                
                if page_token == 'no more pages':
                    break
                
                # timeout needed to allow Google to validate the next page token
                time.sleep(2)

        # flatten the result list (which is now a list of lists)
        result_flat = [x for xs in result for x in xs]
        
        return result_flat

class place_detail_search_task():
    """ a place detail search task """

    def __init__(self, place_id : str, details : list = details) -> None:
        """
        initialise an instance of place_detail_search_task

        properties
        ----------
        place_id : str
            the place id for which to find details
        details : list
            list of details to retrieve about the place
        """

        self.place_id = place_id
        self.details = details
        
    def get_place_details(self) -> dict:
        """
        get details for a place

        returns
        -------
        result : dict
            result dictionary containing place details for a place id
        """

        outcome = gmaps.place(self.place_id, fields=self.details)

        # META DATA - start
        if 'website' in outcome['result'].keys():
            global websites_cnt
            websites_cnt += 1
        # META DATA - end
        
        if set(self.details).issubset(set(outcome['result'].keys())): # check if all details were returned
            return outcome['result']
        else: # if not all details are returned -> add detail as a key with value None - this facilitates converting to a DataFrame later
            for detail in self.details:
                if detail not in outcome['result'].keys():
                    outcome['result'][detail] = None
            return outcome['result']

### Web scraping definitions ###
For the webscraping an asyncronous I/O (asyncio) method is used. This allows for asyncronous server requests, and greatly improves the speed performance of the scraping. It runs through the following procedure:

1. Check the homepage for an email address. If no email found -> go to step 2.
2. Get all page links from the homepage.
3. Check if any priority pages (common contact detail pages) are among the links found. If not -> go to step 5
4. Check the priority pages for email addresses.
5. Use the page links to check all other pages for email addresses.

If at any point an email address is found, the process stops. Regular expressions are used to find email addresses and page links among the HTML code.

#### Parameters ####

In [8]:
HREF_RE = re.compile(r'href="(.*?)"') # the regular expression for finding page links
email_RE = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b') # the regular expression for finding email addresses
priority_links = ['contact', 'about-us', 'over-ons', 'informatie'] # common contact detail pages

#### Objects ####

In [9]:
class parse_outcome():
    """ the outcome of a parse_html call """

    def __init__(self, success : bool, result : set | None, html : str | None) -> None:
        """
        initialise an instance of parse_outcome

        properties
        ----------
        success : bool
            attribute capturing whether the html parse was a success or not
        result : set | None
            the set of found items in the html if it was successful. None is nothing was found
        html : str | None
            the html which was parsed. None if the parse was a success. To be passed on after a failed parse
        """

        self.success = success
        self.result = result
        self.html = html

class get_outcome():
    """ the outcome of a get_html call """

    def __init__(self, success : bool, result : str | None, descr : str) -> None:
        """
        initialise an instance of get_outcome

        properties
        ----------
        success : bool
            attribute capturing whether the get_html was a success or not
        result: str | None
            the html text from a successful get. None if the get failed
        descr : str
            description of what happened with the call. 'Success' for a successful get, explanation if get failed
        """
        
        self.success = success
        self.result = result
        self.descr = descr

#### Functions ####

In [None]:
async def get_html(url: str, session : ClientSession) -> get_outcome:
    """
    get the html text of a url

    args
    ----
    url : str
        the url which to get
    session : ClientSession
        the session to which this call is assigned

    returns
    -------
    get_outcome : get_outcome
        the outcome of the get
    """

    if 'facebook.com' not in url:
        
        # META DATA - start
        global pages_cnt
        pages_cnt += 1
        # META DATA - end

        try:
            rsp = await session.request(method="GET", url=url)
        except: # could not connect to the url
            return get_outcome(success=False, result=None, descr='could not connect to url')
        else:
            if rsp.status == 200: # success case
                try:
                    html = await rsp.text()
                    return get_outcome(success=True, result=html, descr='success')
                except: # html could not be decoded - improperly formatted on the server-side
                    return get_outcome(success=False, result=None, descr='html could not be decoded')
            else: # got a bad response from the url
                return get_outcome(success=False, result=None, descr='bad response from url')
    else:
        return get_outcome(success=False, result=None, descr='website is a facebook page')

async def parse_html(session : ClientSession, pattern : str, given_html : str | None = None, url: str | None = None) -> parse_outcome:
    """
    parse a html text, looking for all occurences of a pattern

    args
    ----
    session : ClientSession
        the session to which this call is assigned 
    pattern : str
        the pattern to look for in the html text
    given_html : str | None
        the html text to parse. Standard None -> parse_html will standard get html
    url : str | None
        the url for which to get and parse the html. None if html is provided via 'given_html'
    """
    
    if url != None:
        html = await get_html(url=url, session=session)
        html = html.result
    else:
        html = given_html

    if html != None:

        # META DATA - start
        global words_cnt
        words_cnt += len(html)
        # META DATA - end

        found = set(re.findall(pattern, html))

        # META DATA - start
        if pattern == email_RE:
            global emails_cnt
            emails_cnt += len(found)
        # META DATA - end

        if len(found) != 0: # success case
            return parse_outcome(success=True, result=found, html=None)
        else: # fail case
            return parse_outcome(success=False, result=None, html=html)
    else: # no html to parse
        return parse_outcome(success=False, result=None, html=None)

async def find_emails(starting_url : str, session : ClientSession, place_id : str, priority_links : list = priority_links) -> set | None:
    """
    find emails within a website. Start on the homepage, then check priority pages, and then check all other pages.

    args
    ----
    starting_url : str
        the 'homepage' url from which to start, and which to search through first
    session : ClientSession
        the session to which this call is assigned
    priority_links : list
        strings commonly found in contact page names

    returns
    -------
    result : set | None
        set of emails found on the website. None if nothing was found
    """
    
    homepage = await parse_html(url=starting_url, session=session, pattern=email_RE)

    if homepage.success:
        return {'place_id' : place_id, 'email': homepage.result, 'url' : starting_url, 'method' : 'homepage'}
    else:
        # find all links on the homepage
        links = await parse_html(given_html=homepage.html, session=session, pattern=HREF_RE)
        if links.success:

            # first check priority links
            priority_pages_links = {urllib.parse.urljoin(starting_url, link) for link in links.result for priority_link in priority_links if priority_link in link}
            for link in priority_pages_links:
                link_parse = await parse_html(session=session, pattern=email_RE, url=link)
                if link_parse.success:
                    return {'place_id' : place_id, 'email': link_parse.result, 'url' : link, 'method': 'priority page'}

            # if no success on priority pages -> check all pages
            starting_url_stripped = urllib.parse.urlsplit(starting_url)[1]
            url_set = set()
            emails_found = set()
            for link in links.result:
                link_url = urllib.parse.urljoin(starting_url, link)
                if ((link_url not in priority_pages_links) and (starting_url_stripped in link_url)): # skip priority pages (already checked) and links leading off the main site (usually ads)
                    link_parse = await parse_html(session=session, pattern=email_RE, url=link_url)
                    if link_parse.success:
                        emails_found.update(link_parse.result)
                        url_set.add(link_url)
            
            if len(emails_found) != 0:
                return {'place_id' : place_id, 'email': emails_found, 'url' : url_set, 'method': 'regular page'}
                    
            # no emails found on any page
            else:
                return {'place_id' : place_id, 'email': None, 'url' : starting_url, 'method' : 'found nothing'}
                
        else: # no email or links found on homepage
            return {'place_id' : place_id, 'email': None, 'url' : starting_url, 'method' : 'no links'}

async def main(data : pd.DataFrame) -> list:
    """
    the main function of the asyncio approach. This function gathers all the Futures (tasks that need to be executed asynchronously)

    args
    ----
    urls : list
        list of urls to go through
    result : list
        list of dictionaries containing the email addresses found for each url, and the place_id it belongs to
    """
    
    async with ClientSession() as session:
        tasks = []
        for i in range(len(data)):
            tasks.append(find_emails(starting_url=data['website'][i], place_id=data['place_id'][i], session=session))
        
        result = await asyncio.gather(*tasks)
    await session.close()
    return result

### Global parameters ###

#### Meta data ####
Gathering meta data about the activities of the script. Denoted with  "# META DATA" throughout.

In [11]:
keywords_cnt = 0 # count for the number of keywords used
places_cnt = 0 # count for the number of places investigated
websites_cnt = 0 # count for the number of websites ("home" URLs) searched through
pages_cnt = 0 # count for the number of webpages (URLs) contacted
words_cnt = 0 # count for the number of words (HTML content) shifted through
emails_cnt = 0 # count for the number of emails found
useful_emails_cnt = 0 # count for the number of useful emails found

#### Places ####
Reading in the search details.

In [12]:
with open("Search task details.txt", 'r') as file:
    # central point
    line = file.readline()
    central_point = line[line.find(':') + 2: -1]
    # radius
    line = file.readline()
    radius = int(line[line.find(':') + 2: -1])
    # keywords
    line = file.readline()
    keywords = line[line.find(':') + 2: ].split(', ')
file.close()

# META DATA
keywords_cnt += len(keywords)

Creating the search task.

In [13]:
search_task_SK = nearby_search_task(coords=gmaps.geocode(central_point)[0]['geometry']['location'], # use Google's Geocoding API to find the coordinates for the central point
                                    radius= radius,
                                    keywords= keywords,
                                    pages=5)

## Places ##
Finding places.

### Getting the places ###

In [14]:
search_outcome = search_task_SK.get_all_page_places()

### Adding place details ###

In [15]:
for place in search_outcome:
    detail_task = place_detail_search_task(place['place_id']).get_place_details() # additional details can be added here
    if detail_task != None:
        place['details'] = detail_task
    else:
        place['details'] = None

### Converting the search outcomes into a dataset ###

In [16]:
data = pd.DataFrame(search_outcome)

The 'details' column is still a dictionary (it was nested before, and Pandas cannot automatically handle this). This must be converted separately, and then joined with the rest.

In [17]:
details_df = pd.DataFrame(data['details'].to_list())
data = data.join(details_df).drop(columns=['details'])

Drop duplicates due to overlapping keywords.

In [18]:
data = data.drop_duplicates(subset=['place_id'], ignore_index=True)

## Scraping ##

Doing the actual scraping.

In [None]:
scrape_data = await main(data=data)

Adding the new data to the dataset.

In [20]:
data = pd.merge(data, pd.DataFrame(scrape_data), on='place_id')

## Cleaning & saving the data ##

NOTE: run any tweaks, tests, or bug fixes to the main script before this point.

Removing common email scraping mishaps.

In [21]:
for email_set in data['email']:
    if email_set != None:
        remove_set = set()
        for email in email_set:
            # removing common mishaps
            if ((email.endswith('.jpg')) or (email.endswith('.png')) or (email.endswith('.gif')) or (email.endswith('.svg')) or (not ((email[-4] == '.') or (email[-3] == '.'))) or ('www.' in email) or ('.wixpress.com' in email) or ('sentry.io' in email) or ('mijnwebsite' in email) or ('@domein' in email) or ('@example' in email) or ('jouwweb' in email)): # remove false emails
                remove_set.add(email)
            
            # removing duplicates due to case differences
            if email != email.lower():
                if email.lower() in email_set:
                    remove_set.add(email)

        if len(remove_set) != 0:
            email_set -= remove_set

        # META DATA - start
        useful_emails_cnt += len(email_set)
        # META DATA - end

# replace empty sets created due to removing the entire email_set
data.loc[data.email == set(), 'email'] = None

Allowing multiple emails for a place to spill over into additional columns, for easier use of the output Excel file.

In [22]:
emails_dict = {}
i = 0
for email_set in data['email']:
    if email_set != None:
        email_dict = {}
        j = 1
        for email in email_set:
            if j <= 15: # email cap - can be changed
                email_dict[f'email {j}'] = email
                j += 1
            else: # more than 15 email addresses seems excessive (found 556 for a Primera store at one point...)
                break
        emails_dict[data['place_id'][i]] = email_dict
    else:
        emails_dict[data['place_id'][i]] = {'email 1' : None}
    i += 1

# Converting to a dataframe
emails_df = pd.DataFrame.from_dict(emails_dict, orient='index')
emails_df['place_id'] = emails_df.index

# Merging with the existing dataframe
data = pd.merge(data, emails_df, on='place_id')

Disgarding data which is not useful for the output file.

In [23]:
# dropping irrelevant columns
data = data.drop(columns=['place_id', 'url', 'method', 'email'])

# dropping rows with no website or emails
data = data.dropna(ignore_index=True, subset=['website', 'email 1'])

Save the data to Excel

In [125]:
data.to_excel('Emails.xlsx')

## Meta data result ##
The final counts of the meta data.

In [None]:
print(f'{keywords_cnt=}')
print(f'{places_cnt=}')
print(f'{websites_cnt=}')
print(f'{pages_cnt=}')
print(f'{words_cnt=}')
print(f'{emails_cnt=}')
print(f'{useful_emails_cnt=}')