# Toolkit Core Modules Test

This notebook contains test cells for each core module of the scraping toolkit.


## 1. Browser Management Module Test

Testing WebDriverController, BrowserSession, and related functionality.


In [1]:
# Import browser module components
import sys
from pathlib import Path
import os

# Find project root by looking for toolkit directory
# This works whether notebook is in project root or in toolkit/ subfolder
current_dir = Path.cwd()
project_root = current_dir

# Strategy 1: Check if we're in the project root (has toolkit folder)
if (current_dir / "toolkit").exists():
    project_root = current_dir
# Strategy 2: Check if we're inside toolkit folder (notebook location)
elif current_dir.name == "toolkit":
    # If notebook is inside toolkit folder, go up one level to project root
    project_root = current_dir.parent
# Strategy 3: Search up the directory tree for toolkit folder
else:
    test_path = current_dir
    for _ in range(5):  # Check up to 5 levels up
        if (test_path / "toolkit").exists():
            project_root = test_path
            break
        test_path = test_path.parent

# Add project root to sys.path so 'toolkit' can be imported as a module
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

print(f"✓ Project root: {project_root}")
print(f"✓ Current directory: {current_dir}")
print(f"✓ Notebook location: toolkit/test.ipynb (in toolkit folder)")
print(f"✓ Toolkit path exists: {(project_root / 'toolkit').exists()}")

from toolkit.core.browser import WebDriverController, BrowserConfig, BrowserSession, StealthConfig
import time

print("✓ Imports successful")


✓ Project root: c:\Users\felipe\Dropbox\Python\MYProjects\App-Scraper NWDB
✓ Current directory: c:\Users\felipe\Dropbox\Python\MYProjects\App-Scraper NWDB\toolkit
✓ Notebook location: toolkit/test.ipynb (in toolkit folder)
✓ Toolkit path exists: True
✓ Imports successful


In [2]:
# Configure browser with headless mode for testing
config = BrowserConfig(
    headless=False,
    user_agent="auto",  # Auto-generate random user agent
    stealth=StealthConfig(enabled=False),  # Disable stealth for faster testing
    window_size=(1280, 720)
)

print(f"✓ Browser configuration created")
print(f"  - Headless: {config.headless}")
print(f"  - Window size: {config.window_size}")
print(f"  - User agent: {config.user_agent}")


✓ Browser configuration created
  - Headless: False
  - Window size: (1280, 720)
  - User agent: auto


In [3]:
# Initialize and start browser
controller = WebDriverController(config)

try:
    print("Starting browser...")
    driver = controller.start()
    print("✓ Browser started successfully")
    
    # Check if browser is active
    is_active = controller.is_active()
    print(f"✓ Browser is active: {is_active}")
    
    # Get current user agent
    user_agent = controller.get_current_user_agent()
    print(f"✓ Current user agent: {user_agent[:50]}...")
    
except Exception as e:
    print(f"✗ Failed to start browser: {e}")
    controller = None
    driver = None


ChromeDriver pre-install failed: <urlopen error [WinError 10054] An existing connection was forcibly closed by the remote host>
Browser initialization failed (attempt 1/8): <urlopen error [WinError 10054] An existing connection was forcibly closed by the remote host>


Starting browser...
✓ Browser started successfully
✓ Browser is active: True
✓ Current user agent: Mozilla/5.0 (iPhone; CPU iPhone OS 18_3_2 like Mac...


In [4]:
# Test basic navigation
if controller and controller.is_active():
    try:
        test_url = "https://example.com"
        print(f"Navigating to {test_url}...")
        driver.get(test_url)
        time.sleep(2)  # Wait for page load
        
        current_url = driver.current_url
        page_title = driver.title
        
        print(f"✓ Navigation successful")
        print(f"  - Current URL: {current_url}")
        print(f"  - Page title: {page_title}")
        
    except Exception as e:
        print(f"✗ Navigation failed: {e}")
else:
    print("⚠ Browser not active, skipping navigation test")


Navigating to https://example.com...
✓ Navigation successful
  - Current URL: https://example.com/
  - Page title: Example Domain


In [5]:
# Test BrowserSession and cookie management
if controller and controller.is_active():
    try:
        session = BrowserSession(controller)
        print("✓ BrowserSession created")
        
        # Get cookies
        cookies = session.get_cookies()
        print(f"✓ Retrieved {len(cookies)} cookies from session")
        
        # Test cookie file operations
        cookie_file = Path("test_cookies.json")
        session.save_cookies(cookie_file)
        
        if cookie_file.exists():
            print(f"✓ Cookies saved to {cookie_file}")
            # Clean up test file
            cookie_file.unlink()
            print("✓ Test cookie file cleaned up")
        else:
            print("⚠ Cookie file was not created")
            
    except Exception as e:
        print(f"✗ BrowserSession test failed: {e}")
else:
    print("⚠ Browser not active, skipping session test")


✓ BrowserSession created
✓ Retrieved 0 cookies from session
✓ Cookies saved to test_cookies.json
✓ Test cookie file cleaned up


In [6]:
# Test window management
if controller and controller.is_active():
    try:
        window_manager = controller.window_manager
        
        # Get current window size
        current_size = driver.get_window_size()
        print(f"✓ Current window size: {current_size['width']}x{current_size['height']}")
        
        # Get window position
        current_pos = driver.get_window_position()
        print(f"✓ Current window position: ({current_pos['x']}, {current_pos['y']})")
        
        # Test maximize
        window_manager.driver.maximize_window()
        time.sleep(1)
        print("✓ Window maximized")
        
    except Exception as e:
        print(f"✗ Window management test failed: {e}")
else:
    print("⚠ Browser not active, skipping window management test")


✓ Current window size: 1282x722
✓ Current window position: (8, 8)
✓ Window maximized


In [7]:
# Cleanup: Stop browser
if controller:
    try:
        print("Stopping browser...")
        controller.stop()
        print("✓ Browser stopped successfully")
    except Exception as e:
        print(f"✗ Error stopping browser: {e}")

print("\n" + "="*50)
print("Browser Management Module Test Complete")
print("="*50)


Stopping browser...
✓ Browser stopped successfully

Browser Management Module Test Complete


## 2. Handler Framework Test

Testing AbstractHandler, scraper implementation, and handler utilities.


In [8]:
# Import handler framework components
from toolkit.handlers.base import AbstractHandler
from toolkit.handlers.utils import URLValidator, NameValidator
from toolkit.pipeline.item import ScrapedItem
from bs4 import BeautifulSoup

print("✓ Handler framework imports successful")


✓ Handler framework imports successful


In [9]:
# Create a test handler class for demonstration
class ExampleHandler(AbstractHandler):
    """
    Example handler that scrapes example.com to demonstrate handler functionality.
    """
    
    def scrape_main_page(self):
        """Scrape the main page and yield items."""
        # Navigate to the page
        self.get_url("https://example.com")
        
        # Wait for page to load
        self.wait_page_load()
        
        # Get BeautifulSoup for parsing
        soup = self.get_soup()
        
        # Extract information from the page
        # Example.com has a simple structure with an h1, paragraph, and link
        heading = soup.find('h1')
        paragraph = soup.find('p')
        
        if heading:
            # Create a scraped item from the page content
            item = ScrapedItem(
                source=self.source_name,
                name=heading.get_text(strip=True) if heading else "Example Domain",
                detail_url=self.driver.current_url,
                description=paragraph.get_text(strip=True) if paragraph else None,
                metadata={
                    "page_title": self.driver.title,
                    "url": self.driver.current_url
                }
            )
            yield item
        
        # Look for links and create items for them too
        links = soup.find_all('a', href=True)
        for i, link in enumerate(links[:3]):  # Limit to first 3 links
            link_text = link.get_text(strip=True) or f"Link {i+1}"
            link_href = link['href']
            
            # Normalize URL if relative
            if link_href.startswith('/'):
                from urllib.parse import urljoin
                link_href = urljoin(self.driver.current_url, link_href)
            
            item = ScrapedItem(
                source=self.source_name,
                name=link_text,
                detail_url=link_href,
                metadata={"link_index": i, "anchor_text": link_text}
            )
            yield item

print("✓ Test handler class created: ExampleHandler")


✓ Test handler class created: ExampleHandler


In [10]:
# Test handler utilities: URLValidator and NameValidator
print("Testing Handler Utilities:")
print("=" * 50)

# Test URL validation
test_urls = [
    "https://example.com",
    "http://test.com/page",
    "not-a-url",
    "",
    "ftp://files.example.com"
]

print("\n1. URL Validation Tests:")
for url in test_urls:
    is_valid = URLValidator.is_valid_url(url)
    status = "✓" if is_valid else "✗"
    print(f"  {status} '{url}': {is_valid}")

# Test URL normalization
print("\n2. URL Normalization Tests:")
normalize_tests = [
    "https://example.com/page?query=test#fragment",
    "http://example.com/",
]
for url in normalize_tests:
    normalized = URLValidator.normalize_url(url)
    print(f"  Original: {url}")
    print(f"  Normalized: {normalized}")

# Test external URL detection
print("\n3. External URL Detection:")
external_tests = [
    ("https://example.com/page", "example.com", False),
    ("https://other.com/page", "example.com", True),
]
for url, base, expected in external_tests:
    is_external = URLValidator.is_external_url(url, base)
    match = "✓" if is_external == expected else "✗"
    print(f"  {match} '{url}' external to '{base}': {is_external}")

# Test name validation and normalization
print("\n4. Name Validation Tests:")
name_tests = [
    ("Valid Name", True),
    ("", False),
    ("A" * 201, False),  # Too long
    ("  Normal   Name  ", True),
]
for name, should_be_valid in name_tests:
    is_valid = NameValidator.is_valid_name(name)
    match = "✓" if is_valid == should_be_valid else "✗"
    print(f"  {match} '{name}': {is_valid}")

print("\n5. Name Normalization Tests:")
normalize_names = [
    "  Multiple    Spaces   ",
    "Special!@#Characters",
    "  UPPER  case  name  "
]
for name in normalize_names:
    normalized = NameValidator.normalize_name(name)
    cleaned = NameValidator.clean_name(name)
    print(f"  Original: '{name}'")
    print(f"  Normalized: '{normalized}'")
    print(f"  Cleaned: '{cleaned}'")

print("\n✓ Handler utilities test complete")


Testing Handler Utilities:

1. URL Validation Tests:
  ✓ 'https://example.com': True
  ✓ 'http://test.com/page': True
  ✗ 'not-a-url': False
  ✗ '': False
  ✓ 'ftp://files.example.com': True

2. URL Normalization Tests:
  Original: https://example.com/page?query=test#fragment
  Normalized: https://example.com/page
  Original: http://example.com/
  Normalized: http://example.com/

3. External URL Detection:
  ✓ 'https://example.com/page' external to 'example.com': False
  ✓ 'https://other.com/page' external to 'example.com': True

4. Name Validation Tests:
  ✓ 'Valid Name': True
  ✓ '': False
  ✓ 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA': False
  ✓ '  Normal   Name  ': True

5. Name Normalization Tests:
  Original: '  Multiple    Spaces   '
  Normalized: 'Multiple Spaces'
  Cleaned: 'multiple spaces'
  Original: 'Special!@#Char

In [11]:
# Initialize browser and create handler instance
from toolkit.core.browser import WebDriverController, BrowserConfig, StealthConfig

# Use headless mode for testing
handler_config = BrowserConfig(
    headless=True,
    user_agent="auto",
    stealth=StealthConfig(enabled=False),
    window_size=(1280, 720)
)

handler_controller = WebDriverController(handler_config)

try:
    print("Starting browser for handler test...")
    handler_driver = handler_controller.start()
    print("✓ Browser started")
    
    # Create handler instance
    handler = ExampleHandler(handler_driver, source_name="example_scraper")
    print(f"✓ Handler created: {handler.__class__.__name__}")
    print(f"  - Source name: {handler.source_name}")
    
except Exception as e:
    print(f"✗ Failed to setup handler: {e}")
    handler_controller = None
    handler = None


Starting browser for handler test...
✓ Browser started
✓ Handler created: ExampleHandler
  - Source name: example_scraper


In [12]:
# Test handler scraping functionality
if handler:
    try:
        print("Testing handler scraping...")
        print("=" * 50)
        
        # Collect scraped items
        items = []
        for item in handler.scrape_main_page():
            items.append(item)
            print(f"\n✓ Scraped item {len(items)}:")
            print(f"  - Source: {item.source}")
            print(f"  - Name: {item.name}")
            print(f"  - URL: {item.detail_url}")
            if item.description:
                print(f"  - Description: {item.description[:100]}...")
            if item.metadata:
                print(f"  - Metadata keys: {list(item.metadata.keys())}")
        
        print(f"\n✓ Total items scraped: {len(items)}")
        
        # Test handler utility methods
        print("\n" + "=" * 50)
        print("Testing handler utility methods:")
        
        # Test get_soup
        soup = handler.get_soup()
        print(f"✓ get_soup() works: Found {len(soup.find_all())} HTML elements")
        
        # Test element detection
        has_h1 = handler.is_element_css_present("h1", timeout=5)
        print(f"✓ Element detection (h1): {has_h1}")
        
        # Test page saving (optional - comment out if you don't want to create file)
        # handler.save_page_html("test_page.html")
        # print("✓ Page HTML saved for inspection")
        
    except Exception as e:
        print(f"✗ Handler scraping test failed: {e}")
        import traceback
        traceback.print_exc()
else:
    print("⚠ Handler not available, skipping scraping test")


Testing handler scraping...

✓ Scraped item 1:
  - Source: example_scraper
  - Name: Example Domain
  - URL: https://example.com/
  - Description: This domain is for use in documentation examples without needing permission. Avoid use in operations...
  - Metadata keys: ['page_title', 'url']

✓ Scraped item 2:
  - Source: example_scraper
  - Name: Learn more
  - URL: https://iana.org/domains/example
  - Metadata keys: ['link_index', 'anchor_text']

✓ Total items scraped: 2

Testing handler utility methods:
✓ get_soup() works: Found 11 HTML elements
✓ Element detection (h1): True


## 3. Pipeline Framework Integration Test

Testing pipeline stages and orchestrator with scraped items.


In [13]:
# Import pipeline components
from toolkit.pipeline.base import AbstractPipeline
from toolkit.pipeline.orchestrator import PipelineOrchestrator
from toolkit.pipeline.context import PipelineContext
from toolkit.pipeline.exceptions import DropItem

print("✓ Pipeline framework imports successful")


✓ Pipeline framework imports successful


In [14]:
# Create test pipeline stages
class ValidationPipeline(AbstractPipeline):
    """Pipeline stage that validates scraped items."""
    
    def process_item(self, item):
        """Validate item and add clean_name if missing."""
        # Validate URL
        if not URLValidator.is_valid_url(item.detail_url):
            raise DropItem(f"Invalid URL: {item.detail_url}")
        
        # Validate name
        if not NameValidator.is_valid_name(item.name):
            raise DropItem(f"Invalid name: {item.name}")
        
        # Add clean_name if not present
        if not item.clean_name:
            item.clean_name = NameValidator.clean_name(item.name)
        
        # Add processing metadata
        item.metadata['validated'] = True
        item.metadata['validation_timestamp'] = time.strftime("%Y-%m-%d %H:%M:%S")
        
        return item


class EnrichmentPipeline(AbstractPipeline):
    """Pipeline stage that enriches items with additional data."""
    
    def process_item(self, item):
        """Enrich item with additional information."""
        # Add enrichment metadata
        item.metadata['enriched'] = True
        
        # Normalize URL
        normalized_url = URLValidator.normalize_url(item.detail_url)
        item.metadata['normalized_url'] = normalized_url
        
        # Mark as processed
        item.is_new = True
        
        return item


class FilterPipeline(AbstractPipeline):
    """Pipeline stage that filters items based on criteria."""
    
    def __init__(self, name=None, min_name_length=5):
        super().__init__(name)
        self.min_name_length = min_name_length
    
    def process_item(self, item):
        """Filter items based on name length."""
        if len(item.name) < self.min_name_length:
            raise DropItem(f"Name too short: {item.name}")
        
        item.metadata['filtered'] = True
        return item

print("✓ Test pipeline stages created:")
print("  - ValidationPipeline: Validates and cleans items")
print("  - EnrichmentPipeline: Enriches items with metadata")
print("  - FilterPipeline: Filters items by criteria")


✓ Test pipeline stages created:
  - ValidationPipeline: Validates and cleans items
  - EnrichmentPipeline: Enriches items with metadata
  - FilterPipeline: Filters items by criteria


In [15]:
# Test pipeline execution with scraped items
if handler:
    try:
        print("Testing Pipeline Framework:")
        print("=" * 50)
        
        # Create pipeline context
        context = PipelineContext(session_id="test_session_123")
        print(f"✓ Pipeline context created: {context.session_id}")
        
        # Create pipeline stages
        pipelines = [
            ValidationPipeline(name="validator"),
            FilterPipeline(name="filter", min_name_length=3),
            EnrichmentPipeline(name="enricher")
        ]
        
        # Create orchestrator
        orchestrator = PipelineOrchestrator(pipelines, context=context)
        print(f"✓ Pipeline orchestrator created with {len(pipelines)} stages")
        
        # Scrape items using handler
        print("\nScraping items...")
        scraped_items = list(handler.scrape_main_page())
        print(f"✓ Scraped {len(scraped_items)} items from handler")
        
        # Process items through pipeline
        print("\nProcessing items through pipeline...")
        processed_items = orchestrator.execute(iter(scraped_items))
        
        # Display results
        print("\n" + "=" * 50)
        print("Pipeline Execution Results:")
        stats = orchestrator.get_stats()
        print(f"  - Total items: {stats['total_items']}")
        print(f"  - Processed items: {stats['processed_items']}")
        print(f"  - Dropped items: {stats['dropped_items']}")
        print(f"  - Error items: {stats['error_items']}")
        
        print(f"\n✓ Successfully processed {len(processed_items)} items")
        
        # Show sample processed items
        if processed_items:
            print("\nSample processed items:")
            for i, item in enumerate(processed_items[:3], 1):
                print(f"\n  Item {i}:")
                print(f"    - Name: {item.name}")
                print(f"    - Clean name: {item.clean_name}")
                print(f"    - URL: {item.detail_url}")
                print(f"    - Metadata: {item.metadata}")
                print(f"    - Is new: {item.is_new}")
        
        # Show context statistics
        print(f"\n✓ Pipeline duration: {context.get_duration():.2f} seconds")
        print(f"✓ Context stats: {context.stats}")
        
    except Exception as e:
        print(f"✗ Pipeline test failed: {e}")
        import traceback
        traceback.print_exc()
else:
    print("⚠ Handler not available, skipping pipeline test")


Testing Pipeline Framework:
✓ Pipeline context created: test_session_123
✓ Pipeline orchestrator created with 3 stages

Scraping items...
✓ Scraped 2 items from handler

Processing items through pipeline...

Pipeline Execution Results:
  - Total items: 2
  - Processed items: 2
  - Dropped items: 0
  - Error items: 0

✓ Successfully processed 2 items

Sample processed items:

  Item 1:
    - Name: Example Domain
    - Clean name: example domain
    - URL: https://example.com/
    - Metadata: {'page_title': 'Example Domain', 'url': 'https://example.com/', 'validated': True, 'validation_timestamp': '2025-11-03 14:31:48', 'filtered': True, 'enriched': True, 'normalized_url': 'https://example.com/'}
    - Is new: True

  Item 2:
    - Name: Learn more
    - Clean name: learn more
    - URL: https://iana.org/domains/example
    - Metadata: {'link_index': 0, 'anchor_text': 'Learn more', 'validated': True, 'validation_timestamp': '2025-11-03 14:31:48', 'filtered': True, 'enriched': True, 'norm

In [16]:
# Cleanup: Stop handler browser
if handler_controller:
    try:
        print("\nStopping handler browser...")
        handler_controller.stop()
        print("✓ Handler browser stopped successfully")
    except Exception as e:
        print(f"✗ Error stopping handler browser: {e}")

print("\n" + "="*50)
print("Handler Framework Test Complete")
print("="*50)



Stopping handler browser...
✓ Handler browser stopped successfully

Handler Framework Test Complete


## 4. Download Management Module Test

Testing FileDownloader, StoragePathManager, and download strategies.


In [17]:
# Import download module components
from toolkit.core.download import FileDownloader, DownloadConfig, StoragePathManager, ContentType
from toolkit.core.download.config import RetryPolicy
import tempfile
import shutil

print("✓ Download module imports successful")


✓ Download module imports successful


In [18]:
# Test StoragePathManager
print("Testing StoragePathManager:")
print("=" * 50)

# Create temporary directory for testing
test_download_dir = Path(tempfile.mkdtemp(prefix="test_downloads_"))
print(f"✓ Test directory created: {test_download_dir}")

# Test different path strategies
print("\n1. Testing 'source_type' strategy:")
path_manager = StoragePathManager(test_download_dir, strategy="source_type")
content_dir = path_manager.get_content_dir("test_source", ContentType.IMAGE)
print(f"  ✓ Content directory: {content_dir}")
print(f"  ✓ Directory exists: {content_dir.exists()}")

# Test different content types
for content_type in ContentType:
    content_dir = path_manager.get_content_dir("test_source", content_type)
    print(f"  ✓ {content_type.name}: {content_dir.name}")

# Test path normalization
print("\n2. Testing path normalization:")
test_paths = [
    "images/test.jpg",
    "../images/test.jpg",
    str(content_dir / "test.jpg"),
]
for path in test_paths:
    normalized = path_manager.normalize_path(path)
    print(f"  Original: {path}")
    print(f"  Normalized: {normalized}")

# Test path resolution
print("\n3. Testing path resolution:")
relative_path = "test_source/images/test.jpg"
resolved = path_manager.resolve_path(relative_path)
print(f"  Relative: {relative_path}")
print(f"  Resolved: {resolved}")
print(f"  ✓ Resolved path exists: {resolved.parent.exists()}")

print("\n✓ StoragePathManager tests complete")


Testing StoragePathManager:
✓ Test directory created: C:\Users\felipe\AppData\Local\Temp\test_downloads_iwujfntp

1. Testing 'source_type' strategy:
  ✓ Content directory: C:\Users\felipe\AppData\Local\Temp\test_downloads_iwujfntp\test_source\images
  ✓ Directory exists: True
  ✓ IMAGE: images
  ✓ THUMBNAIL: thumbnails
  ✓ VIDEO: videos
  ✓ DOCUMENT: documents
  ✓ AUDIO: audio
  ✓ ARCHIVE: archives
  ✓ OTHER: other

2. Testing path normalization:
  Original: images/test.jpg
  Normalized: images/test.jpg
  Original: ../images/test.jpg
  Normalized: ../images/test.jpg
  Original: C:\Users\felipe\AppData\Local\Temp\test_downloads_iwujfntp\test_source\other\test.jpg
  Normalized: test_source/other/test.jpg

3. Testing path resolution:
  Relative: test_source/images/test.jpg
  Resolved: C:\Users\felipe\AppData\Local\Temp\test_downloads_iwujfntp\test_source\images\test.jpg
  ✓ Resolved path exists: True

✓ StoragePathManager tests complete


In [19]:
# Configure downloader
retry_policy = RetryPolicy(
    max_retries=3,
    base_delay=1.0,
    exponential_base=2.0,
    max_delay=10.0
)

download_config = DownloadConfig(
    timeout=30,
    chunk_size=8192,
    retry_policy=retry_policy
)

downloader = FileDownloader(download_config)
print("✓ FileDownloader configured")
print(f"  - Timeout: {download_config.timeout}s")
print(f"  - Chunk size: {download_config.chunk_size} bytes")
print(f"  - Max retries: {retry_policy.max_retries}")


✓ FileDownloader configured
  - Timeout: 30s
  - Chunk size: 8192 bytes
  - Max retries: 3


In [20]:
# Test file download (using a small test file from example.com or httpbin)
print("Testing FileDownloader:")
print("=" * 50)

# Use a small, reliable test file (favicon from example.com)
test_url = "https://www.google.com/favicon.ico"
test_destination = test_download_dir / "test_favicon.ico"

try:
    print(f"Downloading test file from {test_url}...")
    success = downloader.download(test_url, test_destination)
    
    if success and test_destination.exists():
        file_size = test_destination.stat().st_size
        print(f"✓ Download successful!")
        print(f"  - File saved to: {test_destination}")
        print(f"  - File size: {file_size} bytes")
        
        # Clean up test file
        test_destination.unlink()
        print("  ✓ Test file cleaned up")
    else:
        print("✗ Download failed or file not found")
        
except Exception as e:
    print(f"✗ Download test failed: {e}")

# Test download with path manager integration
print("\nTesting download with path manager:")
try:
    content_dir = path_manager.get_content_dir("test_source", ContentType.IMAGE)
    download_path = content_dir / "test_download.jpg"
    
    # Try downloading a small image (1x1 pixel PNG)
    small_image_url = "https://www.google.com/favicon.ico"
    success = downloader.download(small_image_url, download_path)
    
    if success and download_path.exists():
        print(f"✓ Download with path manager successful!")
        print(f"  - Saved to: {download_path}")
        download_path.unlink()  # Clean up
    else:
        print("⚠ Download test skipped (network issue)")
        
except Exception as e:
    print(f"⚠ Download test skipped: {e}")

print("\n✓ Download Management tests complete")

# Cleanup test directory
try:
    shutil.rmtree(test_download_dir)
    print(f"✓ Test directory cleaned up: {test_download_dir}")
except Exception as e:
    print(f"⚠ Could not clean up test directory: {e}")


Testing FileDownloader:
Downloading test file from https://www.google.com/favicon.ico...
✓ Download successful!
  - File saved to: C:\Users\felipe\AppData\Local\Temp\test_downloads_iwujfntp\test_favicon.ico
  - File size: 5430 bytes
  ✓ Test file cleaned up

Testing download with path manager:
✓ Download with path manager successful!
  - Saved to: C:\Users\felipe\AppData\Local\Temp\test_downloads_iwujfntp\test_source\images\test_download.jpg

✓ Download Management tests complete
✓ Test directory cleaned up: C:\Users\felipe\AppData\Local\Temp\test_downloads_iwujfntp


## 5. Logging System Module Test

Testing LoggingManager, SessionTracker, and PerformanceMonitor.


In [21]:
# Import logging module components
from toolkit.core.logging import LoggingManager, LogConfig, HandlerConfig
from toolkit.core.logging.config import LogLevel
from toolkit.core.logging.session import SessionTracker
from toolkit.core.logging.monitor import PerformanceMonitor
import tempfile

print("✓ Logging module imports successful")


✓ Logging module imports successful


In [22]:
# Test SessionTracker
print("Testing SessionTracker:")
print("=" * 50)

tracker = SessionTracker(session_id="test_session_001")
print(f"✓ SessionTracker created: {tracker.session_id}")
print(f"  - Start time: {tracker.start_time}")

# Add metadata
tracker.add_metadata("test_key", "test_value")
tracker.add_metadata("scraper_name", "test_scraper")
print(f"✓ Metadata added: {tracker.metadata}")

# Increment stats
tracker.increment_stat("items_scraped", 5)
tracker.increment_stat("items_scraped", 3)
tracker.increment_stat("errors", 1)
print(f"✓ Statistics: {tracker.stats}")

# Get session duration
import time
time.sleep(0.1)  # Small delay to test duration
duration = tracker.get_duration()
print(f"✓ Session duration: {duration:.2f} seconds")

print("\n✓ SessionTracker tests complete")


Testing SessionTracker:
✓ SessionTracker created: test_session_001
  - Start time: 2025-11-03 17:31:49.125087+00:00
✓ Metadata added: {'test_key': 'test_value', 'scraper_name': 'test_scraper'}
✓ Statistics: {'items_scraped': 8, 'errors': 1}
✓ Session duration: 0.10 seconds

✓ SessionTracker tests complete


In [23]:
# Test PerformanceMonitor
print("Testing PerformanceMonitor:")
print("=" * 50)

from toolkit.core.logging.monitor import PerformanceMonitor

monitor = PerformanceMonitor()
print("✓ PerformanceMonitor created")

# Test timing operations
with monitor.time_operation("test_operation"):
    time.sleep(0.1)  # Simulate work

# Time multiple operations
for i in range(3):
    with monitor.time_operation(f"operation_{i}"):
        time.sleep(0.05)

# Get statistics using get_summary()
summary = monitor.get_summary()
print(f"✓ Performance statistics:")
timings = summary.get('timings', {})
for op_name, op_stats in timings.items():
    if op_stats:  # Check if stats exist
        print(f"  - {op_name}: count={op_stats['count']}, "
              f"total={op_stats['total']:.3f}s, "
              f"avg={op_stats['mean']:.3f}s, "
              f"min={op_stats['min']:.3f}s, "
              f"max={op_stats['max']:.3f}s")

# Test counters
monitor.increment_counter("items_processed", 5)
monitor.increment_counter("items_processed", 3)
summary = monitor.get_summary()  # Get updated summary
print(f"\n✓ Counters: {summary.get('counters', {})}")

print("\n✓ PerformanceMonitor tests complete")


Testing PerformanceMonitor:
✓ PerformanceMonitor created
✓ Performance statistics:
  - test_operation: count=1, total=0.101s, avg=0.101s, min=0.101s, max=0.101s
  - operation_0: count=1, total=0.050s, avg=0.050s, min=0.050s, max=0.050s
  - operation_1: count=1, total=0.050s, avg=0.050s, min=0.050s, max=0.050s
  - operation_2: count=1, total=0.051s, avg=0.051s, min=0.051s, max=0.051s

✓ Counters: {'items_processed': 8}

✓ PerformanceMonitor tests complete


In [24]:
# Test LoggingManager with console handler
print("Testing LoggingManager:")
print("=" * 50)

# Create temporary log file
log_file = Path(tempfile.mktemp(suffix=".log", prefix="test_log_"))

# Configure logging with console and file handlers
console_handler = HandlerConfig(
    type="console",
    enabled=True,
    level=LogLevel.INFO
)

file_handler = HandlerConfig(
    type="file",
    enabled=True,
    file_path=str(log_file),
    level=LogLevel.DEBUG,
    max_bytes=10485760,  # 10MB
    backup_count=3
)

log_config = LogConfig(
    logger_name="test_logger",
    level=LogLevel.DEBUG,
    enable_context=True,
    session_id="test_logging_session",
    handlers=[console_handler, file_handler]
)

logging_manager = LoggingManager(log_config)
print(f"✓ LoggingManager created: {logging_manager.session_id}")

# Test logging at different levels
logger = logging_manager.logger
logger.debug("This is a DEBUG message")
logger.info("This is an INFO message")
logger.warning("This is a WARNING message")
logger.error("This is an ERROR message")

print("\n✓ Log messages sent")

# Verify log file was created
if log_file.exists():
    with open(log_file, 'r') as f:
        log_content = f.read()
        print(f"✓ Log file created: {log_file}")
        print(f"  - File size: {len(log_content)} bytes")
        print(f"  - Contains 'INFO': {'INFO' in log_content}")
        print(f"  - Contains 'ERROR': {'ERROR' in log_content}")
        
        # Show last few lines
        lines = log_content.strip().split('\n')
        if lines:
            print(f"  - Last line: {lines[-1][:100]}...")
else:
    print("⚠ Log file was not created")

print("\n✓ LoggingManager tests complete")

# Cleanup: Close handlers before deleting file
try:
    # Close all handlers to release file locks
    for handler in logging_manager.logger.handlers[:]:
        handler.close()
        logging_manager.logger.removeHandler(handler)
    
    # Now we can safely delete the file
    if log_file.exists():
        log_file.unlink()
        print(f"✓ Log file cleaned up")
except Exception as e:
    print(f"⚠ Could not clean up log file: {e}")
    print(f"  (This is normal - file may be locked by logging handlers. It will be cleaned up on Windows temp cleanup)")


2025-11-03 17:31:49 UTC - test_logger - INFO - This is an INFO message
2025-11-03 17:31:49 UTC - test_logger - ERROR - This is an ERROR message


Testing LoggingManager:
✓ LoggingManager created: test_logging_session

✓ Log messages sent
✓ Log file created: C:\Users\felipe\AppData\Local\Temp\test_log_pnlw2av7.log
  - File size: 292 bytes
  - Contains 'INFO': True
  - Contains 'ERROR': True
  - Last line: 2025-11-03 17:31:49 UTC - test_logger - ERROR - This is an ERROR message...

✓ LoggingManager tests complete
✓ Log file cleaned up


## 6. Database & ORM Module Test

Testing DatabaseManager, Repository pattern, and UnitOfWork.


In [25]:
# Import database module components
from toolkit.core.database import DatabaseManager, DatabaseConfig, ModelBase, Repository, UnitOfWork
from sqlalchemy import Column, Integer, String, DateTime, create_engine
from sqlalchemy.orm import declarative_base
from datetime import datetime, timezone
import tempfile

print("✓ Database module imports successful")


✓ Database module imports successful


In [26]:
# Create a test model for demonstration
class TestItem(ModelBase):
    """Test model for database testing."""
    __tablename__ = 'test_items'
    
    id = Column(Integer, primary_key=True)
    name = Column(String(200), nullable=False)
    source = Column(String(100), nullable=False)
    created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))

print("✓ Test model created: TestItem")


✓ Test model created: TestItem


In [27]:
# Configure database (using SQLite in-memory for testing)
print("Testing DatabaseManager:")
print("=" * 50)

# Use temporary SQLite database file
test_db_file = Path(tempfile.mktemp(suffix=".db", prefix="test_db_"))
db_config = DatabaseConfig(
    connection_string=f"sqlite:///{test_db_file}",
    echo=False,  # Set to True to see SQL queries
    pool_pre_ping=True
)

db_manager = DatabaseManager(db_config)
print(f"✓ DatabaseManager created")
print(f"  - Connection: {db_config.connection_string}")

# Create tables
db_manager.create_all_tables()
print("✓ Database tables created")

# Test session creation
session = db_manager.create_session()
print("✓ Database session created")

# Verify table exists
from sqlalchemy import inspect
inspector = inspect(db_manager.engine)
tables = inspector.get_table_names()
print(f"✓ Tables in database: {tables}")

session.close()
print("\n✓ DatabaseManager tests complete")


Testing DatabaseManager:
✓ DatabaseManager created
  - Connection: sqlite:///C:\Users\felipe\AppData\Local\Temp\test_db_kk75s3_s.db
✓ Database tables created
✓ Database session created
✓ Tables in database: ['test_items']

✓ DatabaseManager tests complete


In [28]:
# Test Repository pattern
print("Testing Repository Pattern:")
print("=" * 50)

session = db_manager.create_session()
repo = Repository(session, TestItem)

# Create test items
print("1. Creating test items...")
items = [
    TestItem(name="Item 1", source="test_source"),
    TestItem(name="Item 2", source="test_source"),
    TestItem(name="Item 3", source="another_source"),
]

for item in items:
    created = repo.add(item)
    print(f"  ✓ Created: {created.name} (id: {created.id})")

session.commit()
print(f"✓ Committed {len(items)} items to database")

# Test repository queries
print("\n2. Testing repository queries:")
all_items = repo.get_all()
print(f"  ✓ get_all(): {len(all_items)} items")

item_by_id = repo.get_by_id(1)
if item_by_id:
    print(f"  ✓ get_by_id(1): {item_by_id.name}")

filtered = repo.find_by(source="test_source")
print(f"  ✓ find_by(source='test_source'): {len(filtered)} items")

# Test update
print("\n3. Testing update...")
if item_by_id:
    item_by_id.name = "Updated Item 1"
    updated = repo.update(item_by_id)
    print(f"  ✓ Updated: {updated.name}")

# Test delete
print("\n4. Testing delete...")
items_to_delete = repo.find_by(source="another_source")
if items_to_delete:
    item_to_delete = items_to_delete[0]
    deleted_name = item_to_delete.name  # Save name before deleting
    repo.delete(item_to_delete)
    print(f"  ✓ Deleted: {deleted_name}")

session.commit()

# Verify final state
final_count = len(repo.get_all())
print(f"\n✓ Final item count: {final_count}")

session.close()
print("\n✓ Repository pattern tests complete")


Testing Repository Pattern:
1. Creating test items...
  ✓ Created: Item 1 (id: None)
  ✓ Created: Item 2 (id: None)
  ✓ Created: Item 3 (id: None)
✓ Committed 3 items to database

2. Testing repository queries:
  ✓ get_all(): 3 items
  ✓ get_by_id(1): Item 1
  ✓ find_by(source='test_source'): 2 items

3. Testing update...
  ✓ Updated: Updated Item 1

4. Testing delete...
  ✓ Deleted: Item 3

✓ Final item count: 2

✓ Repository pattern tests complete


In [29]:
# Test UnitOfWork pattern
print("Testing UnitOfWork Pattern:")
print("=" * 50)

from toolkit.core.database.repository import UnitOfWork

# Create UnitOfWork with repository
session = db_manager.create_session()
repo = Repository(session, TestItem)

uow = UnitOfWork(session)

print("✓ UnitOfWork created")

# Test context manager (transaction)
print("\n1. Testing transaction context manager...")
try:
    with uow:
        # Create new item within transaction
        new_item = TestItem(name="Transaction Item", source="test_source")
        uow.get_repository(TestItem).add(new_item)
        print("  ✓ Item created in transaction")
        
        # Transaction will commit automatically on exit
        print("  ✓ Transaction will commit")
except Exception as e:
    print(f"  ✗ Transaction error: {e}")

# Verify item was committed
session = db_manager.create_session()
repo = Repository(session, TestItem)
transaction_item = repo.find_by(name="Transaction Item")
if transaction_item:
    print(f"  ✓ Transaction item found: {transaction_item[0].name}")

session.close()

# Test rollback on error
print("\n2. Testing transaction rollback...")
session = db_manager.create_session()
repo = Repository(session, TestItem)
initial_count = len(repo.get_all())

uow = UnitOfWork(session)

try:
    with uow:
        # Create item
        rollback_item = TestItem(name="Rollback Item", source="test_source")
        uow.get_repository(TestItem).add(rollback_item)
        print("  ✓ Item created in transaction")
        
        # Force an error to trigger rollback
        raise Exception("Simulated error for rollback test")
except Exception:
    print("  ✓ Exception caught, transaction rolled back")

# Verify item was NOT committed
session = db_manager.create_session()
repo = Repository(session, TestItem)
final_count = len(repo.get_all())
rollback_items = repo.find_by(name="Rollback Item")

if len(rollback_items) == 0 and final_count == initial_count:
    print(f"  ✓ Rollback successful (count unchanged: {final_count})")
else:
    print(f"  ⚠ Rollback may have failed (count: {initial_count} -> {final_count})")

session.close()
print("\n✓ UnitOfWork pattern tests complete")


Testing UnitOfWork Pattern:
✓ UnitOfWork created

1. Testing transaction context manager...
  ✓ Item created in transaction
  ✓ Transaction will commit
  ✓ Transaction item found: Transaction Item

2. Testing transaction rollback...
  ✓ Item created in transaction
  ✓ Exception caught, transaction rolled back
  ✓ Rollback successful (count unchanged: 3)

✓ UnitOfWork pattern tests complete


In [30]:
# Cleanup: Close database connections and remove test database
try:
    # Close all database connections to release file locks
    if 'db_manager' in globals():
        db_manager.close()  # This calls engine.dispose() to close all connections
        print("✓ Database connections closed")
    
    # Also close any scoped sessions
    if 'db_manager' in globals() and hasattr(db_manager, 'scoped_session_factory'):
        db_manager.scoped_session_factory.remove()
        print("✓ Scoped sessions removed")
    
    # Small delay to ensure file handles are released (Windows-specific)
    import time
    time.sleep(0.1)
    
    # Now we can safely delete the file
    if test_db_file.exists():
        test_db_file.unlink()
        print(f"✓ Test database cleaned up: {test_db_file}")
except Exception as e:
    print(f"⚠ Could not clean up test database: {e}")
    print(f"  (This is normal - file may be locked by database connections. It will be cleaned up on Windows temp cleanup)")

print("\n" + "="*50)
print("Database & ORM Module Test Complete")
print("="*50)


✓ Database connections closed
✓ Scoped sessions removed
✓ Test database cleaned up: C:\Users\felipe\AppData\Local\Temp\test_db_kk75s3_s.db

Database & ORM Module Test Complete


## Summary

This notebook tests all core modules of the scraping toolkit:

1. ✅ **Browser Management** - WebDriverController, BrowserSession, window management
2. ✅ **Handler Framework** - AbstractHandler, utilities, scraping workflow
3. ✅ **Pipeline Framework** - Pipeline stages, orchestrator, context management
4. ✅ **Download Management** - FileDownloader, StoragePathManager, ContentType
5. ✅ **Logging System** - LoggingManager, SessionTracker, PerformanceMonitor
6. ✅ **Database & ORM** - DatabaseManager, Repository, UnitOfWork

All modules have been tested and verified to work correctly!
