In [None]:
from bs4 import BeautifulSoup
import xml.etree.ElementTree as ET
import requests
import csv
from datetime import datetime
from dataclasses import dataclass
from typing import List
import time
import re

@dataclass
class Offer:
    price_after_gc: str
    gift_card: str
    total_price: str
    monthly_price: str
    down_payment: str
    bib_premium: str
    bib_monthly: str
    down_return: str

@dataclass
class Carrier:
    name: str
    link: str
    offers: List[Offer]

@dataclass
class Phone:
    name: str
    carriers: List[Carrier]

# -------------------------------------------------------------------
#                    BESTBUY SCRAPER
# -------------------------------------------------------------------
class BestBuyScraper:
    def __init__(self):
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
                          '(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        }
        self.session = requests.Session()
        self.session.headers.update(self.headers)

    def scrape_bestbuy(self, xml_file_path: str) -> List[Phone]:
        """
        Scrapes data from the provided BestBuy XML.
        """
        phones = []
        try:
            # Fetch XML file from URL or load from local path
            if xml_file_path.startswith("http://") or xml_file_path.startswith("https://"):
                response = self.session.get(xml_file_path)
                response.raise_for_status()
                xml_content = response.content
                root = ET.fromstring(xml_content)
            else:
                tree = ET.parse(xml_file_path)
                root = tree.getroot()

            phone_nodes = root  # Assuming root is the <Phones> or a direct list of phones
            print(f"Found {len(phone_nodes)} BestBuy phone entries")

            for phone_node in phone_nodes:
                phone_name = phone_node.tag.replace('_', ' ')
                carriers = []

                for url_node in phone_node:
                    url = url_node.text.strip()
                    try:
                        carrier = self.get_carrier_price_data_bestbuy(url, url, phone_name)
                        carriers.append(carrier)
                    except Exception as e:
                        print(f"Error scraping carrier data for {phone_name}: {str(e)}")

                phones.append(Phone(name=phone_name, carriers=carriers))

        except Exception as e:
            print(f"Error in BestBuy scrape method: {str(e)}")
            if 'response' in locals():
                print("\nLast response content:")
                print(response.text)

        return phones

    def extract_carrier_name(self, url: str) -> str:
        carriers = {
            'telus': 'Telus',
            'koodo': 'Koodo',
            'rogers': 'Rogers',
            'fido': 'Fido',
            'freedom-mobile': 'Freedom Mobile',
            'bell': 'Bell',
            'virgin-plus': 'Virgin Plus'
        }
        for key, value in carriers.items():
            if key in url.lower():
                return value
        return 'Unknown'

    def extract_sku_id_bestbuy(self, url: str) -> str:
        """
        Extracts the SKU ID for BestBuy from the product URL.
        """
        return url.split('/')[-1]

    def get_phone_api_url_bestbuy(self, response: requests.Response) -> str:
        """
        Finds and returns the phone's API URL from the page HTML.
        """
        for resp_line_raw in response.iter_lines():
            resp_line = resp_line_raw.decode()
            searchprefix = 'cellPhonesCarrierPlansUrl'
            if searchprefix in resp_line:
                rematch = re.search(f'{searchprefix}":".*?"', resp_line)
                if rematch is not None:
                    api_url = rematch.group()[len(searchprefix)+3:-1]
                    return api_url
        return ""

    def get_carrier_price_data_bestbuy(self, url: str, link: str, phone_name: str) -> Carrier:
        """
        Given a BestBuy URL, scrapes the carrier data and builds a Carrier object.
        """
        carrier_name = self.extract_carrier_name(url)
        sku_id = self.extract_sku_id_bestbuy(url)
        print(f"[BestBuy] Started Extracting data for {phone_name} ({sku_id}) - {carrier_name}")

        max_retries = 3
        attempt = 1
        response_text = None

        # Return-it (BIB) placeholders
        return_it_monthly = 'N/A'
        return_it_down = 'N/A'

        # Keep-it placeholders
        keep_it_monthly = 'N/A'
        keep_it_down = 'N/A'
        gift_card_amount = '0'

        while attempt <= max_retries:
            try:
                response = self.session.get(url)
                response.raise_for_status()

                # Extract the cellPhonesCarrierPlansUrl
                cellphone_api_url = self.get_phone_api_url_bestbuy(response)
                if not cellphone_api_url:
                    raise Exception("Could not find cellPhonesCarrierPlansUrl in the page source")

                # Replace {skuId} with actual sku_id
                request_str = re.sub(r'{skuId}', sku_id, cellphone_api_url)

                # Get the JSON from the API
                api_response = self.session.get(request_str)
                api_response.raise_for_status()
                data_list = api_response.json()

                # We expect a list of offers in data_list
                for offer_dict in data_list:
                    if 'return-it' in offer_dict.get('type', ''):
                        return_it_monthly = str(offer_dict['monthly'])
                        return_it_down = str(offer_dict['downPayment'])
                    if 'keep-it' in offer_dict.get('type', ''):
                        keep_it_monthly = str(offer_dict['monthly'])
                        keep_it_down = str(offer_dict['downPayment'])
                    gift_card_amount = str(offer_dict.get('giftCard', '0'))

                response_text = response.text

                # If we have valid keep-it data
                if (keep_it_monthly.replace('.', '').isdigit() and
                    keep_it_monthly != 'N/A'):

                    monthly_price = float(keep_it_monthly)
                    down_payment = float(keep_it_down) if keep_it_down.replace('.', '').isdigit() else 0.0
                    gift_card = float(gift_card_amount) if gift_card_amount.replace('.', '').isdigit() else 0.0

                    total_price = monthly_price * 24 + down_payment
                    price_after_gc = total_price - gift_card

                    # BIB values
                    bib_monthly_price = float(return_it_monthly) if return_it_monthly.replace('.', '').isdigit() else 0.0
                    bib_down_payment = float(return_it_down) if return_it_down.replace('.', '').isdigit() else 0.0
                    if bib_monthly_price > 0:
                        # For BIB, the "premium" is the difference in total phone cost
                        # compared to the keep-it option
                        bib_total = bib_monthly_price * 24 + bib_down_payment
                        bib_premium_val = total_price - bib_total
                        bib_premium_str = f"{bib_premium_val:.2f}"
                    else:
                        bib_premium_str = "N/A"

                    offer = Offer(
                        price_after_gc=f"{price_after_gc:.2f}",
                        gift_card=f"{gift_card:.2f}",
                        total_price=f"{total_price:.2f}",
                        monthly_price=f"{monthly_price:.2f}",
                        down_payment=f"{down_payment:.2f}",
                        bib_premium=bib_premium_str,
                        bib_monthly=f"{bib_monthly_price:.2f}" if bib_monthly_price > 0 else "N/A",
                        down_return=f"{bib_down_payment:.2f}" if bib_down_payment > 0 else "N/A"
                    )
                    return Carrier(name=carrier_name, link=link, offers=[offer])

                # Otherwise, we do a retry
                if attempt < max_retries:
                    print(f"Retry {attempt} - Cannot load monthly price for {phone_name} - {carrier_name}")
                    time.sleep(2)
                attempt += 1

            except Exception as e:
                if attempt < max_retries:
                    print(f"Retry {attempt} - Error occurred for {phone_name} - {carrier_name}: {str(e)}")
                    time.sleep(2)
                attempt += 1

        # If we exhausted all attempts
        print(f"Failed to load data after {max_retries} attempts for {phone_name} - {carrier_name}")
        if response_text:
            print("Last page response text was available but parsing failed or incomplete.\n")

        # Return a fallback with no data
        return Carrier(
            name=carrier_name,
            link=link,
            offers=[
                Offer(
                    price_after_gc='N/A',
                    gift_card='0',
                    total_price='N/A',
                    monthly_price='N/A',
                    down_payment='N/A',
                    bib_premium='N/A',
                    bib_monthly='N/A',
                    down_return='N/A'
                )
            ]
        )

# -------------------------------------------------------------------
#                    WALMART SCRAPER
# -------------------------------------------------------------------
class WalmartScraper:
    def __init__(self):
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
                          '(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Content-Type': 'application/x-www-form-urlencoded',
            'Accept': '*/*',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive'
        }
        self.session = requests.Session()
        self.session.headers.update(self.headers)

    def scrape_walmart(self, xml_file_path: str) -> List[Phone]:
        """
        Scrapes data from the provided Walmart XML using multi-threading,
        preserving the phone order in the CSV via dict insertion order.
        If extract_sku_ids_walmart raises an exception, we set error=True
        for that phone and fill numeric fields with 'N/A'.
        """
        phones: List[Phone] = []
        phone_data_map = {}  # phone_name -> { 'phone_url': ..., 'sku_ids_map': ..., 'results': {}, 'error': bool }

        try:
            # 1) Load/parse the XML
            if xml_file_path.startswith("http://") or xml_file_path.startswith("https://"):
                response = self.session.get(xml_file_path)
                response.raise_for_status()
                xml_content = response.content
                root = ET.fromstring(xml_content)
            else:
                tree = ET.parse(xml_file_path)
                root = tree.getroot()

            phone_nodes = root
            print(f"Found {len(phone_nodes)} Walmart phone entries")

            # 2) For each phone node, gather SKU info (or set error=True)
            for phone_node in phone_nodes:
                phone_name = phone_node.tag.replace('_', ' ')
                phone_url = phone_node.text.strip() if phone_node.text else ""

                sku_ids_map = self.extract_sku_ids_walmart(phone_url)
                has_error = ("_error" in sku_ids_map)  # True if an exception occurred

                phone_data_map[phone_name] = {
                    'phone_url': phone_url,
                    # If there's an error, we won't attempt concurrency
                    'sku_ids_map': {} if has_error else sku_ids_map,
                    'results': {},
                    'error': has_error
                }

            # 3) Multi-threaded fetch for phones without errors
            futures = []
            with ThreadPoolExecutor(max_workers=5) as executor:
                for phone_name, pinfo in phone_data_map.items():
                    if pinfo['error']:
                        # Skip concurrency if we encountered an error for this phone
                        continue

                    # Otherwise, fetch all carriers for this phone in parallel
                    for carrier_name, variation_id in pinfo['sku_ids_map'].items():
                        future = executor.submit(
                            self._fetch_sku_price,
                            phone_name,
                            carrier_name,
                            variation_id
                        )
                        futures.append((phone_name, future))

                # 4) Collect results
                for phone_name, future in futures:
                    try:
                        carrier_name, offer = future.result()
                        phone_data_map[phone_name]['results'][carrier_name] = offer
                    except Exception as e:
                        print(f"Thread error while fetching data for {phone_name}: {e}")

            # 5) Build final Phones in insertion order
            for phone_name, pinfo in phone_data_map.items():
                if pinfo['error']:
                    # Produce 'N/A' carriers (because we had an extraction error)
                    na_carriers = self._produce_na_carriers(pinfo['phone_url'])
                    phones.append(Phone(name=phone_name, carriers=na_carriers))
                else:
                    # Rebuild carriers from the concurrency results
                    carriers = []
                    for carrier_name, variation_id in pinfo['sku_ids_map'].items():
                        # If we got a result from concurrency, use it
                        if carrier_name in pinfo['results']:
                            offer = pinfo['results'][carrier_name]
                        else:
                            # Fallback if no data was fetched
                            offer = Offer(
                                price_after_gc='N/A',
                                gift_card='0',
                                total_price='N/A',
                                monthly_price='N/A',
                                down_payment='N/A',
                                bib_premium='NODATA',
                                bib_monthly='NODATA',
                                down_return='NODATA'
                            )
                        carriers.append(Carrier(name=carrier_name, link=pinfo['phone_url'], offers=[offer]))

                    phones.append(Phone(name=phone_name, carriers=carriers))

        except Exception as e:
            print(f"Error in Walmart scrape method: {e}")
            if 'response' in locals():
                print("\nLast response content:")
                print(response.text)

        return phones

    def extract_sku_ids_walmart(self, page_url: str) -> dict:
        """
        Parse the Walmart phone page, find possible carriers / variation IDs (SKUs).
        If an exception occurs, return {'_error': True} so the caller knows to produce 'N/A'.
        """
        carrier_sku_map = {}
        if not page_url:
            return carrier_sku_map

        try:
            resp = self.session.get(page_url)
            resp.raise_for_status()
            soup = BeautifulSoup(resp.text, "html.parser")

            carrier_div = soup.find('div', class_='pd-carriers-option-wrapper')
            if not carrier_div:
                print("Could not find any carrier wrapper on Walmart page.")
                return carrier_sku_map

            buttons = carrier_div.find_all('button', class_='pd-carrier-options')
            if not buttons:
                print("No carrier buttons found on Walmart page.")
                return carrier_sku_map

            for btn in buttons:
                variation_id = btn.get('data-variations', '').strip()
                carrier_name_elem = btn.find('span', class_='pd-carrier-name')
                carrier_name = carrier_name_elem.text.strip() if carrier_name_elem else "UnknownCarrier"

                if variation_id:
                    carrier_sku_map[carrier_name] = variation_id

        except Exception as e:
            print(f"Error extracting Walmart SKU IDs from {page_url}: {e}")
            # Indicate to caller that there's an actual error
            carrier_sku_map["_error"] = True

        return carrier_sku_map

    def _fetch_sku_price(self, phone_name: str, carrier_name: str, variation_id: str):
        """
        Worker function for multi-threading that fetches a single carrier's
        price data from Walmart, returning (carrier_name, Offer).
        """
        print(f"Getting price data for {phone_name} ({variation_id}) - {carrier_name}")
        price_data = self.get_carrier_price_data_walmart(variation_id)

        if price_data and price_data.get('success'):
            data = price_data['data']
            monthly_raw = data.get('monthly_price', 'N/A')
            down_raw = data.get('down_price', 'N/A')
            gc_raw = data.get('gc_price', '0')

            monthly_str = str(monthly_raw).replace('$', '').replace('/month', '')
            down_str = str(down_raw).replace('$', '')
            gc_str = str(gc_raw).replace('$', '')

            try:
                monthly_price = float(monthly_str) if monthly_str.replace('.', '').isdigit() else 0.0
                down_payment = float(down_str) if down_str.replace('.', '').isdigit() else 0.0
                gift_card = float(gc_str) if gc_str.replace('.', '').isdigit() else 0.0

                total_price = monthly_price * 24 + down_payment
                price_after_gc = total_price - gift_card

                offer = Offer(
                    price_after_gc=f"{price_after_gc:.2f}",
                    gift_card=f"{gift_card:.2f}",
                    total_price=f"{total_price:.2f}",
                    monthly_price=f"{monthly_price:.2f}",
                    down_payment=f"{down_payment:.2f}",
                    bib_premium="NODATA",  # Walmart doesn't have BIB
                    bib_monthly="NODATA",
                    down_return="NODATA"
                )
                return (carrier_name, offer)
            except Exception as parse_ex:
                print(f"Error parsing price data for {phone_name} - {carrier_name}: {parse_ex}")
                fallback_offer = Offer(
                    price_after_gc='N/A',
                    gift_card='0',
                    total_price='N/A',
                    monthly_price='N/A',
                    down_payment='N/A',
                    bib_premium='NODATA',
                    bib_monthly='NODATA',
                    down_return='NODATA'
                )
                return (carrier_name, fallback_offer)
        else:
            print(f"Failed to get price data for {phone_name} - {carrier_name}")
            fallback_offer = Offer(
                price_after_gc='N/A',
                gift_card='0',
                total_price='N/A',
                monthly_price='N/A',
                down_payment='N/A',
                bib_premium='NODATA',
                bib_monthly='NODATA',
                down_return='NODATA'
            )
            return (carrier_name, fallback_offer)

    def get_carrier_price_data_walmart(self, variation_id: str) -> dict:
        """
        Calls Walmart's Wireless endpoint to get the price data for the given variation_id (SKU).
        Returns JSON like:
          {
            "success": true,
            "data": {
               "monthly_price": "$25.00",
               "down_price": "$100.00",
               "gc_price": "$50.00"
            }
          }
        or {"success": false} on failure.
        """
        try:
            url = "https://www.wireless.walmart.ca/wp-admin/admin-ajax.php"
            payload = {
                'action': 'get_variation_price',
                'variation_id': variation_id
            }
            resp = self.session.post(url, data=payload)
            resp.raise_for_status()
            return resp.json()
        except Exception as e:
            print(f"Error getting Walmart price data for SKU {variation_id}: {e}")
            return {"success": False}

    def _produce_na_carriers(self, link: str) -> List[Carrier]:
        """
        Builds a set of carriers with 'N/A' for numeric fields,
        used when we fail to extract any SKU IDs for a phone.
        """
        carrier_names = [
            'Fido', 'Rogers', 'Virgin Plus',
            'Bell', 'Koodo', 'Telus', 'Freedom Mobile'
        ]
        na_offer = Offer(
            price_after_gc='N/A',
            gift_card='N/A',
            total_price='N/A',
            monthly_price='N/A',
            down_payment='N/A',
            bib_premium='NODATA',  # or 'N/A', your choice
            bib_monthly='NODATA',
            down_return='NODATA'
        )
        return [
            Carrier(name=c, link=link, offers=[na_offer])
            for c in carrier_names
        ]

# -------------------------------------------------------------------
#                    COSTCO SCRAPER
# -------------------------------------------------------------------
    # TODO

# -------------------------------------------------------------------
#                    CSV WRITER (REUSED)
# -------------------------------------------------------------------
def write_to_csv(phones: List[Phone], file_path: str):
    """
    Writes phone data to CSV. For Walmart, bib_* fields will be "NODATA" as set
    in the Offer. For BestBuy, they will have real values if found.
    """
    headers = [
        'Phone',
        'Carrier',
        'Price After GC',
        'Gift Card Amount',
        'Total Price',
        'Monthly Price',
        'Downpayment',
        'BIB Premium',
        'BIB Monthly',
        'BIB Downpayment',
        'Link'
    ]

    # Reorder the carriers in a common order if you like
    carrier_order = ['Fido', 'Rogers', 'Virgin Plus', 'Bell', 'Koodo', 'Telus', 'Freedom Mobile']

    print(f"Writing data to CSV: {file_path}")
    with open(file_path, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(headers)

        for phone in phones:
            # Map carriers by name so we can produce them in consistent order
            carrier_map = {carrier.name: carrier for carrier in phone.carriers}

            for carrier_name in carrier_order:
                if carrier_name in carrier_map:
                    carrier_obj = carrier_map[carrier_name]
                    for offer in carrier_obj.offers:
                        writer.writerow([
                            phone.name,
                            carrier_obj.name,
                            offer.price_after_gc,
                            offer.gift_card,
                            offer.total_price,
                            offer.monthly_price,
                            offer.down_payment,
                            offer.bib_premium,
                            offer.bib_monthly,
                            offer.down_return,
                            carrier_obj.link
                        ])
                else:
                    # If the phone doesn't have that carrier, fill with placeholders
                    writer.writerow([
                        phone.name,
                        carrier_name,
                        '--', '--', '--', '--', '--', '--', '--', '--', '--'
                    ])

# -------------------------------------------------------------------
#                    STARTUP FUNCTIONS
# -------------------------------------------------------------------

def startBB():
    print("Starting Bestbuy Mobile scraper")
    bb_scraper = BestBuyScraper()
    xml_path = "https://raw.githubusercontent.com/Herbrax/PhoneDealsScrapper/22ed5b864d6f2ca18965d8d15d4b3596e8a82c60/bestbuymobile.xml"

    phones = bb_scraper.scrape_bestbuy(xml_path)
    current_date = datetime.now().strftime("%Y%m%d")
    csv_path = f"bestbuy_{current_date}_mobiles.csv"

    write_to_csv(phones, csv_path)
    print("All BestBuy data has been written to CSV files.")

def startWM():
    print("Starting Walmart Wireless scraper")
    wm_scraper = WalmartScraper()
    xml_path = "https://raw.githubusercontent.com/Herbrax/PhoneDealsScrapper/refs/heads/main/walmartwireless.xml"  # Replace with your actual Walmart XML if needed

    phones = wm_scraper.scrape_walmart(xml_path)
    current_date = datetime.now().strftime("%Y%m%d")
    csv_path = f"walmart_{current_date}_mobiles.csv"

    write_to_csv(phones, csv_path)
    print("All Walmart data has been written to CSV files.")

#def startCC():

In [4]:
startBB()

Starting Bestbuy Mobile scraper
Found 66 BestBuy phone entries
[BestBuy] Started Extracting data for iPhone 13 128GB (15726923) - Freedom Mobile
[BestBuy] Started Extracting data for iPhone 13 128GB (15726985) - Koodo
[BestBuy] Started Extracting data for iPhone 13 128GB (15727047) - Telus


KeyboardInterrupt: 

In [3]:
startWM()

Starting Walmart Wireless scraper
Error in Walmart scrape method: not well-formed (invalid token): line 24, column 75

Last response content:






<!DOCTYPE html>
<html
  lang="en"
  
  data-color-mode="auto" data-light-theme="light" data-dark-theme="dark"
  data-a11y-animated-images="system" data-a11y-link-underlines="true"
  
  >



  <head>
    <meta charset="utf-8">
  <link rel="dns-prefetch" href="https://github.githubassets.com">
  <link rel="dns-prefetch" href="https://avatars.githubusercontent.com">
  <link rel="dns-prefetch" href="https://github-cloud.s3.amazonaws.com">
  <link rel="dns-prefetch" href="https://user-images.githubusercontent.com/">
  <link rel="preconnect" href="https://github.githubassets.com" crossorigin>
  <link rel="preconnect" href="https://avatars.githubusercontent.com">

  


  <link crossorigin="anonymous" media="all" rel="stylesheet" href="https://github.githubassets.com/assets/light-0cfd1fd8509e.css" /><link crossorigin="anonymous" media="all" rel="st