In [1]:
import requests
from bs4 import BeautifulSoup
import re
import json
import os
from pathlib import Path
from urllib.parse import urljoin, urlparse
import time
import logging
from typing import List, Dict

In [None]:
class TikiScraperWithMetadata:
    def __init__(self, base_dir="tiki_dataset"):
        self.base_dir = Path(base_dir)
        self.base_dir.mkdir(exist_ok=True)

        # Setup requests session
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'vi-VN,vi;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'DNT': '1',
            'Connection': 'keep-alive'
        })

        # Setup logging
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        self.logger = logging.getLogger(__name__)

        # Danh s√°ch metadata cho t·∫•t c·∫£ s·∫£n ph·∫©m
        self.products_metadata = []

    def extract_product_info_from_url(self, product_url: str) -> Dict:
        """
        Tr√≠ch xu·∫•t th√¥ng tin s·∫£n ph·∫©m t·ª´ URL

        Logic:
        - URL: https://tiki.vn/product-p186389538.html?spid=186389559
        - Product ID: 186389538 (t·ª´ p{ID}.html)
        - SPID: 186389559 (t·ª´ spid parameter)
        """
        product_info = {
            "id": "",
            "spid": "",
            "short_url": product_url
        }

        # Extract Product ID t·ª´ pattern p{ID}.html
        match = re.search(r'p(\d+)\.html', product_url)
        if match:
            product_info["id"] = match.group(1)

        # Extract SPID t·ª´ URL parameters
        match = re.search(r'spid=(\d+)', product_url)
        if match:
            product_info["spid"] = match.group(1)

        # N·∫øu kh√¥ng c√≥ SPID, d√πng Product ID
        if not product_info["spid"]:
            product_info["spid"] = product_info["id"]

        self.logger.debug(f"Extracted: ID={product_info['id']}, SPID={product_info['spid']}")
        return product_info

    def scrape_product_details(self, product_url: str) -> Dict:
        """
        Scrape chi ti·∫øt s·∫£n ph·∫©m: t√™n, ·∫£nh, v.v.

        Returns:
        {
            "id": "186389538",
            "name": "ƒë·ªì b·ªô m·∫∑c nh√†",
            "short_url": "https://tiki.vn/product-p186389538.html?spid=186389559",
            "annotations": "",
            "images": ["186389538_01.png", "186389538_02.png", ...]
        }
        """
        self.logger.info(f" Scraping product: {product_url}")

        # Extract basic info t·ª´ URL
        product_info = self.extract_product_info_from_url(product_url)

        # Kh·ªüi t·∫°o structure
        product_data = {
            "id": product_info["id"],
            "name": "",
            "short_url": product_info["short_url"],
            "annotations": "",
            "images": []
        }

        try:
            response = self.session.get(product_url, timeout=10)
            response.raise_for_status()
            soup = BeautifulSoup(response.content, 'html.parser')

            # EXTRACT T√äN S·∫¢N PH·∫®M
            product_name = self.extract_product_name(soup)
            product_data["name"] = product_name

            # EXTRACT ·∫¢NH S·∫¢N PH·∫®M
            image_urls = self.extract_product_images(soup, product_url)

            # T·∫†O TH∆Ø M·ª§C S·∫¢N PH·∫®M
            product_dir = self.create_product_directory(product_info["id"])

            # DOWNLOAD V√Ä L∆ØU ·∫¢NH
            saved_images = self.download_and_save_images(
                image_urls,
                product_dir,
                product_info["id"]
            )

            product_data["images"] = saved_images

            self.logger.info(f"‚úÖ Scraped product {product_info['id']}: {len(saved_images)} images")

        except Exception as e:
            self.logger.error(f"‚ùå Error scraping {product_url}: {e}")

        return product_data

    def extract_product_name(self, soup: BeautifulSoup) -> str:
        """
        Extract t√™n s·∫£n ph·∫©m t·ª´ HTML

        Logic: T√¨m theo th·ª© t·ª± ∆∞u ti√™n
        1. h1 v·ªõi data-view-id c·ª• th·ªÉ
        2. h1.title
        3. .product-name
        4. h1 ƒë·∫ßu ti√™n
        """
        name_selectors = [
            'h1[data-view-id="pdp_details_view_product_title"]',
            'h1.title',
            '.product-name h1',
            '.product-title',
            'h1'
        ]

        for selector in name_selectors:
            element = soup.select_one(selector)
            if element:
                name = element.get_text().strip()
                if name:
                    self.logger.debug(f"Found name via {selector}: {name[:50]}...")
                    return name

        self.logger.warning("Could not extract product name")
        return "Unknown Product"

    def extract_product_images(self, soup: BeautifulSoup, base_url: str) -> List[str]:
        """
        Extract t·∫•t c·∫£ ·∫£nh s·∫£n ph·∫©m t·ª´ HTML

        Logic:
        1. T√¨m ·∫£nh ch√≠nh trong gallery
        2. T√¨m ·∫£nh trong slider/carousel
        3. T√¨m t·∫•t c·∫£ ·∫£nh c√≥ domain tikicdn
        4. Upgrade th√†nh ·∫£nh ch·∫•t l∆∞·ª£ng cao
        """
        image_urls = set()

        # Method 1: ·∫¢nh ch√≠nh trong product gallery
        main_img_selectors = [
            'img[data-view-id="pdp_details_view_product_image"]',
            '.product-image img',
            '.gallery-image img',
            '.image-gallery img'
        ]

        for selector in main_img_selectors:
            images = soup.select(selector)
            for img in images:
                src = img.get('src') or img.get('data-src') or img.get('data-original')
                if src:
                    image_urls.add(src)

        # Method 2: ·∫¢nh trong slider/carousel
        slider_selectors = [
            '.slider img',
            '.carousel img',
            '.product-gallery img',
            '.thumbnail img'
        ]

        for selector in slider_selectors:
            images = soup.select(selector)
            for img in images:
                src = img.get('src') or img.get('data-src')
                if src:
                    image_urls.add(src)

        # Method 3: T·∫•t c·∫£ ·∫£nh c√≥ domain Tiki
        all_images = soup.find_all('img')
        for img in all_images:
            src = img.get('src') or img.get('data-src')
            if src and ('tikicdn.com' in src or 'tiki.vn' in src):
                image_urls.add(src)

        # X·ª≠ l√Ω URLs
        processed_urls = []
        for url in image_urls:
            # Convert to full URL
            if url.startswith('//'):
                url = 'https:' + url
            elif not url.startswith('http'):
                url = urljoin(base_url, url)

            # Upgrade to high quality
            if 'cache.tikicdn.com' in url:
                # Thay ƒë·ªïi k√≠ch th∆∞·ªõc th√†nh 1200x1200 ho·∫∑c g·ª° b·ªè ƒë·ªÉ l·∫•y ·∫£nh g·ªëc
                url = re.sub(r'/cache/\d+x\d+/', '/cache/1200x1200/', url)

            # L·ªçc ·∫£nh c√≥ k√≠ch th∆∞·ªõc h·ª£p l√Ω (lo·∫°i b·ªè icon, logo nh·ªè)
            if not any(x in url.lower() for x in ['icon', 'logo', 'badge', 'thumb']):
                processed_urls.append(url)

        # Lo·∫°i b·ªè duplicate v√† gi·ªõi h·∫°n s·ªë l∆∞·ª£ng
        unique_urls = list(dict.fromkeys(processed_urls))[:10]  # Max 10 ·∫£nh

        self.logger.debug(f"Found {len(unique_urls)} unique images")
        return unique_urls

    def create_product_directory(self, product_id: str) -> Path:
        """
        T·∫°o th∆∞ m·ª•c cho s·∫£n ph·∫©m
        Structure: base_dir/product_id/
        """
        product_dir = self.base_dir / str(product_id)
        product_dir.mkdir(exist_ok=True)

        self.logger.debug(f"Created directory: {product_dir}")
        return product_dir

    def download_and_save_images(self, image_urls: List[str],
                                product_dir: Path, product_id: str) -> List[str]:
        """
        Download v√† l∆∞u ·∫£nh v·ªõi t√™n c√≥ th·ª© t·ª±

        Returns: List t√™n file ƒë√£ l∆∞u
        Format: {product_id}_01.png, {product_id}_02.png, ...
        """
        saved_images = []

        for i, image_url in enumerate(image_urls, 1):
            try:
                # T·∫°o t√™n file v·ªõi format chu·∫©n
                filename = f"{product_id}_{i:02d}.png"
                filepath = product_dir / filename

                # Skip n·∫øu file ƒë√£ t·ªìn t·∫°i
                if filepath.exists():
                    self.logger.debug(f"File exists, skipping: {filename}")
                    saved_images.append(filename)
                    continue

                # Download image
                self.logger.info(f"‚¨áÔ∏è Downloading: {filename}")

                response = self.session.get(image_url, stream=True, timeout=30)
                response.raise_for_status()

                # Ki·ªÉm tra content type
                content_type = response.headers.get('content-type', '')
                if not content_type.startswith('image/'):
                    self.logger.warning(f"Not an image: {image_url}")
                    continue

                # L∆∞u file
                with open(filepath, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            f.write(chunk)

                # Ki·ªÉm tra file size (lo·∫°i b·ªè ·∫£nh qu√° nh·ªè)
                if filepath.stat().st_size < 8000:  # < 1KB
                    filepath.unlink()  # X√≥a file
                    self.logger.warning(f"Image too small, deleted: {filename}")
                    continue

                saved_images.append(filename)
                self.logger.info(f"‚úÖ Saved: {filename}")

                # Delay gi·ªØa c√°c download
                time.sleep(0.5)

            except Exception as e:
                self.logger.error(f"‚ùå Error downloading {image_url}: {e}")
                continue

        return saved_images

    def scrape_category_products(self, category_url: str, max_pages: int = 5) -> List[str]:
        """
        L·∫•y danh s√°ch URL s·∫£n ph·∫©m t·ª´ category

        Returns: List c√°c product URLs
        """
        self.logger.info(f"üîç Scraping category: {category_url}")

        all_product_urls = []

        for page in range(1, max_pages + 1):
            page_url = f"{category_url}?page={page}"
            self.logger.info(f" Page {page}: {page_url}")

            try:
                response = self.session.get(page_url, timeout=10)
                response.raise_for_status()
                soup = BeautifulSoup(response.content, 'html.parser')

                # T√¨m product links
                product_links = []

                # Method 1: Link c√≥ data-view-id
                links = soup.find_all('a', {'data-view-id': 'pdp_main'})
                for link in links:
                    href = link.get('href')
                    if href:
                        product_links.append(href)

                # Method 2: Link c√≥ pattern p{ID}.html
                if not product_links:
                    links = soup.find_all('a', href=re.compile(r'.*p\d+\.html'))
                    for link in links:
                        href = link.get('href')
                        if href:
                            product_links.append(href)

                # Convert to full URLs
                for href in product_links:
                    if href.startswith('/'):
                        full_url = f"https://tiki.vn{href}"
                    else:
                        full_url = href

                    if full_url not in all_product_urls:
                        all_product_urls.append(full_url)

                self.logger.info(f"Found {len(product_links)} products on page {page}")

                # Delay gi·ªØa c√°c trang
                time.sleep(1)

            except Exception as e:
                self.logger.error(f"Error scraping page {page}: {e}")
                continue

        self.logger.info(f"Total products found: {len(all_product_urls)}")
        return all_product_urls

    def scrape_full_category(self, category_url: str, max_pages: int = 5,
                           max_products: int = 50) -> str:
        """
        Scrape to√†n b·ªô category v√† t·∫°o JSON metadata

        Returns: Path to JSON file
        """
        self.logger.info(f" Starting full category scrape: {category_url}")

        # Reset metadata
        self.products_metadata = []

        # L·∫•y danh s√°ch s·∫£n ph·∫©m
        product_urls = self.scrape_category_products(category_url, max_pages)

        # Gi·ªõi h·∫°n s·ªë s·∫£n ph·∫©m
        if len(product_urls) > max_products:
            product_urls = product_urls[:max_products]
            self.logger.info(f"Limited to {max_products} products")

        # Scrape t·ª´ng s·∫£n ph·∫©m
        for i, product_url in enumerate(product_urls, 1):
            self.logger.info(f"üîÑ Processing product {i}/{len(product_urls)}")

            product_data = self.scrape_product_details(product_url)

            # Ch·ªâ th√™m v√†o metadata n·∫øu c√≥ ·∫£nh
            if product_data["images"]:
                self.products_metadata.append(product_data)

            # Delay gi·ªØa c√°c s·∫£n ph·∫©m
            time.sleep(1)

        # L∆∞u JSON metadata
        json_file = self.save_metadata_json()

        self.logger.info(f" Completed! Total products: {len(self.products_metadata)}")
        self.logger.info(f" JSON metadata saved: {json_file}")

        return json_file

    def save_metadata_json(self) -> str:
        """
        L∆∞u metadata th√†nh file JSON (gi·ªëng nh∆∞ file c·ªßa anh b·∫°n)
        """
        json_file = self.base_dir / "products_metadata.json"

        with open(json_file, 'w', encoding='utf-8') as f:
            json.dump(self.products_metadata, f, ensure_ascii=False, indent=4)

        # T·∫°o summary
        total_products = len(self.products_metadata)
        total_images = sum(len(p["images"]) for p in self.products_metadata)

        summary = {
            "total_products": total_products,
            "total_images": total_images,
            "products": self.products_metadata
        }

        summary_file = self.base_dir / "summary.json"
        with open(summary_file, 'w', encoding='utf-8') as f:
            json.dump(summary, f, ensure_ascii=False, indent=4)

        return str(json_file)

# DEMO: S·ª≠ d·ª•ng gi·ªëng nh∆∞ script c·ªßa anh b·∫°n
def main():
    scraper = TikiScraperWithMetadata("tiki_fashion_dataset")

    # Scrape th·ªùi trang n·ªØ (gi·ªëng data c·ªßa anh b·∫°n)
    category_url = "https://tiki.vn/nha-sach-tiki/c8322"

    json_file = scraper.scrape_full_category(
        category_url=category_url,
        max_pages=3,
        max_products=25
    )

    print(f" Ho√†n th√†nh! JSON file: {json_file}")

    # In th·ªëng k√™
    print(f"  Th·ªëng k√™:")
    print(f"   - T·ªïng s·∫£n ph·∫©m: {len(scraper.products_metadata)}")
    print(f"   - T·ªïng ·∫£nh: {sum(len(p['images']) for p in scraper.products_metadata)}")

if __name__ == "__main__":
    main()

In [None]:
!rm -r /content/tiki_books_dataset

rm: cannot remove '/content/tiki_books_dataset': No such file or directory


In [None]:
!rm -r /content/tiki_fashion_dataset/