In [105]:
from bs4 import BeautifulSoup
from selenium import webdriver
import pyautogui
import re
from bs4 import BeautifulSoup
import sys
sys.path.append('..')
import constants.constants as const
import constants.file_handler_constants as fh
from constants.attraction_constants import *

from packages.attraction.Attraction import *
from packages.file_handler_package.file_handler import *

import os
import glob
import time
import pandas as pd
import numpy as np
from dotenv import load_dotenv, dotenv_values 

from selenium.webdriver import Remote, ChromeOptions
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.actions.wheel_input import ScrollOrigin
from selenium.webdriver import ActionChains

from seleniumwire import webdriver
from selenium.webdriver.edge.service import Service
from webdriver_manager.microsoft import EdgeChromiumDriverManager
from selenium.webdriver.edge.options import Options


from selenium.webdriver.support.ui import Select
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

In [106]:
def create_attraction_df(attraction: Attraction) -> pd.DataFrame:
    attraction_dict = {
        'name' : [attraction.get_name()],
        'description' : [attraction.get_description()],
        'latitude' : [attraction.get_latitude()],
        'longitude' : [attraction.get_longitude()],
        'imgPath' : [attraction.get_imgPath()],
        'phone': [attraction.get_phone()],
        'website': [attraction.get_website()],
        'openingHour': [attraction.get_openingHour()],

        # location
        'address' : [attraction.get_location().get_address()],
        'province' : [attraction.get_location().get_province()],
        'district' : [attraction.get_location().get_district()],
        'subDistrict' : [attraction.get_location().get_subDistrict()],
        'province_code' : [attraction.get_location().get_province_code()],
        'district_code' : [attraction.get_location().get_district_code()],
        'sub_district_code' : [attraction.get_location().get_sub_district_code()],

        # rating
        'score' : [attraction.get_rating().get_score()],
        'ratingCount' : [attraction.get_rating().get_ratingCount()],
    }

    attraction_df = pd.DataFrame(attraction_dict)
    
    return attraction_df.copy()

In [107]:
def scrape_img(attraction_page_driver: webdriver) -> list[str]:
    
    res_img = []

    possible_click_img_xpath = [
        '//*[@id="AR_ABOUT"]/div[2]/div/div/div/div/div[1]/div/div/div/div[1]/div/div[7]/button',
        '//*[@id="AR_ABOUT"]/div/div/div/div/div/div[1]/div/div/div/div[1]/div/div[7]/button'
    ]

    # find button to click popup image section
    # and save url to same site with image popup section -> use with new webdriver to find images with auto-retry
    link_to_img_section = ""
    for cur_xpath in possible_click_img_xpath:
        try:
            WebDriverWait(attraction_page_driver, 1).until(EC.visibility_of_element_located((By.XPATH, cur_xpath)))
            click_img_btn = attraction_page_driver.find_element(By.XPATH, cur_xpath)
            click_img_btn.click()

            # wait for page to load (change url from 'current url' to 'website with all reviewed images')
            WebDriverWait(attraction_page_driver, 2).until(EC.url_changes(attraction_page_driver.current_url))
            link_to_img_section = attraction_page_driver.current_url
            break
        except Exception as e:
                pass
    
    if(not len(link_to_img_section)):
        return ['']

    # continue scrape image with new driver with url 'link_to_img_section'   
    while(True):
        # if(cnt_retry == 10):
        #     print("max retry for scrape single attraction ...")
        #     break

        # formulate the proxy url with authentication
        proxy_url = f"http://{os.environ['proxy_username']}:{os.environ['proxy_password']}@{os.environ['proxy_address']}:{os.environ['proxy_port']}"
        
        # set selenium-wire options to use the proxy
        seleniumwire_options = {
            "proxy": {
                "http": proxy_url,
                "https": proxy_url
            },
        }

        # set Chrome options to run in headless mode
        options = Options()
        options.add_argument("start-maximized")
        # options.add_argument("--headless=new")
        options.add_experimental_option(
            "prefs", {"profile.managed_default_content_settings.images": 2}
        )

        # initialize the Chrome driver with service, selenium-wire options, and chrome options
        img_driver = webdriver.Edge(
            service=Service(EdgeChromiumDriverManager().install()),
            seleniumwire_options=seleniumwire_options,
            options=options
        )
        
        img_driver.quit()
        break
    
    return res_img.copy()
       

In [108]:
def scrape_location(attraction_page_driver: webdriver, attraction: Attraction, province_th: str):
    pass

In [109]:
def scrape_single_attraction(link_to_attraction: str, province_th: str) -> Attraction:
    
    attraction = Attraction()
    cnt_retry = 0
    
    while(True):
        # if(cnt_retry == 10):
        #     print("max retry for scrape single attraction ...")
        #     break

        # formulate the proxy url with authentication
        proxy_url = f"http://{os.environ['proxy_username']}:{os.environ['proxy_password']}@{os.environ['proxy_address']}:{os.environ['proxy_port']}"
        
        # set selenium-wire options to use the proxy
        seleniumwire_options = {
            "proxy": {
                "http": proxy_url,
                "https": proxy_url
            },
        }

        # set Chrome options to run in headless mode
        options = Options()
        options.add_argument("start-maximized")
        # options.add_argument("--headless=new")
        options.add_experimental_option(
            "prefs", {"profile.managed_default_content_settings.images": 2}
        )

        # initialize the Chrome driver with service, selenium-wire options, and chrome options
        attraction_page_driver = webdriver.Edge(
            service=Service(EdgeChromiumDriverManager().install()),
            seleniumwire_options=seleniumwire_options,
            options=options
        )
        
        # retry in case of web restrictions and some elements not loaded
        try:
            print("scrape single attraction...")
            print("for attraction : ", link_to_attraction)
            attraction_page_driver.get(link_to_attraction)

            print("debug scrape_single_attraction: common component section")
            WebDriverWait(attraction_page_driver, 2).until(EC.visibility_of_element_located((By.CLASS_NAME, 'IDaDx')))

            
        except Exception as e:
            cnt_retry += 1
            attraction_page_driver.quit()
            print("retry single attraction...")
            continue

        # find name
        name = ""
        try:
            WebDriverWait(attraction_page_driver, 1).until(EC.visibility_of_element_located((By.XPATH, '//*[@id="lithium-root"]/main/div[1]/div[2]/div[1]/header/div[3]/div[1]/div/h1')))
            name_element = attraction_page_driver.find_element(By.XPATH, '//*[@id="lithium-root"]/main/div[1]/div[2]/div[1]/header/div[3]/div[1]/div/h1')
            name = name_element.text

        except Exception as e:
            print("can't find name")

        print("name -> ", name)

        # find description
        description = ""
        try:
           pass

        except Exception as e:
            pass

        print("description -> ", description)
        
        # find rating
        rating = 0
        ratingCount = 0
        try:
            WebDriverWait(attraction_page_driver, 1).until(EC.visibility_of_element_located((By.XPATH, '//*[@id="lithium-root"]/main/div[1]/div[2]/div[2]/div[2]/div/div[1]/section[1]/div/div/div/div/div[1]/div[1]/a/div')))
            score_element = attraction_page_driver.find_element(By.XPATH, '//*[@id="lithium-root"]/main/div[1]/div[2]/div[2]/div[2]/div/div[1]/section[1]/div/div/div/div/div[1]/div[1]/a/div')
            score_text_list = score_element.get_attribute('aria-label').split(' ')
            for Idx in range(1, len(score_text_list)):
                # set rating
                if(score_text_list[Idx - 1] == "คะแนน"):
                    rating = float(score_text_list[Idx])

                elif(score_text_list[Idx - 1] == "รีวิว"):
                    ratingCount = float(score_text_list[Idx].replace(',', ''))

        except Exception as e:
            print("can't find rating and ratingCount")

        print("rating --> ", rating)
        print("ratingCount --> ", ratingCount)
        

        # find opening hours
        
        # scrape img_path
        img_path = scrape_img(attraction_page_driver)
        print("cur img path -> ", img_path)

        attraction_page_driver.quit()
        break

In [110]:
def get_all_url_by_page(query_url: str) -> list[str]:

    res_url_by_page = []

    cnt_retry = 0
    
    while(True):
        
        # if(cnt_retry == 10):
        #     print("max retry for scrape data by page ...")
        #     break

        # formulate the proxy url with authentication
        # os.environ['proxy_port']
        proxy_url = f"http://{os.environ['proxy_username']}:{os.environ['proxy_password']}@{os.environ['proxy_address']}:{os.environ['proxy_port']}"
        
        # set selenium-wire options to use the proxy
        seleniumwire_options = {
            "proxy": {
                "http": proxy_url,
                "https": proxy_url
            },
        }

        # set Chrome options to run in headless mode
        options = Options()
        options.add_argument("start-maximized")
        # options.add_argument("--headless=new")
        options.add_experimental_option(
            "prefs", {"profile.managed_default_content_settings.images": 2}
        )
      
        # initialize the Chrome driver with service, selenium-wire options, and chrome options
        driver = webdriver.Edge(
            service=Service(EdgeChromiumDriverManager().install()),
            seleniumwire_options=seleniumwire_options,
            options=options
        )
        
        # just check for ip
        # print("just check for ip :")
        # driver.get("https://httpbin.io/ip")
        # print(driver.page_source)

        # find group of restaurant on the nth page
        all_attractions_card = []

        # retry in case of web restrictions and some elements not loaded
        try:
            driver.get(query_url)
            # scroll and wait for some msec
            driver.execute_script('window.scrollBy(0, document.body.scrollHeight)')

            print("check current page url --> ", driver.current_url)

            # wait for div (each attraction section) to be present and visible
            print("debug get_all_url_by_page: attraction by one page section")
            WebDriverWait(driver, 1).until(EC.visibility_of_element_located((By.CLASS_NAME, 'XJlaI')))
            all_attractions_card = driver.find_elements(By.CLASS_NAME, 'XJlaI')

        except Exception as e:
            print("retry find all_restaurants_card ...")
            cnt_retry += 1
            driver.quit()
            continue

        for cur_attraction_card in all_attractions_card:
            WebDriverWait(driver, 1).until(EC.presence_of_element_located((By.TAG_NAME, 'a')))
            cur_attraction_url = cur_attraction_card.find_element(By.TAG_NAME, 'a').get_attribute('href')
            
            WebDriverWait(driver, 1).until(EC.visibility_of_element_located((By.CLASS_NAME, 'BKifx')))
            check_text = cur_attraction_card.find_element(By.CLASS_NAME, 'BKifx').text
            
            # check if cuurent card is for attraction ?
            is_attraction = True
            not_attraction_keyword = ['ทัวร์', "สปา", "กิจกรรมทางวัฒนธรรม", 'ชั้นเรียน', 'รถรับส่ง', 'อุปกรณ์ให้เช่า', 'ร้านขายของ']
            for cur_check_word in not_attraction_keyword:
                if(cur_check_word in check_text):
                    is_attraction = False
                    break
            
            if(not is_attraction):
                # print("not prn : ", cur_attraction_url)
                continue

            print("prn : ", cur_attraction_url)
            res_url_by_page.append(cur_attraction_url)

        driver.quit()
        break

    return res_url_by_page.copy()

In [111]:
def scrape_attraction_by_province(province_url: str, province: str) -> pd.DataFrame:
    # res_attraction_df = pd.DataFrame()
    res_attraction_df = create_attraction_df(Attraction())
    
    cnt_for_debug = 0

    while(True):
        if(cnt_for_debug == 1):
            break
        cnt_for_debug += 1
        
        print("scraping attraction | province --> %s | page --> %s" % (province, cnt_for_debug))

        try:
            # get url of to all attraction in current page
            all_url_by_page = get_all_url_by_page(query_url=province_url)
        
            # use data from 'res_get_data_by_page' to retrive data of specific attraction
            for cur_attraction_url in all_url_by_page:
                print("g")
                # continue scraping data for a specific resgtaurant
                cur_attraction = scrape_single_attraction(
                    link_to_attraction = cur_attraction_url,
                    province_th = province
                )
                
                # create data frame represent data scrape from current attraction card
                cur_attraction_df = create_attraction_df(attraction=cur_attraction)

                # concat all data frame result
                res_attraction_df = pd.concat([res_attraction_df, cur_attraction_df])
        
        except Exception as e:
            pass

    return res_attraction_df.iloc[1:, :].copy()

In [112]:
# create directory 'res_restaurant_scraping'
createDirectory(fh.STORE_ATTRACTION_SCRAPING, 'res_attraction_scraping')

# *** select one province from 'ALL_PROVINCE_TRIPADVISOR_DATA'
# *** so, change "Idx_of_region" everytime when scrape another province
Idx_of_region = 0
cur_region_data = ALL_PROVINCE_TRIPADVISOR_DATA[Idx_of_region]

cur_province_en = cur_region_data[0]
cur_province_th = cur_region_data[1]
cur_province_url = cur_region_data[2]

# get dataframe result of all attraction in current province
cur_res_allAttractions_df = scrape_attraction_by_province(
    province_url = cur_province_url,
    province = cur_province_th
)
# remove duplicate restaurant 
cur_res_allAttractions_df.drop_duplicates(subset=['name'], inplace=True)
# set new index
cur_res_allAttractions_df.set_index(['name'], inplace=True)

# save result dataframe to .csv
res_file_name = 'res_attraction_%s.csv' % (cur_province_en)
res_path = os.path.join(fh.STORE_ATTRACTION_SCRAPING, 'res_attraction_scraping', res_file_name) 
cur_res_allAttractions_df.to_csv(res_path, encoding="utf-8")

Directory res_attraction_scraping created successfully
scraping attraction | province --> สุรินทร์ | page --> 1
check current page url -->  https://th.tripadvisor.com/Attractions-g2099297-Activities-c47-Surin_Province.html
debug get_all_url_by_page: attraction by one page section
prn :  https://th.tripadvisor.com/Attraction_Review-g303923-d4322926-Reviews-Surin_National_Museum-Surin_Surin_Province.html
prn :  https://th.tripadvisor.com/Attraction_Review-g303923-d4322931-Reviews-Ban_Tha_Sawang_Silk_Weaving_Village-Surin_Surin_Province.html
prn :  https://th.tripadvisor.com/Attraction_Review-g303923-d6379888-Reviews-Phanom_Sawai_Forest_Park-Surin_Surin_Province.html
prn :  https://th.tripadvisor.com/Attraction_Review-g303923-d4322930-Reviews-City_Pillar_Shrine-Surin_Surin_Province.html
prn :  https://th.tripadvisor.com/Attraction_Review-g303923-d4322932-Reviews-Wat_Burapharam-Surin_Surin_Province.html
prn :  https://th.tripadvisor.com/Attraction_Review-g2237594-d4322582-Reviews-Prasat_Si