In [None]:
'''
匯入套件
'''

# 操作 browser 的 API
from selenium import webdriver
from selenium.webdriver.chrome.service import Service

# 處理逾時例外的工具
from selenium.common.exceptions import TimeoutException

# 面對動態網頁，等待某個元素出現的工具，通常與 exptected_conditions 搭配
from selenium.webdriver.support.ui import WebDriverWait

# 搭配 WebDriverWait 使用，對元素狀態的一種期待條件，若條件發生，則等待結束，往下一行執行
from selenium.webdriver.support import expected_conditions as EC

# 期待元素出現要透過什麼方式指定，通常與 EC、WebDriverWait 一起使用
from selenium.webdriver.common.by import By

# 強制等待 (執行期間休息一下)
from time import sleep

# 美麗湯
from bs4 import BeautifulSoup as bs

# 整理 json 使用的工具
import json

#正規表達式
import re

# 發送請求
import requests

# 讀取清單
import csv

# 整理時間格式
from datetime import datetime, timezone

# 執行 command 的時候用的
import os

In [None]:
'''
建立類別
'''

class GoogleMapScraper:
    # 建構子
    def __init__(self):
        # 開啟瀏覽器
        self.driver = self._init_driver()
        # 先開啟 Google Maps
        self.driver.get("https://www.google.com/maps?authuser=0")
        # 等待地圖加載
        sleep(5)
        # 爬取地點失敗log檔名
        self.failed_locations = "failed_locations.csv"
        # 用來存儲 google_2s 碼
        self.checkpoint_file = "google_2s_checkpoint.json"  

    # 開啟瀏覽器
    def _init_driver(self):
        # 啟動瀏覽器工具的選項
        my_options = webdriver.ChromeOptions()
        # my_options.add_argument("--headless")              #不開啟實體瀏覽器背景執行
        my_options.add_argument("--start-maximized")         #最大化視窗
        my_options.add_argument("--incognito")               #開啟無痕模式
        my_options.add_argument("--disable-popup-blocking")  #禁用彈出攔截
        my_options.add_argument("--disable-notifications")   #取消 chrome 推播通知
        my_options.add_argument("--lang=zh-TW")              #設定為正體中文
        my_options.add_argument(f"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36")
        # 加入其他可能有用的隱私設定
        my_options.add_argument('--disable-blink-features=AutomationControlled')
        my_options.add_argument('--disable-extensions')
        my_options.add_experimental_option('excludeSwitches', ['enable-automation'])
        my_options.add_experimental_option('useAutomationExtension', False)
        # 使用 Chrome 的 WebDriver
        return webdriver.Chrome(options=my_options)
    
    # 爬取地點失敗log
    def log_failed_location(self, location, reason):
        with open(self.failed_locations, mode='a', encoding='utf-8', newline='') as file:
            writer = csv.writer(file)
            writer.writerow([location, reason])
    
    # 搜尋地點
    def search_location(self, location: str):
        try:
            # 等待搜尋框出現
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, 'input#searchboxinput'))
            )
            # 尋找網頁中的搜尋框
            input_element = self.driver.find_element(By.CSS_SELECTOR, 'input#searchboxinput')
            
            # 在搜尋框中輸入文字
            input_element.clear()
            input_element.send_keys(location)

            # 睡個幾秒
            sleep(2)

            # 尋找送出按鈕
            clickButton = self.driver.find_element(
                By.CSS_SELECTOR, 'button#searchbox-searchbutton'
            )

            # 送出搜尋
            clickButton.click()

            # 睡個幾秒 等url跳轉為包含1s碼的url
            sleep(5)

            # 回傳包含1s碼的url
            return self.driver.current_url
        
        except:
            # 搜尋失敗紀錄
            print(f"{location}:搜尋失敗")
            self.log_failed_location(location, "搜尋失敗")
            return None
    
    # 取得地點的1s碼與身分驗證的KEI碼
    def extract_google_codes(self, url: str):
        try:
            # 用正則表達式轉換地點的1s碼
            google_1s = re.findall(r'0x\w+', url)
            if len(google_1s) >= 2:
                google_1s_code = f"{google_1s[0]}%3A{google_1s[1]}"
            else:
                return None, None
            
            # 取得身分驗證的KEI碼 在<head><script>中
            html = self.driver.page_source
            soup = bs(html, "lxml")
            # 取得第一個匹配結果
            google_kei_code = next((script.text.split("kEI='", 1)[1].split("'", 1)[0] for script in soup.find_all("script") if "kEI" in script.text), None)

            return google_1s_code, google_kei_code
        except:
            print(f"無法獲得1s碼")
            return None, None
    
    # 儲存當下的 google_2s 因應突發狀況
    def save_google_2s_checkpoint(self, google_2s, location):
        with open(self.checkpoint_file, 'w', encoding='utf-8') as file:
            json.dump({"google_2s": google_2s, "location": location}, file)

    # 接續上次的 google_2s 與地點 如果沒有則返回 None
    def load_google_2s_checkpoint(self):
        if os.path.exists(self.checkpoint_file):
            with open(self.checkpoint_file, 'r', encoding='utf-8') as file:
                data = json.load(file)
                return data.get("google_2s"), data.get("location")
        return None, None    
    
    # 抓取並寫入評論
    def fetch_reviews(self, google_1s, google_kei, location):
        # 讀取上次中斷爬取地點與2s碼
        google_2s, last_location = self.load_google_2s_checkpoint()
        # 如果是新的地點，則重置 google_2s 為空值
        if last_location != location:
            google_2s = ""
        # 不重複的評論ID
        unique_ids = set()
        # 總評論數
        total_reviews = 0
        # 有留言的評論數
        reviews_with_comments = 0
        # 頁數
        count = 1
        # 評論輸出檔名
        output_file = f"reviews_output_{location}.csv"
        
        while True:
            url = f'https://www.google.com/maps/rpc/listugcposts?authuser=0&hl=zh-TW&gl=tw&pb=!1m6!1s{google_1s}!6m4!4m1!1e1!4m1!1e3!2m2!1i10!2s{google_2s}!5m2!1s{google_kei}!7e81!8m9!2b1!3b1!5b1!7b1!12m4!1b1!2b1!4m1!1e1!11m0!13m1!1e2'
            response = requests.get(url)
            response.raise_for_status()
            raw_content = response.text.strip()
            
            # 處理防爬蟲字串
            if raw_content.startswith(")]}'"):
                raw_content = raw_content[4:]
            
            # 轉為json格式
            data = json.loads(raw_content)

            # 處理2s碼
            google_2s = data[1].replace('=', '%3D') if 1 in data else None
            if not google_2s:
                print("沒有下一頁")
                break
            
            # 儲存最新的 google_2s
            self.save_google_2s_checkpoint(google_2s, location)

            # 輸出每一頁評論
            with open(output_file, mode='a', encoding='utf-8', newline='') as file:
                writer = csv.writer(file)
                for review in data[2]:
                    try:
                        user = review[0][1][4][5][0]
                        user_id = review[0][1][4][5][3]
                        user_page = review[0][1][4][2][0]
                        review_id = str(review[0][0])
                        rating = str(review[0][2][0][0])
                        timestamp = datetime.fromtimestamp(review[0][1][2] // 1000000, tz=timezone.utc).strftime('%Y-%m-%d')
                        
                        # 只有評分的評論內容相關變數都要清空
                        try:
                            comment = review[0][2][15][0][0]
                        except:
                            comment = ""

                        try:                   
                            language = review[0][2][14][0]
                        except:
                            language = ""

                        try:
                            translated_comment = review[0][2][15][1][0]
                        except:
                            translated_comment = ""

                        # 評論ID不重複才寫入
                        if review_id not in unique_ids:
                            writer.writerow([user, user_id,review_id, rating, timestamp, comment, language, translated_comment,user_page])
                            unique_ids.add(review_id)
                            total_reviews += 1
                            if comment.strip():
                                reviews_with_comments += 1
                    except:
                        print(f"無法該筆爬取評論")
            print(f"現在在第 {count} 頁")
            print(f"總共存入 {total_reviews} 筆評論")
            print(f"其中有 {reviews_with_comments} 筆評論有內文")
            count += 1
        if total_reviews == 0:
            self.log_failed_location(location, "沒有任何評論")

    def scrape_from_csv(self, input_file):
        with open(input_file, mode='r', encoding='utf-8') as file:
            csv_reader = csv.reader(file)
            locations = list(csv_reader)
        for row in locations:
            if row:
                location = row[0]
                print(f"開始爬取 {location}")
                url = self.search_location(location)
                if url:
                    google_1s, google_kei = self.extract_google_codes(url)
                    if google_1s and google_kei:
                        self.fetch_reviews(google_1s, google_kei, location)
                        print(f'{location} 已爬取完成')
                    else:
                        self.log_failed_location(location, "無法獲得1s碼")

    # 關閉瀏覽器
    def close_driver(self):
        self.driver.quit()

In [None]:
# 開啟瀏覽器
scraper = GoogleMapScraper()

In [None]:
# 實際爬蟲步驟 只需要給location list 的 csv檔
scraper.scrape_from_csv("google_maps_locations.csv")

In [None]:
# 等確定擷取流程結束後，再手動關閉瀏覽器，以便 debug，減少瀏覽器開開關關
scraper.close_driver()