# Beyoung

In [5]:
import os, re, unicodedata, sys, csv, json
from pathlib import Path
from urllib.parse import urljoin, urlparse
from typing import List, Dict, Optional
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import requests
from bs4 import BeautifulSoup

sys.path.append(os.path.abspath("./../models"))

from skin import (SKIN_TYPE_CANONICAL_ORDER, SKIN_TYPE_SYNONYMS_PT)
from exclude import (EXCLUDE_KEYWORDS,)
from ingredient import (INGREDIENTES_VALIDOS,)
from benefits import (BENEFIT_SYNONYMS_PT, BENEFIT_CANONICAL_ORDER)
from category import (CATEGORY_CANONICAL_ORDER, CATEGORY_HINTS)


### Configurações Iniciais

In [6]:

BASE_URL = "https://www.beyoung.com.br"
BASE_SKINCARE_URL = "https://www.beyoung.com.br/collections/skincare"

CATEGORY_URLS = {
    "Limpeza":        "https://www.beyoung.com.br/collections/skincare-limpeza",
    "Tratamento":     "https://www.beyoung.com.br/collections/skincare-tratamento",
    "Hidratação":     "https://www.beyoung.com.br/collections/skincare-hidratacao",
    "Proteção Solar": "https://www.beyoung.com.br/collections/protecao-solar",
}

OUTPUT_JSON_PATH = "/home/usuario/Área de trabalho/Dados/Beyoung/beyoung_scraper.json"
IMAGES_DIR = Path("images")
IMAGES_DIR.mkdir(parents=True, exist_ok=True)

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
        "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
    ),
    "Accept-Language": "pt-BR,pt;q=0.9,en-US;q=0.8,en;q=0.7",
}

def build_session() -> requests.Session:
    s = requests.Session()
    s.headers.update(HEADERS)
    retry = Retry(
        total=5, backoff_factor=0.7,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET", "HEAD", "OPTIONS"]
    )
    s.mount("https://", HTTPAdapter(max_retries=retry))
    s.mount("http://", HTTPAdapter(max_retries=retry))
    return s

SESSION = build_session()

## Utilitários

### Funções auxiliares para normalização de texto, remoção de acentos, tokenização dos ingredientes, nomes de arquivos e formatação de preços.

In [None]:
def _strip_accents_lower(s: str) -> str:
    if not s:
        return ""
    s = "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
    s = s.lower()
    s = s.replace("-", " ")
    s = re.sub(r"[^\w\s]", " ", s)
    s = re.sub(r"\s+", " ", s).strip()
    return s

def sentence_case(s: str) -> str:
  
    if not s:
        return s
    s_low = s.lower()
    return s_low[0].upper() + s_low[1:] if len(s_low) > 1 else s_low.upper()

def slugify(name: str) -> str:
    base = _strip_accents_lower(name)
    base = re.sub(r"[^a-z0-9]+", "-", base).strip("-")
    base = re.sub(r"-{2,}", "-", base)
    return base or "produto"

def http_get(url: str, timeout: int = 30) -> Optional[bytes]:
    try:
        r = SESSION.get(url, timeout=timeout)
        if r.status_code != 200:
            return None
        low = r.text.lower()
        if any(x in low for x in ["please enable cookies", "attention required", "access denied"]):
            return None
        return r.content
    except Exception:
        return None

def soup_from_url(url: str) -> Optional[BeautifulSoup]:
    content = http_get(url)
    if not content:
        return None
    return BeautifulSoup(content, "html.parser")

def should_exclude(name: str) -> bool:
    n = _strip_accents_lower(name)
    for kw in EXCLUDE_KEYWORDS:
        if not kw:
            continue
        if _strip_accents_lower(kw) in n:
            return True
    return False

def normalize_price_text(txt: str) -> Optional[str]:
    if not txt: return None
    t = re.sub(r"\s+", " ", txt).replace("\xa0", " ").strip()
    m = re.search(r"(?:R\$)?\s*(\d{1,3}(?:\.\d{3})*,\d{2})", t)
    if not m:
        m = re.search(r"(?:R\$)?\s*(\d{1,3}(?:\.\d{3})*)\b(?!,)", t)
        if m:
            num = m.group(1).replace(".", "")
            return f"{float(num):.2f}"
    if m:
        num = m.group(1).replace(".", "").replace(",", ".")
        try:
            return f"{float(num):.2f}"
        except ValueError:
            return None
    return None

def safe_join_url(url: str) -> str:
    return "https:" + url if url and url.startswith("//") else url

def infer_img_ext(url: str) -> str:
    if not url:
        return ".jpg"
    path = urlparse(url).path
    ext = os.path.splitext(path)[1].lower()
    if ext in [".jpg", ".jpeg", ".png", ".webp"]:
        return ".jpg" if ext == ".jpeg" else ext
    return ".jpg"


## Características

In [8]:
def download_product_image(image_url: str, product_name: str) -> Optional[str]:
    if not image_url:
        return None
    
    try:
        image_request_headers = {
            **HEADERS,
            "Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8",
            "Referer": BASE_URL
        }
        
        response = SESSION.get(image_url, headers=image_request_headers, timeout=40)
        response.raise_for_status()
        
        file_extension = infer_img_ext(image_url)
        filename_base = slugify(product_name)
        destination_path = IMAGES_DIR / f"{filename_base}{file_extension}"
        
        counter = 1
        while destination_path.exists():
            destination_path = IMAGES_DIR / f"{filename_base}-{counter}{file_extension}"
            counter += 1
        
        destination_path.write_bytes(response.content)
        return destination_path.name
    except Exception as download_error:
        print(f"[download_product_image] Falha: {image_url} -> {download_error}")
        return None

normalized_benefit_synonyms = {
    canonical_benefit: [_strip_accents_lower(synonym) for synonym in synonym_list if synonym]
    for canonical_benefit, synonym_list in (BENEFIT_SYNONYMS_PT or {}).items()
}
benefit_canonical_order = {benefit: index for index, benefit in enumerate(BENEFIT_CANONICAL_ORDER or [])}

def standardize_product_benefits(raw_benefit_list: List[str]) -> str:
    if not raw_benefit_list:
        return ""
    
    identified_benefits = set()
    for benefit_text in raw_benefit_list:
        normalized_text = _strip_accents_lower(benefit_text)
        for canonical_benefit, pattern_list in normalized_benefit_synonyms.items():
            if any(pattern and pattern in normalized_text for pattern in pattern_list):
                identified_benefits.add(canonical_benefit)
    
    ordered_benefits = sorted(identified_benefits, key=lambda benefit: benefit_canonical_order.get(benefit, 999))
    return "; ".join(ordered_benefits)

normalized_ingredient_mapping = {_strip_accents_lower(ingredient): ingredient for ingredient in INGREDIENTES_VALIDOS}
normalized_ingredient_list = list(normalized_ingredient_mapping.keys())
ingredient_canonical_order = {ingredient: index for index, ingredient in enumerate(INGREDIENTES_VALIDOS)}

def collect_active_ingredients_section_text(html_soup: BeautifulSoup) -> str:
    heading_elements = html_soup.find_all(re.compile(r"h\d", re.I))
    
    for heading_element in heading_elements:
        heading_text = heading_element.get_text(" ", strip=True)
        if heading_text and _strip_accents_lower(heading_text) == "principais ativos":
            
            content_parts = []
            next_sibling = heading_element.find_next_sibling()
            
            while next_sibling and not re.match(r"h\d", next_sibling.name or "", re.I):
                sibling_text = next_sibling.get_text(" ", strip=True)
                if sibling_text:
                    content_parts.append(sibling_text)
                next_sibling = next_sibling.find_next_sibling()
            
            if content_parts:
                return " \n ".join(content_parts)

    metafield_blocks = [element.get_text(" \n ", strip=True) for element in html_soup.select(".metafield-multi_line_text_field")]
    if metafield_blocks:
        return " \n ".join(metafield_blocks)

    return html_soup.get_text(" ", strip=True)

def extract_ingredients_from_text(ingredient_text_block: str) -> List[str]:
    if not ingredient_text_block:
        return []
    
    normalized_block_text = _strip_accents_lower(ingredient_text_block)
    identified_ingredients = set()
    
    percentage_cleaned_text = re.sub(r"\b\d+[.,]?\d*\s*%\b", " ", normalized_block_text)

    for normalized_ingredient in normalized_ingredient_list:
        if normalized_ingredient and normalized_ingredient in percentage_cleaned_text:
            identified_ingredients.add(normalized_ingredient)

    ordered_ingredients = sorted(
        identified_ingredients, 
        key=lambda ingredient: ingredient_canonical_order.get(normalized_ingredient_mapping[ingredient], 9999)
    )
    return [normalized_ingredient_mapping[normalized_ingredient] for normalized_ingredient in ordered_ingredients]

def extract_active_ingredients(html_soup: BeautifulSoup) -> str:
    ingredients_text_block = collect_active_ingredients_section_text(html_soup)
    ingredient_items = extract_ingredients_from_text(ingredients_text_block)
    return "; ".join(ingredient_items)

def extract_category_product_links(category_url: str, category_name: str) -> List[Dict]:
    category_soup = soup_from_url(category_url)
    if not category_soup:
        return []

    discovered_products = []
    product_anchor_elements = category_soup.select('a[href*="/products/"]')
    
    for anchor_element in product_anchor_elements:
        href_attribute = anchor_element.get("href", "")
        if "/products/" not in href_attribute:
            continue
        
        absolute_product_url = urljoin(BASE_URL, href_attribute)
        product_name = anchor_element.get_text(" ", strip=True) or ""
        
        if not product_name:
            image_element = anchor_element.find("img", alt=True)
            if image_element and image_element.get("alt"):
                product_name = image_element["alt"].strip()
        
        if not product_name:
            url_slug = href_attribute.rstrip("/").split("/")[-1].replace("-", " ").title()
            product_name = url_slug

        if should_exclude(product_name):
            continue

        discovered_products.append({
            "url": absolute_product_url, 
            "categoria": category_name, 
            "nome_card": product_name
        })

    unique_products = []
    processed_urls = set()
    for product in discovered_products:
        if product["url"] not in processed_urls:
            processed_urls.add(product["url"])
            unique_products.append(product)
    
    return unique_products

def extract_product_name(html_soup: BeautifulSoup) -> Optional[str]:
    name_selectors = ["h1.product__title", "h1", "meta[property='og:title']", "title"]
    
    for selector in name_selectors:
        element = html_soup.select_one(selector)
        if element:
            if element.name == "meta":
                extracted_text = element.get("content", "")
            else:
                extracted_text = element.get_text(" ", strip=True)
            
            if extracted_text:
                return extracted_text.strip()
    
    return None

def extract_product_price(html_soup: BeautifulSoup) -> Optional[str]:
    price_selector_candidates = [
        ".price__container .price__current, .price__regular .price-item--regular",
        ".price__container .price-item--regular",
        ".product__price .price",
        ".f-price-item--current",
        ".price, .Price",
    ]
    
    for selector in price_selector_candidates:
        price_element = html_soup.select_one(selector)
        if price_element:
            formatted_price = normalize_price_text(price_element.get_text(" ", strip=True))
            if formatted_price:
                return formatted_price
    
    fallback_price = normalize_price_text(html_soup.get_text(" ", strip=True))
    return fallback_price

def extract_product_quantity(html_soup: BeautifulSoup) -> Optional[str]:
    size_element = html_soup.select_one('[data-selected-swatch-value="Tamanho"]')
    if size_element and size_element.get_text(strip=True):
        return size_element.get_text(strip=True)
    
    page_text = html_soup.get_text(" ", strip=True)
    quantity_match = re.search(r"\b\d+\,?\d*\s*(?:ml|g|mg|kg|l|oz)\b", page_text, re.I)
    if quantity_match:
        return quantity_match.group(0)
    
    return None

def extract_product_benefits(html_soup: BeautifulSoup) -> List[str]:
    extracted_benefits = []
    
    feature_elements = html_soup.select(".section__content .feature-list__items .feature-item__text")
    for feature_element in feature_elements:
        benefit_text = feature_element.get_text(" ", strip=True)
        if benefit_text and len(benefit_text) <= 200:
            extracted_benefits.append(benefit_text)
    
    if not extracted_benefits:
        list_item_elements = html_soup.select("li")
        for list_item in list_item_elements:
            item_text = list_item.get_text(" ", strip=True)
            if item_text and 3 <= len(item_text) <= 160:
                extracted_benefits.append(item_text)
    
    return extracted_benefits

def extract_product_image_url(html_soup: BeautifulSoup) -> Optional[str]:
    image_elements = html_soup.find_all("img")
    for image_element in image_elements:
        source_url = image_element.get("src") or ""
        if "cdn/shop/files" in source_url or "cdn.shopify.com" in source_url:
            return safe_join_url(source_url.strip())
    
    open_graph_meta = html_soup.find("meta", property="og:image")
    if open_graph_meta and open_graph_meta.get("content"):
        return safe_join_url(open_graph_meta["content"].strip())
    
    return None


## Execução

In [9]:

def execute_beyoung_scraper() -> List[Dict]:
    print("____________________________________________________________________________________________________________")
    print("WEB SCRAPING Beyoung...")
    
    discovered_product_links = []
    
    for category_name, category_url in CATEGORY_URLS.items():
        category_product_results = extract_category_product_links(category_url, category_name)
        print(f"Categoria '{category_name}': {len(category_product_results)} produtos encontrados")
        discovered_product_links.extend(category_product_results)
        time.sleep(0.6)

    unique_product_links = []
    processed_product_urls = set()
    for product_item in discovered_product_links:
        if product_item["url"] not in processed_product_urls:
            processed_product_urls.add(product_item["url"])
            unique_product_links.append(product_item)

    print(f"Total de produtos únicos para processar: {len(unique_product_links)}")
    successfully_extracted_products = []

    for current_product_index, product_item in enumerate(unique_product_links, 1):
        product_url = product_item["url"]
        product_category = product_item["categoria"]
        
        print(f" [{current_product_index}/{len(unique_product_links)}] {product_url}")
        product_page_soup = soup_from_url(product_url)
        
        if not product_page_soup:
            print("\n Falha ao abrir página.")
            continue

        raw_product_name = extract_product_name(product_page_soup) or product_item.get("nome_card") or ""
        if not raw_product_name:
            print("\n Nome não encontrado.")
            continue

        formatted_product_name = sentence_case(raw_product_name)

        if should_exclude(formatted_product_name):
            print(f"\nExcluído por keyword (models): {formatted_product_name}")
            continue

        extracted_product_price = extract_product_price(product_page_soup) or ""
        extracted_product_quantity = extract_product_quantity(product_page_soup) or ""
        raw_benefits_list = extract_product_benefits(product_page_soup)
        standardized_product_benefits = standardize_product_benefits(raw_benefits_list)

        extracted_active_ingredients = extract_active_ingredients(product_page_soup)  

        product_image_url = extract_product_image_url(product_page_soup)
        downloaded_image_filename = download_product_image(product_image_url, formatted_product_name) if product_image_url else None

        structured_product_data = {
            "marca": "beyoung",
            "nome": formatted_product_name,
            "subtitulo": None,
            "categoria": product_category,
            "quantidade": extracted_product_quantity or "",
            "preco": extracted_product_price or "",
            "beneficios": standardized_product_benefits or "",
            "ingredientes": extracted_active_ingredients or "",  
            "tipo_pele": "todos os tipos",        
            "imagem": downloaded_image_filename or "",
        }
        
        successfully_extracted_products.append(structured_product_data)
        print(f" \nOK: {formatted_product_name}")
        time.sleep(0.6)

    print(f"Total pós-filtro: {len(successfully_extracted_products)}")
    return successfully_extracted_products




## Saída

In [10]:
def save_data(products_data: List[Dict]):
    if not products_data:
        print("\nNenhum dado para salvar.")
        return

    cleaned_products = []
    for product in products_data:
        cleaned_products.append({
            "marca": product.get("marca"),
            "nome": product.get("nome"),
            "subtitulo": product.get("subtitulo"),
            "categoria": product.get("categoria"),
            "quantidade": product.get("quantidade"),
            "preco": product.get("preco"),
            "beneficios": product.get("beneficios"),
            "ingredientes": product.get("ingredientes"),
            "tipo_pele": product.get("tipo_pele"),
            "imagem": product.get("imagem"),
        })

    with open("beyoung_products.json", "w", encoding="utf-8") as json_file:
        json.dump(cleaned_products, json_file, ensure_ascii=False, indent=2)
    
    print(f"JSON: beyoung_products.json ({len(cleaned_products)} produtos)")

if __name__ == "__main__":
    try:
        scraped_products_data = execute_beyoung_scraper()
        save_data(scraped_products_data)
        print("____________________________________________________________________________________________________________")
        print(f"\nFim da execução! Produtos extraídos: {len(scraped_products_data)}")
        print(f"Imagens salvas em: {IMAGES_DIR.resolve()}")
        print("____________________________________________________________________________________________________________")

    except Exception as execution_error:
        print(f"\nERRO: {execution_error}")

____________________________________________________________________________________________________________
WEB SCRAPING Beyoung...
Categoria 'Limpeza': 15 produtos encontrados
Categoria 'Tratamento': 15 produtos encontrados
Categoria 'Hidratação': 11 produtos encontrados
Categoria 'Proteção Solar': 14 produtos encontrados
Total de produtos únicos para processar: 25
 [1/25] https://www.beyoung.com.br/products/agua-micelar-hidratante
 
OK: Água micelar hidratante
 [2/25] https://www.beyoung.com.br/products/gentle-cleanser
 
OK: Gel de limpeza facial suave
 [3/25] https://www.beyoung.com.br/products/exfoliant-cleanser
 
OK: Esfoliante facial smart peeling
 [4/25] https://www.beyoung.com.br/products/gel-de-limpeza-facial-com-acido-glicolico-controle-de-oleosidade
 
OK: Gel de limpeza facial com ácido glicólico (controle de oleosidade)
 [5/25] https://www.beyoung.com.br/products/micellar-cleanser-wipes
 
OK: Lenço umedecido para limpeza facial
 [6/25] https://www.beyoung.com.br/products/c

## Conversão JSON para CSV

In [6]:
def json_to_csv(json_file="beyoung_products.json", csv_file="beyoung_products.csv"):

    try:
        with open(json_file, "r", encoding="utf-8") as f:
            data = json.load(f)
        
        if not data:
            print(f"Nenhum dado encontrado no arquivo {json_file}")
            return
        
        cols = ["marca", "nome", "subtitulo", "categoria", "quantidade", "preco", "beneficios", "ingredientes", "tipo_pele", "imagem"]
        
        with open(csv_file, "w", encoding="utf-8", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=cols)
            writer.writeheader()
            for row in data:

                csv_row = {k: (row.get(k) or "") for k in cols}
                writer.writerow(csv_row)
        
        print(f"CSV gerado: {csv_file} ({len(data)} linhas)")
        print(f"A partir do JSON: {json_file}")
        
    except FileNotFoundError:
        print(f" Arquivo {json_file} não encontrado!")
        
        import glob
        json_files = glob.glob("*.json")
        if json_files:
            for f in json_files:
                print(f"   - {f}")
        else:
            print("   Nenhum arquivo .json encontrado")
    except Exception as e:
        print(f"Erro ao converter JSON para CSV: {e}")


json_to_csv("beyoung_products.json")  

CSV gerado: beyoung_products.csv (16 linhas)
A partir do JSON: beyoung_products.json
