<a href="https://colab.research.google.com/github/AncoPetiteMer/obsidian/blob/main/pictures_scraping2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%capture --no-stderr
!pip install --quiet -U langchain langchain-openai openai google-auth-oauthlib google-auth-httplib2 google-api-python-client python-dotenv beautifulsoup4 requests langchain_community

In [None]:
prompt_template = """
Extraction des informations
1. Ville : Extraire le nom de la ville où se situe le bien.
2. Code Postal (CP) : Récupérer le code postal associé à la ville.
3. Transaction : Identifier s’il s’agit d’une location, d’une vente ou des deux.
4. Type de bien : Préciser s’il s’agit d’un terrain, bureaux, local d’activités, entrepôt, etc.
5. Surface du bien : Extraire la surface totale du bien en m².
6. Surface du terrain : Extraire la surface du terrain s'il est mentionné.
7. Surface divisibilité : Extraire la surface minimale divisible si elle est mentionnée.
8. Prix : Si c’est une location, récupérer le loyer annuel ou mensuel (en multipliant par 12 si c’est le cas). Si c’est une vente, récupérer le prix de vente.
9. Localisation et prestations : Extraire les informations liées à l’emplacement et aux caractéristiques techniques (accessibilité, équipements, etc.).
10. Caractéristiques et équipements : Extraire les caractéristiques et équipements du bien immobilier à partir de la description.

Génération des phrases et URL
1. Phrase résumé des surfaces : Générer une phrase type comme « Terrain de 5000 m² divisible à partir de 100 m² » ou « Bureaux de 250 m² non divisibles ».
2. Accroche commerciale : Créer une accroche sous le format Type de bien + type d’acquisition + surface en m² + ville + (numéro du département) (ex : "Local d'activités 440 m² à louer à Lyon (69)").
3. Phrase résumé du prix : Rédiger une phrase pour résumer le prix, par exemple : "Loyer mensuel de 70 €/m² HT/HC". Si le prix n’est pas mentionné : "Nous consulter pour plus d'informations."
4. Reformulation des informations extraites : Formuler une description de 120 mots reprenant les informations collectées (description du bien, emplacement, surface, loyer/prix de vente, etc.).
5. URL : Générer une URL sous le modèle /typed'acquisition-typedebien-surfacedubienm2-commune-n°département (ex : /location-local-activites-250m2-saint-pantaleon-de-larche-19).
6. Titre SEO : Créer un titre SEO sous le modèle type acquisition + type de bien + surface totale en m² + ville (ex : "Location local d'activités 1220 m² Lyon").
7. Phrase résumé : Générer une phrase incitative à la lecture, par exemple : "Découvrez ce local d’activités idéalement situé à Lyon, offrant 440 m² divisibles et accessible aux entreprises."
"""


In [None]:
%%writefile offres_immos.py
import os
from dotenv import load_dotenv
import requests
import random
from bs4 import BeautifulSoup
from langchain.text_splitter import CharacterTextSplitter
from langchain.prompts import ChatPromptTemplate
from langchain.output_parsers import StructuredOutputParser, ResponseSchema
from langchain.schema.runnable import RunnablePassthrough
from langchain_openai import ChatOpenAI
from google.colab import auth, drive
from google.auth import default
from googleapiclient.discovery import build
import pandas as pd
import re
import time
from requests.exceptions import RequestException
import logging
import base64
from PIL import Image
from io import BytesIO
from googleapiclient.http import MediaFileUpload
from urllib.parse import urljoin, urlparse

# Configuration du logger
logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)

# Authentification et création du client Drive (utilisé globalement pour la gestion des images)
auth.authenticate_user()
drive_service = build('drive', 'v3')

# -------------------------------
# IMAGE RELEVANCE UTILS
# -------------------------------
def image_to_base64(img_url):
    """
    Download an image from the provided URL and convert it to a base64-encoded string.

    Args:
        img_url (str): The URL of the image to download.

    Returns:
        str or None: A base64-encoded string representing the image in JPEG format,
                     or None if the image cannot be downloaded or converted.
    """
    try:
        response = requests.get(img_url)
        response.raise_for_status()
        image = Image.open(BytesIO(response.content))
        buffered = BytesIO()
        image.save(buffered, format="JPEG")
        return base64.b64encode(buffered.getvalue()).decode()
    except Exception as e:
        logger.error("Error converting image to base64 for URL %s: %s", img_url, e)
        return None

# -------------------------------
# Other utility functions
# -------------------------------
def scrapemyurl(url):
    logger.info("Starting to scrape images from: %s", url)
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')
    img_tags = soup.find_all('img')
    logger.info("Found %s image tags.", len(img_tags))
    div_bg_tags = soup.find_all('div', attrs={'data-bg': True})
    logger.info("Found %s div elements with data-bg attributes.", len(div_bg_tags))
    div_style_tags = soup.find_all('div', style=lambda value: value and 'background-image' in value)
    logger.info("Found %s div elements with background-image in style attributes.", len(div_style_tags))
    return img_tags, div_bg_tags, div_style_tags

def is_real_image(img_url):
    non_real_keywords = ['favicon', 'icon', 'logo', 'sprite', 'placeholder']
    for keyword in non_real_keywords:
        if keyword in img_url.lower():
            logger.debug("Image URL %s is not real because it contains the keyword '%s'.", img_url, keyword)
            return False
    logger.debug("Image URL %s is considered real.", img_url)
    return True

def has_sufficient_size(img_url):
    try:
        response = requests.get(img_url, stream=True)
        response.raise_for_status()
        image = Image.open(BytesIO(response.content))
        width, height = image.size
        if width > 100 and height > 100:
            logger.debug("Image URL %s has sufficient size: %sx%s.", img_url, width, height)
            return True
        else:
            logger.debug("Image URL %s does not have sufficient size: %sx%s.", img_url, width, height)
            return False
    except (requests.RequestException, IOError) as e:
        logger.error("Failed to check size for image URL %s: %s", img_url, e)
        return False

def is_image_content_type(img_url):
    try:
        response = requests.head(img_url, headers={'User-Agent': 'Mozilla/5.0'})
        content_type = response.headers.get('Content-Type', '')
        if content_type.startswith('image/'):
            logger.debug("Image URL %s has content type %s, which is an image.", img_url, content_type)
            return True
        else:
            logger.debug("Image URL %s has content type %s, which is not an image.", img_url, content_type)
            return False
    except requests.RequestException:
        logger.error("Failed to retrieve content type for image URL %s.", img_url)
        return False

def extract_image_urls(tags, base_url):
    image_urls = []
    for tag in tags:
        if tag.name == 'img':
            img_url = tag.get('src') or tag.get('data-src')
        elif tag.name == 'div':
            img_url = tag.get('data-bg') or extract_background_image_url(tag.get('style'))
        else:
            continue
        if img_url:
            img_url = urljoin(base_url, img_url)
            image_urls.append(img_url)
            logger.debug("Extracted image URL: %s", img_url)
    return image_urls

def extract_background_image_url(style):
    if 'background-image' in style:
        start = style.find('url(') + 4
        end = style.find(')', start)
        if start != -1 and end != -1:
            img_url = style[start:end].strip().strip('"').strip("'")
            logger.debug("Extracted background image URL from style: %s", img_url)
            return img_url
    return None

def save_image_to_drive(img_url, folder_id):
    try:
        response = requests.get(img_url, stream=True)
        response.raise_for_status()
        image = Image.open(BytesIO(response.content))
        image.thumbnail((960, 720))
        img_name = os.path.basename(img_url).split('.')[0] + '.jpeg'
        img_path = f"/tmp/{img_name}"
        image.convert("RGB").save(img_path, 'JPEG', quality=90)
        file_metadata = {'name': img_name, 'parents': [folder_id]}
        media = MediaFileUpload(img_path, mimetype='image/jpeg')
        file = drive_service.files().create(body=file_metadata, media_body=media, fields='id').execute()
        file_id = file.get('id')
        logger.info("Uploaded image %s to Google Drive with ID: %s", img_url, file_id)
    except requests.RequestException as e:
        logger.error("Failed to save image %s to Google Drive: %s", img_url, e)

def create_folder_in_drive(folder_name, parent_folder_id):
    file_metadata = {
        'name': folder_name,
        'mimeType': 'application/vnd.google-apps.folder',
        'parents': [parent_folder_id]
    }
    file = drive_service.files().create(body=file_metadata, fields='id').execute()
    folder_id = file.get('id')
    logger.info("Created folder '%s' with ID: %s", folder_name, folder_id)
    return folder_id

def get_domain_name(url):
    parsed_url = urlparse(url)
    domain_name = parsed_url.netloc
    logger.debug("Extracted domain name: %s", domain_name)
    return domain_name

def fetch_webpage_content(url, max_retries=3):
    logger.info("Fetching content from URL: %s", url)
    user_agents = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15',
        'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0',
        'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36'
    ]
    session = requests.Session()
    for i in range(max_retries):
        try:
            headers = {'User-Agent': random.choice(user_agents)}
            time.sleep(2)
            response = session.get(url, headers=headers)
            response.raise_for_status()
            soup = BeautifulSoup(response.content, 'html.parser')
            content = ''
            main_content = soup.find('div', {'id': 'content'})
            if main_content:
                for tag in main_content.find_all(['h1', 'h2', 'h3', 'p', 'div', 'span']):
                    content += tag.get_text(separator=' ', strip=True) + ' '
            else:
                for tag in ['h1', 'h2', 'h3', 'p', 'div', 'span']:
                    elements = soup.find_all(tag)
                    for element in elements:
                        content += element.get_text(separator=' ', strip=True) + ' '
            content = re.sub(r'\s+', ' ', content)
            content = re.sub(r'[\r\n]+', ' ', content)
            content = re.sub(r'[^a-zA-Z0-9À-ÿ.,:;/\-\s]', '', content)
            logger.info("Content fetched and parsed for URL: %s", url)
            return content.strip()
        except RequestException as e:
            logger.error("Error fetching URL %s: %s", url, str(e))
            if i == max_retries - 1:
                return ""
            wait_time = 2 ** i
            logger.info("Retrying in %s seconds...", wait_time)
            time.sleep(wait_time)
    return ""

# Classe encapsulant les ressources partagées
class OffresImmoProcessor:
    """
    Classe responsable du traitement des offres immobilières.
    Elle encapsule l'authentification, la configuration des services et le traitement des URL.
    """
    def __init__(self):
        logger.info("Mounting Google Drive")
        drive.mount('/content/drive')

        logger.info("Loading API key from .env file")
        env_path = '/content/drive/MyDrive/Colab Notebooks/Api_keys/.env'
        load_dotenv(env_path)
        self.openai_api_key = os.getenv('OPENAI_API_KEY')
        if not self.openai_api_key:
            logger.error("Failed to load API key")
            raise Exception("API key not loaded")
        else:
            logger.info("API key loaded successfully: %s...%s", self.openai_api_key[:5], self.openai_api_key[-5:])

        logger.info("Authenticating with Google")
        auth.authenticate_user()
        creds, _ = default()
        self.service = build('sheets', 'v4', credentials=creds)
        logger.info("Google Sheets service created")

        logger.info("Setting up output parser and prompt template")
        self.response_schemas = [
            ResponseSchema(name="ville", description="La ville où se trouve le bien"),
            ResponseSchema(name="cp", description="Le code postal, qui doit toujours être associé à la ville."),
            ResponseSchema(name="transaction", description="Type de transaction (location, vente, etc.)"),
            ResponseSchema(name="type_de_bien", description="Type de bien immobilier à rentrer uniquement sous les déterminations suivantes (attention de bien respecter l'orthographe indiquée, ex : activite sans accent) : bureaux ou entrepot - logistique (pour tous les biens de type entrepot, batiment logistique, batiment industriel) ou local d'activite - atelier (incluant local commercial) ou terrain"),
            ResponseSchema(name="surface_du_bien", description="Surface du bien en m²"),
            ResponseSchema(name="surface_du_terrain", description="Surface du terrain en m²"),
            ResponseSchema(name="surface_divisibilite", description="Surface divisible en m²"),
            ResponseSchema(name="prix_loyer_annuel", description="Prix du loyer annuel"),
            ResponseSchema(name="prix_de_vente", description="Prix de vente"),
            ResponseSchema(name="localisation", description="Informations sur la localisation"),
            ResponseSchema(name="phrase_resume_surfaces", description="Phrase résumant les surfaces"),
            ResponseSchema(name="accroche_commerciale", description="Accroche commerciale générée sous le format Type de bien + type d’acquisition + surface en m² + ville + (numéro du département) ex : 'Local d'activités 440 m² à louer à Lyon (69)'"),
            ResponseSchema(name="phrase_resume_prix", description="Phrase résumant le prix, par ex : 'Loyer mensuel de 70 €/m² HT/HC'. Si le prix n’est pas mentionné : 'Nous consulter pour plus d'informations.'"),
            ResponseSchema(name="reformulation", description="Description reformulée de 120 mots, sans mentionner les prix, reprenant les informations collectées dont les équipements du site, les détails du bâtiment (revêtement de sol, matériaux extérieur, parking, portes sectionnelles, etc.) et les caractéristiques techniques (ex: accès poids lourds), sans commencer le paragraphe par 'découvrez', le ton doit être informatif"),
            ResponseSchema(name="url_modele", description="URL générée sous le format '/typed'acquisition-typedebien-surfacedubienm2-commune-n°département' (ex : '/location-local-activites-250m2-saint-pantaleon-de-larche-19')"),
            ResponseSchema(name="titre_seo", description="Titre SEO généré sous le format 'type acquisition + type de bien + ville (ex :'location local d'activités à Lyon') "),
            ResponseSchema(name="phrase_resume", description="Phrase résumé incitative à la lecture, par exemple : 'Découvrez ce local d’activités idéalement situé à Lyon, offrant 440 m² divisibles et accessible aux entreprises.'"),
            ResponseSchema(name="caracteristiques_et_equipements", description="Caractéristiques et équipements du bien immobilier extraits de la description")
        ]
        output_parser = StructuredOutputParser.from_response_schemas(self.response_schemas)
        logger.info("Creating ChatOpenAI model")
        self.llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0, api_key=self.openai_api_key)
        logger.info("ChatOpenAI model created with model name: gpt-4o-mini")
        prompt_template = ChatPromptTemplate.from_template(
            "Extract the following information from the given text:\n{format_instructions}\n\nText: {text}"
        )
        self.chain = (
            {
                "text": RunnablePassthrough(),
                "format_instructions": lambda _: output_parser.get_format_instructions()
            }
            | prompt_template
            | self.llm
            | output_parser
        )
        logger.info("Chain created")

        # Create a dedicated prompt template for image relevance evaluation
        self.image_prompt_template = ChatPromptTemplate.from_template(
            "Website context: {website_context}\n"
            "Image data (truncated): {truncated_base64}\n"
            "Based on the above information, determine if the image is relevant for the website. "
            "Answer with 'relevant' or 'irrelevant'."
        )

    def check_image_relevance(self, img_url, website_context):
        """
        Evaluate the relevance of an image for a given website context using the centralized LangChain LLM.

        This method converts the image to base64, truncates the encoded string, formats a prompt using a dedicated
        prompt template, and then sends the prompt via the shared ChatOpenAI instance.

        Args:
            img_url (str): The URL of the image to evaluate.
            website_context (str): A text description of the website context in which the image will be used.

        Returns:
            str: The LLM's response, typically "relevant" or "irrelevant". Returns an error message if conversion fails or an exception occurs.
        """
        base64_img = image_to_base64(img_url)
        if not base64_img:
            return "Image conversion failed"

        truncated_base64 = base64_img[:100] + "..."
        prompt = self.image_prompt_template.format(
            website_context=website_context,
            truncated_base64=truncated_base64
        )
        try:
          response = self.llm([{"role": "user", "content": prompt}])
          # Extract message content from the LLM response, handling various response types
          if isinstance(response, list) and len(response) > 0 and hasattr(response[0], "content"):
              result = response[0].content
          elif hasattr(response, "content"):
              result = response.content
          else:
              result = str(response)

        except Exception as e:
            logger.error("Error during image relevance check for image %s: %s", img_url, e)
            return "Relevance check failed"

    def test_sheet_access(self, spreadsheet_id):
        try:
            sheet = self.service.spreadsheets()
            result = sheet.get(spreadsheetId=spreadsheet_id).execute()
            logger.info("Successfully accessed sheet: %s", result['properties']['title'])
            logger.info("Available sheets:")
            for sheet in result['sheets']:
                logger.info(" - %s", sheet['properties']['title'])
        except Exception as e:
            logger.error("Error accessing spreadsheet: %s", str(e))

    def get_used_range(self, spreadsheet_id, sheet_name):
        try:
            sheet = self.service.spreadsheets()
            result = sheet.get(spreadsheetId=spreadsheet_id, ranges=[sheet_name], includeGridData=False).execute()
            grid_properties = result['sheets'][0]['properties']['gridProperties']
            row_count = grid_properties['rowCount']
            column_count = grid_properties['columnCount']
            return f"{sheet_name}!A1:{chr(64+column_count)}{row_count}"
        except Exception as e:
            logger.error("Error getting used range: %s", str(e))
            return f"{sheet_name}!A1:Z1000"

    def read_urls_from_sheet(self, spreadsheet_id, range_name):
        logger.info("Reading URLs from sheet: %s, range: %s", spreadsheet_id, range_name)
        sheet = self.service.spreadsheets()
        try:
            result = sheet.values().get(spreadsheetId=spreadsheet_id, range=range_name).execute()
            values = result.get('values', [])
            logger.debug("Raw data from sheet: %s", values)
            if not values:
                logger.info("No data found in the sheet.")
                return pd.DataFrame()
            if values[0][0].lower() == 'urls':
                df = pd.DataFrame(values[1:], columns=values[0])
            else:
                df = pd.DataFrame(values, columns=['Urls'])
            logger.info("Read %s URLs from the sheet", len(df))
            for url in df['Urls']:
                logger.debug(" - %s", url)
            return df
        except Exception as e:
            logger.error("Error reading from Google Sheets: %s", str(e))
            return pd.DataFrame()

    def write_data_to_sheet(self, spreadsheet_id, range_name, data):
        logger.info("Writing data to sheet: %s, range: %s", spreadsheet_id, range_name)
        sheet = self.service.spreadsheets()
        try:
            sheet_name, start_cell = range_name.split('!')
        except ValueError:
            logger.error("Error: Invalid range format '%s'. Expected format 'SheetName!Cell'.", range_name)
            return
        logger.info("Clearing existing data in sheet: %s", sheet_name)
        sheet.values().clear(spreadsheetId=spreadsheet_id, range=sheet_name).execute()
        end_column = chr(ord('A') + len(data[0]) - 1)
        end_row = len(data)
        updated_range = f"{sheet_name}!{start_cell}:{end_column}{end_row}"
        logger.debug("Calculated updated range: %s", updated_range)
        body = {'values': data}
        try:
            result = sheet.values().update(
                spreadsheetId=spreadsheet_id, range=updated_range,
                valueInputOption='RAW', body=body).execute()
            logger.info("%s cells updated.", result.get('updatedCells'))
            logger.info("Data written: %s rows, %s columns", len(data), len(data[0]))
        except Exception as e:
            logger.error("Error writing to Google Sheets: %s", str(e))

    def process_url(self, url):
        logger.info("Processing URL: %s", url)
        webpage_text = fetch_webpage_content(url)
        text_splitter = CharacterTextSplitter(chunk_size=4000, chunk_overlap=0)
        texts = text_splitter.split_text(webpage_text)
        img_tags, div_bg_tags, div_style_tags = scrapemyurl(url)
        image_urls = extract_image_urls(img_tags, url)
        image_urls += extract_image_urls(div_bg_tags, url)
        image_urls += extract_image_urls(div_style_tags, url)
        domain_name = get_domain_name(url)
        parent_folder_id = '1etOraaUSWfBcPAniYNw6Asbs1hIsJNb6'
        drive_folder_id = create_folder_in_drive(domain_name, parent_folder_id)
        # --- NEW FEATURE: Check image relevance before saving ---
        website_context = "This is a real estate website showcasing properties for sale and rent."
        for img_url in image_urls:
            if is_real_image(img_url) and is_image_content_type(img_url) and has_sufficient_size(img_url):
                relevance = self.check_image_relevance(img_url, website_context)
                logger.info("Image relevance for %s: %s", img_url, relevance)
                if "relevant" in relevance.lower():
                    save_image_to_drive(img_url, drive_folder_id)
                else:
                    logger.info("Skipping image %s due to irrelevance.", img_url)
            else:
                logger.info("Invalid image URL: %s", img_url)
        # ---------------------------------------------------------
        try:
            parsed_response = self.chain.invoke(texts[0])
            logger.info("URL processed successfully: %s", url)
            return {schema.name: parsed_response.get(schema.name, '') for schema in self.response_schemas}
        except Exception as e:
            logger.error("Error processing URL %s: %s", url, str(e))
            return {schema.name: '' for schema in self.response_schemas}

    def process_urls(self, spreadsheet_id, input_range, output_range):
        logger.info("Starting URL processing")
        urls_df = self.read_urls_from_sheet(spreadsheet_id, input_range)
        if urls_df.empty:
            logger.info("No URLs to process. Exiting.")
            return
        data = []
        for index, url in enumerate(urls_df['Urls'], 1):
            logger.info("Processing URL %s/%s: %s", index, len(urls_df), url)
            parsed_info = self.process_url(url)
            row_data = [url] + [parsed_info.get(schema.name, '') for schema in self.response_schemas]
            data.append(row_data)
            logger.debug("Processed data for %s:", url)
            for schema, value in zip(self.response_schemas, row_data[1:]):
                if value is None:
                    logger.debug(" - %s: None", schema.name)
                elif isinstance(value, str) and len(value) > 50:
                    logger.debug(" - %s: %s...", schema.name, value[:50])
                else:
                    logger.debug(" - %s: %s", schema.name, value)
        headers = ['URL'] + [schema.name for schema in self.response_schemas]
        self.write_data_to_sheet(spreadsheet_id, output_range, [headers] + data)
        logger.info("URL processing completed")

    def run(self):
        spreadsheet_id = '1kBQgpQhzEaxL1R-FQtGVTSIgkLqZScyPrbaZ_mKewvo'
        input_sheet_name = 'urls_to_scrape'
        output_sheet_name = 'offres_immo'
        logger.info("Testing sheet access for spreadsheet ID: %s", spreadsheet_id)
        self.test_sheet_access(spreadsheet_id)
        input_range = self.get_used_range(spreadsheet_id, input_sheet_name)
        output_range = f"{output_sheet_name}!A1"
        logger.info("Starting main process with spreadsheet ID: %s", spreadsheet_id)
        logger.info("Input range: %s", input_range)
        logger.info("Output range: %s", output_range)
        self.process_urls(spreadsheet_id, input_range, output_range)
        logger.info("Main process completed")


In [None]:
processor = OffresImmoProcessor()
processor.run()

In [None]:
%%capture --no-stderr
pip install pylint radon pytest

In [None]:
!pylint --enable=design offres_immos.py

In [None]:
!radon cc offres_immos.py -a

In [None]:
%%writefile test_offres_immos.py

import pytest
import logging
from offres_immos import (
    is_real_image,
    extract_background_image_url,
    get_domain_name,
    fetch_webpage_content,
    scrapemyurl,
    extract_image_urls,
    has_sufficient_size
)
import requests
from unittest.mock import patch, MagicMock
from bs4 import BeautifulSoup
from io import BytesIO
from PIL import Image

# Setup logger for testing
logger = logging.getLogger("TestLogger")
logger.setLevel(logging.DEBUG)
if not logger.handlers:
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    ch.setFormatter(formatter)
    logger.addHandler(ch)

def test_is_real_image_valid():
    """
    Test that is_real_image returns True for a valid image URL.
    """
    result = is_real_image("https://example.com/image.jpg")
    assert result is True
    logger.info("test_is_real_image_valid passed: expected True and got %s", result)

def test_is_real_image_invalid():
    """
    Test that is_real_image returns False for an image URL containing excluded keywords.
    """
    result = is_real_image("https://example.com/logo.png")
    assert result is False
    logger.info("test_is_real_image_invalid passed: expected False and got %s", result)

def test_extract_background_image_url():
    """
    Test that extract_background_image_url correctly extracts a URL from a style string.
    """
    style = "background-image: url('https://example.com/bg.jpg');"
    result = extract_background_image_url(style)
    assert result == "https://example.com/bg.jpg"
    logger.info("test_extract_background_image_url passed: extracted URL is %s", result)

def test_extract_background_image_url_no_url():
    """
    Test that extract_background_image_url returns None if no URL is present in the style.
    """
    style = "background-color: red;"
    result = extract_background_image_url(style)
    assert result is None
    logger.info("test_extract_background_image_url_no_url passed: expected None and got %s", result)

def test_get_domain_name():
    """
    Test that get_domain_name correctly extracts the domain from a URL.
    """
    url = "http://sub.example.com/path"
    result = get_domain_name(url)
    assert result == "sub.example.com"
    logger.info("test_get_domain_name passed: extracted domain is %s", result)

@patch('offres_immos.requests.Session.get')
def test_fetch_webpage_content_success(mock_get):
    """
    Test that fetch_webpage_content successfully fetches and parses content from a webpage.
    """
    sample_html = "<html><body><div id='content'><p>Hello World!</p></div></body></html>"
    mock_response = MagicMock()
    mock_response.status_code = 200
    mock_response.content = sample_html.encode('utf-8')
    mock_get.return_value = mock_response

    content = fetch_webpage_content("http://example.com")
    # The cleaning process in fetch_webpage_content removes punctuation, so "Hello World" is expected.
    assert "Hello World" in content
    logger.info("test_fetch_webpage_content_success passed: 'Hello World' found in content")


@patch('offres_immos.requests.get')
def test_scrapemyurl(mock_get):
    """
    Test that scrapemyurl extracts image tags, divs with data-bg,
    and divs with background-image style correctly.
    """
    sample_html = "<html><body><img src='image1.jpg'/><div data-bg='bg_image.jpg'></div><div style='background-image: url(\"style_image.jpg\");'></div></body></html>"
    mock_response = MagicMock()
    mock_response.status_code = 200
    mock_response.content = sample_html.encode('utf-8')
    mock_get.return_value = mock_response

    img_tags, div_bg_tags, div_style_tags = scrapemyurl("http://example.com")
    assert len(img_tags) == 1
    assert len(div_bg_tags) == 1
    assert len(div_style_tags) == 1
    logger.info("test_scrapemyurl passed: found %d img tags, %d divs with data-bg, and %d divs with style",
                len(img_tags), len(div_bg_tags), len(div_style_tags))



def test_extract_image_urls():
    """
    Test that extract_image_urls correctly constructs full image URLs from img tags and divs with data-bg.
    """
    base_url = "http://example.com/"
    html = """
    <html>
      <body>
        <img src="image1.jpg" />
        <div data-bg="bg_image.jpg"></div>
      </body>
    </html>
    """
    soup = BeautifulSoup(html, 'html.parser')
    img_tags = soup.find_all('img')
    div_bg_tags = soup.find_all('div', attrs={'data-bg': True})

    image_urls = extract_image_urls(img_tags, base_url)
    image_urls += extract_image_urls(div_bg_tags, base_url)

    assert "http://example.com/image1.jpg" in image_urls
    assert "http://example.com/bg_image.jpg" in image_urls
    logger.info("test_extract_image_urls passed: extracted URLs are %s", image_urls)

@patch('offres_immos.requests.get')
def test_has_sufficient_size(mock_get):
    """
    Test that has_sufficient_size returns True for a dummy image with dimensions above the threshold.
    """
    # Create a dummy image (150x150 pixels)
    image = Image.new('RGB', (150, 150), color='red')
    buf = BytesIO()
    image.save(buf, format='JPEG')
    buf.seek(0)
    fake_image_content = buf.read()

    mock_response = MagicMock()
    mock_response.status_code = 200
    mock_response.content = fake_image_content
    mock_get.return_value = mock_response

    result = has_sufficient_size("http://example.com/dummy.jpg")
    assert result is True
    logger.info("test_has_sufficient_size passed: image size sufficient, returned %s", result)



In [None]:
!pytest --maxfail=1 --disable-warnings -q