In [None]:
"""
Script to monitor the Irsafam IELTS timetable for available test slots.

This script periodically fetches the timetable page from Irsafam's website,
parses the HTML, and looks for exam entries that are not marked as "تکمیل ظرفیت"
(meaning "fully booked" in Persian). When it finds new available test sessions
it will send you a popup notification through the local desktop.  You can adapt
the notification mechanism to something else (for example pushbullet, Pushover, Telegram,
email, Slack webhook, etc.) by replacing the `send_notification` function.

Usage:
  1. Install the required Python packages:

         pip install requests beautifulsoup4 selenium

     For Pushbullet integration you also need the `pushbullet.py` library:

         pip install pushbullet.py

  2. Run the script.  It will check the timetable every 10 minutes by
     default.  You can change the interval by editing the `CHECK_INTERVAL`
     constant.

The script keeps track of slots it has already notified you about using the
`seen` set.  If you restart the script it will start with an empty set and
may resend notifications for any currently available slots.
"""

import os
import time
import logging
from datetime import datetime, timedelta
from typing import List, Tuple

import requests
from bs4 import BeautifulSoup


# URL for the Irsafam IELTS timetable.  You can add query parameters to
# filter by exam type or location (for example model[]=cdielts).  See
# https://irsafam.org/ielts/timetable for details.  The default URL shows
# all computer-based IELTS, UKVI, and Lifeskills sessions.
TIMETABLE_URL = "https://irsafam.org/ielts/timetable"

# How often to check the timetable (in seconds).
CHECK_INTERVAL = 6 # though each iteration take much longer because of selenium driver.

# Configure logging to output to the console with timestamps.
# You can change level to DEBUG for more detailed output during development
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

# Create a logger instance for this module
logger = logging.getLogger(__name__)

# Track statistics for summary reporting
stats = {
    'checks_performed': 0,
    'total_slots_found': 0,
    'new_notifications_sent': 0,
    'errors_encountered': 0,
    'start_time': datetime.now()
}




In [14]:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from bs4 import BeautifulSoup
import time

def fetch_timetable(scroll_pause: float = 2):
    # Configure Chrome to run in headless mode
    chrome_options = Options()
    chrome_options.add_argument("--headless")  # This enables headless mode
    chrome_options.add_argument("--no-sandbox")  # Bypass OS security model (useful for servers)
    chrome_options.add_argument("--disable-dev-shm-usage")  # Overcome limited resource problems
    chrome_options.add_argument("--disable-gpu")  # Disable GPU acceleration (not needed in headless)
    chrome_options.add_argument("--window-size=1920,1080")  # Set window size for consistent rendering
    
    # Create the driver with headless options
    driver = webdriver.Chrome(options=chrome_options)
    
    try:
        driver.get(TIMETABLE_URL)
        
        # Scroll until page stops growing
        last_height = driver.execute_script("return document.body.scrollHeight")
       
        while True:
            # Scroll to bottom of page
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(scroll_pause)
            
            # Calculate new scroll height after scrolling
            new_height = driver.execute_script("return document.body.scrollHeight")
            if new_height == last_height:
                break  # No more content loaded, exit loop
            last_height = new_height
        
        # Get the final page source after all scrolling is complete
        html = driver.page_source
        
    finally:
        # Always close the driver, even if an error occurs
        driver.quit()
    
    return BeautifulSoup(html, "html.parser")

In [15]:


def parse_available_slots(soup: BeautifulSoup) -> List[Tuple[str, str, str]]:
    """Parse the HTML for available exam slots.

    An exam entry is represented as an <a class="exam__item ..."> tag.  A
    slot is considered available if it contains a <span class="btn"> element
    that does *not* have the class "disable" (which indicates that the
    capacity is full).

    Returns:
        A list of tuples `(date_str, location, exam_type)` describing each
        available slot.  The `date_str` combines the Gregorian date parts
        shown on the site (e.g., "29 Oct 2025").  The `location` and
        `exam_type` fields are extracted from the listing.

    Note:
        If the site structure changes this parser may need adjustment.  To
        debug, enable logging at DEBUG level and inspect the structure of
        `soup` objects.
    """
    logger.debug("Starting to parse HTML content for available slots")
    
    available_slots = []
    total_items_found = 0
    full_items_found = 0
    parsing_errors = 0
    
    # Find all exam items on the page
    exam_items = soup.find_all("a", class_="exam__item")
    total_items_found = len(exam_items)
    logger.debug("Found %d exam items to process", total_items_found)
    
    for item_index, item in enumerate(exam_items):
        try:
            # Log each item we're processing (only at DEBUG level to avoid spam)
            logger.debug("Processing exam item %d/%d", item_index + 1, total_items_found)
            
            # Extract date information
            date_tag = item.find("date")
            if not date_tag:
                logger.debug("Skipping item %d: no date tag found", item_index + 1)
                parsing_errors += 1
                continue
                
            date_parts = [span.get_text(strip=True) for span in date_tag.find_all("span")]
            if not date_parts:
                logger.debug("Skipping item %d: no date parts found", item_index + 1)
                parsing_errors += 1
                continue
            date_str = " ".join(date_parts)

            # Extract exam type
            exam_type_tag = item.find("span", class_="exam_type")
            exam_type = exam_type_tag.get_text(strip=True) if exam_type_tag else "Unknown"
            
            # Log missing exam type information
            if not exam_type_tag:
                logger.debug("Item %d: exam type tag missing, using 'Unknown'", item_index + 1)

            # Extract location information
            location_tag = item.find("h5")
            location = location_tag.get_text(strip=True) if location_tag else "Unknown Location"
            
            # Log missing location information
            if not location_tag:
                logger.debug("Item %d: location tag missing, using 'Unknown Location'", item_index + 1)

            # Check availability by examining the button state
            btn = item.find("span", class_="btn")
            if not btn:
                logger.debug("Skipping item %d: no button found", item_index + 1)
                parsing_errors += 1
                continue
                
            btn_classes = btn.get("class", [])
            is_full = any(cls == "disable" for cls in btn_classes)
            
            if is_full:
                full_items_found += 1
                logger.debug("Item %d (%s at %s) is fully booked", 
                           item_index + 1, exam_type, location)
            else:
                available_slots.append((date_str, location, exam_type))
                logger.debug("Item %d: Found available slot - %s at %s (%s)", 
                           item_index + 1, date_str, location, exam_type)
                
        except Exception as e:
            parsing_errors += 1
            logger.warning("Error parsing exam item %d: %s", item_index + 1, e)
            continue
    
    # Log comprehensive parsing summary
    logger.info("Parsing complete: %d total items, %d available, %d full, %d errors", 
               total_items_found, len(available_slots), full_items_found, parsing_errors)
    
    # Log details of all available slots found
    if available_slots:
        logger.info("Available slots found:")
        for i, (date_str, location, exam_type) in enumerate(available_slots, 1):
            logger.info("  %d. %s at %s (%s)", i, date_str, location, exam_type)
    else:
        logger.info("No available slots found in current check")
    
    return available_slots




In [16]:
import os
import sys
import time
import logging

logger = logging.getLogger(__name__)

def send_notification(message: str) -> None:
    """Send a local desktop popup notification with the given message.
    
    This function automatically detects your operating system and uses the
    appropriate native notification system:
    - Windows: Uses built-in toast notifications via PowerShell
    - macOS: Uses osascript to display native notifications
    - Linux: Uses notify-send (part of libnotify package)
    
    If native notifications aren't available, falls back to tkinter popup
    or logs the message as a last resort.
    """
    logger.debug("Attempting to send local notification: %s", message)
    
    # Record notification attempt timing
    notification_start = time.time()
    
    try:
        # Detect operating system and use appropriate notification method
        if sys.platform == "win32":
            _send_windows_notification(message)
        elif sys.platform == "darwin":  # macOS
            _send_macos_notification(message)
        elif sys.platform.startswith("linux"):
            _send_linux_notification(message)
        else:
            # Fallback to tkinter popup for unknown systems
            _send_tkinter_popup(message)
            
        notification_duration = time.time() - notification_start
        logger.info("Successfully sent local notification (%.2fs): %s", 
                   notification_duration, message)
        
        # Update statistics (assuming this variable exists in your code)
        # stats['new_notifications_sent'] += 1
        
    except Exception as e:
        logger.error("Failed to send desktop notification: %s", e)
        logger.info("NOTIFICATION (fallback): %s", message)

def _send_windows_notification(message: str) -> None:
    """Send notification using Windows toast notifications via PowerShell."""
    import subprocess
    
    # PowerShell script to show toast notification
    ps_script = f'''
    [Windows.UI.Notifications.ToastNotificationManager, Windows.UI.Notifications, ContentType = WindowsRuntime] > $null
    [Windows.UI.Notifications.ToastNotification, Windows.UI.Notifications, ContentType = WindowsRuntime] > $null
    [Windows.Data.Xml.Dom.XmlDocument, Windows.Data.Xml.Dom.XmlDocument, ContentType = WindowsRuntime] > $null
    
    $template = @"
    <toast>
        <visual>
            <binding template="ToastText02">
                <text id="1">Irsafam Slot Notification</text>
                <text id="2">{message}</text>
            </binding>
        </visual>
    </toast>
"@
    
    $xml = New-Object Windows.Data.Xml.Dom.XmlDocument
    $xml.LoadXml($template)
    $toast = [Windows.UI.Notifications.ToastNotification]::new($xml)
    [Windows.UI.Notifications.ToastNotificationManager]::CreateToastNotifier("Irsafam").Show($toast)
    '''
    
    subprocess.run(["powershell", "-Command", ps_script], 
                  capture_output=True, check=True)

def _send_macos_notification(message: str) -> None:
    """Send notification using macOS native notification center."""
    import subprocess
    
    # Use osascript to display native macOS notification
    subprocess.run([
        "osascript", "-e", 
        f'display notification "{message}" with title "Irsafam Slot"'
    ], check=True)

def _send_linux_notification(message: str) -> None:
    """Send notification using Linux notify-send (libnotify)."""
    import subprocess
    
    try:
        # Try notify-send first (most common on Linux desktops)
        subprocess.run([
            "notify-send", 
            "--app-name=Irsafam",
            "--urgency=normal",
            "Irsafam Slot", 
            message
        ], check=True)
    except (subprocess.CalledProcessError, FileNotFoundError):
        # Fallback to zenity if notify-send isn't available
        try:
            subprocess.run([
                "zenity", "--info",
                f"--title=Irsafam Slot",
                f"--text={message}"
            ], check=True)
        except (subprocess.CalledProcessError, FileNotFoundError):
            # Final fallback to tkinter
            _send_tkinter_popup(message)

def _send_tkinter_popup(message: str) -> None:
    """Send notification using tkinter popup window (cross-platform fallback)."""
    try:
        import tkinter as tk
        from tkinter import messagebox
        
        # Create a temporary root window (hidden)
        root = tk.Tk()
        root.withdraw()  # Hide the root window
        
        # Show the message box
        messagebox.showinfo("Irsafam Slot Notification", message)
        
        # Clean up
        root.destroy()
        
    except ImportError:
        # If tkinter isn't available, just log the message
        logger.warning("No GUI notification system available, logging message instead")
        logger.info("NOTIFICATION: %s", message)

# Alternative simpler version if you prefer just tkinter popups
def send_simple_popup_notification(message: str) -> None:
    """Simple popup notification using tkinter messagebox.
    
    This creates a modal dialog box that the user must acknowledge.
    It's the most reliable cross-platform approach but requires user interaction.
    """
    logger.debug("Sending simple popup notification: %s", message)
    
    try:
        import tkinter as tk
        from tkinter import messagebox
        
        # Create and immediately hide the root window
        root = tk.Tk()
        root.withdraw()
        
        # Configure the message box appearance
        root.option_add('*Dialog.msg.font', 'Arial 12')
        
        # Show the notification popup
        messagebox.showinfo(
            title="🎰 Irsafam Slot Alert", 
            message=f"📢 {message}",
            parent=root
        )
        
        # Clean up the root window
        root.destroy()
        
        logger.info("Successfully displayed popup notification: %s", message)
        
    except Exception as e:
        logger.error("Failed to show popup notification: %s", e)
        logger.info("NOTIFICATION (fallback log): %s", message)

In [17]:

def log_session_summary():
    """Log a summary of the monitoring session statistics."""
    runtime = datetime.now() - stats['start_time']
    runtime_str = str(runtime).split('.')[0]  # Remove microseconds for readability
    
    logger.info("=== SESSION SUMMARY ===")
    logger.info("Runtime: %s", runtime_str)
    logger.info("Total checks performed: %d", stats['checks_performed'])
    logger.info("Total available slots found: %d", stats['total_slots_found'])
    logger.info("New notifications sent: %d", stats['new_notifications_sent'])
    logger.info("Errors encountered: %d", stats['errors_encountered'])
    
    if stats['checks_performed'] > 0:
        avg_interval = runtime.total_seconds() / stats['checks_performed']
        logger.info("Average check interval: %.1f seconds", avg_interval)




In [18]:

seen_slots = set()  # Keep track of slots we've already notified about

# Log startup information
logger.info("=== STARTING IRSAFAM TIMETABLE MONITOR ===")
logger.info("Target URL: %s", TIMETABLE_URL)
logger.info("Check interval: %d seconds (%.1f minutes)", CHECK_INTERVAL, CHECK_INTERVAL / 60)
logger.info("Pushbullet configured: %s", "Yes" if os.environ.get("PUSHBULLET_TOKEN") else "No")
logger.info("Logging level: %s", logging.getLevelName(logger.getEffectiveLevel()))
logger.info("Monitor started at: %s", stats['start_time'].strftime("%Y-%m-%d %H:%M:%S"))

try:
    while True:
        try:
            # Log the start of each check cycle
            check_start_time = time.time()
            logger.info("--- Starting check cycle #%d ---", stats['checks_performed'] + 1)
            
            # Fetch and parse the timetable
            soup = fetch_timetable()
            available = parse_available_slots(soup)
            
            # Update statistics
            stats['checks_performed'] += 1
            stats['total_slots_found'] += len(available)
            
            # Process any new slots found
            new_slots_found = 0
            for slot in available:
                if slot not in seen_slots:
                    new_slots_found += 1
                    date_str, location, exam_type = slot
                    message = f"New slot available on {date_str} at {location} ({exam_type})."
                    send_notification(message)
                    seen_slots.add(slot)
            
            # Log summary of this check cycle
            check_duration = time.time() - check_start_time
            logger.info("Check cycle completed in %.2f seconds: %d available slots, %d new notifications", 
                       check_duration, len(available), new_slots_found)
            
            # Periodically log session statistics (every 10 checks)
            if stats['checks_performed'] % 10 == 0:
                log_session_summary()
            
        except Exception as exc:
            stats['errors_encountered'] += 1
            logger.error("Error during check cycle #%d: %s", stats['checks_performed'] + 1, exc)
            logger.debug("Exception details:", exc_info=True)  # Include full traceback at DEBUG level
            
        # Log when we're waiting for the next check
        next_check_time = datetime.now().replace(microsecond=0) + timedelta(seconds=CHECK_INTERVAL)
        logger.debug("Waiting %d seconds until next check (at %s)", 
                    CHECK_INTERVAL, next_check_time.strftime("%H:%M:%S"))
        
        # Wait until the next check
        time.sleep(CHECK_INTERVAL)

except KeyboardInterrupt:
    logger.info("Monitor stopped by user (Ctrl+C)")
    log_session_summary()
except Exception as e:
    logger.critical("Unexpected error caused monitor to stop: %s", e)
    log_session_summary()
    raise

2025-09-21 15:59:31 [INFO] === STARTING IRSAFAM TIMETABLE MONITOR ===
2025-09-21 15:59:31 [INFO] Target URL: https://irsafam.org/ielts/timetable
2025-09-21 15:59:31 [INFO] Check interval: 6 seconds (0.1 minutes)
2025-09-21 15:59:31 [INFO] Pushbullet configured: No
2025-09-21 15:59:31 [INFO] Logging level: INFO
2025-09-21 15:59:31 [INFO] Monitor started at: 2025-09-21 15:59:30
2025-09-21 15:59:31 [INFO] --- Starting check cycle #1 ---
2025-09-21 16:00:26 [INFO] Parsing complete: 54 total items, 0 available, 54 full, 0 errors
2025-09-21 16:00:26 [INFO] No available slots found in current check
2025-09-21 16:00:26 [INFO] Check cycle completed in 55.67 seconds: 0 available slots, 0 new notifications
2025-09-21 16:00:32 [INFO] --- Starting check cycle #2 ---
2025-09-21 16:01:23 [INFO] Parsing complete: 54 total items, 0 available, 54 full, 0 errors
2025-09-21 16:01:23 [INFO] No available slots found in current check
2025-09-21 16:01:23 [INFO] Check cycle completed in 50.84 seconds: 0 availa