In [2]:
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from fake_useragent import UserAgent
import time
import random
import csv
import os
import sqlite3
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed

# Configuration des logs
logging.basicConfig(
    filename='scraping_log.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# Fonction pour lire les annonces existantes à partir du fichier CSV
def load_existing_annonces(filename):
    if not os.path.exists(filename):
        return set()  # Retourner un ensemble vide si le fichier n'existe pas
    with open(filename, mode="r", newline="", encoding="utf-8") as file:
        reader = csv.reader(file)
        next(reader)  # Ignorer l'en-tête
        return {row[0] for row in reader}  # Utiliser la location comme clé

# Fonction pour créer la table dans la base de données SQLite
def create_annonces_table(db_file):
    try:
        connection = sqlite3.connect(db_file)
        cursor = connection.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS annonces (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            location INTEGER NOT NULL,
            pieces INTEGER NOT NULL,
            surface FLOAT NOT NULL,
            price FLOAT NOT NULL,
            description TEXT NOT NULL
            )
        ''')
        connection.commit()
        cursor.close()
        connection.close()
    except Exception as e:
        logging.error(f"Erreur lors de la création de la table : {e}")

# Fonction d'extraction pour une annonce unique
def extract_annonce(annonce, existing_annonces):
    try:
        location_element = annonce.find_element(By.CLASS_NAME, "item-title")
        location_text = ''.join(filter(str.isdigit, location_element.find_element(By.TAG_NAME, "span").text.strip()))
        if len(location_text) >= 3:
            location = int(location_text[-2:])
        elif len(location_text) == 1:
            location = int('0' + location_text)
        elif len(location_text) == 2:
            location = int(location_text)

        # Vérifier si l'annonce existe déjà
        if location in existing_annonces:
            return None, "Annonce déjà présente"

        item_tags = annonce.find_elements(By.CSS_SELECTOR, "ul.item-tags li")
        piece_info = [tag.text for tag in item_tags if "pièce" in tag.text]
        surface_info = [tag.text for tag in item_tags if "m²" in tag.text]

        pieces = int(''.join(filter(str.isdigit, piece_info[0]))) if piece_info else "Non précisé"
        surface = surface_info[0].replace("m²", "").strip() if surface_info else "Non précisé"
        if surface != "Non précisé":
            surface = float(surface.replace(",", "."))

        price_element = annonce.find_element(By.CLASS_NAME, "item-price")
        price_text = price_element.text.strip()
        price = float(''.join(filter(str.isdigit, price_text)))
        description = annonce.find_element(By.CLASS_NAME, "item-description").text.strip()

        # Condition pour ne sauvegarder que les annonces complètes
        if (pieces != "Non précisé" and surface != "Non précisé" and 
            price != "" and description != ""):
            return [location, pieces, surface, price, description], None
        else:
            return None, "Champs manquants"

    except Exception as e:
        return None, f"Erreur : {e}"

# Fonction pour ajouter une annonce à la base de données SQLite
def add_annonce_to_db(db_file, annonce):
    try:
        connection = sqlite3.connect(db_file)
        cursor = connection.cursor()
        cursor.execute("INSERT INTO annonces (location, pieces, surface, price, description) VALUES (?, ?, ?, ?, ?)", annonce)
        connection.commit()
        cursor.close()
        connection.close()
        return True
    except Exception as e:
        logging.error(f"Erreur lors de l'ajout à la base de données : {e}")
        return False

# Fonction principale de scraping
def scrape_annonces():
    logging.info("Démarrage du script de scraping.")
    print("Démarrage du script de scraping.")
    
    # Configuration du User-Agent aléatoire
    user_agent = UserAgent().random

    # Options pour undetected_chromedriver en mode headless avec ajustements
    options = uc.ChromeOptions()
    options.add_argument(f"user-agent={user_agent}")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")
    # options.add_argument("--headless")  # Activer le mode headless
    options.add_argument("--window-size=1920,1080")  # Simuler une taille de fenêtre de bureau
    options.add_argument("--disable-gpu")  # Nécessaire en mode headless pour certains environnements
    options.add_argument("--enable-cookies")
    options.add_argument("--disable-blink-features=AutomationControlled")

    # Initialiser le navigateur en mode non-détectable
    logging.info("Initialisation du navigateur.")
    print("Initialisation du navigateur.")
    driver = uc.Chrome(options=options)

    # URL de la page d'annonces à scraper
    url = "https://www.pap.fr/annonce/locations-appartement-paris-75-g439"
    driver.get(url)
    logging.info(f"Ouverture de la page {url}")
    print(f"Ouverture de la page {url}")
    time.sleep(random.uniform(3, 5))

    # Accepter les cookies si une bannière de consentement est présente
    logging.info("Vérification de la bannière de cookies.")
    print("Vérification de la bannière de cookies.")
    try:
        cookie_button = driver.find_element(By.XPATH, "//span[contains(@class, 'sd-cmp-1jLDJ') and contains(@class, 'sd-cmp-fuQAp') and contains(@class, 'sd-cmp-3_LLS')]")
        cookie_button.click()
        logging.info("Bannière de cookies acceptée.")
        print("Bannière de cookies acceptée.")
    except Exception:
        logging.info("Pas de bannière de cookies détectée.")
        print("Pas de bannière de cookies détectée.")

    # Déroulement de la page pour charger toutes les annonces
    logging.info("Début du déroulement de la page pour charger toutes les annonces.")
    print("Début du déroulement de la page pour charger toutes les annonces.")
    last_height = driver.execute_script("return document.body.scrollHeight")
    while True:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.uniform(2, 4))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height:
            logging.info("Déroulement de la page terminé.")
            print("Déroulement de la page terminé.")
            break
        last_height = new_height

    # Récupération des annonces
    annonces = driver.find_elements(By.CLASS_NAME, "item-body")
    total_annonces = len(annonces)
    logging.info(f"Nombre d'annonces trouvées : {total_annonces}")
    print(f"Nombre d'annonces trouvées : {total_annonces}")

    # Charger les annonces existantes
    existing_annonces = load_existing_annonces("annonces_immobilieres.csv")

    # Créer la table si elle n'existe pas
    db_file = "annonces.db"  # Assurez-vous que le nom de votre fichier de base de données est correct
    create_annonces_table(db_file)

    # Compteurs pour les statistiques
    nombre_enregistrees = 0
    nombre_ignored = 0
    ignored_details = []
    data_to_save = []

    # Extraction en parallèle avec ThreadPoolExecutor pour le CSV
    logging.info("Début de l'extraction des annonces.")
    print("Début de l'extraction des annonces.")
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(extract_annonce, annonce, existing_annonces) for annonce in annonces]
        for future in as_completed(futures):
            result, reason = future.result()
            if result:
                data_to_save.append(result)
                nombre_enregistrees += 1
            else:
                nombre_ignored += 1
                ignored_details.append(reason)

    logging.info("Extraction des annonces terminée.")
    print("Extraction des annonces terminée.")

    # Fermer le navigateur
    driver.quit()
    logging.info("Navigateur fermé.")
    print("Navigateur fermé.")

    # Enregistrer les nouvelles données dans le fichier CSV
    if data_to_save:
        with open("annonces_immobilieres.csv", mode="a", newline="", encoding="utf-8") as file:
            writer = csv.writer(file)
            writer.writerows(data_to_save)
        logging.info("Données enregistrées dans le fichier CSV.")
        print("Données enregistrées dans le fichier CSV.")

        # Enregistrer les données dans la base de données SQLite une par une
        for annonce in data_to_save:
            if add_annonce_to_db(db_file, annonce):
                logging.info(f"Annonce ajoutée à la base de données : {annonce}")
                print(f"Annonce ajoutée à la base de données : {annonce}")
            else:
                logging.info(f"Erreur lors de l'ajout de l'annonce à la base de données : {annonce}")
                print(f"Erreur lors de l'ajout de l'annonce à la base de données : {annonce}")

    # Rapport final
    logging.info(f"Total d'annonces trouvées : {total_annonces}")
    logging.info(f"Nombre d'annonces enregistrées : {nombre_enregistrees}")
    logging.info(f"Nombre d'annonces ignorées : {nombre_ignored}")
    print(f"Total d'annonces trouvées : {total_annonces}")
    print(f"Nombre d'annonces enregistrées : {nombre_enregistrees}")
    print(f"Nombre d'annonces ignorées : {nombre_ignored}")
    if ignored_details:
        print("Détails des annonces ignorées :")
        for detail in ignored_details:
            print(detail)

# Point d'entrée du script
if __name__ == "__main__":
    while True:
        scrape_annonces()
        logging.info("Attente de 24 heures avant la prochaine exécution.")
        print("Attente de 24 heures avant la prochaine exécution.")
        time.sleep(24 * 60 * 60)  # Pause de 24 heures (24 heures * 60 minutes * 60 secondes)


Démarrage du script de scraping.
Initialisation du navigateur.
Ouverture de la page https://www.pap.fr/annonce/locations-appartement-paris-75-g439
Vérification de la bannière de cookies.
Bannière de cookies acceptée.
Début du déroulement de la page pour charger toutes les annonces.
Déroulement de la page terminé.
Nombre d'annonces trouvées : 375
Début de l'extraction des annonces.
Extraction des annonces terminée.
Navigateur fermé.
Données enregistrées dans le fichier CSV.
Annonce ajoutée à la base de données : [10, 1, 15.4, 740.0, 'Studio refait à neuf très belle prestation rez-de-chaussée. Comprenant : 1 pièces à vivre cuisinette entièrement équipée neuve douche wc. Loyer 670 E/mois + charges 70 E. Références exigées.']
Annonce ajoutée à la base de données : [8, 1, 40.0, 2000.0, '**Idéal étudiant(e)** À 100 m du Parc Monceau, au pied du métro Courcelles. Dans résidence de standing avec gardien et ascenseurs, beau studio meublé 40 m2 lumineux, très agréable, tout confort.']
Annonce aj

KeyboardInterrupt: 