## **Project 1: E-Commerce Price Tracker**  
*Monitor product prices across major retailers and alert on price drops*

### **Objective**
Build a robust, maintainable scraper that:
- Tracks prices of specific products on Amazon, Best Buy, and Walmart
- Detects price changes over time
- Sends email alerts when prices drop below a threshold
- Handles anti-bot measures gracefully

---

### **Step 1: Project Setup & Dependencies**

```python
# Install required packages
!pip install requests beautifulsoup4 selenium pandas schedule smtplib python-dotenv lxml fake-useragent tenacity

# Create project structure
"""
ecommerce_tracker/
├── config/
│   └── .env
├── data/
│   └── price_history.db
├── scrapers/
│   ├── __init__.py
│   ├── amazon.py
│   ├── bestbuy.py
│   └── walmart.py
├── utils/
│   ├── database.py
│   ├── email_alerts.py
│   └── proxy_manager.py
├── main.py
└── requirements.txt
"""
```

### **Step 2: Configuration Management**

```python
# config/.env
AMAZON_API_KEY=your_api_key_here
EMAIL_HOST=smtp.gmail.com
EMAIL_PORT=587
EMAIL_USER=your_email@gmail.com
EMAIL_PASS=your_app_password
PROXY_LIST=http://proxy1:port,http://proxy2:port
```

```python
# utils/config_loader.py
import os
from dotenv import load_dotenv

load_dotenv()

class Config:
    EMAIL_HOST = os.getenv('EMAIL_HOST')
    EMAIL_PORT = int(os.getenv('EMAIL_PORT'))
    EMAIL_USER = os.getenv('EMAIL_USER')
    EMAIL_PASS = os.getenv('EMAIL_PASS')
    PROXY_LIST = os.getenv('PROXY_LIST').split(',') if os.getenv('PROXY_LIST') else []
```

### **Step 3: Database Schema**

```python
# utils/database.py
import sqlite3
import pandas as pd
from datetime import datetime

class PriceDatabase:
    def __init__(self, db_path='data/price_history.db'):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                product_name TEXT NOT NULL,
                retailer TEXT NOT NULL,
                product_url TEXT UNIQUE NOT NULL,
                target_price REAL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS price_history (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                product_id INTEGER,
                current_price REAL,
                currency TEXT DEFAULT 'USD',
                in_stock BOOLEAN DEFAULT TRUE,
                scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (product_id) REFERENCES products (id)
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def add_product(self, product_name, retailer, product_url, target_price):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        try:
            cursor.execute('''
                INSERT OR IGNORE INTO products 
                (product_name, retailer, product_url, target_price)
                VALUES (?, ?, ?, ?)
            ''', (product_name, retailer, product_url, target_price))
            conn.commit()
            return cursor.lastrowid or self.get_product_id(product_url)
        finally:
            conn.close()
    
    def get_product_id(self, product_url):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('SELECT id FROM products WHERE product_url = ?', (product_url,))
        result = cursor.fetchone()
        conn.close()
        return result[0] if result else None
    
    def save_price(self, product_id, price, currency='USD', in_stock=True):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('''
            INSERT INTO price_history 
            (product_id, current_price, currency, in_stock)
            VALUES (?, ?, ?, ?)
        ''', (product_id, price, currency, in_stock))
        conn.commit()
        conn.close()
    
    def get_latest_price(self, product_id):
        conn = sqlite3.connect(self.db_path)
        query = '''
            SELECT current_price, scraped_at 
            FROM price_history 
            WHERE product_id = ? 
            ORDER BY scraped_at DESC 
            LIMIT 1
        '''
        df = pd.read_sql_query(query, conn, params=(product_id,))
        conn.close()
        return df.iloc[0] if not df.empty else None
    
    def get_product_info(self, product_id):
        conn = sqlite3.connect(self.db_path)
        query = 'SELECT * FROM products WHERE id = ?'
        df = pd.read_sql_query(query, conn, params=(product_id,))
        conn.close()
        return df.iloc[0] if not df.empty else None
```

### **Step 4: Base Scraper Class**

```python
# scrapers/base_scraper.py
import requests
from fake_useragent import UserAgent
from tenacity import retry, stop_after_attempt, wait_exponential
import time
import random

class BaseScraper:
    def __init__(self, proxies=None):
        self.session = requests.Session()
        self.ua = UserAgent()
        self.proxies = proxies or []
        self.setup_session()
    
    def setup_session(self):
        self.session.headers.update({
            'User-Agent': self.ua.random,
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
        })
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    def make_request(self, url, use_proxy=True):
        proxies = {}
        if use_proxy and self.proxies:
            proxy = random.choice(self.proxies)
            proxies = {'http': proxy, 'https': proxy}
        
        response = self.session.get(
            url, 
            timeout=10, 
            proxies=proxies if use_proxy else None
        )
        response.raise_for_status()
        return response
    
    def random_delay(self, min_sec=1, max_sec=3):
        time.sleep(random.uniform(min_sec, max_sec))
```

### **Step 5: Amazon Scraper**

```python
# scrapers/amazon.py
from bs4 import BeautifulSoup
import re
from .base_scraper import BaseScraper

class AmazonScraper(BaseScraper):
    def __init__(self, proxies=None):
        super().__init__(proxies)
        self.retailer = "Amazon"
    
    def extract_price(self, soup):
        # Multiple possible selectors for price
        price_selectors = [
            '#corePriceDisplay_desktop_feature_div .a-price-whole',
            '.a-price .a-offscreen',
            '#priceblock_ourprice',
            '.a-price.a-text-price .a-offscreen'
        ]
        
        for selector in price_selectors:
            price_elem = soup.select_one(selector)
            if price_elem:
                price_text = price_elem.get_text().strip()
                # Extract numeric value
                price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
                if price_match:
                    return float(price_match.group())
        return None
    
    def is_in_stock(self, soup):
        # Check for out of stock indicators
        out_of_stock_selectors = [
            '#availability .a-color-state',
            '.a-color-error'
        ]
        
        for selector in selector in out_of_stock_selectors:
            elem = soup.select_one(selector)
            if elem and 'unavailable' in elem.get_text().lower():
                return False
        
        # Check add to cart button
        add_to_cart = soup.select_one('#add-to-cart-button')
        return add_to_cart is not None
    
    def scrape_product(self, product_url):
        try:
            response = self.make_request(product_url)
            soup = BeautifulSoup(response.content, 'html.parser')
            
            # Extract product name
            title_elem = soup.select_one('#productTitle')
            product_name = title_elem.get_text().strip() if title_elem else "Unknown Product"
            
            # Extract price
            price = self.extract_price(soup)
            
            # Check stock status
            in_stock = self.is_in_stock(soup)
            
            return {
                'product_name': product_name,
                'price': price,
                'in_stock': in_stock,
                'currency': 'USD'
            }
        except Exception as e:
            print(f"Error scraping Amazon product {product_url}: {str(e)}")
            return None
```

### **Step 6: Best Buy Scraper**

```python
# scrapers/bestbuy.py
from bs4 import BeautifulSoup
import json
from .base_scraper import BaseScraper

class BestBuyScraper(BaseScraper):
    def __init__(self, proxies=None):
        super().__init__(proxies)
        self.retailer = "Best Buy"
    
    def scrape_product(self, product_url):
        try:
            response = self.make_request(product_url)
            soup = BeautifulSoup(response.content, 'html.parser')
            
            # Best Buy often has JSON-LD structured data
            script_tag = soup.find('script', type='application/ld+json')
            if script_tag:
                try:
                    data = json.loads(script_tag.string)
                    if isinstance(data, list):
                        data = data[0]
                    
                    product_name = data.get('name', 'Unknown Product')
                    price = data.get('offers', {}).get('price')
                    in_stock = data.get('offers', {}).get('availability', '') != 'OutOfStock'
                    
                    return {
                        'product_name': product_name,
                        'price': float(price) if price else None,
                        'in_stock': in_stock,
                        'currency': 'USD'
                    }
                except (json.JSONDecodeError, KeyError, TypeError):
                    pass
            
            # Fallback to HTML parsing
            price_elem = soup.select_one('.priceView-hero-price.priceView-customer-price span')
            price = None
            if price_elem:
                price_text = price_elem.get_text()
                import re
                price_match = re.search(r'[\d,]+\.?\d*', price_text.replace(',', ''))
                price = float(price_match.group()) if price_match else None
            
            product_name_elem = soup.select_one('.sku-title h1')
            product_name = product_name_elem.get_text().strip() if product_name_elem else "Unknown Product"
            
            # Check stock
            add_to_cart = soup.select_one('[data-button-state="ADD_TO_CART"]')
            in_stock = add_to_cart is not None
            
            return {
                'product_name': product_name,
                'price': price,
                'in_stock': in_stock,
                'currency': 'USD'
            }
        except Exception as e:
            print(f"Error scraping Best Buy product {product_url}: {str(e)}")
            return None
```

### **Step 7: Email Alert System**

```python
# utils/email_alerts.py
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from .config_loader import Config

class EmailAlert:
    def __init__(self):
        self.config = Config()
    
    def send_price_alert(self, product_info, current_price, previous_price, price_drop_percent):
        subject = f"Price Drop Alert: {product_info['product_name']}"
        body = f"""
        Price Drop Detected!
        
        Product: {product_info['product_name']}
        Retailer: {product_info['retailer']}
        Current Price: ${current_price:.2f}
        Previous Price: ${previous_price:.2f}
        Price Drop: {price_drop_percent:.1f}%
        Target Price: ${product_info['target_price']:.2f}
        
        View Product: {product_info['product_url']}
        """
        
        msg = MIMEMultipart()
        msg['From'] = self.config.EMAIL_USER
        msg['To'] = self.config.EMAIL_USER
        msg['Subject'] = subject
        msg.attach(MIMEText(body, 'plain'))
        
        try:
            server = smtplib.SMTP(self.config.EMAIL_HOST, self.config.EMAIL_PORT)
            server.starttls()
            server.login(self.config.EMAIL_USER, self.config.EMAIL_PASS)
            text = msg.as_string()
            server.sendmail(self.config.EMAIL_USER, self.config.EMAIL_USER, text)
            server.quit()
            print(f"Email alert sent for {product_info['product_name']}")
        except Exception as e:
            print(f"Failed to send email alert: {str(e)}")
```

### **Step 8: Main Application Logic**

```python
# main.py
import time
import schedule
from scrapers.amazon import AmazonScraper
from scrapers.bestbuy import BestBuyScraper
from scrapers.walmart import WalmartScraper
from utils.database import PriceDatabase
from utils.email_alerts import EmailAlert
from utils.config_loader import Config

class PriceTracker:
    def __init__(self):
        self.db = PriceDatabase()
        self.email_alert = EmailAlert()
        self.config = Config()
        self.scrapers = {
            'Amazon': AmazonScraper(proxies=self.config.PROXY_LIST),
            'Best Buy': BestBuyScraper(proxies=self.config.PROXY_LIST),
            'Walmart': WalmartScraper(proxies=self.config.PROXY_LIST)
        }
    
    def add_product_to_track(self, product_name, retailer, product_url, target_price):
        product_id = self.db.add_product(product_name, retailer, product_url, target_price)
        print(f"Added product to track: {product_name} (ID: {product_id})")
        return product_id
    
    def check_single_product(self, product_id):
        product_info = self.db.get_product_info(product_id)
        if not product_info:
            return
        
        retailer = product_info['retailer']
        product_url = product_info['product_url']
        
        if retailer not in self.scrapers:
            print(f"Unsupported retailer: {retailer}")
            return
        
        scraper = self.scrapers[retailer]
        product_data = scraper.scrape_product(product_url)
        
        if not product_data or product_data['price'] is None:
            print(f"Failed to scrape product: {product_url}")
            return
        
        # Save current price
        self.db.save_price(
            product_id, 
            product_data['price'], 
            product_data['currency'], 
            product_data['in_stock']
        )
        
        # Check for price drop
        latest_price_record = self.db.get_latest_price(product_id)
        if latest_price_record is not None:
            current_price = latest_price_record['current_price']
            previous_price = self.get_previous_price(product_id, latest_price_record['scraped_at'])
            
            if previous_price and current_price < previous_price:
                price_drop_percent = ((previous_price - current_price) / previous_price) * 100
                target_price = product_info['target_price']
                
                # Send alert if price dropped below target or significant drop
                if current_price <= target_price or price_drop_percent >= 5.0:
                    self.email_alert.send_price_alert(
                        product_info.to_dict(), 
                        current_price, 
                        previous_price, 
                        price_drop_percent
                    )
    
    def get_previous_price(self, product_id, current_scraped_at):
        conn = sqlite3.connect(self.db.db_path)
        query = '''
            SELECT current_price 
            FROM price_history 
            WHERE product_id = ? AND scraped_at < ?
            ORDER BY scraped_at DESC 
            LIMIT 1
        '''
        cursor = conn.cursor()
        cursor.execute(query, (product_id, current_scraped_at))
        result = cursor.fetchone()
        conn.close()
        return result[0] if result else None
    
    def run_full_check(self):
        print("Starting full price check...")
        conn = sqlite3.connect(self.db.db_path)
        cursor = conn.cursor()
        cursor.execute('SELECT id FROM products')
        product_ids = [row[0] for row in cursor.fetchall()]
        conn.close()
        
        for product_id in product_ids:
            self.check_single_product(product_id)
            # Be respectful - add delay between requests
            time.sleep(2)
        
        print("Price check completed.")
    
    def start_scheduler(self):
        # Run every 6 hours
        schedule.every(6).hours.do(self.run_full_check)
        
        # Also run immediately on startup
        self.run_full_check()
        
        while True:
            schedule.run_pending()
            time.sleep(60)

# Example usage
if __name__ == "__main__":
    tracker = PriceTracker()
    
    # Add products to track
    tracker.add_product_to_track(
        "Sony WH-1000XM4 Wireless Headphones",
        "Amazon",
        "https://www.amazon.com/dp/B0863TXGM3",
        250.00
    )
    
    tracker.add_product_to_track(
        "Apple MacBook Air M2",
        "Best Buy",
        "https://www.bestbuy.com/site/apple-macbook-air-13-6-laptop-m2-chip-8gb-memory-256gb-ssd-midnight/6509650.p",
        900.00
    )
    
    # Start the scheduler
    tracker.start_scheduler()
```