In [2]:
import sys
import csv
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.keys import Keys
import time
import re
from tqdm.notebook import tqdm, trange
from IPython.display import clear_output
from datetime import datetime

from thefuzz import fuzz
from thefuzz import process

import numpy as np
import pandas as pd

In [8]:
class GoggleMapsScrapper():

    
    def __init__(self, city, rest_names, rest_adres, folder_to_store, show=False):
        # self.PATH = 'chromedriver.exe'
        self.options = Options()
        self.options.add_argument("--lang=en-US")
        if not show:
            self.options.add_argument("--headless")
        self.city = city
        self.restaurant_names = rest_names
        self.restaurant_adres = rest_adres
        self.already_collected = []
        self.errors = []
        self.folder = folder_to_store + '/'
        # Dict for storing dicts with restaurants information
        try:
            self.load_collected_data(folder = self.folder)
            self.existed_names = [i.get('Trip_Advisor_name') for i in  self.restaurants_data]
        except:
            self.restaurants_data = []
            self.existed_names = []
        
    
    def _verbose(message):
        def decorator(func):
            def wrapper(*args, **kwargs):
                print(message)
                #clear_output(wait=False)
                return func(*args, **kwargs) 
            return wrapper
        return decorator
        

    @_verbose(message = 'starting driver...')
    def start_driver(self):
        self.driver = webdriver.Chrome(options=self.options)
        self.driver.set_window_size(600, 800)
        self.driver.implicitly_wait(10)
        
    
    @_verbose(message = 'opening GoogleMaps...')
    def open_googlemaps(self):
        self.driver.get('https://www.google.com/maps/@48.8588589,2.3470599,11z')
        
    
    @_verbose(message = 'changing language...')
    def change_language(self):
        # xpaths
        english = '//*[@id="languages"]/div/div[2]/div[2]/ul[1]/li[11]/a'
        
        
        
        attempts = 0
        while attempts!=3:
            try:
                WebDriverWait(self.driver, timeout=10).until(lambda d: d.find_element_by_class_name('xoLGzf-LgbsSe'))
                self.driver.find_element_by_class_name('xoLGzf-LgbsSe').click()
                WebDriverWait(self.driver, timeout=10).until(lambda d: d.find_element_by_class_name('KY3DLe-languages-QA0Szd-LgbsSe'))
                self.driver.find_element_by_class_name('KY3DLe-languages-QA0Szd-LgbsSe').click()
                WebDriverWait(self.driver, timeout=10).until(lambda d: d.find_element_by_xpath(english))
                self.driver.find_element_by_xpath(english).click()
                break
            except:
                attempts += 1
                
        #time.sleep(1)
        
    
    @_verbose(message = 'accepting cookies...')
    def accept_cookies(self):
        #WebDriverWait(self.driver, timeout=10).until(lambda d: d.find_element_by_class_name('VfPpkd-LgbsSe'))
        self.driver.find_elements_by_class_name('VfPpkd-LgbsSe')[3].click()
        
        
    @_verbose(message = 'searching...')
    def search_restaurant(self, query):
        #WebDriverWait(self.driver, timeout=10).until(lambda d: d.find_element_by_xpath('//*[@id="searchboxinput"]'))
        search_box = self.driver.find_element_by_id('searchboxinput')
        search_box.clear()
        search_box.send_keys(query)
        # time.sleep(3)      !!!
        self.driver.find_element_by_id('searchbox-searchbutton').click()
        #self.driver.find_element_by_class_name('section-layout')
        
    
    @_verbose(message = 'collecting name...')
    def collect_name(self, rest_data):
        # Class
        
        name = 'gm2-headline-5'
        # collecting
        #WebDriverWait(self.driver, timeout=10).until(lambda d: d.find_element_by_xpath(name))
        rest_data['Name'] = self.driver.find_element_by_class_name(name).text
        
    
    @_verbose(message = 'collecting category...')
    def collect_category(self, rest_data):
        # Classes
        category = 'Yr7JMd-pane-hSRGPd'
        i = 0
        while i!=3:
            try:
                WebDriverWait(self.driver, timeout=3).until(lambda d: d.find_element_by_class_name(category))
                raw_info = [i.text for i in x.driver.find_elements_by_class_name(category)]
                info = list(filter(None, raw_info))
                rest_data['Category'] = info[1]
                rest_data['N_reviews'] = info[0]
                break
            except:
                i+=1
    
    @_verbose(message = 'collecting details...')
    def collect_details(self, rest_data):
        # classes
        button_details_c = 'uxOu9-sTGRBb-UmHwN'
        data = 'LQjNnc-p83tee-JNdkSc-ibnC6b'
        button_back_c = 'x05QPc-header-LgbsSe-sSvV8b'
        
        # go to full details
        try:
            # !!! WebDriverWait(self.driver, timeout=3).until(lambda d: d.find_element_by_class_name(button_details_c))
            self.driver.find_element(By.CLASS_NAME, button_details_c).click()
            #time.sleep(2)
            i = 0
            while i!=3:
                    try:
                        # collecting
                        #WebDriverWait(self.driver, timeout=3).until(lambda d: d.find_element_by_class_name(data))
                        rest_data['Details'] = [i.find_element(By.XPATH, './/span').get_attribute('aria-label') for i in self.driver.find_elements(By.CLASS_NAME, data)]
                        time.sleep(1)
                        button = self.driver.find_element(By.CLASS_NAME, button_back_c)
                        button.click()
                        break
                    except:
                        i+=1
                        pass
            # collecting
            #rest_data['Details'] = [i.get_attribute('aria-label') for i in details]
            # going back
            # time.sleep(2)
            # button = self.driver.find_element_by_class_name(button_back_c)
            # button.click()
            time.sleep(1)
            
            # for i in range(0,3):
            #    
                    

            
        except:
            print ('No details')
            rest_data['Details'] = None
        
        
    
    @_verbose(message = 'collecting ratings...')
    def collect_ratings(self, rest_data):
        # cpathes
        ratings_x = '//*[@id="pane"]/div/div[1]/div/div/div[2]/div[2]/div/div[1]/table/tbody/tr'
        n_reviews_but_x = '//*[@id="pane"]/div/div[1]/div/div/div[2]/div[1]/div[1]/div[2]/div/div[1]/span[1]/span/span/span[2]/span[1]/button'
        button_back_x = '//*[@id="pane"]/div/div[1]/div/div/div[1]/div/div/div[1]/span/button'
        
        # Classes
        ratings_c = 'BHOKXe'
        try:
            rating_elements = self.driver.find_elements_by_class_name(ratings_c)
            ratings = [i.get_attribute('aria-label').split(', ') for i in rating_elements]
            rest_data['Ratings']={}
            # time.sleep(2) !!!
            for i in ratings:
                rest_data['Ratings'][i[0]] = (i[1].split(' '))[0]
                n=0
        except:
            rest_data['Ratings'] = None
        
        
    @_verbose(message = 'collecting working hours...')
    def collect_working_hours(self, rest_data):
        
        # classes
        days = 'y0skZc-header'
        hours = 'y0skZc-wZVHld'
        working_hours_but = 'n2H0ue-RWgCYc'
        try:
            # !!! WebDriverWait(self.driver, timeout=5).until(lambda d: d.find_element_by_class_name(working_hours_but))
            self.driver.find_element_by_class_name(working_hours_but).click()

            # !!! WebDriverWait(self.driver, timeout=5).until(lambda d: d.find_element_by_class_name(days))
            week_days = [i.text for i in self.driver.find_elements_by_class_name(days)]
            working_hours = [i.text for i in self.driver.find_elements_by_class_name(hours)]
            self.driver.find_element_by_class_name(working_hours_but).click()

            rest_data['Open_hours'] = {}
            rest_data['Close_hours'] = {}

            for d, t in zip(week_days, working_hours):
                if t == 'Closed': 
                    rest_data['Open_hours'][d] = 'Closed'
                    rest_data['Close_hours'][d] = 'Closed'
                else:
                    rest_data['Open_hours'][d] = t.split('–')[0]
                    rest_data['Close_hours'][d] = t.split('–')[1]
            self.driver.find_element_by_class_name(working_hours_but).click()
        except:
            rest_data['Open_hours'] = None
            rest_data['Close_hours'] = None
        
        
    @_verbose(message = 'collecting popularity times...')
    def collect_popular_times(self, rest_data):
        # xpaths
        day = '//*[@id=":7"]'
        # classes
        popularity = 'O9Q0Ff-NmME3c-Utye1-ZMv3u'
        hours = 'y0skZc-wZVHld'

        # Function that structurize collected popularity data from gmaps
        def modify_popular_times(pop_times):
            # filtering and modifying time data and popularity data
            modified_pop_hours = []
            hour_prev = 'null'
            for i in pop_times:
                add = 0
                # Info in hour when data were scrapped is different from other data
                if 'Currently' in i.split():
                    hour = int(datetime.now().strftime("%H"))
                    pop = i.split()[-2]
                else:
                    # Changing the time format to 24 hour time 
                    if (i.split()[-1] == 'PM.') & (i.split()[-2] != '12'):
                        add = 12
                    # Replacing 12AM to 0
                    elif (i.split()[-1] == 'AM.') & (i.split()[-2] == '12'):
                        add = -12
                    else:
                        add = 0
                    # The days when restaurant is closed
                    if i.split()[0] == '%':
                        hour = None
                        pop = None
                    else:
                        hour = int(i.split()[-2]) + add
                        pop = i.split()[0]
                # Some data in hours can be repeated
                if hour != hour_prev or hour == None:
                    hour_prev = hour
                    modified_pop_hours.append([hour, pop])

            # Creating and filling dictionary. Format - {week_day: {hour: popularity}}
            week_days = ['Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
            day_hours = range(24)
            dic = {}
            for day in week_days:
                dic[day] = {}
            for day in week_days:
                try:
                    for hour in day_hours:
                        if modified_pop_hours[0][0] == hour:
                            dic[day][str(hour)] = modified_pop_hours.pop(0)[1]
                        elif modified_pop_hours[0][0] == None:
                            modified_pop_hours.pop(0)
                            break
                        else:
                            dic[day][str(hour)] = None
                except:
                    pass
            return dic
        
        try:
            # !!! WebDriverWait(self.driver, timeout=1).until(lambda d: d.find_element_by_class_name(popularity))
            popular_times = [i.get_attribute('aria-label') for i in x.driver.find_elements_by_class_name(popularity)]
            rest_data['Popular_times'] = modify_popular_times(popular_times)
        except:
            rest_data['Popular_times'] = None
        
    
    @_verbose('clearing results...')
    def clear_search_results(self):
        # Classes
        clear_search = 'gsst_a'
        
        # !!! WebDriverWait(self.driver, timeout=5).until(lambda d: d.find_element_by_class_name(clear_search))
        self.driver.find_element_by_class_name(clear_search).click()
        
    
    @_verbose('choosing best match from search results...')
    def choose_from_search_results(self, restaurant):
        # Classes
        # !!! time.sleep(1)
        results = 'a4gq8e-aVTXAb-haAclf-jRmmHf-hSRGPd'
        message = 'f4O7db-bSF9Gf-LaJeF-title'
        
        n_times=0
        while n_times!=3:
            try:
                print ("Trying to get search results...")
                # ### WebDriverWait(self.driver, timeout=5).until(lambda d: d.find_element_by_class_name(results))
                results_elements = self.driver.find_elements_by_class_name(results)
                results_text = [i.get_attribute('aria-label') for i in results_elements]
                processed = process.extract(restaurant, results_text, scorer=fuzz.token_set_ratio)
                n_times = 3
                if processed[0][1]>80:
                    index = results_text.index(processed[0][0])
                    print (processed[0][0])
                    results_elements[index].click()
                    status = 'found'
                    
                else:
                    status = 'not found'
                
            except:
                n_times +=1
                print (':( Message with error results?')
                try:
                    not_found_message = self.driver.find_element_by_class_name(message).text
                    print (not_found_message)
                    status = 'not found'
                    
                except:
                    print (':( Something wrong, i"ll beter refresh')
                    self.driver.refresh()
        return status
    
    
    def get_page_type(self):
        # Class
        layout = 'siAUzd-neVct'
        
        WebDriverWait(self.driver, timeout=5).until(lambda d: d.find_element_by_class_name(layout))
        

        layout_arias = list(filter(None, [i.get_attribute('aria-label') for i in self.driver.find_elements_by_class_name(layout)]))
        if any(['route' in i for i in layout_arias]):
            return 'direction'

        elif any(['filters' in i for i in layout_arias]):
            return 'search_results'

        elif any(['reviews' in i for i in layout_arias]):
            return 'page'
        else:
            return 'something_wrong'

    
    def prepare_driver(self):
        self.start_driver()
        self.open_googlemaps()
        self.accept_cookies()
        time.sleep(2)
        self.change_language()
        time.sleep(2)
        clear_output()
    
    
    @_verbose('saving restaurants data...')        
    def save_collected_data(self, folder=""):
        with open(f'{folder}restaurants_data_google_{self.city}.json', 'w') as outfile:
            json.dump(self.restaurants_data, outfile)
        clear_output()
        print ('Done')
        
    
    @_verbose('loading restaurants data...')
    def load_collected_data(self, folder=""):
        with open(f'{folder}restaurants_data_google_{self.city}.json') as inputfile:
            self.restaurants_data = json.load(inputfile)
        clear_output()
        print ('Done')
    
    
    def search_and_get(self):
        for num, rest in enumerate(self.restaurant_names):
            #print (f'duration of driver life is {int((end-start)/60)} mins, {int((end-start)%60)} seconds')
            #print (f'Scrapping {num} from {len(self.restaurant_names)} restaurants names')
            #prep = rest.lower().replace('restauracja', '').replace('restaurant', '')
            #if process.extract(prep, self.existed_names, scorer = fuzz.token_set_ratio)[0][1] < 85:
            if rest.lower() not in self.existed_names:
                
                self.search_restaurant(f'{rest} {self.restaurant_adres[num]} {self.city}')
                time.sleep(10)
                self.driver.find_element_by_class_name('section-layout')
                page_type = self.get_page_type()
                print (page_type)
    
    
    def collect_data(self):
        start = time.time()
        self.prepare_driver()
        
        end =time.time()
        iteration = 0
        for num, rest in enumerate(self.restaurant_names):
            print (f'duration of driver life is {int((end-start)/60)} mins, {int((end-start)%60)} seconds')
            print (f'Scrapping {num} from {len(self.restaurant_names)} restaurants names')
            #prep = rest.lower().replace('restauracja', '').replace('restaurant', '')
            #if process.extract(prep, self.existed_names, scorer = fuzz.token_set_ratio)[0][1] < 85:
            if (rest.lower() not in self.existed_names) & ('apart' not in rest.lower()) :
                print (rest)
                self.search_restaurant(f'{rest} {self.restaurant_adres[num]} {self.city}') #  {self.restaurant_adres[num]} 
                time.sleep(6)
                page_type = self.get_page_type()
                if page_type == 'search_results':
                    status = self.choose_from_search_results(rest)
                    print (status)
                    if status == 'not found':
                        continue
                    else:
                        rest_data = {}
                        rest_data['Trip_Advisor_name'] = rest
                        self.collect_name(rest_data)
                        self.collect_category(rest_data)
                        self.collect_details(rest_data)
                        self.collect_ratings(rest_data)
                        self.collect_working_hours(rest_data)
                        self.collect_popular_times(rest_data)
                        self.restaurants_data.append(rest_data)
                elif page_type == 'page':
                    rest_data = {}
                    rest_data['Trip_Advisor_name'] = rest
                    self.collect_name(rest_data)
                    self.collect_category(rest_data)
                    self.collect_details(rest_data)
                    self.collect_ratings(rest_data)
                    self.collect_working_hours(rest_data)
                    self.collect_popular_times(rest_data)
                    self.restaurants_data.append(rest_data)
                elif page_type == 'direction':
                    self.open_googlemaps()
                    self.errors.append(rest)
                else:
                    self.errors.append(rest)
                    
                #self.clear_search_results()
                
                clear_output()
                end = time.time()
                iteration +=1
                # Restarting driver to avoid 'Oh snap chrome error'. 
                # Not the pretiest solution, but enogh for the purpose
                # Good deeper explanation of the problem here: https://stackoverflow.com/questions/59147010/how-to-address-chrome-displaying-aw-snap-page-while-executing-tests-through 
                if (end - start) > 30*60:
                    self.driver.close()
                    start = time.time()
                    self.prepare_driver() 
                if (iteration%10 == 0) | (num == len(self.restaurants_data) - 1):
                    self.save_collected_data(self.folder)
            else:
                self.already_collected.append(rest)
                
    


In [9]:
city = 'Krakow'
names = ['Sekta', 'Busz', 'Nic Nowego']
adresses = ['Plac Nowy 3', 'Mikolajska 13', 'Berka Joselewicza']

In [10]:
x = GoggleMapsScrapper(city, names, adresses, 'data', show=True)

loading restaurants data...


In [12]:
x.collect_data()

Done


In [13]:
x.restaurants_data

[{'Trip_Advisor_name': 'Sekta',
  'Name': 'Sekta',
  'Category': 'Bar',
  'N_reviews': '112 reviews',
  'Details': ['Serves dine-in',
   'No delivery',
   'No takeaway',
   'Serves great cocktails',
   'Serves alcohol',
   'Serves beer',
   'Serves cocktails',
   'Has dancing',
   'Serves happy-hour drinks',
   'Serves spirits',
   'Serves wine',
   'Has seating',
   'Has gender-neutral toilets',
   'Has toilets',
   'Free Wi-Fi',
   'Casual',
   'Cosy',
   'Good for groups',
   'LGBTQ+ friendly',
   'Transgender safe space',
   'Credit cards'],
  'Ratings': {'5 stars': '94',
   '4 stars': '10',
   '3 stars': '5',
   '2 stars': '1',
   '1 stars': '2'},
  'Open_hours': {'Wednesday': '8PM',
   'Thursday': '8PM',
   'Friday': '8PM',
   'Saturday': '8PM',
   'Sunday': '8PM',
   'Monday': '8PM',
   'Tuesday': '8PM'},
  'Close_hours': {'Wednesday': '3:30AM',
   'Thursday': '5AM',
   'Friday': '6AM',
   'Saturday': '6AM',
   'Sunday': '3:30AM',
   'Monday': '2:30AM',
   'Tuesday': '2:30AM'},
