In [163]:
# -*- coding: utf-8 -*-
import pandas as pd
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
from datetime import datetime
import time
import re
import logging
import traceback
import numpy as np
import itertools

#pd.set_option('display.max_colwidth', None)

GM_WEBPAGE = 'https://www.google.com/maps/'
MAX_WAIT = 15
MAX_RETRY = 5
MAX_SCROLLS = 40

class GoogleMapsScraper:

    def __init__(self, debug=False):
        self.debug = debug
        self.driver = self.__getDriver()
        self.logger = self.__getLogger()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        if exc_type is not None:
            traceback.print_exception(exc_type, exc_value, tb)

        self.driver.close()
        self.driver.quit()

        return True

    def sortBy(self, url, ind):

        self.driver.get(url)
        self.__clickOnCookieAgreement()

        wait = WebDriverWait(self.driver, MAX_WAIT)

        # open dropdown menu
        clicked = False
        tries = 0
        while not clicked and tries < MAX_RETRY:
            try:
                menu_bt = wait.until(EC.element_to_be_clickable((By.XPATH, '//button[@data-value=\'Sort\']')))
                menu_bt.click()

                clicked = True
                time.sleep(3)
            except Exception as e:
                tries += 1
                self.logger.warning('Failed to click sorting button')

            # failed to open the dropdown
            if tries == MAX_RETRY:
                return -1

        #  element of the list specified according to ind
        recent_rating_bt = self.driver.find_elements_by_xpath('//div[@role=\'menuitemradio\']')[ind]
        recent_rating_bt.click()

        # wait to load review (ajax call)
        time.sleep(5)

        return 0

    def getPlaces(self, method='urls', keyword_list=None):

        df_places = pd.DataFrame()

        if method == 'urls':
            # search_point_url = row['url']  # TODO:
            pass
        if method == 'squares':
            search_point_url_list = self._genSearchPointsFromSquare(keyword_list=keyword_list)
        else:
            # search_point_url = f"https://www.google.com/maps/search/{row['keyword']}/@{str(row['longitude'])},{str(row['latitude'])},{str(row['zoom'])}z"
            # TODO:
            pass

        for i, search_point_url in enumerate(search_point_url_list):

            if (i+1) % 10 == 0:
                print(f"{i}/{len(search_point_url_list)}")
                df_places = df_places[['search_point_url', 'href', 'name', 'rating', 'num_reviews', 'close_time', 'other']]
                df_places.to_csv('output/places_wax.csv', index=False)


            try:
                self.driver.get(search_point_url)
            except NoSuchElementException:
                self.driver.quit()
                self.driver = self.__getDriver()
                self.driver.get(search_point_url)

            # Gambiarra to load all places into the page
            scrollable_div = self.driver.find_element_by_css_selector(
                "div.siAUzd-neVct.section-scrollbox.cYB2Ge-oHo7ed.cYB2Ge-ti6hGc > div[aria-label*='Results for']")
            for i in range(10):
                self.driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', scrollable_div)

            # Get places names and href
            # time.sleep(2)
            response = BeautifulSoup(self.driver.page_source, 'html.parser')
            div_places = response.select('div[jsaction] > a[href]')
            for div_place in div_places:
                place_info = {
                    'search_point_url': search_point_url.replace('https://www.google.com/maps/search/', ''),
                    'href': div_place['href'],
                    'name': div_place['aria-label'],
                    'rating': None,
                    'num_reviews': None,
                    'close_time': None,
                    'other': None
                }

                df_places = df_places.append(place_info, ignore_index=True)
        df_places = df_places[['search_point_url', 'href', 'name', 'rating', 'num_reviews', 'close_time', 'other']]
        df_places.to_csv('output/places_wax.csv', index=False)
        self.driver.quit()

    def _genSearchPointsFromSquare(self, keyword_list=None):
        keyword_list = [] if keyword_list is None else keyword_list

        square_points = pd.read_csv('input/square_points.csv')

        cities = square_points['city'].unique()

        search_urls = []

        for city in cities:
            df_aux = square_points[square_points['city'] == city]
            latitudes = np.linspace(df_aux['latitude'].min(), df_aux['latitude'].max(), num=20)
            longitudes = np.linspace(df_aux['longitude'].min(), df_aux['longitude'].max(), num=20)
            coordinates_list = list(itertools.product(latitudes, longitudes, keyword_list))

            search_urls += [f"https://www.google.com/maps/search/{coordinates[2]}/@{str(coordinates[1])},{str(coordinates[0])},{str(15)}z"
             for coordinates in coordinates_list]

        return search_urls



    # Triggers scroll which then triggers loading of next patch of reviews
    # It then expands reviews, query the DOM and iterates through each review to parse it
    # Returns list of parsed reviews
    def getReviews(self, offset):
        # wait for other reviews to load (ajax)
        time.sleep(4)

        self.__scroll()

        # expand review text
        self.__expandReviews()

        # parse reviews
        response = BeautifulSoup(self.driver.page_source, 'html.parser')
        
        # TODO: Subject to changes
        rblock = response.find_all('div', class_='jftiEf fontBodyMedium')
        
        parsed_reviews = []
        for index, review in enumerate(rblock):
            if index >= offset:
                parsed_reviews.append(self.__parse(review))
        return parsed_reviews


    def __parse(self, review):
        item = {}


        try:
            # TODO: Subject to changes
            id_review = review['data-review-id']
        except Exception as e:
            id_review = None

        try:
            # TODO: Subject to changes
            review_text = self.__filterString(review.find('span', class_='wiI7pd').text)
        except Exception as e:
            review_text = None

        try:
            # TODO: Subject to changes
            rating = float(review.find('span', class_='kvMYJc')['aria-label'].split(' ')[1])
        except Exception as e:
            rating = None

        try:
            # TODO: Subject to changes
            relative_date = review.find('span', class_='rsqaWe').text
        except Exception as e:
            relative_date = None

        item['id_review'] = id_review
        item['caption'] = review_text
        item['relative_date'] = relative_date
        item['rating'] = rating

        return item


    def __parsePlace(self, response):
        place = {}
        try:
            place['overall_rating'] = float(response.find('div', class_='gm2-display-2').text.replace(',', '.'))
        except:
            place['overall_rating'] = 'NOT FOUND'

        try:
            place['n_reviews'] = int(response.find('div', class_='gm2-caption').text.replace('.', '').replace(',','').split(' ')[0])
        except:
            place['n_reviews'] = 0
        
        return place

    # expand review content - click on each expand button which then replaces the ellipsized text
    # use XPath to load complete reviews
    def __expandReviews(self):
        # TODO: Subject to changes
        links = self.driver.find_elements_by_xpath('//button[@jsaction="pane.review.expandReview"]')
        for l in links:
            l.click()
        wait = WebDriverWait(self.driver, 1) # 2
            
    # scroll the review container element to show the next patch of reviews
    def __scroll(self):
        scrollable_div = self.driver.find_element_by_css_selector('div.m6QErb.DxyBCb.kA9KIf.dS8AEf')
        self.driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', scrollable_div)


    def __getLogger(self):
        # create logger
        logger = logging.getLogger('googlemaps-scraper')
        logger.setLevel(logging.DEBUG)

        # create console handler and set level to debug
        fh = logging.FileHandler('gm-scraper.log')
        fh.setLevel(logging.DEBUG)

        # create formatter
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

        # add formatter to ch
        fh.setFormatter(formatter)

        # add ch to logger
        logger.addHandler(fh)
        return logger


    def __getDriver(self, debug=False):
        options = Options()

        if not self.debug:
            options.add_argument("--headless")
        else:
            options.add_argument("--window-size=1366,768")

        options.add_argument("--disable-notifications")
        options.add_argument("--lang=en-GB")
        input_driver = webdriver.Chrome(executable_path=ChromeDriverManager().install(), options=options)

         # click on google agree button so we can continue (not needed anymore)
         # EC.element_to_be_clickable((By.XPATH, '//span[contains(text(), "I agree")]')))
        input_driver.get(GM_WEBPAGE)
        return input_driver

    # cookies agreement click
    def __clickOnCookieAgreement(self):
        try:
            agree = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((By.XPATH, '//span[contains(text(), "Reject all")]')))
            agree.click()

            # back to the main page
            # self.driver.switch_to_default_content()
            return True
        except:
            return False

    # util function to clean special characters
    def __filterString(self, str):
        strOut = str.replace('\r', ' ').replace('\n', ' ').replace('\t', ' ')
        return strOut

In [164]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
import argparse
import logging
import sys

collection = [];
class ReviewGatherer:
    def __init__(self, url_file, from_date):

        # load urls file
        with open(url_file, 'r') as furl:
            self.urls = [u[:-1] for u in furl]

        # min date review to scrape
        self.min_date_review = datetime.strptime(from_date, '%Y-%m-%d')

        # logging
        self.logger = self.__getLogger()

    def scrapeGMReviews(self):
        # init scraper and incremental add reviews
        # TO DO: pass logger as parameter to log into one single file?
        with GoogleMapsScraper(debug=True) as scraper:
            for url in self.urls:
                try:
                    #ind = {'most_relevant' : 0 , 'newest' : 1, 'highest_rating' : 2, 'lowest_rating' : 3 }
                    error = scraper.sortBy(url, 1)
                    
                    if error == 0:
                        stop = False
                        offset = 0
                        n_new_reviews = 0
                        while not stop:
                            rlist = scraper.getReviews(offset)
                            for r in rlist:
                                # calculate review date and compare to input min_date_review
                                r['timestamp'] = self.__parseRelativeDate(r['relative_date'])
                                stop = self.__stop(r)
                                if not stop:
                                    collection.append(r)
                                    n_new_reviews += 1
                                else:
                                    break
                            offset += len(rlist)

                        # log total number
                        self.logger.info('{} : {} new reviews'.format(url, n_new_reviews))
                    else:
                        self.logger.warning('Sorting reviews failed for {}'.format(url))
                except Exception as e:
                    self.logger.error('Exception: {}'.format(e))
                    
        return(collection)


    def __parseRelativeDate(self, string_date):
        curr_date = datetime.now()
        split_date = string_date.split(' ')

        n = split_date[0]
        delta = split_date[1]

        if delta == 'year':
            return curr_date - timedelta(days=365)
        elif delta == 'years':
            return curr_date - timedelta(days=365 * int(n))
        elif delta == 'month':
            return curr_date - timedelta(days=30)
        elif delta == 'months':
            return curr_date - timedelta(days=30 * int(n))
        elif delta == 'week':
            return curr_date - timedelta(weeks=1)
        elif delta == 'weeks':
            return curr_date - timedelta(weeks=int(n))
        elif delta == 'day':
            return curr_date - timedelta(days=1)
        elif delta == 'days':
            return curr_date - timedelta(days=int(n))
        elif delta == 'hour':
            return curr_date - timedelta(hours=1)
        elif delta == 'hours':
            return curr_date - timedelta(hours=int(n))
        elif delta == 'minute':
            return curr_date - timedelta(minutes=1)
        elif delta == 'minutes':
            return curr_date - timedelta(minutes=int(n))
        elif delta == 'moments':
            return curr_date - timedelta(seconds=1)


    def __stop(self, r):
        review_list = [x for x in collection if x['id_review'] == r['id_review']];
        is_old_review = review_list[0] if len(review_list) != 0 else None
        
        if is_old_review is None and r['timestamp'] >= self.min_date_review:
            return False
        else:
            return True

    def __getLogger(self):
        # create logger
        logger = logging.getLogger('monitor')
        logger.setLevel(logging.DEBUG)
        # create console handler and set level to debug
        fh = logging.FileHandler('monitor.log')
        fh.setLevel(logging.DEBUG)
        # create formatter
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        # add formatter to ch
        fh.setFormatter(formatter)
        # add ch to logger
        logger.addHandler(fh)

        return logger

In [165]:
gatherer = ReviewGatherer("urls.text", '2023-01-15')

try:
    gatherer.scrapeGMReviews()
except Exception as e:
    gatherer.logger.error('Not handled error: {}'.format(e))
    


In [169]:
df_reviews = pd.DataFrame.from_records(collection)


In [170]:
df_reviews[['caption', 'rating', 'timestamp']].to_csv('reviews.csv', sep=';', encoding='utf-8')

In [171]:
df_reviews.loc[df_reviews['caption'] != ''].head()

Unnamed: 0,id_review,caption,relative_date,rating,timestamp
1,ChZDSUhNMG9nS0VJQ0FnSURCN2Q2WUpnEAE,Absolutely fantastic pizza. This is definitely one of the best if not the best pizza places in Frankfurt. They were fully packed on a weeknight so reservations are highly recommended.,13 hours ago,5.0,2023-01-26 09:45:52.538831
2,ChZDSUhNMG9nS0VJQ0FnSURCcmRUMkxREAE,"Sehr chillige Pizzeria! Immer gute Laune der Angestellten ,fast schon partymäßig! Pizza,Mega ! Extrem hochwertiger Belag, auch obwohl ich normalerweise die krossen Pizzen mehr schätze, gehe ich dort immer wieder hin, weil wie gesagt, der leckere Belag alles ausgleicht… Sie werden ja nun mal nur 90 Sekunden in den Ofen bei hoher Temperatur zubereitet und da wird die Pizza nicht richtig kross im Teig ,aber dafür sehr lecker und vom Steinofen her optimal. Der Teig schmeckt auch sehr gut an sich . Was auch sehr löblich ist, man kann die Pizza, mit in die Brauerei nehmen ,neben an. Drinnen wie draußen ,im Freien konsumieren,ein Bierchen dort trinken und die Pizza von nebenan essen ! Auch umgekehrt … kann man sich ein leckeres frisch gezapftes Bier mit in die Pizzeria rein nehmen! Wo hat man so eine tolle Kooperation ? fantastisch ! Es läuft immer eine sehr angenehme Musik partymäßig angenehm. Die Leute sind freundlich 😉,es herrscht eine sehr schöne Atmosphäre und es ist immer rappelvoll! Man sollte eigentlich seine Pizza nicht später als sechs konsumieren ,danach wird es brechend voll! Oder wann kommt spät, Bitte die last Order beachten ,unterschiedliche Öffnungszeiten… Gehört zu den besten Pizzerien in Frankfurt! (Translated by Google) Very chilled pizzeria! The employees are always in a good mood, almost like a party! Pizza, mega! Extremely high-quality toppings, even though I usually appreciate the crispy pizzas more, I keep going there because, as I said, the delicious toppings balance everything out... They are only prepared for 90 seconds in the oven at a high temperature and the pizza is not really crispy in the dough, but it is very tasty and the stone oven is ideal. The dough itself tastes really good too. Which is also very praiseworthy, you can take the pizza to the brewery next door. Inside and outside, consume outdoors, have a beer there and eat the pizza next door! Also vice versa... you can take a delicious freshly tapped beer to the pizzeria! Where do you have such a great cooperation? fantastic ! There is always a very pleasant music playing in a party-like way. The people are friendly 😉, there is a very nice atmosphere and it is always packed! You should actually eat your pizza no later than six, after that it gets packed! Or when is it late, please note the last order, different opening times... One of the best pizzerias in Frankfurt!",21 hours ago,5.0,2023-01-26 01:45:52.538840
3,ChZDSUhNMG9nS0VJQ0FnSURCamJhcFNnEAE,"Hands down, best pizza in town.",a day ago,5.0,2023-01-25 22:45:52.538843
7,ChZDSUhNMG9nS0VJQ0FnSURCNFlmM1ZBEAE,Sehr geil (Translated by Google) very cool,5 days ago,5.0,2023-01-21 22:45:52.538854
8,ChdDSUhNMG9nS0VJQ0FnSURCX1AteW1BRRAB,"Super leckere, gute belegte Pizza. Extrem dünn in der Mitte, dafür dicker fluffiger Rand. ""Normale"" deutsche Sorten (z.B. Hawaii) sucht man hier vergeblich, aber das ist auch gut so! Das hier ist deutlich italienischer. …",a week ago,5.0,2023-01-19 22:45:52.538856
