In [None]:
import numpy as np
import pandas as pd
pd.set_option('display.max_columns', None)

import json
import os.path as osp
import os
import platform
import time
import traceback
import logging

from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.proxy import Proxy, ProxyType
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.chrome.service import Service
import re

## All class:

In [None]:
class BrowserInitializer:
    def __init__(self, no_gui=False, proxy=False):
        executable = ''

        if platform.system() == 'Windows':
            print('Detected OS : Windows')
            executable = 'window/chromedriver.exe'
        elif platform.system() == 'Linux':
            print('Detected OS : Linux')
            os.system('chmod +x ./linux/chromedriver')  
            executable = './linux/chromedriver'
        elif platform.system() == 'Darwin':
            print('Detected OS : Mac')
            executable = './chromedriver/chromedriver.exe'
        else:
            raise OSError('Unknown OS Type')

        if not osp.exists(executable):
            raise FileNotFoundError('Chromedriver file should be placed at {}'.format(executable))

        chrome_options = Options()
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        if no_gui:
            chrome_options.add_argument('--headless')
        if proxy:
            chrome_options.add_argument("--proxy-server={}".format(proxy))
        self.browser = webdriver.Chrome(service=Service(executable), options=chrome_options)

        browser_version = 'Failed to detect version'
        chromedriver_version = 'Failed to detect version'
        major_version_different = False

        if 'browserVersion' in self.browser.capabilities:
            browser_version = str(self.browser.capabilities['browserVersion'])

        if 'chrome' in self.browser.capabilities:
            if 'chromedriverVersion' in self.browser.capabilities['chrome']:
                chromedriver_version = str(self.browser.capabilities['chrome']['chromedriverVersion']).split(' ')[0]

        if browser_version.split('.')[0] != chromedriver_version.split('.')[0]:
            major_version_different = True

        print('_________________________________')
        print('Current web-browser version:\t{}'.format(browser_version))
        print('Current chrome-driver version:\t{}'.format(chromedriver_version))
        if major_version_different:
            print('warning: Version different')
            print(
                'Download correct version at "http://chromedriver.chromium.org/downloads" and place in "./chromedriver"')
        print('_________________________________')
        self.browser.get('https://shopee.vn')

In [None]:
class ProductFeatureExtractor:
    """An object to extract products" information from a file containing products" URLs."""

    def __init__(self, no_gui, proxy, schemas_file_path):
        """Browser Initialization
        no_gui: argument to set up a headless browser
        proxy: argument to set up proxy server
        schemas_file_path: file path to product schema"""

        self.browser = BrowserInitializer(no_gui, proxy).browser
        
        self.browser.get('https://shopee.vn/buyer/login?next=https%3A%2F%2Fshopee.vn%2F')    
        time.sleep(2)
        self.login_shopee()
        time.sleep(30)
        f = open(schemas_file_path)
        self.schema = json.load(f)

    def login_shopee(self):
        txtUser = self.browser.find_element(By.XPATH, "//input[@type='text']")
        txtUser.send_keys("") #username

        txtPassword = self.browser.find_element(By.XPATH, "//input[@type='password']")
        txtPassword.send_keys("") #password
        

        time.sleep(5)

        txtPassword.send_keys(Keys.ENTER)
        time.sleep(3)

    def extract_product_feature(self, product_url):
        """Method to extract all product features from a product link
        Input:
        product_url: link to the product page
        schema: file path of json schema of product feature table
        Output: A dictionary which contains all product information except for shop information"""

        result = {}
        for key, val in self.schema['properties'].items():
            result[key] = val['default']

        if not isinstance(product_url, str):
            raise Exception("Invalid Product URL. Must be string type")
        
        self.browser.get(product_url)
        time.sleep(3) 

        self.browser.execute_script("window.scrollTo(0, 500);")

        total_height = int(self.browser.execute_script("return document.body.scrollHeight"))
        for i in range(1, total_height, 300):
            self.browser.execute_script("window.scrollTo(0, {});".format(i))
            time.sleep(1)
            
        full_page_html = self.browser.page_source
        soup = BeautifulSoup(full_page_html, "html.parser")
        html_json_application = soup.find_all("script", type="application/ld+json")
        
        # ** PRODUCT INFORMATION **
        # extract product url
        result["product_url"] = product_url

        # product name
        result["name"] = html_json_application[1].text.split('"name":')[1].split('"')[1]

        # brand name
        brand_tag = html_json_application[1].text.split('"brand":')[1].split('"')[1]
        if brand_tag:
            result["brand"] = brand_tag
        else:
            result["brand"] = np.nan

        # extract industry (ngành hàng)
        industry_tag = html_json_application[2].text.split('"name":')[1:-1]
        industry_tag = [i.split('"')[1] for i in industry_tag]
        result["industry"] = industry_tag[1]
        result["product_type_lv1"] = industry_tag[2]
        if len(industry_tag) < 4:
            result["product_type_lv2"] = np.nan
        else:
            result["product_type_lv2"] = industry_tag[3]

        # extract description
        result["description"] = html_json_application[1].text.split('"description":')[1].split('"')[1]

        # extract number of image in description
        try:
            img_descrip = soup.find_all("div", class_ = "fS94mY")
            for tag in img_descrip:
                img = tag.find("img")
                if img:
                    result['img_description'] += 1
        except:
            result['img_description'] = 0

        # extract product id
        result["product_id"] = html_json_application[1].text.split('"productID":')[1].split('"')[1]

        # extract num of variation (color, size, etc.)
        var_1 = soup.find_all("button", class_ = "hUWqqt _69cHHm")
        var_2 = soup.find_all("button", class_ = "hUWqqt")
        num_var = len(var_1) + len(var_2)
        result["num_variation"] = num_var
        if num_var == 0:
            result["num_variation"] = 1

        # extract number of sold
        result["num_sold"] = soup.find_all("div", class_="e9sAa2")[0].text

        # extract number of stock
        stock_tag = soup.find_all("div", string=re.compile("sản phẩm có sẵn|products available"))
        result["num_in_stock"] = stock_tag[0].text.split()[0]

        # extract shopee mall or not
        header_mall = soup.find_all("a", class_="ofs-header__page-name")
        mall = 0
        if len(header_mall) != 0:
            mall = 1
        result["is_mall"] = mall

        # extract insurance information
        insurance_tag = soup.find_all("section", class_ = "flex rY0UiC")
        if insurance_tag:
            for n in insurance_tag:
                insurance_label = n.find("div", class_ =  "flex items-center")
                if insurance_label is not None:
                    result["insurance"] = n.find("div", class_=None).text
        
        try:
            price_tag = html_json_application[1].text.split('"price":')[1].split('"')[1]
            result["price"] = price_tag
            result["price_min"] = price_tag
            result["price_max"] = price_tag
        except:
            result['price_max'] = html_json_application[1].text.split('"highPrice":')[1].split('"')[1]
            result['price_min'] = html_json_application[1].text.split('"lowPrice":')[1].split('"')[1]
            result['price'] = result['price_min']

        # extract product image - maximize 15 image
        result_img = []
        html_img = [i['src'] for i in soup.find_all(class_= "_7D4JtJ")]
        result_img.extend(html_img)
        if self.browser.find_elements(By.XPATH, "//button[@class='shopee-icon-button LFMWYe _41JS8N']"):
            for i in range (3):
                for i in range(5):
                    self.browser.find_element(By.XPATH, "//button[@class='shopee-icon-button LFMWYe _41JS8N']").click()
                    time.sleep(1)
                soup = self.browser.page_source
                soup = BeautifulSoup(soup, "html.parser")
                html_img = [i['src'] for i in soup.find_all(class_= "_7D4JtJ")]
                result_img.extend(html_img)
        result_img = tuple(set(result_img))
        result["num_img"] = str(result_img)

        #product rating
        try:
            rating = html_json_application[1].text.split('"aggregateRating":{"@type":"AggregateRating"',)
            result['average_rating'] = rating[2].split('"ratingValue":')[1].split('"')[1]
        except:
            result['average_rating'] = np.nan

        #product num rating
        try:
            result['num_rating'] = rating[2].split('"ratingCount":')[1].split('"')[1]
        except:
            result['num_rating'] = np.nan

        # #shop rating
        try:
            shop_rate = rating[1].split('"ratingValue":')[1].split('"')[1]
            result['shop_rating'] = shop_rate
        except:
            result['shop_rating'] = np.nan

        #shop num rating 
        try:
            shop_rate_count = rating[1].split('"ratingCount":')[1].split('"')[1]
            result['shop_num_rating'] = shop_rate_count
        except:
            result['shop_num_rating'] = np.nan
        
        # extract shop url
        shop_link_button = soup.find_all("div", class_="Uwka-w")
        shop_link_tail = shop_link_button[0].find("a")["href"]
        result["shop_url"] = f"https://shopee.vn{shop_link_tail}"

        # extract shop name
        result["shop_name"] = soup.find("div", class_ = "VlDReK").text

        # extract shop id
        result['shop_id'] = html_json_application[1].text.split('"url":')[2].split('"')[1].split('/')[-1]

        # extract shop follower
        shop_info = soup.find_all("span", class_ = "Xkm22X")
        result["shop_follower"] = shop_info[0].text.split()[0]

        # extract shop response rate
        result["shop_response_rate"] = shop_info[1].text.split()[0]

        return result
    def crawl_multiple_products(self, product_link_df, save_path, max_prod):
        """Method to crawl multiple products from a csv file containing products' URLs
        product_link_df: DataFrame that stores products' links and names
        save_path: file path to save output
        max_prod: maximum number of products to crawl
        
        Output: a dataframe and a csv file in which each row is a product with its product features"""
        
        start_time = time.time()
        if max_prod >= product_link_df.shape[0]:
            df = product_link_df
        else:
            df = product_link_df.loc[:max_prod]
        url_list = df["product_url"]
        print(url_list)
        prod_names = df["name"]
        print(prod_names)
        output_df = pd.DataFrame(columns=list(self.schema['properties'].keys()))
        print(output_df)
        for url, name in zip(url_list, prod_names):
            try:
                try:
                    print("Crawling features for product {0}".format(name))
                    result = self.extract_product_feature(url)
                    print(result)
                    output_df = output_df.append(result, ignore_index=True)
                    print(output_df)
                    print(f"Execution time: {time.time() - start_time}")
                except KeyboardInterrupt:  # in case you lose your patience
                    print("Keyboard Interrputed")
                    break
            except Exception as e:
                logging.error(traceback.format_exc())
        output_df.to_csv(save_path)
        return output_df

In [None]:
product_schema = {
    "type": "object",
    "properties": {
        "product_id":{
            "type": "str",
            "default": np.nan
        },
        "product_url": {
            "type": "str",
            "default": np.nan
        },
        "name": {
            "type": 'str',
            "default": np.nan
        },
        "brand": {
            "type": 'str',
            "default": np.nan
        },
        "industry": {
            "type": 'str',
            "default": np.nan
        },
        "product_type_lv1": {
            "type": 'str',
            "default": np.nan
        },
        "product_type_lv2": {
            "type": 'str',
            "default": np.nan
        },
        "description": {
            "type": 'str',
            "default": np.nan
        },
        "num_variation": {
            "type": 'int',
            "default": 0
        },
        "num_sold": {
            "type": 'float',
            "default": np.nan
        },
        "num_in_stock": {
            "type": 'float',
            "default": np.nan
        },
        "shop_url": {
            "type": 'str',
            "default": np.nan
        },
        "is_mall": {
            "type": 'bool',
            "default": 0
        },
        "insurance": {
            "type": 'str',
            "default": np.nan
        },
        "price": {
            "type": 'float',
            "default": np.nan
        },
        "price_min": {
            "type": 'float',
            "default": np.nan
        },
        "price_max": {
            "type": 'float',
            "default": np.nan
        },
        "average_rating": {
            "type": 'float',
            "default": 0
        },
        "num_rating": {
            "type": 'float',
            "default": 0
        },
        "img_description": {
            "type": "int",
            "default": 0
        },
        "shop_id": {
            "type": "str",
            "default": np.nan
        },
        "shop_name": {
            "type": "str",
            "default": np.nan
        },
        "shop_follower": {
            "type": "str",
            "default": np.nan
        },
        "shop_response_rate": {
            "type": "str",
            "default": np.nan
        },
        "num_img": {
            "type": "int",
            "default": 0
    }
    }
}

In [None]:
# if code error, create directory by yourself first
with open(r"..\schemas\product_schema.json", "w", encoding="utf-8") as outfile:
    json.dump(product_schema, outfile)

In [None]:
def read_df_from_dir(directory):
    try:
        os.chdir(directory)
    except FileNotFoundError:
        print('Invalid directory.')
    except NotADirectoryError:
        print('Input is a file path, not a directory.')
    except SyntaxError:
        print('Invalid directory syntax. Change "\" to "\\" or "/".')
    path = os.getcwd()
    files = os.listdir(path)
    
    df_list = []
    sheet_type = ['xlsx', 'xls', 'xlsm', 'xlsb', 'odf', 'ods']
    for f in files:
        file_type = f.split('.')[-1]
        if file_type == 'csv':
            df = pd.read_csv(f)
            df_list.append(df)
            print(f'Read csv file {f}.')
        elif file_type in sheet_type:
            sheets = pd.ExcelFile(f).sheet_names
            if len(sheets) > 1:
                dfs = []
                for s in sheets:
                    dfs.append(pd.read_excel(f, sheet_name=s))
                df_list.append(dfs)
                print(f'Read {file_type} file {f} which has multiple sheets.')
            else:
                df_list.append(pd.read_excel(f))
                print(f'Read {file_type} file {f} which has one sheet.')
        else:
            print(f'Non sheet type detected: {f}.')
    return df_list, files

In [None]:
product_crawler = ProductFeatureExtractor(no_gui=False, proxy=False, schemas_file_path="schemas/product_schema.json")

In [None]:
directory = "product link\\điện tử"
dfs, file_names = read_df_from_dir(directory)
keyword = []
for n in file_names:
    prod_name = n.split('_')[0]
    keyword.append(prod_name)
    
keyword

In [None]:
count = 1
for df, kw in zip(dfs, keyword):
    print("\n")
    print("-----------------------------------------------------------")
    print("{0}. Crawling products for {1}...".format(count, kw))
    # print("-----------------------------------------------------------")
    product_crawler.crawl_multiple_products(product_link_df=df,
                                            save_path=r"C:\Users\LENOVO\OneDrive - National Economics University\Manh and DSEB\DSLab\Mapping & Evaluate Ecommerce Site\shopee_crawl\product features\{0}.csv".format(kw), max_prod=400)
    count += 1