# Pipeline 2: Listing

## Prerequisites

In [None]:
import sys
from pathlib import Path
import os
import random

# Find the project root
project_root = Path().cwd().parent
print(f"Project root: {project_root}")

# Add project root to Python path (not just sources)
sys.path.insert(0, str(project_root))
print(f"Added to Python path: {project_root}")

# Set environment variables

os.environ["QE_ENV"] = "dev"
os.environ["QE_CONF_FOLDER"] = "sources/resources"
print(f"Added environment variables: QE_ENV={os.environ['QE_ENV']}, QE_CONF_FOLDER={os.environ['QE_CONF_FOLDER']}")

In [None]:
from sources.datamodel.listing_details import ListingDetails
from sources.datamodel.listing_id import ListingId
from sources.logging import logging_utils
from sources.storage.abstract_storage import Storage
from sources.scrapers.immobiliare.scraper_listing import ImmobiliareListingScraper
from sources.config.config_manager import ConfigManager

## Configuration

In [None]:
logging_utils.setup_logging(config_path="sources/resources/logging.yaml")
logger = logging_utils.get_logger(__name__)

settings = ConfigManager().get_storage_config()

storage: Storage = Storage.create_storage(data_type=ListingDetails, config=settings)

## Extract ListingIds from MongoDB

In [None]:
from pymongo import MongoClient
from contextlib import contextmanager

from sources.config.model.storage_settings import MongoStorageSettings
from sources.storage.mongo_storage import MongoDBStorage


# Get MongoDB configuration from storage settings
mongo_config: MongoStorageSettings = settings.mongodb_settings  # This should be a MongoStorageSettings instance


# Connect to MongoDB using the same configuration as the storage
@contextmanager
def get_mongo_client():

    """Context manager for MongoDB client with proper resource cleanup."""

    client = MongoClient(mongo_config.connection_string.get_secret_value())

    try:
        yield client

    finally:
        client.close()


# Query for ListingIds that don't have corresponding ListingDetails using aggregation

batch_size = 100


with get_mongo_client() as client:

    db = client[mongo_config.database]
    ids_collection = db[mongo_config.collection_ids]

    # Use aggregation pipeline with $lookup (left outer join) to find unprocessed IDs
    pipeline = [
        {
            "$lookup": {
                "from": mongo_config.collection_listings,  # Join with listings collection

                "localField": "id",  # Field from ids collection
                "foreignField": "id",  # Field from listings collection

                "as": "listing_details",  # Output array field
            }
        },
        {

            "$match": {

                "listing_details": {
                    "$size": 0
                }  # Filter where no matching listing details found

            }

        },
        {
            "$sample": {"size": batch_size}  # Randomly sample from matching documents
        },
        {
            "$project": {

                "listing_details": 0  # Remove the empty listing_details array from output
            }
        },
    ]

    # Execute aggregation pipeline
    unprocessed_docs = list(ids_collection.aggregate(pipeline))


    # Convert documents back to ListingId objects

    listing_ids = [ListingId.from_dict(doc) for doc in unprocessed_docs]


    print(f"Found {len(listing_ids)} ListingIds without corresponding ListingDetails")

listing_ids[:5]

## Scraping Listings

In [None]:
import concurrent.futures as cf
import time

# Stagger the start times to be respectful to the server
max_workers = 5  # Conservative limit for immobiliare.it
stagger_delay = 10  # Seconds between starting each scraper

def run_scraper_with_delay(
    listing_id: ListingId, 
    delay=1,
):
    """Run scraper with staggered start to avoid overwhelming the server"""
    time.sleep(random.uniform(0, delay))
    scraper = ImmobiliareListingScraper(storage, listing_id=listing_id, headless=False)
    return scraper.scrape()


with cf.ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = []
    for i, id in enumerate(listing_ids):
        future = executor.submit(run_scraper_with_delay, id, stagger_delay)
        futures.append(future)

    for future in cf.as_completed(futures):
        try:
            result = future.result()
            logger.info("Scraper completed")
        except Exception as e:
            logger.error(f"Scraper failed: {e}")