In [None]:
# avito_scraper_spark.py

import os
import re
import csv
import json
import time
import random
from typing import List, Dict, Any
from urllib.parse import urljoin
from datetime import datetime

import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# =========================================================
# 1. ENV & SPARK CONFIG
# =========================================================

load_dotenv()

# üîí ADLS / Storage config
storage_account = "strealestatehamza"
container = "realestate"

adls_key = os.getenv("ADLS_ACCOUNT_KEY")
if not adls_key:
    raise RuntimeError("ADLS_ACCOUNT_KEY missing from .env")

spark = (
    SparkSession
    .builder
    .appName("RealEstate_Avito_Docker")
    .getOrCreate()
)

# Set ADLS account keys (DFS + BLOB)
spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
    adls_key,
)
spark.conf.set(
    f"fs.azure.account.key.{storage_account}.blob.core.windows.net",
    adls_key,
)

# =========================================================
# 2. CONSTANTS & BASIC UTILS
# =========================================================

BASE_URL = "https://www.avito.ma"

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0 Safari/537.36"
    )
}


def fetch_html(url: str) -> str:
    """T√©l√©charge le HTML d'une page Avito."""
    resp = requests.get(url, headers=HEADERS, timeout=20)
    resp.raise_for_status()
    return resp.text


def extract_id_from_url(url: str) -> str:
    """Extrait l'ID num√©rique √† la fin de l'URL Avito."""
    m = re.search(r"_([0-9]+)\.htm$", url.split("?")[0])
    return m.group(1) if m else ""


# =========================================================
# 3. BREADCRUMBS, CAT, LOCATION, DATE
# =========================================================

def get_breadcrumbs(soup: BeautifulSoup) -> Dict[str, Any]:
    """Retourne breadcrumbs_list et breadcrumbs (string)."""
    crumbs = []
    ol = soup.find("ol", class_=re.compile(r"sc-16q833i-0"))
    if ol:
        for li in ol.find_all("li", class_=re.compile(r"sc-16q833i-3")):
            span_or_a = li.find(["a", "span"])
            if span_or_a:
                text = span_or_a.get_text(strip=True)
                if text:
                    crumbs.append(text)

    return {
        "breadcrumbs_list": crumbs,
        "breadcrumbs": " > ".join(crumbs) if crumbs else "",
    }


def get_category_label(soup: BeautifulSoup) -> str:
    """
    R√©cup√®re le libell√© de cat√©gorie du bloc 'Categorie'
    (ex: 'Appartements, √† louer').
    """
    cat_section = soup.find("div", attrs={"aria-label": re.compile(r"Category ")})
    if not cat_section:
        return ""

    texts = cat_section.stripped_strings
    for t in texts:
        if "Categorie" in t:
            continue
        return t
    return ""


def get_location_and_date(soup: BeautifulSoup) -> Dict[str, str]:
    """
    R√©cup√®re location et published_date_raw.
    Ajoute √©galement scraping_time.
    """
    location = ""
    published_date_raw = ""

    # Date publication <time>
    time_tag = soup.find("time", attrs={"datetime": True})
    if time_tag:
        published_date_raw = time_tag["datetime"]

    # Location (ex: Racine, Casablanca)
    location_span = None
    for svg in soup.find_all("svg", title=re.compile(r"MapPinFill Icon")):
        parent = svg.parent
        location_span = parent.find("span", class_=re.compile(r"sc-16573058-17"))
        if location_span:
            break

    if location_span:
        location = location_span.get_text(strip=True)

    # Temps de scraping
    scraping_time = datetime.utcnow().isoformat()

    return {
        "location": location,
        "published_date": published_date_raw,
        "scraping_time": scraping_time,
    }


# =========================================================
# 4. TITLE, PRICE, DESCRIPTION, IMAGES
# =========================================================

def get_title_and_price(soup: BeautifulSoup) -> Dict[str, str]:
    """R√©cup√®re le titre (h1) et le prix."""
    title = ""
    price_text = ""

    h1 = soup.find("h1")
    if h1:
        title = h1.get_text(strip=True)

    price_block = soup.find("div", class_=re.compile(r"sc-16573058-10"))
    if price_block:
        p = price_block.find("p")
        if p:
            price_text = p.get_text(strip=True)
    else:
        p = soup.find(string=re.compile(r"DH"))
        if p:
            price_text = p.strip()

    return {
        "title": title,
        "price_text": price_text,
    }


def get_description(soup: BeautifulSoup) -> str:
    """R√©cup√®re la description depuis le bloc Description."""
    desc_container = None
    for div in soup.find_all("div", class_=re.compile(r"sc-b59a33d2-3")):
        h2 = div.find("h2")
        if h2 and "Description" in h2.get_text():
            desc_container = div
            break

    if desc_container:
        text_parts = []
        for node in desc_container.find_all(["p", "div", "span"], recursive=True):
            t = node.get_text(" ", strip=True)
            if t:
                text_parts.append(t)
        description = " ".join(text_parts)
        description = re.sub(r"\s+", " ", description).strip()
        return description

    # fallback
    p = soup.find("p")
    if p:
        return p.get_text(" ", strip=True)
    return ""


def get_images(soup: BeautifulSoup) -> List[str]:
    """R√©cup√®re toutes les URLs d'images de la galerie principale."""
    urls = []
    for img in soup.select("div.picture img"):
        src = img.get("src")
        if src and "content.avito.ma/classifieds/images" in src:
            if src not in urls:
                urls.append(src)
    return urls


# =========================================================
# 5. ATTRIBUTES, EQUIPMENTS, SELLER INFO
# =========================================================

def get_attributes_and_equipments(soup: BeautifulSoup) -> Dict[str, Any]:
    """
    R√©cup√®re :
      - attributes: dict { "Chambres": "3", "Salle de bain": "2", ... }
      - equipments: liste ["Ascenseur", "Balcon", ...]
    """
    attributes = {}
    equipments = []

    attr_blocks = soup.find_all("div", class_=re.compile(r"sc-cd1c365e-0"))
    for block in attr_blocks:
        parent = block.find_parent("div", class_=re.compile(r"sc-b59a33d2-3"))
        is_equip_section = False
        if parent:
            h2 = parent.find("h2")
            if h2 and "√âquipements" in h2.get_text():
                is_equip_section = True

        for item in block.find_all("div", class_=re.compile(r"sc-cd1c365e-1")):
            img = item.find("img", alt=True)
            label_from_alt = img["alt"].strip() if img else ""

            value_span = item.find("span", class_=re.compile(r"fjZBup"))
            value = value_span.get_text(strip=True) if value_span else ""

            if is_equip_section:
                if label_from_alt:
                    equipments.append(label_from_alt)
                elif value:
                    equipments.append(value)
            else:
                if label_from_alt and value:
                    attributes[label_from_alt] = value

    return {
        "attributes": attributes,
        "equipments": equipments,
    }


def get_seller_info(soup: BeautifulSoup) -> Dict[str, Any]:
    """R√©cup√®re seller_name, seller_url, seller_is_store."""
    seller_name = ""
    seller_url = ""
    seller_is_store = False

    seller_block = soup.find("div", attrs={"data-test": "av_sellerInfo"})
    if not seller_block:
        seller_block = soup.find("div", class_=re.compile(r"sc-1l0do2b-0"))

    if seller_block:
        a = seller_block.find("a", href=True)
        if a:
            seller_url = urljoin(BASE_URL, a["href"])
            name_tag = a.find("p") or a.find("span")
            if name_tag:
                seller_name = name_tag.get_text(strip=True)

        text_block = seller_block.get_text(" ", strip=True)
        if "Voir la boutique" in text_block:
            seller_is_store = True

        if seller_block.find("title", string=re.compile(r"Store Icon")):
            seller_is_store = True

    return {
        "seller_name": seller_name,
        "seller_url": seller_url,
        "seller_is_store": seller_is_store,
    }


# =========================================================
# 6. PARSER PRINCIPAL
# =========================================================

def parse_avito_ad(url: str) -> Dict[str, Any]:
    """Scrape une annonce Avito et retourne un dict."""
    html = fetch_html(url)
    soup = BeautifulSoup(html, "html.parser")

    ad_id = extract_id_from_url(url)

    crumbs = get_breadcrumbs(soup)
    cat_label = get_category_label(soup)
    loc_date = get_location_and_date(soup)
    title_price = get_title_and_price(soup)
    description = get_description(soup)
    images = get_images(soup)
    attrs_equip = get_attributes_and_equipments(soup)
    seller = get_seller_info(soup)

    ad_data: Dict[str, Any] = {
        "id": ad_id,
        "url": url,
        "title": title_price["title"],
        "price_text": title_price["price_text"],
        "location": loc_date["location"],
        "published_date": loc_date["published_date"],
        "scraping_time": loc_date["scraping_time"],
        "breadcrumbs_list": crumbs["breadcrumbs_list"],
        "breadcrumbs": crumbs["breadcrumbs"],
        "category_label": cat_label,
        "description": description,
        "images": images,
        "attributes": attrs_equip["attributes"],
        "equipments": attrs_equip["equipments"],
        "seller_name": seller["seller_name"],
        "seller_url": seller["seller_url"],
        "seller_is_store": seller["seller_is_store"],
    }

    return ad_data


# =========================================================
# 7. LISTING -> URLS (PAGINATION)
# =========================================================

def get_ad_urls_from_listing(listing_url: str, page: int = 1) -> List[str]:
    """
    R√©cup√®re toutes les URLs d'annonces d'une page listing Avito.
    `page` permet de naviguer via ?o=2, ?o=3, etc.
    """
    if page == 1:
        page_url = listing_url
    else:
        page_url = f"{listing_url}?o={page}"

    html = fetch_html(page_url)
    soup = BeautifulSoup(html, "html.parser")

    ad_urls = set()

    for a in soup.find_all("a", href=True):
        href = a["href"]

        if href.startswith("/"):
            href = urljoin(BASE_URL, href)

        if "avito.ma" not in href:
            continue

        if re.search(r"_[0-9]+\.htm$", href):
            clean = href.split("?")[0]
            ad_urls.add(clean)

    ad_urls = sorted(ad_urls)
    print(f"üåê Trouv√© {len(ad_urls)} annonces sur {page_url}")
    return ad_urls


# =========================================================
# 8. SCRAPER LISTING -> LISTE (MULTI PAGES)
# =========================================================

def scrape_listing_to_list(listing_url: str, max_pages: int = 50) -> List[Dict[str, Any]]:
    """
    Scrape jusqu'√† `max_pages` pages d'un listing Avito
    et retourne une liste de dicts.
    """
    all_ads: List[Dict[str, Any]] = []
    seen_urls = set()

    for page in range(1, max_pages + 1):
        print(f"\n========== PAGE {page}/{max_pages} ==========")
        try:
            ad_urls = get_ad_urls_from_listing(listing_url, page=page)
        except Exception as e:
            print(f"‚ùå Erreur lors du chargement de la page {page}: {e}")
            break

        if not ad_urls:
            print("Aucune annonce trouv√©e sur cette page, arr√™t du scraping.")
            break

        new_urls = [u for u in ad_urls if u not in seen_urls]
        if not new_urls:
            print("Toutes les annonces de cette page sont d√©j√† vues, arr√™t du scraping.")
            break

        seen_urls.update(new_urls)
        total = len(new_urls)

        for i, ad_url in enumerate(new_urls, 1):
            print(f"[Page {page}] [{i}/{total}] Scraping {ad_url}")
            try:
                ad_data = parse_avito_ad(ad_url)
                all_ads.append(ad_data)
            except Exception as e:
                print(f"  ‚ùå Erreur sur {ad_url}: {e}")

            time.sleep(random.uniform(1.0, 2.5))  # Respect du site

    print(f"\n‚úÖ Total annonces collect√©es depuis {listing_url}: {len(all_ads)}")
    return all_ads


# =========================================================
# 9. NORMALISATION POUR SPARK
# =========================================================

def normalize_ads_for_spark(ads: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Convertit les champs dict/list en strings pour Spark (OK aussi pour Parquet)."""
    normalized = []
    for ad in ads:
        d = ad.copy()
        d["attributes"] = json.dumps(d.get("attributes", {}), ensure_ascii=False)
        d["equipments"] = "; ".join(d.get("equipments", []) or [])
        d["breadcrumbs_list"] = json.dumps(
            d.get("breadcrumbs_list", []) or [],
            ensure_ascii=False
        )
        d["images"] = ", ".join(d.get("images", []) or [])
        normalized.append(d)
    return normalized


# =========================================================
# 10. MAIN WORKFLOW SPARK + ADLS (PARQUET)
# =========================================================

def main():
    # --- URLs des listings Avito ---
    ventes_url = "https://www.avito.ma/fr/maroc/ventes_immobilieres-√†_vendre"
    locations_url = "https://www.avito.ma/fr/maroc/locations_immobilieres-√†_louer"

    # --- Scraping (50 pages chacune) ---
    raw_ventes_ads = scrape_listing_to_list(ventes_url, max_pages=50)
    raw_locations_ads = scrape_listing_to_list(locations_url, max_pages=50)

    print(f"Ventes r√©cup√©r√©es: {len(raw_ventes_ads)}")
    print(f"Locations r√©cup√©r√©es: {len(raw_locations_ads)}")

    # --- Normalisation ---
    ventes_ads = normalize_ads_for_spark(raw_ventes_ads)
    locations_ads = normalize_ads_for_spark(raw_locations_ads)

    # --- DataFrames Spark ---
    ventes_df = spark.createDataFrame(ventes_ads) if ventes_ads else spark.createDataFrame([], schema=None)
    locations_df = spark.createDataFrame(locations_ads) if locations_ads else spark.createDataFrame([], schema=None)

    now_iso = datetime.utcnow().isoformat()

    if ventes_ads:
        ventes_df = (
            ventes_df
            .withColumn("source_site", F.lit("avito"))
            .withColumn("offre", F.lit("vente"))
            .withColumn("ingest_ts", F.lit(now_iso))
        )

    if locations_ads:
        locations_df = (
            locations_df
            .withColumn("source_site", F.lit("avito"))
            .withColumn("offre", F.lit("location"))
            .withColumn("ingest_ts", F.lit(now_iso))
        )

    if ventes_ads:
        print("üìå Sch√©ma ventes")
        ventes_df.printSchema()
    if locations_ads:
        print("üìå Sch√©ma locations")
        locations_df.printSchema()

    # --- ADLS paths ---
    date_path = datetime.utcnow().strftime("%Y/%m/%d/%H%M%S")
    base_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/raw"

    ventes_path = f"{base_path}/avito/ventes/{date_path}"
    locations_path = f"{base_path}/avito/locations/{date_path}"

    # --- √âcriture Parquet ---
    if ventes_ads:
        (
            ventes_df
            .coalesce(1)
            .write
            .mode("overwrite")
            .parquet(ventes_path)
        )
        print("‚úÖ Ventes Avito (PARQUET) ->", ventes_path)
    else:
        print("‚ö†Ô∏è Aucune vente √† √©crire.")

    if locations_ads:
        (
            locations_df
            .coalesce(1)
            .write
            .mode("overwrite")
            .parquet(locations_path)
        )
        print("‚úÖ Locations Avito (PARQUET) ->", locations_path)
    else:
        print("‚ö†Ô∏è Aucune location √† √©crire.")


if __name__ == "__main__":
    main()