In [None]:
# !python3 -m pip install -r requirements.txt

[Example Google Style Python Docstrings](https://sphinxcontrib-napoleon.readthedocs.io/en/latest/example_google.html)

In [25]:
import bs4
import csv
import collections
import logging
import re
import random
import requests
import time
import os

from tqdm import tqdm
from fake_useragent import UserAgent
from fake_headers import Headers
from webdriver_manager.chrome import ChromeDriverManager

# from selenium import webdriver
from seleniumwire import webdriver
from selenium.webdriver import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
# from selenium.webdriver.remote.remote_connection import LOGGER
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.chrome.service import Service as ChromeService

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('wb')
# LOGGER.setLevel(logging.WARNING)

ParseResult = collections.namedtuple(
    "ParseResult",
    (
        "seller",
        "title",
        "url",
        "review_count",
        "current_price",
        "category_0",
        "category_1",
        "category_2",
    ),
)

HEADERS = (
    "brand",
    "title",
    "url",
    "review_count",
    "current_price",
)

In [48]:
class Parser():
    """
    Parser is an utility class to crawl raw data from the e-commerce website 
    (e.g. WildBerries): item's brand name, URL, price, etc.

    This class uses selenium package in order to automate web browser interaction.

    Example:
        >>> my_parser = Parser() 
        >>> my_parser.run("https://www.wildberries.ru/catalog/zhenshchinam/odezhda/bryuki-i-shorty")
        >>> my_parser.save_result()
    """    

    def __init__(self, url: str):
        self.driver = None
        self.url = url
        self.result = []

    def _start(self):
        """Creates an instance of Chrome WebDriver """ 
        
        # Configure the webdriver
        ua = UserAgent()
        ua = ua.random
        options = Options()
        options.add_argument(
            "--disable-blink-features=AutomationControlled")
        options.add_argument("--headless")
        options.add_argument(f"user-agent={ua}")
        # avoid loading images
        # options.experimental_options["prefs"] = {
        #     "profile.managed_default_content_settings.images": 2
        # }

        self.driver = webdriver.Chrome(
            service=ChromeService(
                ChromeDriverManager().install()),
            options=options,
            chrome_options=options)


    def stop(self):
        """ Closes the browser that was created during the class initialization."""        
        self.driver.close()

    def _load_page(self):
        """Returns page contents from a given URL.

        Args:
            url (str): the URL address of page

        Returns:
            str: HTML source code of page
        """        
        # Load the URL
        self.driver.get(self.url)

        # Fake user activity to load JavaScript content
        for x in range(5):
            actions = ActionChains(self.driver)
            actions.send_keys(Keys.SPACE)
            actions.perform()
            time.sleep(.5)

        page = self.driver.page_source
        return page

    def _parse_page(self, page: str):
        """Parse HTML page and process each item's block.

        Args:
            page (str): HTML source code of page
        """
        soup = bs4.BeautifulSoup(page, 'lxml')
        # Find all div-blocks with class "product-card j-card-item"
        container = soup.select("div.product-card.j-card-item")
        if len(container) == 0:
            logger.error("Container is empty!")
            return

        for block in container:
            self._parse_block(block=block)

    def _get_seller_name(self, block):
        brand_block = block.select_one("div.product-card__brand")
        if not brand_block:
            logger.error("no brand_block")
            return

        brand_name_block = brand_block.select_one("p.product-card__brand-name")

        seller_name = brand_name_block.select_one("span.brand-name")
        if not seller_name:
            logger.error("no brand_name")
            return
        return seller_name.text.strip()

    def _get_title(self, block):
        brand_block = block.select_one("div.product-card__brand")
        if not brand_block:
            logger.error("no brand_block")
            return

        brand_name_block = brand_block.select_one("p.product-card__brand-name")
        title = brand_name_block.select_one("span.goods-name")
        if not title:
            logger.error("no title")
            return
        return title.text.replace('/', '').strip()

    def _get_url(self, block):
        url_block = block.select_one("a.product-card__main.j-card-link")
        if not url_block:
            logger.error("no url block")
            return

        item_url = url_block.get("href")
        if not item_url:
            logger.error("no href")
            return
        return item_url

    def _get_review_count(self, block):
        return re.findall(r'\d+', "".join(block.select_one("span.product-card__count").text.split()))[0]

    def _get_current_price(self, block):
        current_price = None
        try:
            current_price = re.findall(
                r'\d+', "".join(block.select_one("ins.price__lower-price").text.split()))[0]
        except AttributeError:
            try:
                current_price = re.findall(
                    r'\d+', "".join(block.select_one("span.price__lower-price").text.split()))[0]
            except AttributeError:
                print("no low price found!")
        return current_price
    
    # TODO: should handle the exception when it cannot find the `<del> ... </del>`` block
    def _get_old_price(self, block):
        return re.findall(r'\d+', "".join(block.select_one("del").text.split()))[0]

    def _get_category(self, level: int = 0):
        """Returns the item's category based on the input level.
        E.g., if the URL is https://www.wildberries.ru/catalog/zhenshchinam/odezhda/futbolki-i-topy,
        it removes the part with `https://www.wildberries.ru/catalog/`
        and processes only the categories tree-like structure:
        `zhenshchinam/odezhda/futbolki-i-topy`. 

        In the given example, the 0-level category is `zhenshchinam`,
        the 1-level category is `odezhda`,
        the 2-level category is `futbolki-i-topy`.

        Args:
            level (int, optional): The level of category where 0 is 
            the main, parent category. Defaults to 0.

        Returns:
            _type_: _description_
        """        
        url = os.path.normpath(self.url)
        # Exclude this part: `https://www.wildberries.ru/catalog/`
        categories = url.split(os.sep)[3:]

        # Clean up 
        if "?sort=popular&page=" in categories[-1]:
            categories[-1] = re.findall(r"(.*)\?sort=popular&page=", categories[-1])[0]

        assert level >= 0, "Level must be positive"

        if (level >= len(categories)):
            # new_level = len(categories) - 1
            # print(f"Setting the largest possible level: {new_level}.")
            # level = new_level
            return "Nan"

        return categories[level]
    
    def _parse_block(self, block):
        item_seller_name = self._get_seller_name(block)
        item_title = self._get_title(block)
        item_url = self._get_url(block)
        item_review_count = self._get_review_count(block)
        item_current_price = self._get_current_price(block)
        # item_old_price = self._get_old_price(block)
        item_category_0 = self._get_category(level=0)
        item_category_1 = self._get_category(level=1)
        # sometimes there is no such category, so we get Nan
        item_category_2 = self._get_category(level=2)

        self.result.append(ParseResult(
            seller=item_seller_name,
            title=item_title,
            url=item_url,
            review_count=item_review_count,
            current_price=item_current_price,
            category_0=item_category_0,
            category_1=item_category_1,
            category_2=item_category_2
        ))

    def save_result(self, path: str = "wildberries.csv"):
        with open(path, "a+") as f:
            writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
            # writer.writerow(HEADERS)
            for item in self.result:
                writer.writerow(item)

    def run(self):
        self._start()
        page = self._load_page()
        self._parse_page(page)


Example Links: 
- 1st page: `https://www.wildberries.ru/catalog/zhenshchinam/odezhda/bryuki-i-shorty#c17539910` or `https://www.wildberries.ru/catalog/zhenshchinam/odezhda/bryuki-i-shorty?sort=popular&page=1`
- 2nd page: `https://www.wildberries.ru/catalog/zhenshchinam/odezhda/bryuki-i-shorty?sort=popular&page=2`
- 3rd page: `https://www.wildberries.ru/catalog/zhenshchinam/odezhda/bryuki-i-shorty?sort=popular&page=7`

If we send a request to this page `https://www.wildberries.ru/catalog/zhenshchinam/odezhda/bryuki-i-shorty?sort=popular&page=101`, we will get a `404` status code, so we can parse only the first 100 pages on WB website.

## Launch

In [None]:
# WB_MAX_PAGES = 100 # please do not change this parameter
WB_MAX_PAGES = 1

PAGE_URL = "https://www.wildberries.ru/catalog/zhenshchinam/odezhda/futbolki-i-topy"
for page_num in tqdm(range(1, WB_MAX_PAGES + 1)):
    # create new random user-agent every time 
    # to avoid being banned for parsing
    my_parser = Parser(PAGE_URL) 
    my_parser.run()
    my_parser.save_result()
    time.sleep(random.randint(1, 3))
my_parser.stop()

In [30]:
# page_urls = []
# with open("conf/wildberries-categories-test.txt", "r") as categories_file:
#     category_urls = categories_file.readlines()
#     for category_url in category_urls:
#         category_url = category_url.rstrip()
#         subcategory = os.path.basename(category_url)
#         with open(f"conf/wildberries-subcategories/{subcategory}.txt") as subcategory_file:
#             subcategory_paths = subcategory_file.readlines()
#             for subcategory_path in subcategory_paths:
#                 subcategory_path = subcategory_path.rstrip()
#                 PAGE_URL = "https://www.wildberries.ru" + subcategory_path
#                 logger.info(f"PAGE_URL = {PAGE_URL}") 
#                 page_urls.append(PAGE_URL)

page_urls = [
    "https://www.wildberries.ru/catalog/muzhchinam/odezhda/dzhinsy",
    "https://www.wildberries.ru/catalog/muzhchinam/odezhda/dzhempery-i-kardigany",
    "https://www.wildberries.ru/catalog/muzhchinam/odezhda/bryuki-i-shorty/shorty",
]

In [None]:
# WB_MAX_PAGES = 100 # please do not change this parameter
WB_MAX_PAGES = 5
import os
clear = lambda: os.system('clear')

for page_url in page_urls:
    for page_num in tqdm(range(1, WB_MAX_PAGES + 1)):
        # create new random user-agent every time 
        # to avoid being banned for parsing
        
        url = f"{page_url}?sort=popular&page={page_num}"
        my_parser = Parser(url) 
        print(f"Processing URL: {url}")
        my_parser.run()
        my_parser.save_result("wildberries-test.csv")
        time.sleep(random.randint(1, 3))
        my_parser.stop()
        clear()

## Categories

In [None]:
import os

parser = Parser()
parser._start()

def get_all_links_from_page(page_url: str):
    
    page = parser._load_page(page_url)
    # grab = requests.get(page_url)
    soup = bs4.BeautifulSoup(page, 'lxml')
    category = os.path.basename(page_url) # get the category name
    
    with open(f"conf/wildberries-subcategories/{category}.txt", "w") as f:
        container = soup.select("ul.menu-catalog__list-2.maincatalog-list-2")
        if len(container) == 0:
            logger.error("Container is empty")
        else:
            for block in container:
                menu_item = block.select("a.j-menu-item")
                for item in menu_item:
                    link = item.get("href")
                    
                    f.write(link + '\n')


with open("conf/wildberries-categories.txt", "r") as f:
    category_urls = f.readlines()
    for url in category_urls:
        get_all_links_from_page(url)
        time.sleep(random.randint(1, 4))

parser.stop()

In [None]:
# Preprocess sport category separately

url = "https://www.wildberries.ru/catalog/sport"
parser = Parser()
parser._start()
page = parser._load_page(url)
parser.stop()

soup = bs4.BeautifulSoup(page, 'lxml')
container = soup.select("ul.menu-catalog-second__wrapper")
category = os.path.basename(url)
    
with open(f"conf/wildberries-subcategories/{category}", "w") as f:
    
    for block in container:
        print(block)
        menu_item = block.select("a.menu-catalog-second__drop-link")
        for item in menu_item:
            link = item.get("href")
            print(link)
            f.write(link + '\n')