In [1]:
!pip install selenium webdriver-manager

Collecting selenium
  Downloading selenium-4.34.2-py3-none-any.whl.metadata (7.5 kB)
Collecting webdriver-manager
  Downloading webdriver_manager-4.0.2-py2.py3-none-any.whl.metadata (12 kB)
Collecting urllib3~=2.5.0 (from urllib3[socks]~=2.5.0->selenium)
  Downloading urllib3-2.5.0-py3-none-any.whl.metadata (6.5 kB)
Collecting trio~=0.30.0 (from selenium)
  Downloading trio-0.30.0-py3-none-any.whl.metadata (8.5 kB)
Collecting trio-websocket~=0.12.2 (from selenium)
  Downloading trio_websocket-0.12.2-py3-none-any.whl.metadata (5.1 kB)
Collecting certifi>=2025.6.15 (from selenium)
  Downloading certifi-2025.7.14-py3-none-any.whl.metadata (2.4 kB)
Collecting typing_extensions~=4.14.0 (from selenium)
  Downloading typing_extensions-4.14.1-py3-none-any.whl.metadata (3.0 kB)
Collecting attrs>=23.2.0 (from trio~=0.30.0->selenium)
  Using cached attrs-25.3.0-py3-none-any.whl.metadata (10 kB)
Collecting outcome (from trio~=0.30.0->selenium)
  Downloading outcome-1.3.0.post0-py2.py3-none-any.whl


[notice] A new release of pip is available: 23.3.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [11]:
import time
import csv
import logging
from tqdm import tqdm
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
    TimeoutException, 
    NoSuchElementException, 
    StaleElementReferenceException,
    WebDriverException,
    ElementClickInterceptedException
)
from webdriver_manager.chrome import ChromeDriverManager

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class RobustMerchantScraper:
    def __init__(self, headless=False, timeout=30):
        self.timeout = timeout
        self.max_retries = 3
        self.retry_delay = 2
        
        # Setup Chrome options
        self.options = webdriver.ChromeOptions()
        if headless:
            self.options.add_argument('--headless')
        
        # More robust Chrome options
        self.options.add_argument('--no-sandbox')
        self.options.add_argument('--disable-dev-shm-usage')
        self.options.add_argument('--disable-gpu')
        self.options.add_argument('--disable-extensions')
        self.options.add_argument('--disable-blink-features=AutomationControlled')
        self.options.add_argument('--start-maximized')
        self.options.add_experimental_option("excludeSwitches", ["enable-automation"])
        self.options.add_experimental_option('useAutomationExtension', False)
        
        self.driver = None
        self.wait = None
        
    def setup_driver(self):
        """Initialize the Chrome driver with error handling"""
        try:
            service = Service(ChromeDriverManager().install())
            self.driver = webdriver.Chrome(service=service, options=self.options)
            self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
            self.wait = WebDriverWait(self.driver, self.timeout)
            logger.info("✅ Driver initialized successfully")
            return True
        except Exception as e:
            logger.error(f"❌ Failed to initialize driver: {e}")
            return False
    
    def navigate_to_page(self, url, max_attempts=3):
        """Navigate to the target URL with retry logic"""
        for attempt in range(max_attempts):
            try:
                logger.info(f"🌐 Navigating to {url} (Attempt {attempt + 1})")
                self.driver.get(url)
                # Wait for page to load by checking for a common element
                self.wait.until(
                    lambda d: d.execute_script("return document.readyState") == "complete"
                )
                time.sleep(2)  # Additional buffer
                return True
            except Exception as e:
                logger.warning(f"⚠️ Navigation attempt {attempt + 1} failed: {e}")
                if attempt < max_attempts - 1:
                    time.sleep(self.retry_delay * (attempt + 1))
                else:
                    logger.error("❌ Failed to navigate after all attempts")
                    return False
    
    def wait_for_spinner_to_disappear(self):
        """Enhanced spinner waiting with multiple strategies"""
        logger.info("⏳ Waiting for spinner to disappear...")
        
        # Strategy 1: Wait for spinner overlay to be invisible
        spinner_selectors = [
            ".ngx-spinner-overlay",
            ".spinner",
            ".loading",
            "[class*='spinner']",
            "[class*='loading']"
        ]
        
        for selector in spinner_selectors:
            try:
                # First check if spinner exists
                spinner_elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
                if spinner_elements:
                    logger.info(f"Found spinner with selector: {selector}")
                    # Wait for it to become invisible
                    self.wait.until(
                        EC.invisibility_of_element_located((By.CSS_SELECTOR, selector))
                    )
                    logger.info("✅ Spinner disappeared")
                    break
            except TimeoutException:
                logger.warning(f"⚠️ Timeout waiting for spinner {selector} to disappear")
                continue
            except Exception as e:
                logger.warning(f"⚠️ Error checking spinner {selector}: {e}")
                continue
        
        # Strategy 2: Wait for content to be stable
        time.sleep(1)
        
        # Strategy 3: Wait for network idle (if possible)
        try:
            self.wait.until(
                lambda d: d.execute_script(
                    "return window.performance && window.performance.getEntriesByType && "
                    "window.performance.getEntriesByType('navigation').length > 0"
                )
            )
        except:
            pass
        
        time.sleep(1)  # Final buffer
    
    def find_merchant_cards(self):
        """Find merchant cards with multiple strategies"""
        card_selectors = [
            ".card-body:not(:empty)",
            ".card-body",
            "[class*='card']",
            ".merchant-card",
            ".store-card"
        ]
        
        for selector in card_selectors:
            try:
                cards = self.wait.until(
                    EC.presence_of_all_elements_located((By.CSS_SELECTOR, selector))
                )
                # Filter out empty cards
                non_empty_cards = [card for card in cards if card.text.strip()]
                if non_empty_cards:
                    logger.info(f"✅ Found {len(non_empty_cards)} cards with selector: {selector}")
                    return non_empty_cards
            except TimeoutException:
                logger.warning(f"⚠️ No cards found with selector: {selector}")
                continue
            except Exception as e:
                logger.warning(f"⚠️ Error finding cards with {selector}: {e}")
                continue
        
        logger.error("❌ No merchant cards found with any selector")
        return []
    
    def extract_card_data(self, card, card_index):
        """Extract store and address data from a card with multiple strategies"""
        data_extraction_strategies = [
            # Strategy 1: Original selectors
            {
                'store_selector': '.card-title',
                'address_selector': '.card-text'
            },
            # Strategy 2: Alternative selectors
            {
                'store_selector': 'h5, h4, h3, .title, .name',
                'address_selector': 'p, .text, .address, .location'
            },
            # Strategy 3: By tag hierarchy
            {
                'store_selector': 'h5, h4',
                'address_selector': 'p'
            }
        ]
        
        for strategy_idx, strategy in enumerate(data_extraction_strategies):
            try:
                # Try to find store name
                store_elements = card.find_elements(By.CSS_SELECTOR, strategy['store_selector'])
                address_elements = card.find_elements(By.CSS_SELECTOR, strategy['address_selector'])
                
                if store_elements and address_elements:
                    store = store_elements[0].text.strip()
                    address = address_elements[0].text.strip()
                    
                    if store and address:
                        logger.debug(f"✅ Extracted data using strategy {strategy_idx + 1}: {store[:30]}...")
                        return store, address
                        
            except StaleElementReferenceException:
                logger.warning(f"⚠️ Stale element in card {card_index}, retrying...")
                return None, None
            except Exception as e:
                logger.warning(f"⚠️ Strategy {strategy_idx + 1} failed for card {card_index}: {e}")
                continue
        
        # Fallback: Try to extract any text content
        try:
            card_text = card.text.strip()
            if card_text:
                lines = [line.strip() for line in card_text.split('\n') if line.strip()]
                if len(lines) >= 2:
                    logger.info(f"✅ Using fallback extraction for card {card_index}")
                    return lines[0], lines[1]
        except Exception as e:
            logger.warning(f"⚠️ Fallback extraction failed for card {card_index}: {e}")
        
        logger.error(f"❌ Failed to extract data from card {card_index}")
        return None, None
    
    def scrape_current_page(self, writer):
        """Scrape all merchant cards on the current page with enhanced error handling"""
        self.wait_for_spinner_to_disappear()
        
        cards = self.find_merchant_cards()
        if not cards:
            logger.warning("⚠️ No cards found on current page")
            return 0
        
        successful_extractions = 0
        
        for idx, card in enumerate(cards):
            try:
                store, address = self.extract_card_data(card, idx)
                
                if store and address:
                    writer.writerow([store, address])
                    successful_extractions += 1
                    logger.debug(f"✅ Card {idx + 1}: {store[:30]}...")
                else:
                    logger.warning(f"⚠️ Failed to extract data from card {idx + 1}")
                    
            except Exception as e:
                logger.error(f"❌ Unexpected error processing card {idx + 1}: {e}")
                continue
        
        logger.info(f"✅ Successfully extracted {successful_extractions}/{len(cards)} cards from current page")
        return successful_extractions
    
    def go_to_next_page(self):
        """Navigate to next page with enhanced error handling and multiple strategies"""
        next_button_strategies = [
            '//a[@aria-label="Next »" and not(ancestor::li[contains(@class, "disabled")])]',
            '//a[contains(@aria-label, "Next") and not(ancestor::li[contains(@class, "disabled")])]',
            '//a[contains(text(), "Next") and not(ancestor::li[contains(@class, "disabled")])]',
            '//a[contains(@class, "next") and not(ancestor::li[contains(@class, "disabled")])]',
            '.pagination .next:not(.disabled) a',
            '.pagination li:not(.disabled) a[aria-label*="Next"]'
        ]
        
        for strategy_idx, xpath in enumerate(next_button_strategies):
            try:
                if xpath.startswith('//'):
                    next_button = self.wait.until(
                        EC.element_to_be_clickable((By.XPATH, xpath))
                    )
                else:
                    next_button = self.wait.until(
                        EC.element_to_be_clickable((By.CSS_SELECTOR, xpath))
                    )
                
                # Scroll to element if needed
                self.driver.execute_script("arguments[0].scrollIntoView(true);", next_button)
                time.sleep(0.5)
                
                # Try different click methods
                click_methods = [
                    lambda: next_button.click(),
                    lambda: self.driver.execute_script("arguments[0].click();", next_button),
                    lambda: self.driver.execute_script("arguments[0].dispatchEvent(new MouseEvent('click', {bubbles: true}));", next_button)
                ]
                
                for click_method in click_methods:
                    try:
                        click_method()
                        logger.info(f"⏩ Successfully clicked next button using strategy {strategy_idx + 1}")
                        time.sleep(2)  # Wait for navigation to start
                        return True
                    except ElementClickInterceptedException:
                        logger.warning("⚠️ Click intercepted, trying alternative method...")
                        continue
                    except Exception as e:
                        logger.warning(f"⚠️ Click method failed: {e}")
                        continue
                        
            except TimeoutException:
                logger.debug(f"Strategy {strategy_idx + 1} timed out")
                continue
            except Exception as e:
                logger.warning(f"⚠️ Strategy {strategy_idx + 1} failed: {e}")
                continue
        
        logger.info("📄 Reached end of pagination or no next button found")
        return False
    
    def scrape_all_pages(self, output_file="merchants_all_pages.csv"):
        """Main scraping method with comprehensive error handling"""
        if not self.setup_driver():
            return False
        
        try:
            # Navigate to the initial page
            if not self.navigate_to_page("https://checkstatus.mykasih.net/sara2/merchant-list"):
                return False
            
            with open(output_file, mode="w", newline="", encoding="utf-8") as csv_file:
                writer = csv.writer(csv_file)
                writer.writerow(["Store", "Address"])
                
                page = 1
                total_merchants = 0
                consecutive_failures = 0
                max_consecutive_failures = 3
                
                while True:
                    logger.info(f"\n📄 Scraping page {page}")
                    
                    try:
                        merchants_found = self.scrape_current_page(writer)
                        total_merchants += merchants_found
                        
                        if merchants_found == 0:
                            consecutive_failures += 1
                            logger.warning(f"⚠️ No merchants found on page {page} ({consecutive_failures}/{max_consecutive_failures} consecutive failures)")
                            
                            if consecutive_failures >= max_consecutive_failures:
                                logger.error("❌ Too many consecutive failures, stopping...")
                                break
                        else:
                            consecutive_failures = 0  # Reset counter on success
                        
                        # Try to go to next page
                        if not self.go_to_next_page():
                            logger.info("📄 No more pages to scrape")
                            break
                            
                        page += 1
                        
                        # Safety check to prevent infinite loops
                        if page > 1000:  # Adjust as needed
                            logger.warning("⚠️ Reached maximum page limit (1000), stopping...")
                            break
                            
                    except KeyboardInterrupt:
                        logger.info("⚠️ Scraping interrupted by user")
                        break
                    except Exception as e:
                        logger.error(f"❌ Critical error on page {page}: {e}")
                        consecutive_failures += 1
                        
                        if consecutive_failures >= max_consecutive_failures:
                            logger.error("❌ Too many consecutive failures, stopping...")
                            break
                        
                        # Try to continue with next page
                        if not self.go_to_next_page():
                            break
                        page += 1
                
                logger.info(f"\n✅ Scraping completed! Total merchants found: {total_merchants}")
                logger.info(f"📁 Data saved to: {output_file}")
                
        except Exception as e:
            logger.error(f"❌ Fatal error during scraping: {e}")
            return False
        finally:
            if self.driver:
                self.driver.quit()
                logger.info("🚪 Driver closed")
        
        return True

def main():
    """Main execution function"""
    # Configuration
    HEADLESS = False  # Set to True for headless mode
    OUTPUT_FILE = "merchants_all_pages.csv"
    TIMEOUT = 30  # seconds
    
    logger.info("🚀 Starting robust merchant scraper...")
    
    scraper = RobustMerchantScraper(headless=HEADLESS, timeout=TIMEOUT)
    success = scraper.scrape_all_pages(OUTPUT_FILE)
    
    if success:
        logger.info("✅ Scraping completed successfully!")
    else:
        logger.error("❌ Scraping failed!")

if __name__ == "__main__":
    main()

2025-07-30 14:25:22,937 - INFO - 🚀 Starting robust merchant scraper...
2025-07-30 14:25:24,305 - INFO - Get LATEST chromedriver version for google-chrome
2025-07-30 14:25:24,378 - INFO - Get LATEST chromedriver version for google-chrome
2025-07-30 14:25:24,428 - INFO - Driver [C:\Users\User\.wdm\drivers\chromedriver\win64\138.0.7204.183\chromedriver-win32/chromedriver.exe] found in cache
2025-07-30 14:25:25,780 - INFO - ✅ Driver initialized successfully
2025-07-30 14:25:25,780 - INFO - 🌐 Navigating to https://checkstatus.mykasih.net/sara2/merchant-list (Attempt 1)
2025-07-30 14:25:28,809 - INFO - 
📄 Scraping page 1
2025-07-30 14:25:28,810 - INFO - ⏳ Waiting for spinner to disappear...
2025-07-30 14:25:28,834 - INFO - Found spinner with selector: .ngx-spinner-overlay
2025-07-30 14:25:58,920 - INFO - Found spinner with selector: [class*='spinner']
2025-07-30 14:26:28,987 - INFO - Found spinner with selector: [class*='loading']
2025-07-30 14:26:49,159 - INFO - ✅ Spinner disappeared
2025-0

In [41]:
import pandas as pd

df= pd.read_csv("merchants_all_pages.csv")
df

Unnamed: 0,Store,Address
0,PERNIAGAAN NAZILI,"LOT 246, KAMPUNG MERKANG 16800 PASIR PUTEH KEL..."
1,PASARAYA ECONO BATU LINTANG,"LOT 27397B, KAMPUNG BUKIT SUNGAI PASIR 08000 S..."
2,SING KWONG SUPERMARKET (BATU 8),"SUBLOT 61-63 OF LOT 341, BLOCK 33 AND LOT 1337..."
3,99 SPEEDMART SDN BHD (TMN MEGAH RIA 2) - 3015,"NO : 27 & 29 (GROUND FLOOR), JALAN KEMPAS 17 T..."
4,PASARAYA MILLENNIUM (MANTIN) SDN BHD,"5G,7G,9G,11G JALAN BESAR MANTIN 71700 MANTIN N..."
...,...,...
3911,SRI SUBHAM - LEBUH CHULIA,"NO.20,22,24 & 26, GROUND FLOOR, FIRST & SECOND..."
3912,TONG HUP CASH AND CARRY SDN BHD,NO 203-203U JLN KENANGA 3/29A TMN KENANGA 7520...
3913,JN SHOP'S,"BLOCK H, LOT 2, KEDAI SEDCO FASA 111, PEKAN SO..."
3914,HOCK CAI TRADING SDN BHD,"NO. 10, BELURU BAZAAR, BAKONG 98050 BARAM SARAWAK"


In [42]:
import pandas as pd
import re

# List of states and federal territories
states = [
    "Johor", "Kedah", "Kelantan", "Melaka", "Negeri Sembilan",
    "Pahang", "Penang", "Perak", "Perlis", "Sabah", "Sarawak",
    "Selangor", "Terengganu", "W.P Kuala Lumpur", "W.P Labuan", "W.P Putrajaya"
]

# Lowercase mapping for comparison
states_lower = {s.lower(): s for s in states}

# Function to extract state or W.P from address
def extract_state(address):
    if not isinstance(address, str):
        return None

    # Normalize: lowercase and strip punctuation
    address_clean = re.sub(r"[^\w\s]", "", address.lower()).strip()
    words = address_clean.split()

    # 1. Check for W.P matches (anywhere in string)
    for wp in ["W.P KUALA LUMPUR", "W.P LABUAN", "W.P PUTRAJAYA"]:
        if wp in address_clean:
            return states_lower[wp]

    # 2. Check last 1 to 3 words for a matching state
    for i in range(1, 4):
        phrase = " ".join(words[-i:])
        if phrase in states_lower:
            return states_lower[phrase]

    return None

# Apply the function to extract state
df["state"] = df["Address"].apply(extract_state)

df1 = df.dropna(subset=['state'])

In [43]:
import pandas as pd

# Sample DataFrame
df2 = df[~df['state'].notna()]

# Define mapping for federal territories (case-insensitive match)
wp_mapping = {
    "kuala lumpur": "W.P Kuala Lumpur",
    "labuan": "W.P Labuan",
    "putrajaya": "W.P Putrajaya"
}

# Function to check if any WP is in address
def detect_wp(address):
    if not isinstance(address, str):
        return None
    address_lower = address.lower()
    for key in wp_mapping:
        if key in address_lower:
            return wp_mapping[key]
    return None

# Apply it to state column (or overwrite existing)
df2["state"] = df2["Address"].apply(detect_wp)

df2


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df2["state"] = df2["Address"].apply(detect_wp)


Unnamed: 0,Store,Address,state
46,MYDIN EMPORIUM JALAN MASJID INDIA,"NO. 95-97, TKT BAWAH TKT 1, TKT 2, TKT 3 & TKT...",W.P Kuala Lumpur
79,AEON BIG (DANAU KOTA),"LOT PT 9834 JLN. LANGKAWI TMN. DANAU KOTA, MUK...",W.P Kuala Lumpur
120,KK SUPER MART (AMPANG SRI ANGSANA HILIR),"NO.2, JALAN HILIR 1, TAMAN SRI ANGSANA HILIR 5...",W.P Kuala Lumpur
142,99 SPEEDMART SDN BHD (BKT JALIL2) - 1468,"NO : 22, JLN 17/155C, BDR BUKIT JALIL, 57000 K...",W.P Kuala Lumpur
210,PASARAYA B&O SDN BHD,"NO. 21-27, JALAN 46B/26, PUSAT BANDAR SRI RAMP...",W.P Kuala Lumpur
...,...,...,...
3741,CHECKERS HYPERMARKET SDN BHD - TAMAN CHERAS UTAMA,"LOT PT 38285, JALAN CHERAS UTAMA 12 TAMAN CHER...",W.P Kuala Lumpur
3762,99 SPEEDMART SDN BHD (PANDAN PERDANA 1) - 1277,"NO: 70 & 72, JALAN PERDANA 6/8, PANDAN PERDANA...",W.P Kuala Lumpur
3779,99 SPEEDMART SDN BHD (PANTAI DALAM) - 1810,NO. 9 & 11 JALAN PANTAI MURNI 2 59200 PANTAI D...,W.P Kuala Lumpur
3906,TMC (BANGSAR),"TMC BANGSAR, 23-27 LORONG ARA KIRI SATU LUCKY ...",W.P Kuala Lumpur


In [38]:
df2['state'].unique()

array(['W.P Kuala Lumpur', 'W.P Labuan', 'W.P Putrajaya'], dtype=object)

In [40]:
df2[~df2['state'].notna()]

Unnamed: 0,Store,Address,state


In [44]:
df1.shape[0]

3726

In [45]:
df2.shape[0]

190

In [48]:
all_df = pd.concat([df1,df2],axis=0)
all_df.shape[0]

3916

In [51]:
all_df = all_df.applymap(lambda x: x.upper() if isinstance(x, str) else x)
all_df

  all_df = all_df.applymap(lambda x: x.upper() if isinstance(x, str) else x)


Unnamed: 0,Store,Address,state
0,PERNIAGAAN NAZILI,"LOT 246, KAMPUNG MERKANG 16800 PASIR PUTEH KEL...",KELANTAN
1,PASARAYA ECONO BATU LINTANG,"LOT 27397B, KAMPUNG BUKIT SUNGAI PASIR 08000 S...",KEDAH
2,SING KWONG SUPERMARKET (BATU 8),"SUBLOT 61-63 OF LOT 341, BLOCK 33 AND LOT 1337...",SARAWAK
3,99 SPEEDMART SDN BHD (TMN MEGAH RIA 2) - 3015,"NO : 27 & 29 (GROUND FLOOR), JALAN KEMPAS 17 T...",JOHOR
4,PASARAYA MILLENNIUM (MANTIN) SDN BHD,"5G,7G,9G,11G JALAN BESAR MANTIN 71700 MANTIN N...",NEGERI SEMBILAN
...,...,...,...
3741,CHECKERS HYPERMARKET SDN BHD - TAMAN CHERAS UTAMA,"LOT PT 38285, JALAN CHERAS UTAMA 12 TAMAN CHER...",W.P KUALA LUMPUR
3762,99 SPEEDMART SDN BHD (PANDAN PERDANA 1) - 1277,"NO: 70 & 72, JALAN PERDANA 6/8, PANDAN PERDANA...",W.P KUALA LUMPUR
3779,99 SPEEDMART SDN BHD (PANTAI DALAM) - 1810,NO. 9 & 11 JALAN PANTAI MURNI 2 59200 PANTAI D...,W.P KUALA LUMPUR
3906,TMC (BANGSAR),"TMC BANGSAR, 23-27 LORONG ARA KIRI SATU LUCKY ...",W.P KUALA LUMPUR


In [52]:
all_df.to_csv("merchants_sara30072025.csv", index=False)