In [12]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import random
from urllib.parse import urljoin, urlparse
import json
import re
from datetime import datetime
import logging
import ast
import numpy as np

In [13]:
HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
    'Accept-Language': 'de-DE,de;q=0.9'
}


In [14]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import random
from urllib.parse import urljoin, urlparse
import json
import re
from datetime import datetime
import logging

# Set up logging for notebook
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
logger = logging.getLogger(__name__)

class EnhancedProteinScraper:
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        })
        self.base_url = 'https://muchproteins.com'
        self.products_data = []
        self.price_history_data = []
        
    def get_page(self, url, retries=3):
        """Get page content with error handling and retries"""
        for attempt in range(retries):
            try:
                time.sleep(random.uniform(1, 3))
                
                response = self.session.get(url, timeout=15)
                response.raise_for_status()
                return response
            except requests.RequestException as e:
                print(f"⚠️ Attempt {attempt + 1} failed for {url}: {e}")
                if attempt == retries - 1:
                    print(f"❌ Failed to fetch {url} after {retries} attempts")
                    return None
                time.sleep(random.uniform(3, 6))
    
    def scrape_main_pages(self, start_page=1, max_pages=5):
        """Scrape the main listing pages to extract product data"""
        print(f"🚀 Starting to scrape pages {start_page} to {max_pages}...")
        
        for page in range(start_page, max_pages + 1):
            if page == 1:
                url = self.base_url
            else:
                url = f"{self.base_url}/{page}"
            
            print(f"\n📄 Scraping page {page}: {url}")
            
            response = self.get_page(url)
            if not response:
                continue
                
            soup = BeautifulSoup(response.content, 'html.parser')
            
            products_found = self.extract_products_from_listing(soup, page)
            
            if not products_found:
                print(f"❌ No products found on page {page}, stopping...")
                break
            
            print(f"✅ Found {products_found} products on page {page}")
                
        print(f"\n🎉 Scraped {len(self.products_data)} products total")
        return len(self.products_data)
    
    def extract_products_from_listing(self, soup, page_num):
        """Extract product data from listing page"""
        products_found = 0
        
        scripts = soup.find_all('script')
        
        for script in scripts:
            if not script.string:
                continue
                
            script_content = script.string.strip()
            
            if script_content.startswith('{"props":'):
                try:
                    data = json.loads(script_content)
                    
                    if 'props' in data and 'pageProps' in data['props']:
                        page_props = data['props']['pageProps']
                        
                        if 'data' in page_props and isinstance(page_props['data'], list):
                            products_data = page_props['data']
                            print(f"🎉 Found {len(products_data)} products in data array!")
                            
                            for product_data in products_data:
                                product = self.parse_muchproteins_product(product_data, page_num)
                                if product:
                                    self.products_data.append(product)
                                    products_found += 1
                                    
                except json.JSONDecodeError as e:
                    print(f"❌ Failed to parse React props JSON: {e}")
                    continue
        
        return products_found
    
    def parse_muchproteins_product(self, product_data, page_num):
        """Parse product data from muchproteins.com specific format"""
        if not isinstance(product_data, dict):
            return None
            
        product = {
            'source_page': page_num,
            'scraped_at': datetime.now().isoformat(),
            'extraction_method': 'muchproteins_json'
        }
        
        # Map the actual fields from muchproteins.com (based on real data structure)
        field_mappings = {
            'name': 'title',
            'url': 'product_url',
            'manufacturer': 'brand',  # muchproteins uses 'manufacturer' not 'brand'
            'protein': 'protein_per_100g',
            'pricePerProtein100g': 'price_per_100g_protein',  # actual field name
            'pricePer100g': 'price_per_100g',  # additional price field
            'energy': 'energy_per_100g',
            'type': 'product_type',
            'tags': 'product_tags',  # muchproteins uses 'tags'
            'categories': 'categories',
            'flavour': 'flavor',
            'flavors': 'flavors',
            'weight': 'weight',
            'pricePerKg': 'price_per_kg',
            'priceTotal': 'total_price',
            'rating': 'rating',
            'reviews': 'review_count',
            'slug': 'product_slug',
            'imagePublicThumb': 'image_url',
            'ingredientsMap': 'ingredients',
            'nutritionMap': 'nutrition_info',
            'priceSummary': 'price_summary'
        }
        
        # Extract all available fields
        for original_key, value in product_data.items():
            mapped_key = field_mappings.get(original_key, original_key)
            
            # Handle different data types
            if isinstance(value, list):
                try:
                    if value:
                        flattened = []
                        for item in value:
                            if isinstance(item, list):
                                flattened.extend([str(x) for x in item])
                            else:
                                flattened.append(str(item))
                        product[mapped_key] = ', '.join(flattened)
                    else:
                        product[mapped_key] = ''
                except Exception as e:
                    product[mapped_key] = str(value)
            elif isinstance(value, (str, int, float, bool)):
                product[mapped_key] = value
            elif value is not None:
                product[mapped_key] = str(value)
        
        # Parse numeric values (using actual field names)
        if 'protein_per_100g' in product:
            product['protein_per_100g'] = self.parse_numeric(product['protein_per_100g'])
        
        if 'price_per_100g_protein' in product:
            product['price_per_100g_protein'] = self.parse_numeric(product['price_per_100g_protein'])
            
        if 'price_per_100g' in product:
            product['price_per_100g'] = self.parse_numeric(product['price_per_100g'])
            
        if 'energy_per_100g' in product:
            product['energy_per_100g'] = self.parse_numeric(product['energy_per_100g'])
        
        # Create the muchproteins.com URL for this product
        if 'slug' in product_data:
            # Use the actual slug from the data
            product['muchproteins_url'] = f"/protein/{product_data['slug']}"
        elif 'name' in product_data and 'manufacturer' in product_data:
            # Fallback: create URL from name and manufacturer
            brand_slug = product_data['manufacturer'].lower().replace(' ', '-')
            name_slug = product_data['name'].lower().replace(' ', '-').replace('™', '').replace('®', '')
            name_slug = re.sub(r'[^a-z0-9\-]', '', name_slug)
            product['muchproteins_url'] = f"/protein/{brand_slug}/{name_slug}"
        
        return product if product.get('title') else None
    
    def scrape_price_history(self, sample_size=10, delay_between_requests=3):
        """Scrape price history for a sample of products"""
        if not self.products_data:
            print("❌ No products available. Run scrape_main_pages() first.")
            return
        
        # Take a sample of products to avoid overwhelming the server
        import random
        sample_products = random.sample(self.products_data, min(sample_size, len(self.products_data)))
        
        print(f"📈 Scraping price history for {len(sample_products)} sample products...")
        
        successful_scrapes = 0
        
        for i, product in enumerate(sample_products):
            if 'muchproteins_url' not in product:
                print(f"⚠️ Product {i+1}: No URL found, skipping...")
                continue
                
            full_url = self.base_url + product['muchproteins_url']
            print(f"\n🔍 {i+1}/{len(sample_products)}: {product.get('title', 'Unknown')[:50]}...")
            print(f"   URL: {full_url}")
            
            price_history = self.scrape_individual_price_history(full_url, product)
            
            if price_history:
                # Add price history to the product
                product['price_history'] = price_history
                
                # Also store in separate list for analysis
                for entry in price_history:
                    history_record = {
                        'product_title': product.get('title'),
                        'product_brand': product.get('brand'),
                        'muchproteins_url': product['muchproteins_url'],
                        **entry
                    }
                    self.price_history_data.append(history_record)
                
                successful_scrapes += 1
                print(f"   ✅ Found {len(price_history)} price data points")
            else:
                print(f"   ❌ No price history found")
            
            # Be respectful with delays
            if i < len(sample_products) - 1:
                time.sleep(delay_between_requests)
        
        print(f"\n🎉 Successfully scraped price history for {successful_scrapes}/{len(sample_products)} products")
        print(f"📊 Total price data points collected: {len(self.price_history_data)}")
    
    def scrape_individual_price_history(self, url, product_info):
        """Scrape price history from an individual product page"""
        response = self.get_page(url)
        if not response:
            return None
        
        soup = BeautifulSoup(response.content, 'html.parser')
        
        # Look for price history data in script tags
        scripts = soup.find_all('script')
        
        for script in scripts:
            if not script.string:
                continue
                
            script_content = script.string.strip()
            
            # Method 1: Look for Next.js page data
            if script_content.startswith('{"props":'):
                try:
                    data = json.loads(script_content)
                    
                    if 'props' in data and 'pageProps' in data['props']:
                        page_props = data['props']['pageProps']
                        
                        # Look for price history data
                        price_history = self.extract_price_history_from_props(page_props)
                        if price_history:
                            return price_history
                            
                except json.JSONDecodeError:
                    continue
            
            # Method 2: Look for chart data or time series data
            if any(keyword in script_content.lower() for keyword in ['chart', 'price', 'history', 'data']):
                price_history = self.extract_price_history_from_script(script_content)
                if price_history:
                    return price_history
        
        # Method 3: Look for structured data in HTML
        return self.extract_price_history_from_html(soup)
    
    def extract_price_history_from_props(self, page_props):
        """Extract price history from Next.js page props"""
        price_history = []
        
        # Look for common price history field names
        history_fields = ['priceHistory', 'price_history', 'history', 'prices', 'chartData', 'data']
        
        for field in history_fields:
            if field in page_props:
                data = page_props[field]
                if isinstance(data, list) and len(data) > 0:
                    # Check if this looks like time series data
                    first_item = data[0]
                    if isinstance(first_item, dict) and any(key in first_item for key in ['date', 'time', 'price', 'value']):
                        for item in data:
                            price_entry = self.parse_price_history_entry(item)
                            if price_entry:
                                price_history.append(price_entry)
                        return price_history
        
        # Also check nested objects
        for key, value in page_props.items():
            if isinstance(value, dict):
                nested_history = self.extract_price_history_from_props(value)
                if nested_history:
                    return nested_history
        
        return price_history if price_history else None
    
    def extract_price_history_from_script(self, script_content):
        """Extract price history from script tags"""
        price_history = []
        
        # Look for array patterns that might contain price data
        patterns = [
            r'priceHistory["\']?\s*:\s*(\[.+?\])',
            r'chartData["\']?\s*:\s*(\[.+?\])',
            r'prices["\']?\s*:\s*(\[.+?\])',
            r'data["\']?\s*:\s*(\[.+?\])',
            r'history["\']?\s*:\s*(\[.+?\])'
        ]
        
        for pattern in patterns:
            matches = re.findall(pattern, script_content, re.DOTALL | re.IGNORECASE)
            for match in matches:
                try:
                    data = json.loads(match)
                    if isinstance(data, list) and len(data) > 0:
                        # Check if this looks like time series data
                        first_item = data[0]
                        if isinstance(first_item, dict) and any(key in str(first_item).lower() for key in ['date', 'price', 'time']):
                            for item in data:
                                price_entry = self.parse_price_history_entry(item)
                                if price_entry:
                                    price_history.append(price_entry)
                            return price_history
                except (json.JSONDecodeError, TypeError):
                    continue
        
        return price_history if price_history else None
    
    def extract_price_history_from_html(self, soup):
        """Extract price history from HTML elements"""
        # This would be used if the price data is embedded in HTML rather than JSON
        # Look for canvas elements (charts), data attributes, or table data
        
        # Check for data attributes
        elements_with_data = soup.find_all(attrs={'data-price': True}) or soup.find_all(attrs={'data-chart': True})
        
        if elements_with_data:
            price_history = []
            for element in elements_with_data:
                # Try to extract data from attributes
                for attr, value in element.attrs.items():
                    if 'price' in attr.lower() or 'chart' in attr.lower():
                        try:
                            data = json.loads(value)
                            if isinstance(data, list):
                                for item in data:
                                    price_entry = self.parse_price_history_entry(item)
                                    if price_entry:
                                        price_history.append(price_entry)
                        except (json.JSONDecodeError, TypeError):
                            continue
            
            return price_history if price_history else None
        
        return None
    
    def parse_price_history_entry(self, entry):
        """Parse a single price history entry"""
        if not isinstance(entry, dict):
            return None
        
        price_entry = {}
        
        # Map common field names
        date_fields = ['date', 'time', 'timestamp', 'x']
        price_fields = ['price', 'value', 'y', 'amount']
        
        # Extract date
        for field in date_fields:
            if field in entry:
                price_entry['date'] = entry[field]
                break
        
        # Extract price
        for field in price_fields:
            if field in entry:
                price_entry['price'] = self.parse_numeric(entry[field])
                break
        
        # Add any other fields that might be useful
        for key, value in entry.items():
            if key not in price_entry:
                price_entry[key] = value
        
        # Validate that we have minimum required data
        if 'date' in price_entry and 'price' in price_entry:
            return price_entry
        
        return None
    
    def parse_numeric(self, value):
        """Extract numeric value from string"""
        if isinstance(value, (int, float)):
            return value
        if isinstance(value, str):
            cleaned = re.sub(r'[€$£,\s]', '', value)
            match = re.search(r'\d+(?:\.\d+)?', cleaned)
            if match:
                return float(match.group())
        return None
    
    def to_dataframe(self):
        """Convert scraped data to pandas DataFrame"""
        if not self.products_data:
            print("⚠️ No data to convert")
            return pd.DataFrame()
        return pd.DataFrame(self.products_data)
    
    def price_history_to_dataframe(self):
        """Convert price history data to pandas DataFrame"""
        if not self.price_history_data:
            print("⚠️ No price history data to convert")
            return pd.DataFrame()
        return pd.DataFrame(self.price_history_data)
    
    def save_data(self, products_filename='protein_products.json', history_filename='price_history.json'):
        """Save scraped data to JSON files"""
        # Save products
        with open(products_filename, 'w', encoding='utf-8') as f:
            json.dump(self.products_data, f, indent=2, ensure_ascii=False)
        print(f"💾 Products data saved to {products_filename}")
        
        # Save price history
        if self.price_history_data:
            with open(history_filename, 'w', encoding='utf-8') as f:
                json.dump(self.price_history_data, f, indent=2, ensure_ascii=False)
            print(f"💾 Price history data saved to {history_filename}")

# Usage example for notebook
def demo_usage():
    """Example of how to use the enhanced scraper"""
    
    print("=== ENHANCED PROTEIN SCRAPER DEMO ===\n")
    
    # Step 1: Initialize scraper
    scraper = EnhancedProteinScraper()
    
    # Step 2: Scrape main product listings
    print("Step 1: Scraping main product listings...")
    scraper.scrape_main_pages(start_page=1, max_pages=2)
    
    # Step 3: Scrape price history for a sample
    print("\nStep 2: Scraping price history for sample products...")
    scraper.scrape_price_history(sample_size=5, delay_between_requests=2)
    
    # Step 4: Convert to DataFrames
    products_df = scraper.to_dataframe()
    history_df = scraper.price_history_to_dataframe()
    
    print(f"\n📊 Results:")
    print(f"Products: {len(products_df)}")
    print(f"Price history records: {len(history_df)}")
    
    return scraper, products_df, history_df

In [15]:
# Initialize the enhanced scraper
scraper = EnhancedProteinScraper()

# First, get the main product listings
print("🚀 Scraping main product listings...")
total_products = scraper.scrape_main_pages(start_page=1, max_pages=5)

# Check the products with the correct column names
products_df = scraper.to_dataframe()
print(f"✅ Found {len(products_df)} products")

if not products_df.empty:
    print(f"\n📊 Available columns: {list(products_df.columns)}")
    
    # Display with the actual column names that exist
    display_columns = ['title', 'brand', 'price_per_100g_protein', 'price_per_100g', 'product_tags']
    
    # Check which columns actually exist
    existing_columns = [col for col in display_columns if col in products_df.columns]
    
    print(f"\n🎯 Sample data:")
    display(products_df[existing_columns].head())
    
    # Show nutrition and ingredient data if available
    if 'nutrition_info' in products_df.columns:
        print(f"\n🥗 Sample nutrition data:")
        print(products_df['nutrition_info'].iloc[0])
    
    if 'ingredients' in products_df.columns:
        print(f"\n📋 Sample ingredients data:")
        print(products_df['ingredients'].iloc[0])
        
    # Show the URLs we can use for price history
    if 'muchproteins_url' in products_df.columns:
        print(f"\n🔗 Sample URLs for price history:")
        for i, url in enumerate(products_df['muchproteins_url'].head(3)):
            print(f"  {i+1}. https://muchproteins.com{url}")
    
else:
    print("❌ No products found")

🚀 Scraping main product listings...
🚀 Starting to scrape pages 1 to 5...

📄 Scraping page 1: https://muchproteins.com
🎉 Found 25 products in data array!
✅ Found 25 products on page 1

📄 Scraping page 2: https://muchproteins.com/2
🎉 Found 25 products in data array!
✅ Found 25 products on page 2

📄 Scraping page 3: https://muchproteins.com/3
🎉 Found 25 products in data array!
✅ Found 25 products on page 3

📄 Scraping page 4: https://muchproteins.com/4
🎉 Found 25 products in data array!
✅ Found 25 products on page 4

📄 Scraping page 5: https://muchproteins.com/5
🎉 Found 3 products in data array!
✅ Found 3 products on page 5

🎉 Scraped 103 products total
✅ Found 103 products

📊 Available columns: ['source_page', 'scraped_at', 'extraction_method', 'title', 'product_url', 'brand', 'image_url', 'product_slug', 'ingredients', 'nutrition_info', 'product_tags', 'price_per_100g', 'price_per_100g_protein', 'price_summary', 'muchproteins_url']

🎯 Sample data:


Unnamed: 0,title,brand,price_per_100g_protein,price_per_100g,product_tags
0,Diet Whey Protein,PhD Nutrition,0.88,0.6,"vegetarian, gluten free"
1,Brown Rice Protein,Myprotein,1.64,1.28,"vegetarian, vegan, gluten free, dairy free, la..."
2,Impact Soy Protein,Myprotein,2.17,1.95,"vegetarian, vegan, dairy free, lactose free, g..."
3,Protein Hot Chocolate,Myprotein,2.19,1.25,"gluten free, vegetarian, dairy free, lactose f..."
4,Smart Protein Plant,PhD Nutrition,2.28,1.8,"vegetarian, dairy free, lactose free, vegan, g..."



🥗 Sample nutrition data:
{'typical values (when mixed with water±)': 'per 100g', 'energy': {'kj': 1528, 'kcal': 365}, 'fat': {'all': 5.5, 'saturated': 1.6}, 'carbohydrates': {'all': 11, 'sugar': 5}, 'fibre': 3, 'protein': 68, 'salt': 0.63, 'also provides:': None, 'flaxseed powder': {'all': 3, 'of which is flaxseed oil': 1.2}, 'conjugated linoleic acid powder': {'all': 1.5, 'of which is conjugated linoleic acid': 1}, 'l-carnitine': 0.5, 'green tea extract': 0.4}

📋 Sample ingredients data:
PhD Premium Protein Blend, ['Whey* Protein Concentrate', ['Milk Protein Concentrate', ['Of Which 80% Is Micellar Casein']], 'Soya Protein Isolate'], Waxy Barley Flour, Golden Brown Flaxseed Powder, Thickeners, ['Acacia Gum', 'Guar Gum', 'Xanthan Gum'], Flavouring, Conjugated Linoleic Acid Powder, [['Safflower Oil', ['Rich in Conjugated Linoeic Acid']], 'Glucose Syrup', 'Milk Protein', ['Emulsifier', ['Soya Lecithin']], 'Vitamin E'], L-Carnitine, Green Tea Extract, Sweetener, ['Sucralose']

🔗 Sample U

In [None]:
products_df

In [None]:
  # Save as CSV
# products_df.to_csv('product_data.csv', index=False)