In [8]:
import logging
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import gspread
from google.oauth2.service_account import Credentials
import pandas as pd
import re

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger()

# Path to your service account key file
SERVICE_ACCOUNT_FILE = 'ranking-436314-4daf4b7d4292.json'

# Define the scope
scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"]

# Authenticate using the service account file
creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=scope)
client = gspread.authorize(creds)

# Open the Google Sheet
workbook = client.open("Skillmatics Rank Sheet Streamlit")
url_sheet = workbook.worksheet("Node-URL")
rank_sheet = workbook.worksheet("Node-Rank")

def current_time_slot():
    """Returns the current timestamp for tracking scraping sessions."""
    now = datetime.now()
    return now.strftime('%Y-%m-%d %H:%M')

def parse_rank_numbers(rank_text):
    """Parses the rank text to extract the primary and secondary rankings."""
    ranks = rank_text.split('\n')
    primary_rank = ranks[0].split()[0].lstrip('#').replace(',', '')
    secondary_rank = ranks[1].split()[0].lstrip('#').replace(',', '') if len(ranks) > 1 else ''
    return f"{primary_rank}({secondary_rank})" if secondary_rank else primary_rank



def update_google_sheet(product, current_time, rank_value):
    """Updates the Google Sheet with the latest rank values efficiently."""
    try:
        # Fetch existing data
        existing_data = rank_sheet.get_all_records()
        existing_df = pd.DataFrame(existing_data)
        existing_df.set_index('Product', inplace=True)

        # Create a new DataFrame for the update
        update_data = pd.DataFrame({current_time: [rank_value]}, index=[product])

        # Merge new data
        merged_data = existing_df.combine_first(update_data)

        # Convert DataFrame back to list format
        data = merged_data.reset_index().values.tolist()
        headers = ['Product'] + [col for col in merged_data.columns if col != 'Product']

        # Clear and update the sheet in one batch update
        rank_sheet.clear()
        rank_sheet.update('A1', [headers] + data)

        logger.info(f"Updated Google Sheet: {product} -> {rank_value} at {current_time}")
    except Exception as e:
        logger.error(f"Error updating Google Sheet: {e}")



def fetch_rank_for_asin(driver, asin):
    """Fetches the rank for the provided ASIN on the page."""
    try:
        rank_element = driver.find_element(By.XPATH, f"//div[@data-asin='{asin}']//span[contains(@class, 'zg-bdg-text')]")
        rank_text = rank_element.text.strip().replace('#', '')
        logger.info(f"Rank for ASIN {asin}: {rank_text}")
        return rank_text
    except Exception as e:
        logger.warning(f"Could not fetch rank for ASIN {asin}: {e}")
        return None

def fetch_rank_in_subcategory(driver, subcategory_url, subcategory_name, asin):
    """Fetches the rank of the product in a specific subcategory."""
    try:
        driver.get(subcategory_url)
        logger.info(f"Opening Subcategory URL: {subcategory_url} for {subcategory_name}")

        WebDriverWait(driver, 30).until(
            EC.presence_of_element_located((By.XPATH, f"//div[@data-asin='{asin}']"))
        )
        logger.info(f"ASIN {asin} found on subcategory page: {subcategory_name}")

        rank = fetch_rank_for_asin(driver, asin)
        if rank:
            return f"{rank}"  # Only return the rank for simplified formatting
        return None
    except Exception as e:
        logger.warning(f"ASIN {asin} not found or rank unavailable in subcategory {subcategory_name}: {e}")
        return None

# def fetch_subcategories_and_ranks(driver):
#     """Fetches the next three subcategories, opens their links, and retrieves rankings."""
#     try:
#         subcategory_elements = driver.find_elements(By.XPATH, "//a[contains(@href, '/zgbs/')]")
#         subcategories = []
#         count = 0

#         for element in subcategory_elements:
#             try:
#                 subcategory_name = element.text.strip()
#                 subcategory_link = element.get_attribute("href")

#                 # Skip irrelevant categories
#                 if all(skip not in subcategory_name for skip in ["Any Department", "Toys & Games"]) and subcategory_link:
#                     subcategories.append((subcategory_name, subcategory_link))
#                     logger.info(f"Found subcategory: {subcategory_name} - {subcategory_link}")
#                     count += 1
#                     if count >= 5:  # Limit to 3 subcategories
#                         break
#             except Exception as e:
#                 logger.warning(f"Error processing subcategory element: {e}")

#         return subcategories
#     except Exception as e:
#         logger.error(f"Failed to fetch subcategories: {e}")
#         return []

def fetch_subcategories_and_ranks(driver):
    """
    Fetch the next three subcategories and their rankings after skipping "Any Department" and "Toys & Games".
    """
    try:
        subcategory_elements = driver.find_elements(By.XPATH, "//div[@role='treeitem'] | //a[contains(@href, '/zgbs/')]")
        subcategories = []
        count = 0

        for element in subcategory_elements:
            try:
                subcategory_name = element.text.strip()
                subcategory_link = element.get_attribute("href")

                # Skip "Any Department", "Toys & Games", and similar categories
                if all(skip not in subcategory_name for skip in ["Any Department", "Toys & Games"]) and subcategory_link:
                    subcategories.append((subcategory_name, subcategory_link))
                    logger.info(f"Found subcategory: {subcategory_name} - {subcategory_link}")
                    count += 1
                    if count >= 5:  # Limit to the next three valid subcategories
                        break
            except Exception as e:
                logger.warning(f"Error processing subcategory element: {e}")

        return subcategories
    except Exception as e:
        logger.error(f"Failed to fetch subcategories: {e}")
        return []

def check_asins_in_category_page(driver, category_url, asin_list):
    """Check if ASINs exist in the category page and fetch their ranks."""
    try:
        driver.get(category_url)
        logger.info(f"Opening Category URL: {category_url}")

        found_asins = []
        subcategories = fetch_subcategories_and_ranks(driver)

        for asin in asin_list:
            ranks = []
            main_rank = fetch_rank_for_asin(driver, asin)
            if main_rank:
                ranks.append(main_rank)

            for subcategory_name, subcategory_link in subcategories:
                if subcategory_link:
                    sub_rank = fetch_rank_in_subcategory(driver, subcategory_link, subcategory_name, asin)
                    if sub_rank:
                        ranks.insert(0, sub_rank)

            final_rank = "".join([f"({rank})" for rank in ranks])
            found_asins.append((asin, final_rank))

        return found_asins, subcategories
    except Exception as e:
        logger.error(f"An error occurred while checking ASINs: {e}")
        return [], []

def extract_first_three_numbers(rank_output):
    """Extracts the first three numbers from the rank output."""
    matches = re.findall(r'\d+\(\d+\)|\(\d+\)', rank_output)
    logger.debug(f"Extracted matches: {matches}")
    return "".join(matches[:3])

def scrape_best_sellers_rank(driver, product, url, asin_list, current_time):
    """Scrapes the Best Sellers Rank and category ranks for a given product URL and ASINs."""
    try:
        driver.get(url)
        rank_section = WebDriverWait(driver, 80).until(
            EC.visibility_of_element_located((By.XPATH, "//th[contains(text(), 'Best Sellers Rank')]//following-sibling::td"))
        )
        rank_text = rank_section.text.strip()
        best_seller_rank = parse_rank_numbers(rank_text)
        logger.info(f"Best Sellers Rank for {product}: {best_seller_rank}")

        if best_seller_rank.endswith("(1)"):
            category_links = rank_section.find_elements(By.XPATH, ".//a")
            if len(category_links) > 1:
                second_category_url = category_links[1].get_attribute("href")
                found_asins_with_ranks, subcategories = check_asins_in_category_page(driver, second_category_url, asin_list)

                category_ranks = [ranks for asin, ranks in found_asins_with_ranks]
                category_ranks_str = f"({'})('.join(category_ranks[:3])})" if category_ranks else ""
                final_rank = f"{best_seller_rank}{category_ranks_str}"

                formatted_rank = extract_first_three_numbers(final_rank)
                logger.info(f"Final rank for {product} (ASIN: {asin_list[0]}): {formatted_rank}")
                update_google_sheet(product, current_time, formatted_rank)
            else:
                logger.warning(f"No second category link found for {product}.")
        else:
            logger.info(f"Skipping category ranking for {product}, as Best Seller Rank is not (1).")
            update_google_sheet(product, current_time, best_seller_rank)
    except Exception as e:
        logger.error(f"Failed to scrape {product} ({url}): {e}")

def run_scrape_all():
    """Fetch URLs and ASINs from the URL sheet and scrape rankings for all products."""
    url_data = url_sheet.get_all_records()
    df = pd.DataFrame(url_data)
    df.columns = df.columns.str.strip()

    product_names = df['Products'].tolist()
    url_list = df['URL'].tolist()
    asin_list = df['ASIN'].tolist()

    current_time = current_time_slot()

    options = Options()
    options.add_argument("--disable-gpu")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")
    options.add_argument("--log-level=3")

    driver = webdriver.Chrome(options=options)

    for product, url, asin in zip(product_names, url_list, asin_list):
        scrape_best_sellers_rank(driver, product, url, [asin], current_time)

    driver.quit()
        
if __name__ == "__main__":
    logger.info("Starting the scraper for all products...")
    run_scrape_all()

2025-02-03 10:59:33,351 - Starting the scraper for all products...
2025-02-03 10:59:59,527 - Best Sellers Rank for Found It: 2246(13)
2025-02-03 10:59:59,527 - Skipping category ranking for Found It, as Best Seller Rank is not (1).
  rank_sheet.update('A1', [headers] + data)
2025-02-03 11:00:00,578 - Error updating Google Sheet: Out of range float values are not JSON compliant: nan
2025-02-03 11:00:07,581 - Best Sellers Rank for poke in art: 66(1)
2025-02-03 11:00:11,543 - Opening Category URL: https://www.amazon.com/gp/bestsellers/toys-and-games/166078011/ref=pd_zg_hrsr_toys-and-games
2025-02-03 11:00:11,609 - Found subcategory: 1 - https://www.amazon.com/Best-Sellers-Toys-Games-Kids-Wood-Craft-Kits/zgbs/toys-and-games/166078011/ref=zg_bs_pg_1_toys-and-games?_encoding=UTF8&pg=1
2025-02-03 11:00:11,631 - Found subcategory: 2 - https://www.amazon.com/Best-Sellers-Toys-Games-Kids-Wood-Craft-Kits/zgbs/toys-and-games/166078011/ref=zg_bs_pg_2_toys-and-games?_encoding=UTF8&pg=2
2025-02-03 11

In [9]:
import logging
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import gspread
from google.oauth2.service_account import Credentials
import pandas as pd
import re

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger()

# Path to your service account key file
SERVICE_ACCOUNT_FILE = 'ranking-436314-4daf4b7d4292.json'

# Define the scope
scope = ["https://spreadsheets.google.com/feeds", "https://www.googleapis.com/auth/drive"]

# Authenticate using the service account file
creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=scope)
client = gspread.authorize(creds)

# Open the Google Sheet
workbook = client.open("Skillmatics Rank Sheet Streamlit")
url_sheet = workbook.worksheet("Node-URL")
rank_sheet = workbook.worksheet("Node-Rank")

def current_time_slot():
    """Returns the current timestamp for tracking scraping sessions."""
    now = datetime.now()
    return now.strftime('%Y-%m-%d %H:%M')

def parse_rank_numbers(rank_text):
    """Parses the rank text to extract the primary and secondary rankings."""
    ranks = rank_text.split('\n')
    primary_rank = ranks[0].split()[0].lstrip('#').replace(',', '')
    secondary_rank = ranks[1].split()[0].lstrip('#').replace(',', '') if len(ranks) > 1 else ''
    return f"{primary_rank}({secondary_rank})" if secondary_rank else primary_rank



# def update_google_sheet(product, current_time, rank_value):
#     """Updates the Google Sheet with the latest rank values efficiently."""
#     try:
#         # Fetch existing data
#         existing_data = rank_sheet.get_all_records()
#         existing_df = pd.DataFrame(existing_data)
#         existing_df.set_index('Product', inplace=True)

#         # Create a new DataFrame for the update
#         update_data = pd.DataFrame({current_time: [rank_value]}, index=[product])

#         # Merge new data
#         merged_data = existing_df.combine_first(update_data)

#         # Convert DataFrame back to list format
#         data = merged_data.reset_index().values.tolist()
#         headers = ['Product'] + [col for col in merged_data.columns if col != 'Product']

#         # Clear and update the sheet in one batch update
#         rank_sheet.clear()
#         rank_sheet.update('A1', [headers] + data)

#         logger.info(f"Updated Google Sheet: {product} -> {rank_value} at {current_time}")
#     except Exception as e:
#         logger.error(f"Error updating Google Sheet: {e}")



def update_google_sheet(product, current_time, rank_value):
    """Efficiently updates the Google Sheet with new ranking values."""
    try:
        # Fetch existing data
        existing_data = rank_sheet.get_all_records()
        existing_df = pd.DataFrame(existing_data)
        
        if 'Product' not in existing_df.columns:
            existing_df['Product'] = ''  # Ensure Product column exists
        
        existing_df.set_index('Product', inplace=True)

        # If new column does not exist, create it
        if current_time not in existing_df.columns:
            existing_df[current_time] = ''

        # Update rank for the specific product
        existing_df.loc[product, current_time] = rank_value

        # Convert DataFrame back to list format
        data = existing_df.reset_index().values.tolist()
        headers = ['Product'] + [col for col in existing_df.columns if col != 'Product']

        # Clear and update the sheet in a batch operation
        rank_sheet.clear()
        rank_sheet.append_row(headers)
        rank_sheet.append_rows(data)

        logger.info(f"Updated Google Sheet: {product} -> {rank_value} at {current_time}")
    except Exception as e:
        logger.error(f"Error updating Google Sheet: {e}")



def fetch_rank_for_asin(driver, asin):
    """Fetches the rank for the provided ASIN on the page."""
    try:
        rank_element = driver.find_element(By.XPATH, f"//div[@data-asin='{asin}']//span[contains(@class, 'zg-bdg-text')]")
        rank_text = rank_element.text.strip().replace('#', '')
        logger.info(f"Rank for ASIN {asin}: {rank_text}")
        return rank_text
    except Exception as e:
        logger.warning(f"Could not fetch rank for ASIN {asin}: {e}")
        return None

def fetch_rank_in_subcategory(driver, subcategory_url, subcategory_name, asin):
    """Fetches the rank of the product in a specific subcategory."""
    try:
        driver.get(subcategory_url)
        logger.info(f"Opening Subcategory URL: {subcategory_url} for {subcategory_name}")

        WebDriverWait(driver, 30).until(
            EC.presence_of_element_located((By.XPATH, f"//div[@data-asin='{asin}']"))
        )
        logger.info(f"ASIN {asin} found on subcategory page: {subcategory_name}")

        rank = fetch_rank_for_asin(driver, asin)
        if rank:
            return f"{rank}"  # Only return the rank for simplified formatting
        return None
    except Exception as e:
        logger.warning(f"ASIN {asin} not found or rank unavailable in subcategory {subcategory_name}: {e}")
        return None

# def fetch_subcategories_and_ranks(driver):
#     """Fetches the next three subcategories, opens their links, and retrieves rankings."""
#     try:
#         subcategory_elements = driver.find_elements(By.XPATH, "//a[contains(@href, '/zgbs/')]")
#         subcategories = []
#         count = 0

#         for element in subcategory_elements:
#             try:
#                 subcategory_name = element.text.strip()
#                 subcategory_link = element.get_attribute("href")

#                 # Skip irrelevant categories
#                 if all(skip not in subcategory_name for skip in ["Any Department", "Toys & Games"]) and subcategory_link:
#                     subcategories.append((subcategory_name, subcategory_link))
#                     logger.info(f"Found subcategory: {subcategory_name} - {subcategory_link}")
#                     count += 1
#                     if count >= 5:  # Limit to 3 subcategories
#                         break
#             except Exception as e:
#                 logger.warning(f"Error processing subcategory element: {e}")

#         return subcategories
#     except Exception as e:
#         logger.error(f"Failed to fetch subcategories: {e}")
#         return []

def fetch_subcategories_and_ranks(driver):
    """
    Fetch the next three subcategories and their rankings after skipping "Any Department" and "Toys & Games".
    """
    try:
        subcategory_elements = driver.find_elements(By.XPATH, "//div[@role='treeitem'] | //a[contains(@href, '/zgbs/')]")
        subcategories = []
        count = 0

        for element in subcategory_elements:
            try:
                subcategory_name = element.text.strip()
                subcategory_link = element.get_attribute("href")

                # Skip "Any Department", "Toys & Games", and similar categories
                if all(skip not in subcategory_name for skip in ["Any Department", "Toys & Games"]) and subcategory_link:
                    subcategories.append((subcategory_name, subcategory_link))
                    logger.info(f"Found subcategory: {subcategory_name} - {subcategory_link}")
                    count += 1
                    if count >= 5:  # Limit to the next three valid subcategories
                        break
            except Exception as e:
                logger.warning(f"Error processing subcategory element: {e}")

        return subcategories
    except Exception as e:
        logger.error(f"Failed to fetch subcategories: {e}")
        return []

def check_asins_in_category_page(driver, category_url, asin_list):
    """Check if ASINs exist in the category page and fetch their ranks."""
    try:
        driver.get(category_url)
        logger.info(f"Opening Category URL: {category_url}")

        found_asins = []
        subcategories = fetch_subcategories_and_ranks(driver)

        for asin in asin_list:
            ranks = []
            main_rank = fetch_rank_for_asin(driver, asin)
            if main_rank:
                ranks.append(main_rank)

            for subcategory_name, subcategory_link in subcategories:
                if subcategory_link:
                    sub_rank = fetch_rank_in_subcategory(driver, subcategory_link, subcategory_name, asin)
                    if sub_rank:
                        ranks.insert(0, sub_rank)

            final_rank = "".join([f"({rank})" for rank in ranks])
            found_asins.append((asin, final_rank))

        return found_asins, subcategories
    except Exception as e:
        logger.error(f"An error occurred while checking ASINs: {e}")
        return [], []

def extract_first_three_numbers(rank_output):
    """Extracts the first three numbers from the rank output."""
    matches = re.findall(r'\d+\(\d+\)|\(\d+\)', rank_output)
    logger.debug(f"Extracted matches: {matches}")
    return "".join(matches[:3])

def scrape_best_sellers_rank(driver, product, url, asin_list, current_time):
    """Scrapes the Best Sellers Rank and category ranks for a given product URL and ASINs."""
    try:
        driver.get(url)
        rank_section = WebDriverWait(driver, 80).until(
            EC.visibility_of_element_located((By.XPATH, "//th[contains(text(), 'Best Sellers Rank')]//following-sibling::td"))
        )
        rank_text = rank_section.text.strip()
        best_seller_rank = parse_rank_numbers(rank_text)
        logger.info(f"Best Sellers Rank for {product}: {best_seller_rank}")

        if best_seller_rank.endswith("(1)"):
            category_links = rank_section.find_elements(By.XPATH, ".//a")
            if len(category_links) > 1:
                second_category_url = category_links[1].get_attribute("href")
                found_asins_with_ranks, subcategories = check_asins_in_category_page(driver, second_category_url, asin_list)

                category_ranks = [ranks for asin, ranks in found_asins_with_ranks]
                category_ranks_str = f"({'})('.join(category_ranks[:3])})" if category_ranks else ""
                final_rank = f"{best_seller_rank}{category_ranks_str}"

                formatted_rank = extract_first_three_numbers(final_rank)
                logger.info(f"Final rank for {product} (ASIN: {asin_list[0]}): {formatted_rank}")
                update_google_sheet(product, current_time, formatted_rank)
            else:
                logger.warning(f"No second category link found for {product}.")
        else:
            logger.info(f"Skipping category ranking for {product}, as Best Seller Rank is not (1).")
            update_google_sheet(product, current_time, best_seller_rank)
    except Exception as e:
        logger.error(f"Failed to scrape {product} ({url}): {e}")

def run_scrape_all():
    """Fetch URLs and ASINs from the URL sheet and scrape rankings for all products."""
    url_data = url_sheet.get_all_records()
    df = pd.DataFrame(url_data)
    df.columns = df.columns.str.strip()

    product_names = df['Products'].tolist()
    url_list = df['URL'].tolist()
    asin_list = df['ASIN'].tolist()

    current_time = current_time_slot()

    options = Options()
    options.add_argument("--disable-gpu")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")
    options.add_argument("--log-level=3")

    driver = webdriver.Chrome(options=options)
    
    for product, url, asin in zip(product_names, url_list, asin_list):
        scrape_best_sellers_rank(driver, product, url, [asin], current_time)

    driver.quit()
        
if __name__ == "__main__":
    logger.info("Starting the scraper for all products...")
    run_scrape_all()

2025-02-03 11:26:35,326 - Starting the scraper for all products...
2025-02-03 11:27:06,331 - Best Sellers Rank for Found It: 2246(13)
2025-02-03 11:27:06,332 - Skipping category ranking for Found It, as Best Seller Rank is not (1).
2025-02-03 11:27:08,513 - Updated Google Sheet: Found It -> 2246(13) at 2025-02-03 11:26
2025-02-03 11:27:28,346 - Best Sellers Rank for poke in art: 66(1)
2025-02-03 11:27:33,384 - Opening Category URL: https://www.amazon.com/gp/bestsellers/toys-and-games/166078011/ref=pd_zg_hrsr_toys-and-games
2025-02-03 11:27:33,438 - Found subcategory: 1 - https://www.amazon.com/Best-Sellers-Toys-Games-Kids-Wood-Craft-Kits/zgbs/toys-and-games/166078011/ref=zg_bs_pg_1_toys-and-games?_encoding=UTF8&pg=1
2025-02-03 11:27:33,469 - Found subcategory: 2 - https://www.amazon.com/Best-Sellers-Toys-Games-Kids-Wood-Craft-Kits/zgbs/toys-and-games/166078011/ref=zg_bs_pg_2_toys-and-games?_encoding=UTF8&pg=2
2025-02-03 11:27:33,499 - Found subcategory: Next page - https://www.amazon.c