<a href="https://colab.research.google.com/github/NoraHK3/DataSciProject/blob/main/DataSets/Shawayahouse.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install required libraries (Playwright, requests, nest_asyncio, Pillow)
!pip -q install playwright==1.46.0 requests nest_asyncio pillow
# Install the Chromium browser for Playwright
!playwright install chromium

# ===== Imports =====
import os, re, csv, json, time, unicodedata, asyncio, requests
from datetime import datetime
from urllib.parse import urljoin, urlparse
import nest_asyncio
nest_asyncio.apply()   # allow nested asyncio loops (needed in Colab)

from urllib import robotparser              # for robots.txt checking
from playwright.async_api import async_playwright  # Playwright (async)
from PIL import Image                       # to process images
from io import BytesIO                      # to handle image bytes

# ---------- Config ----------
BASE_URL = "https://shawayahouse.my.taker.io"   # website root
MENU_ROOT = f"{BASE_URL}/menu?language=ar"      # Arabic menu root page
UA = ("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
      "(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36")  # fake browser agent
IMAGES_DIR = "images"                           # local folder for images
os.makedirs(IMAGES_DIR, exist_ok=True)          # create folder if not exists
# ----------------------------

# ---------- Utils ----------

# Regex to match Arabic diacritics and tatweel (elongation mark)
AR_DIACRITICS = re.compile(r"[\u064B-\u0652\u0670\u0640]")

def ar_clean(text):
    """Normalize Arabic text: remove diacritics, tatweel, symbols, extra spaces."""
    if not text: return ""
    t = unicodedata.normalize("NFKC", text)
    t = AR_DIACRITICS.sub("", t)                    # remove diacritics & tatweel
    t = re.sub(r"[^\w\s\u0600-\u06FF]", " ", t)     # keep only words & Arabic range
    t = re.sub(r"\s+", " ", t).strip()              # collapse multiple spaces
    return t

def slugify(text, maxlen=80):
    """Convert dish name to safe filename (Arabic/English)."""
    if not text: return "image"
    t = unicodedata.normalize("NFKD", text)
    t = "".join(ch for ch in t if not unicodedata.combining(ch))  # remove accents
    t = re.sub(r"[^0-9A-Za-z\u0600-\u06FF _.-]+", "", t)          # keep safe chars
    t = re.sub(r"\s+", "_", t.strip())                            # replace spaces with _
    return (t[:maxlen] or "image")

def robots_allows(url, user_agent="*"):
    """Check robots.txt to see if scraping is allowed."""
    rp = robotparser.RobotFileParser()
    rp.set_url(urljoin(BASE_URL, "/robots.txt"))
    try:
        rp.read()
    except:
        return True          # if can't read robots, allow by default
    return rp.can_fetch(user_agent, url)

def ensure_abs(url):
    """Make sure URL is absolute (prefix with BASE_URL if relative)."""
    return url if bool(urlparse(url).netloc) else urljoin(BASE_URL, url)

def download_image(img_url, name_hint, referer):
    """Download image, detect format, save to IMAGES_DIR, return local file path."""
    if not img_url: return ""
    img_url = ensure_abs(img_url)
    headers = {"User-Agent": UA, "Referer": referer}
    r = requests.get(img_url, headers=headers, timeout=60, stream=True)
    r.raise_for_status()
    data = r.content
    # detect extension by content
    try:
        im = Image.open(BytesIO(data))
        ext = (im.format or "JPEG").lower()
        if ext == "jpeg": ext = "jpg"
        if ext not in ("jpg","png","webp"): ext = "jpg"
    except Exception:
        ext = "jpg"
    fn = f"{slugify(name_hint)}.{ext}"
    out_path = os.path.join(IMAGES_DIR, fn)
    with open(out_path, "wb") as f:
        f.write(data)
    return out_path

# ---------- Classification ----------
# Regex rules (Arabic + English keywords) for dish categories
CLASS_MAP = [
    (r"(رز|ارز|كبسه|كبسة|برياني|مندي|mandi|kabsa|rice|biryani|جريش|قرصان|هريس)", "Rice"),
    (r"(سلطه|سلطة|تبوله|تبولة|فتوش|salad|tabbouleh|fattoush|كولسلو)", "Salad"),
    (r"(دجاج|مسحب|شوايه|شواية|chicken|tawook|shawaya|broast)", "Chicken"),
    (r"(لحم|بقري|كباب|برجر|burger|beef)", "Meat"),
    (r"(غنم|ضأن|lamb|mutton|حاشي|camel)", "Lamb"),
    (r"(سمك|هامور|fish|روبيان|shrimp|جمبري)", "Fish"),
    (r"(خبز|صامولي|عربي|pita|tamees|tortilla|saj|bread)", "Bread"),
    (r"(بطاطس|بطاطا|فرايز|fries)", "Fries"),
    (r"(صلصه|صلصة|صوص|ثوم|طحينه|طحينة|مايونيز|كاتشب|sauce|garlic|tahini|ketchup)", "Sauce"),
    # Desserts (includes Om Ali, Muhallabia)
    (r"(حلى|حلويات|كنافه|كنافة|لقيمات|بسبوسه|بسبوسة|معمول|dessert|kunafa|basbousa|maamoul|ام_علي|أم_علي|ام علي|مهلبيه|مهلبية)", "Dessert"),
    (r"(مكرونه|مكرونة|باستا|pasta)", "Pasta"),
    (r"(شاورما|shawarma|راب|wrap|سندوتش|سندويتش|ساندوتش|ساندويتش)", "Sandwich"),
    # Sides / Vegetables (bashamel, molokhia, mixed veg, moussaka)
    (r"(بشاميل|ملوخيه|ملوخية|خضار مشكل|خضار_مشكل|مصقعه|مصقعة|موزه|موزة)", "Sides"),
    # Drinks (water, laban/milk, soft drinks in many spellings)
    (r"(مويه|موية|ماء|مياه|معدنيه|معدنية|مياه معدنية|مياه_معدنيه|لبن|لَبَن|laban|milk|مشروبات غازيه|مشروبات غازية|بيبسي|pepsi|كوكاكولا|coca[- ]?cola|سفن اب|7up|sprite|سبرايت|fanta|فانتا|cola|coke)", "Drinks"),
]

# Section-based hints (helps classification when category name contains keywords)
SECTION_HINTS = [
    (r"(السلطات|سلطات|سلطه|سلطة)", "Salad"),
    (r"(الحلى|حلويات)", "Dessert"),
    (r"(السندوتشات|السندويتشات|الشاورما|ساندويتش|رابس?)", "Sandwich"),
    (r"(الاطباق الرئيسية|الرئيسية|الرز|الارز|المنسف|الكبسه)", "Rice"),
    (r"(المشويات|الشوي|grill|مشوي)", "Meat"),
    (r"(البطاطس|الفرايز)", "Fries"),
    (r"(الصوص|الصلصات)", "Sauce"),
]

def classify_food(name="", desc="", section_name=""):
    """Return category label(s) for a dish by matching regex on name/desc/section."""
    text = ar_clean(f"{name} {desc}")
    sec  = ar_clean(section_name)
    labels = set()
    # check section hints first
    for pat, lab in SECTION_HINTS:
        if re.search(pat, sec, flags=re.IGNORECASE):
            labels.add(lab)
    # check name + description
    for pat, lab in CLASS_MAP:
        if re.search(pat, text, flags=re.IGNORECASE):
            labels.add(lab)
    return " - ".join(labels) if labels else "Unclassified"

# ---------- Scraping helpers ----------

async def scroll_to_bottom(page, max_rounds=20, pause=800):
    """Scroll down repeatedly to trigger lazy loading."""
    last = 0
    for _ in range(max_rounds):
        await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
        await page.wait_for_timeout(pause)
        h = await page.evaluate("document.body.scrollHeight")
        if h == last: break
        last = h

async def get_all_category_links(context):
    """Extract all category links from the main menu page."""
    page = await context.new_page()
    await page.goto(MENU_ROOT, wait_until="domcontentloaded", timeout=60000)
    await scroll_to_bottom(page, max_rounds=10)
    anchors = await page.query_selector_all('a[href*="/menu/category/"]')
    links = set()
    for a in anchors:
        href = await a.get_attribute("href")
        if href and "/menu/category/" in href:
            # unify URL format & force Arabic language param
            links.add(ensure_abs(href.split("?")[0] + "?language=ar"))
    await page.close()
    return sorted(links)

async def scrape_category(context, category_url):
    """Scrape one category page: extract product names, images, dish URL, classify."""
    rows = []
    page = await context.new_page()
    await page.goto(category_url, wait_until="domcontentloaded", timeout=60000)
    await scroll_to_bottom(page)
    section_name = (await page.title()) or ""   # use section title for hints

    # product cards (most reliable: links to product pages)
    cards = await page.query_selector_all('a[href*="/menu/product"]')
    if not cards:
        # fallback selectors
        cards = await page.query_selector_all("a.menu-product, a.card, a[href*='product']")
    imgs = await page.query_selector_all("img")  # fallback: any img with alt/name

    # loop through product links
    for a in cards:
        name = (await a.get_attribute("title")) or (await a.inner_text() or "")
        name = ar_clean(name)

        # NEW: get product URL from the anchor
        href = await a.get_attribute("href")
        dish_url = ensure_abs(href) if href else category_url  # fallback to category page

        img_url = ""
        img = await a.query_selector("img")
        if not img:
            # fallback: find img in parent card
            img = await a.evaluate_handle(
                "el => el.closest('article,div,li,section') && el.closest('article,div,li,section').querySelector('img')"
            )
        if img:
            try:
                if hasattr(img, "get_attribute"):
                    for attr in ["src","data-src","data-original","data-lazy","data-srcset","srcset"]:
                        v = await img.get_attribute(attr)
                        if v and v.strip():
                            img_url = v.split()[0].strip(); break
            except:
                pass

        if name and img_url:
            rows.append({
                "name": name,
                "img_url": img_url,
                "url": dish_url,                 # keep dish URL
                "section": section_name,
                "classification": classify_food(name=name, section_name=section_name)
            })

    # fallback: use <img alt="..."> if nothing found
    if not rows and imgs:
        for img in imgs:
            alt = ar_clean((await img.get_attribute("alt") or ""))
            if not alt: continue
            src = ""
            for attr in ["src","data-src","data-original","data-lazy","data-srcset","srcset"]:
                v = await img.get_attribute(attr)
                if v and v.strip():
                    src = v.split()[0].strip(); break
            if alt and src:
                rows.append({
                    "name": alt,
                    "img_url": src,
                    "url": category_url,          # fallback to category page
                    "section": section_name,
                    "classification": classify_food(name=alt, section_name=section_name)
                })

    await page.close()
    return rows

async def run_all():
    """Run scraping for all categories."""
    if not robots_allows(MENU_ROOT, user_agent="*"):
        print("[robots] Not allowed. Stop.")
        return []

    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True, args=["--no-sandbox"])
        context = await browser.new_context(locale="ar-SA", user_agent=UA)

        # 1) collect category links
        cats = await get_all_category_links(context)
        print(f"[info] Categories found: {len(cats)}")
        for c in cats: print(" -", c)

        # 2) scrape each category
        all_rows = []
        for c in cats:
            try:
                rows = await scrape_category(context, c)
                print(f"[info] {c} -> {len(rows)} items")
                all_rows.extend(rows)
            except Exception as e:
                print("[warn] category failed:", c, e)

        await context.close()
        await browser.close()
    return all_rows

# ---------- Run ----------
rows = asyncio.get_event_loop().run_until_complete(run_all())
print("Total parsed rows:", len(rows))
print(rows[:5])

# ---------- Save to CSV/JSON ----------
final = []
scrape_date = datetime.now().strftime("%Y-%m-%d")
for r in rows:
    img_path = download_image(r["img_url"], r["name"] or "item", MENU_ROOT)
    final.append({
        "name": r["name"],
        "image_file": img_path,
        "classification": r["classification"],
        "scrape_date": scrape_date,
        "dish_url": r.get("url", "")   # NEW: include dish URL in outputs
    })

# Save to CSV (adds dish_url column)
with open("dishes.csv","w",newline="",encoding="utf-8") as f:
    w = csv.DictWriter(f, fieldnames=["name","image_file","classification","scrape_date","dish_url"])
    w.writeheader()
    w.writerows(final)

# Save to JSON
with open("dishes.json","w",encoding="utf-8") as f:
    json.dump(final, f, ensure_ascii=False, indent=2)

print(f"Saved {len(final)} items → dishes.csv & dishes.json  |  Images → {IMAGES_DIR}")

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m37.9/37.9 MB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m625.7/625.7 kB[0m [31m13.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Chromium 128.0.6613.18 (playwright build v1129)[2m from https://playwright.azureedge.net/builds/chromium/1129/chromium-linux.zip[22m
[1G162.8 MiB [] 0% 10.8s[0K[1G162.8 MiB [] 0% 29.7s[0K[1G162.8 MiB [] 0% 20.2s[0K[1G162.8 MiB [] 0% 11.8s[0K[1G162.8 MiB [] 0% 8.6s[0K[1G162.8 MiB [] 1% 7.6s[0K[1G162.8 MiB [] 1% 6.7s[0K[1G162.8 MiB [] 1% 6.0s[0K[1G162.8 MiB [] 2% 5.6s[0K[1G162.8 MiB [] 2% 5.8s[0K[1G162.8 MiB [] 3% 5.6s[0K[1G162.8 MiB [] 3% 5.9s[0K[1G162.8 MiB [] 3% 5.5s[0K[1G162.8 MiB [] 4% 5.3s[0K[1G162.8 MiB [] 4% 5.2s[0K[1G162.8 MiB [] 4% 5.0s[0K[1G162.8 MiB [] 5% 5.1s[0K[1G162.8 MiB [] 5% 5.0s[0K[1G162.8 MiB [] 5% 4.8s[0K[1G162.8 MiB [] 6% 4.6s[0K[1G162.8 MiB [] 7% 4.6s[0K[1G162.8 MiB [

In [None]:

import shutil, os, zipfile
from google.colab import files

IMAGES_DIR = "images"
ZIP_NAME = "images_backup.zip"

# يصنع ملف ZIP من مجلد الصور
shutil.make_archive("images_backup", "zip", IMAGES_DIR)

files.download(ZIP_NAME)
if os.path.exists("dishes.csv"): files.download("dishes.csv")
if os.path.exists("dishes.json"): files.download("dishes.json")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>