In [1]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import random
from urllib.parse import urljoin
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
import uvicorn
import asyncio
import sys
import json
import re
!pip install selenium beautifulsoup4 pandas fastapi uvicorn requests webdriver-manager
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from typing import List, Optional




[notice] A new release of pip is available: 24.0 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip




In [2]:
# Pydantic models for API
class ProductModel(BaseModel):
    title: str
    price: str
    original_price: Optional[str] = None
    discount: Optional[str] = None
    rating: str
    reviews: str
    image_url: str
    product_url: str
    category: str
    page: int

class ScrapeRequest(BaseModel):
    category: str
    start_page: int = 1
    end_page: int = 1
    use_selenium: bool = True

class ScrapeResponse(BaseModel):
    status: str
    count: int
    products: List[ProductModel]
    message: Optional[str] = None

class DarazScraper:
    def __init__(self, use_selenium=True):
        self.base_url = "https://www.daraz.com.np"
        self.use_selenium = use_selenium
        self.driver = None
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1'
        }
        
        if self.use_selenium:
            self.setup_driver()
    
    def setup_driver(self):
        """Setup Chrome driver with options for web scraping"""
        chrome_options = Options()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--no-sandbox')
        chrome_options.add_argument('--disable-dev-shm-usage')
        chrome_options.add_argument('--disable-gpu')
        chrome_options.add_argument('--window-size=1920,1080')
        chrome_options.add_argument(f'--user-agent={self.headers["User-Agent"]}')
        chrome_options.add_argument('--disable-blink-features=AutomationControlled')
        chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
        chrome_options.add_experimental_option('useAutomationExtension', False)
        
        try:
            self.driver = webdriver.Chrome(options=chrome_options)
            self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
            self.driver.implicitly_wait(10)
            print("Selenium driver initialized successfully")
        except Exception as e:
            print(f"Error setting up Chrome driver: {e}")
            print("Falling back to requests method...")
            self.use_selenium = False
    
    def close_driver(self):
        """Close the Selenium driver"""
        if self.driver:
            self.driver.quit()
        
    def get_full_url(self, path):
        """Convert relative URL to absolute URL"""
        if not path or path.startswith('http'):
            return path
        return urljoin(self.base_url, path)

    def scrape_with_selenium(self, url):
        """Scrape using Selenium for dynamic content"""
        try:
            print(f"Loading page with Selenium: {url}")
            self.driver.get(url)
            
            # Wait for products to load
            try:
                WebDriverWait(self.driver, 20).until(
                    EC.any_of(
                        EC.presence_of_element_located((By.CSS_SELECTOR, '[data-qa-locator="product-item"]')),
                        EC.presence_of_element_located((By.CSS_SELECTOR, '.gridItem--Yd0sa')),
                        EC.presence_of_element_located((By.CSS_SELECTOR, '[data-tracking="product-card"]')),
                        EC.presence_of_element_located((By.CSS_SELECTOR, '.search-card-item'))
                    )
                )
                print("Products loaded successfully")
            except TimeoutException:
                print("Timeout waiting for products, trying with current content...")
            
            # Scroll to load more products
            self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(3)
            
            html = self.driver.page_source
            return BeautifulSoup(html, 'html.parser')
            
        except Exception as e:
            print(f"Error with Selenium scraping: {e}")
            if self.driver:
                return BeautifulSoup(self.driver.page_source, 'html.parser')
            return None

    def scrape_with_requests(self, url):
        """Fallback method using requests"""
        try:
            time.sleep(random.uniform(2, 4))
            response = requests.get(url, headers=self.headers, timeout=15)
            response.raise_for_status()
            return BeautifulSoup(response.text, 'html.parser')
        except Exception as e:
            print(f"Error with requests: {e}")
            return None

    def extract_product_data(self, product_element):
        """Extract product data from a product element"""
        try:
            # Multiple selectors for different page layouts
            title_selectors = [
                '.title--wFj93 a',
                '.title a', 
                '[data-qa-locator="product-item"] .c16H9d a',
                '.c16H9d a',
                'a[title]',
                'h3 a'
            ]
            
            # Enhanced price extraction combining both approaches
            price_selectors = [
                '.price--NVB62',
                '.price span',
                '.c13VH6',
                '.current-price',
                '[data-qa-locator="product-price"]'
            ]
            
            # Enhanced original price selectors
            original_price_selectors = [
                '.origPrice--AoCxF',
                '.original-price',
                '.c1hkC1'
            ]
            
            rating_selectors = [
                '.rating--ZI3Ol',
                '.rating',
                '.c6LcCO'
            ]
            
            reviews_selectors = [
                '.rate--DCc0D',
                '.rate',
                '.c6LcCO + span'
            ]
            
            image_selectors = [
                '.image--WOyuZ img',
                '.image img',
                'img[data-qa-locator="product-image"]',
                'img'
            ]
            
            # Extract title and URL
            title_elem = None
            product_url = ""
            for selector in title_selectors:
                title_elem = product_element.select_one(selector)
                if title_elem:
                    break
            
            if title_elem:
                relative_url = title_elem.get('href', '')
                product_url = self.get_full_url(relative_url)
            
            # Enhanced price extraction - combining both approaches
            price_elem = None
            price = "N/A"
            
            # First try with specific selectors
            for selector in price_selectors:
                price_elem = product_element.select_one(selector)
                if price_elem:
                    price = price_elem.get_text(strip=True)
                    break
            
            # If specific selectors failed, use pattern matching approach
            if price == "N/A" or not price_elem:
                for tag in product_element.find_all(['span', 'div', 'p']):
                    text = tag.get_text(strip=True)
                    # Look for Rs, ₨, or number patterns that look like prices
                    if re.search(r'Rs\.?\s*\d+|₨\s*\d+|\d{2,}(?:,\d{3})*(?:\.\d{2})?', text):
                        # Additional validation to avoid false positives
                        if len(text) < 50 and not any(word in text.lower() for word in ['review', 'rating', 'sold', 'item']):
                            price_elem = tag
                            price = text
                            break
            
            # Enhanced original price extraction
            original_price_elem = None
            original_price = None
            
            # First try with specific selectors
            for selector in original_price_selectors:
                original_price_elem = product_element.select_one(selector)
                if original_price_elem:
                    original_price = original_price_elem.get_text(strip=True)
                    break
            
            # If specific selectors failed, use style-based approach
            if not original_price_elem:
                for tag in product_element.find_all(['span', 'div', 'p']):
                    style = tag.get('style', '')
                    classes = ' '.join(tag.get('class', []))
                    text = tag.get_text(strip=True)
                    
                    # Check for strike-through styling or classes
                    if (('line-through' in style or 'text-decoration-line: line-through' in style or 
                         any(keyword in classes.lower() for keyword in ['strike', 'original', 'old-price', 'crossed'])) and
                        re.search(r'Rs\.?\s*\d+|₨\s*\d+|\d{2,}(?:,\d{3})*(?:\.\d{2})?', text)):
                        original_price_elem = tag
                        original_price = text
                        break
            
            # Enhanced rating extraction
            rating_elem = None
            rating = "No rating"
            
            # First try with specific selectors
            for selector in rating_selectors:
                rating_elem = product_element.select_one(selector)
                if rating_elem:
                    rating = rating_elem.get('aria-label', rating_elem.get_text(strip=True))
                    break
            
            # If specific selectors failed, use pattern matching for ratings
            if rating == "No rating":
                for tag in product_element.find_all(['span', 'div', 'p']):
                    text = tag.get_text(strip=True)
                    # Look for rating patterns like "4.5", "4 stars", etc.
                    if re.search(r'\d\.\d|\d+\s*star|\d+/5|★', text.lower()):
                        # Additional validation to avoid false positives
                        if len(text) < 30 and not any(word in text.lower() for word in ['review', 'sold', 'item', 'price']):
                            rating = text
                            break
                            
            
            # Enhanced reviews count extraction
            reviews_elem = None
            reviews = "0"
            
            # First try with specific selectors
            for selector in reviews_selectors:
                reviews_elem = product_element.select_one(selector)
                if reviews_elem:
                    reviews = reviews_elem.get_text(strip=True)
                    break
            
            # If specific selectors failed, use pattern matching for review counts
            if reviews == "0":
                for tag in product_element.find_all(['span', 'div', 'p']):
                    text = tag.get_text(strip=True)
                    # Look for review patterns like "(123)", "123 reviews", "Sold 456", etc.
                    if re.search(r'\(\d+\)|\d+\s*review|\d+\s*sold|\d+\s*rating', text.lower()):
                        # Additional validation
                        if len(text) < 50 and not any(word in text.lower() for word in ['price', 'discount', 'off']):
                            reviews = text
                            break
            
            # Extract image
            image_elem = None
            for selector in image_selectors:
                image_elem = product_element.select_one(selector)
                if image_elem and image_elem.get('src'):
                    break
            
            # Enhanced discount calculation
            discount = None
            if price != "N/A" and original_price:
                try:
                    # Clean price strings and convert to float
                    current_price = float(re.sub(r'[^\d.]', '', price.replace(',', '')))
                    orig_price = float(re.sub(r'[^\d.]', '', original_price.replace(',', '')))
                    if orig_price > current_price > 0:
                        discount = f"{int(((orig_price - current_price) / orig_price) * 100)}% off"
                except ValueError as e:
                    print(f"Error calculating discount: {e}")
                    pass
            
            # If discount not calculated, try to find existing discount text
            if not discount:
                for tag in product_element.find_all(['span', 'div', 'p']):
                    text = tag.get_text(strip=True)
                    # Look for discount patterns like "50% off", "-30%", "Save 40%"
                    if re.search(r'\d+%\s*off|\-\d+%|save\s*\d+%', text.lower()):
                        if len(text) < 20:  # Keep it short to avoid false positives
                            discount = text
                            break
            
            return {
                'title': title_elem.get_text(strip=True) if title_elem else "N/A",
                'price': price,
                'original_price': original_price,
                'discount': discount,
                'rating': rating,
                'reviews': reviews,
                'image_url': self.get_full_url(image_elem.get('src', '')) if image_elem else "N/A",
                'product_url': product_url
            }
        except Exception as e:
            print(f"Error extracting product data: {e}")
            return None

    def scrape_category(self, category, start_page=1, end_page=1):
        """Scrape products from a category"""
        all_products = []
        
        for page in range(start_page, end_page + 1):
            url = f"{self.base_url}/{category}/?page={page}"
            print(f"Scraping page {page}: {url}")
            
            try:
                # Choose scraping method
                if self.use_selenium and self.driver:
                    soup = self.scrape_with_selenium(url)
                else:
                    soup = self.scrape_with_requests(url)
                
                if not soup:
                    print(f"Failed to get content from page {page}")
                    continue
                
                # Multiple selectors for product containers
                product_selectors = [
                    '[data-qa-locator="product-item"]',
                    '.gridItem--Yd0sa',
                    '[data-tracking="product-card"]',
                    '.search-card-item',
                    '.c2prKC'
                ]
                
                products = []
                for selector in product_selectors:
                    products = soup.select(selector)
                    if products:
                        print(f"Found {len(products)} products using selector: {selector}")
                        break
                
                if not products:
                    print(f"No products found on page {page}")
                    print("Available classes:", [elem.get('class') for elem in soup.find_all()[:10] if elem.get('class')])
                    continue
                
                page_products = 0
                for product in products:
                    product_data = self.extract_product_data(product)
                    if product_data and product_data['title'] != "N/A":
                        product_data.update({
                            'category': category,
                            'page': page
                        })
                        all_products.append(product_data)
                        page_products += 1
                
                print(f"Successfully extracted {page_products} products from page {page}")
                
                # Add delay between pages
                if page < end_page:
                    time.sleep(random.uniform(3, 6))
                    
            except Exception as e:
                print(f"Error scraping page {page}: {e}")
                continue
        
        return all_products

# Initialize FastAPI app
app = FastAPI(
    title="Daraz Nepal Product Scraper API",
    description="API for scraping product data from Daraz Nepal",
    version="2.0.0"
)

# Global scraper instance
scraper_instance = None

@app.on_event("startup")
async def startup_event():
    """Initialize scraper on startup"""
    global scraper_instance
    scraper_instance = DarazScraper(use_selenium=True)

@app.on_event("shutdown")
async def shutdown_event():
    """Cleanup on shutdown"""
    global scraper_instance
    if scraper_instance:
        scraper_instance.close_driver()

@app.get("/")
async def root():
    return {
        "message": "Daraz Nepal Product Scraper API",
        "version": "2.0.0",
        "endpoints": {
            "scrape": "/scrape",
            "categories": "/categories"
        }
    }

@app.post("/scrape", response_model=ScrapeResponse)
async def scrape_products(request: ScrapeRequest):
    """Scrape products from a category"""
    global scraper_instance
    
    try:
        if not scraper_instance:
            scraper_instance = DarazScraper(use_selenium=request.use_selenium)
        
        print(f"Starting scrape for category: {request.category}")
        products = scraper_instance.scrape_category(
            request.category, 
            request.start_page, 
            request.end_page
        )
        
        # Convert to Pydantic models
        product_models = []
        for product in products:
            try:
                product_models.append(ProductModel(**product))
            except Exception as e:
                print(f"Error creating product model: {e}")
                continue
        
        return ScrapeResponse(
            status="success",
            count=len(product_models),
            products=product_models,
            message=f"Successfully scraped {len(product_models)} products"
        )
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}")

@app.get("/scrape")
async def scrape_products_get(
    category: str = Query(..., description="Category to scrape (e.g., mobile-cases-covers)"),
    start_page: int = Query(1, description="Start page number"),
    end_page: int = Query(1, description="End page number"),
    use_selenium: bool = Query(True, description="Use Selenium for dynamic content"),
    format: str = Query("json", description="Response format: json or csv")
):
    """GET endpoint for scraping products"""
    global scraper_instance
    
    try:
        if not scraper_instance:
            scraper_instance = DarazScraper(use_selenium=use_selenium)
        
        products = scraper_instance.scrape_category(category, start_page, end_page)
        
        if format.lower() == "csv":
            # Return CSV data
            df = pd.DataFrame(products)
            csv_content = df.to_csv(index=False)
            return {"csv_data": csv_content, "count": len(products)}
        
        return {
            "status": "success",
            "count": len(products),
            "products": products
        }
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/categories")
async def get_popular_categories():
    """Get list of popular categories"""
    return {
        "categories": [
            "mobile-cases-covers",
            "smartphones",
            "laptops",
            "fashion-womens",
            "fashion-mens",
            "electronics",
            "home-garden",
            "sports-outdoor",
            "health-beauty",
            "baby-toys"
        ]
    }

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    global scraper_instance
    return {
        "status": "healthy",
        "selenium_available": scraper_instance and scraper_instance.use_selenium,
        "timestamp": time.time()
    }

def run_cli():
    """CLI mode for direct usage"""
    scraper = DarazScraper(use_selenium=True)
    
    try:
        print("Daraz Nepal Product Scraper (CLI Mode)")
        print("=" * 40)
        
        category = input("Enter category (e.g., mobile-cases-covers): ").strip()
        start_page = int(input("Enter start page (default 1): ") or "1")
        end_page = int(input("Enter end page (default 1): ") or "1")
        
        print(f"\nScraping {category} from page {start_page} to {end_page}...")
        products = scraper.scrape_category(category, start_page, end_page)
        
        if products:
            df = pd.DataFrame(products)
            csv_file = f"daraz_{category}_p{start_page}-{end_page}.csv"
            df.to_csv(csv_file, index=False)
            
            print(f"\n Successfully scraped {len(products)} products!")
            print(f" Saved to: {csv_file}")
            print("\n Sample products:")
            print(df[['title', 'price', 'rating']].head(3).to_string(index=False))
        else:
            print(" No products were scraped.")
            
    except KeyboardInterrupt:
        print("\n\n Scraping interrupted by user")
    except Exception as e:
        print(f"\n Error: {e}")
    finally:
        scraper.close_driver()

if __name__ == "__main__":
    if len(sys.argv) > 1 and sys.argv[1] == "--api":
        print(" Starting FastAPI server...")
        uvicorn.run(app, host="0.0.0.0", port=8000, reload=False)
    else:
        run_cli()

        on_event is deprecated, use lifespan event handlers instead.

        Read more about it in the
        [FastAPI docs for Lifespan Events](https://fastapi.tiangolo.com/advanced/events/).
        
  @app.on_event("startup")
        on_event is deprecated, use lifespan event handlers instead.

        Read more about it in the
        [FastAPI docs for Lifespan Events](https://fastapi.tiangolo.com/advanced/events/).
        
  @app.on_event("shutdown")


Selenium driver initialized successfully
Daraz Nepal Product Scraper (CLI Mode)


Enter category (e.g., mobile-cases-covers):  mobile-phone-cases
Enter start page (default 1):  1
Enter end page (default 1):  2



Scraping mobile-phone-cases from page 1 to 2...
Scraping page 1: https://www.daraz.com.np/mobile-phone-cases/?page=1
Loading page with Selenium: https://www.daraz.com.np/mobile-phone-cases/?page=1
Timeout waiting for products, trying with current content...
No products found on page 1
Available classes: [['daraz']]
Scraping page 2: https://www.daraz.com.np/mobile-phone-cases/?page=2
Loading page with Selenium: https://www.daraz.com.np/mobile-phone-cases/?page=2
Timeout waiting for products, trying with current content...
No products found on page 2
Available classes: [['daraz']]
 No products were scraped.
