# 3 - create_table_user_yt_from_wikipedia_api

Este notebook enriquece a base de criadores a partir das páginas da Wikipedia, realizando scraping controlado e distribuído da API oficial.  
O fluxo inclui:  
- leitura da tabela `default.creators_scrape_wiki` com os nomes das páginas;  
- chamadas paralelas à API da Wikipedia para coletar links externos de cada página;  
- aplicação de padrões (regex) para extrair o identificador de canal do YouTube (`user_id`);  
- escrita incremental na tabela **default.users_yt** via MERGE (upsert);  
- registro de falhas ou páginas sem link em uma tabela de erros (**default.users_yt_errors**) com documentação das causas.  

O resultado é um mapeamento atualizado e confiável de `wiki_page → user_id` para uso nas análises seguintes.  


Obs. a utilização de mapInPandas é
necessária devido ao compute disponível no Databricks Community (Serveless - Spark Connect) não suportar APIs de RDD

In [0]:

# Libraries and configuration
import re, time, random, logging
import requests
import pandas as pd
from typing import Iterator

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
from delta.tables import DeltaTable
from requests.adapters import HTTPAdapter

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# Widgets (parameterization)
dbutils.widgets.text("source_table", "default.creators_scrape_wiki", "Tabela de origem (wiki_page)")
dbutils.widgets.text("target_table", "default.users_yt", "Tabela destino (users_yt)")
dbutils.widgets.text("error_table", "default.users_yt_errors", "Tabela de erros")
dbutils.widgets.text("partitions", "6", "Nº de partições (paralelismo)")
dbutils.widgets.text("base_sleep", "0.4", "Pausa (s) entre requests")
dbutils.widgets.text("max_retries", "3", "Máx tentativas por request")
dbutils.widgets.text("timeout_s", "15", "Timeout por request (s)")

source_table   = dbutils.widgets.get("source_table").strip()
target_table   = dbutils.widgets.get("target_table").strip()
error_table    = dbutils.widgets.get("error_table").strip()
N_PARTITIONS   = int(dbutils.widgets.get("partitions"))
BASE_SLEEP     = float(dbutils.widgets.get("base_sleep"))
MAX_RETRIES    = int(dbutils.widgets.get("max_retries"))
REQUEST_TIMEOUT= int(dbutils.widgets.get("timeout_s"))

# in production switch to Databricks secrets
contact_email  = "edanniel.d@gmail.com"

WIKI_API   = "https://en.wikipedia.org/w/api.php"
USER_AGENT = f"databricks-challenge/1.0 (contato: {contact_email})"

# Extraction rules
YT_PATTERNS = [
    re.compile(r"https?://(?:www\.)?youtube\.com/user/([A-Za-z0-9_\-\.]+)", re.IGNORECASE),
    re.compile(r"https?://(?:www\.)?youtube\.com/@([A-Za-z0-9_\-\.]+)",    re.IGNORECASE),  
    re.compile(r"https?://(?:www\.)?youtube\.com/c/([A-Za-z0-9_\-\.]+)",   re.IGNORECASE),
    re.compile(r"https?://(?:www\.)?youtube\.com/channel/([A-Za-z0-9_\-]+)", re.IGNORECASE),
]

def extract_user_id_from_urls(urls: list[str]) -> str | None:
    if not urls:
        return None
    for rx in YT_PATTERNS:
        for u in urls:
            m = rx.search(str(u))
            if m:
                return m.group(1).strip().lower()
    return None

def fetch_external_links(session: requests.Session, page: str) -> list[str]:
    """Returns list of external links via action=parse&prop=externallinks."""
    params = {"action": "parse", "page": page, "prop": "externallinks", "format": "json", "redirects": 1}
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            r = session.get(WIKI_API, params=params, timeout=REQUEST_TIMEOUT)
            if r.status_code == 429:
                time.sleep(BASE_SLEEP * attempt + random.uniform(0, 0.5))
                continue
            r.raise_for_status()
            data = r.json()
            return data.get("parse", {}).get("externallinks", []) or []
        except requests.RequestException as e:
            logging.warning(f"[try {attempt}] falha em '{page}': {e}")
            if attempt == MAX_RETRIES:
                return []
            time.sleep(BASE_SLEEP * attempt + random.uniform(0, 0.5))
    return []

# mapInPandas: 1 HTTP session per partition (instead of pandas.apply)
result_schema = StructType([
    StructField("wiki_page", StringType(), True),
    StructField("user_id",   StringType(), True),
    StructField("error",     StringType(), True),
])

def fetch_yt_users_batch(pdfs: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    # prepare session with connection pool (reuse) per partition:
    session = requests.Session()
    session.headers.update({"User-Agent": USER_AGENT, "Accept-Encoding": "gzip"})
    # increase pool for internal session parallelism 
    session.mount("https://", HTTPAdapter(pool_connections=10, pool_maxsize=10))
    try:
        for pdf in pdfs:
            rows_out = []
            # explicit iteration
            for page in pdf["wiki_page"].astype(str).tolist():
                if not page or page.lower() in {"nan", "none"}:
                    rows_out.append({"wiki_page": None, "user_id": None, "error": "empty_page"})
                    continue
                urls = fetch_external_links(session, page)
                user_id = extract_user_id_from_urls(urls)
                err = None if user_id else "user_id_not_found"
                rows_out.append({"wiki_page": page, "user_id": user_id, "error": err})
                time.sleep(BASE_SLEEP)  # be polite with the API
            yield pd.DataFrame(rows_out)
    finally:
        session.close()

# Source and incremental
src_df = (
    spark.table(source_table)
         .select(F.trim(F.col("wiki_page")).alias("wiki_page"))
         .where(F.col("wiki_page").isNotNull() & (F.col("wiki_page") != ""))
         .distinct()
)

# incremental (anti-join)
try:
    if spark.catalog.tableExists(target_table):
        done = spark.table(target_table).select("wiki_page").distinct()
        src_df = src_df.join(done, "wiki_page", "left_anti")
except Exception as e:
    logging.info(f"Anti-join não aplicado, processando tudo. Motivo: {e}")

# check without isEmpty()
if src_df.limit(1).count() == 0:
    logging.info("Sem novas wiki_page para processar.")
    dbutils.notebook.exit("OK: nada novo para processar.")

# Distributed execution
users_df = (
    src_df.repartition(N_PARTITIONS)
          .mapInPandas(fetch_yt_users_batch, schema=result_schema)
)

ok_df = (
    users_df.filter(F.col("user_id").isNotNull())
            .select("wiki_page", "user_id")
            .dropDuplicates(["wiki_page"])
)
errors_df = (
    users_df.filter(F.col("user_id").isNull())
            .select("wiki_page", "error")
)

# Persistence (Delta)
# create tables if they don’t exist
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {target_table} (
  wiki_page STRING,
  user_id   STRING
) USING delta
""")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {error_table} (
  wiki_page STRING,
  error     STRING
) USING delta
""")

# MERGE
DeltaTable.forName(spark, target_table) \
    .alias("tgt").merge(
        source=ok_df.alias("src"),
        condition="tgt.wiki_page = src.wiki_page"
    ).whenMatchedUpdate(set={"user_id": "src.user_id"}) \
     .whenNotMatchedInsert(values={"wiki_page": "src.wiki_page", "user_id": "src.user_id"}) \
     .execute()

# erros (append)
if errors_df.limit(1).count() > 0:
    errors_df.write.format("delta").mode("append").saveAsTable(error_table)

# Metadata and comments
spark.sql(f"""
ALTER TABLE {target_table}
SET TBLPROPERTIES (
  'pipeline.step' = 'create_table_user_yt_from_wikipedia_api',
  'source.table'  = '{source_table}'
)
""")

spark.sql(f"COMMENT ON COLUMN {target_table}.wiki_page IS 'Nome da página Wikipedia do criador (ex: Felipe_Neto)'")
spark.sql(f"COMMENT ON COLUMN {target_table}.user_id IS 'UserID/Handle/Channel extraído do YouTube (ex: felipeneto, UC...)'")

spark.sql(f"COMMENT ON TABLE {target_table} IS 'Mapeamento wiki_page -> user_id (YouTube) por scraping da Wikipedia';")

# Documentation of the error table
spark.sql(f"""
  COMMENT ON TABLE {error_table}
  IS 'Registro de páginas da Wikipedia que não geraram user_id do YouTube durante o scraping (falhas ou ausência de link).'
""")

spark.sql(f"""
  COMMENT ON COLUMN {error_table}.wiki_page
  IS 'Nome da página na Wikipedia que foi tentada (ex.: Felipe_Neto).'
""")
spark.sql(f"""
  COMMENT ON COLUMN {error_table}.error
  IS 'Motivo da falha: empty_page | user_id_not_found | mensagem de exceção HTTP/parse.'
""")




# Validation
# display(spark.sql(f"""
# SELECT COUNT(*) AS total_rows, COUNT(user_id) AS with_user_id FROM {target_table}
# """))
