In [0]:
# ===== CO5173 — Cell 1 (Config & Helpers) =====
from pyspark.sql import functions as F
import datetime as dt, time, json, requests

# ------------------------------------------------
# SCHEMAS
# ------------------------------------------------
spark.sql("CREATE SCHEMA IF NOT EXISTS raw")
spark.sql("CREATE SCHEMA IF NOT EXISTS silver")
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

# ------------------------------------------------
# WIDGETS
# ------------------------------------------------
try:
    dbutils.widgets.removeAll()
except:
    pass

dbutils.widgets.text("RUN_LABEL", "run1")  # label phân biệt mỗi lần chạy
dbutils.widgets.dropdown("RUN_MODE", "TODAY", ["TODAY", "BACKFILL"])
dbutils.widgets.text("FROM_DT", "")
dbutils.widgets.text("TO_DT", "")

dbutils.widgets.text("HASHTAGS", "tiktokvn,marketing,coding")
dbutils.widgets.text("MAX_ITEMS", "300")
dbutils.widgets.text("RAW_PATH", "dbfs:/FileStore/tiktok/hashtag-runs")

# Quan trọng: cho phép nhập:
#  - premium_dumpling~tt-trend-vn-burst  (chuẩn)
#  - premium_dumpling/tt-trend-vn-burst  (tự convert)
dbutils.widgets.text("APIFY_TASK_REF", "premium_dumpling~tt-trend-vn-burst")

# Secret/token
dbutils.widgets.text("APIFY_TOKEN_SCOPE", "apify")
dbutils.widgets.text("APIFY_TOKEN_KEY", "APIFY_TOKEN")
dbutils.widgets.text("APIFY_TOKEN", "")  # fallback nếu không dùng secret scope

# Widget này giữ lại cho linh hoạt, nhưng Cell 2 sẽ KHÔNG dùng để skip
dbutils.widgets.dropdown("SKIP_IF_DT_EXISTS", "false", ["true","false"])

# ------------------------------------------------
# ĐỌC GIÁ TRỊ
# ------------------------------------------------
RUN_MODE    = dbutils.widgets.get("RUN_MODE").upper()
FROM_DT     = dbutils.widgets.get("FROM_DT").strip()
TO_DT       = dbutils.widgets.get("TO_DT").strip()
RUN_LABEL   = (dbutils.widgets.get("RUN_LABEL") or "run1").strip()

HASHTAGS    = [h.strip() for h in dbutils.widgets.get("HASHTAGS").split(",") if h.strip()]
MAX_ITEMS   = int(dbutils.widgets.get("MAX_ITEMS"))
RAW_PATH    = dbutils.widgets.get("RAW_PATH").rstrip("/")

APIFY_TASK_REF_RAW = dbutils.widgets.get("APIFY_TASK_REF").strip()

TOKEN_SCOPE = dbutils.widgets.get("APIFY_TOKEN_SCOPE").strip()
TOKEN_KEY   = dbutils.widgets.get("APIFY_TOKEN_KEY").strip()
TOKEN_FB    = dbutils.widgets.get("APIFY_TOKEN").strip()
SKIP_IF_DT_EXISTS = dbutils.widgets.get("SKIP_IF_DT_EXISTS").lower() == "true"

# ------------------------------------------------
# NORMALIZE TASK REF
# ------------------------------------------------
def normalize_task_ref(ref: str) -> str:
    """
    Chuẩn hoá APIFY_TASK_REF:
      - Chấp nhận 'owner/name' hoặc 'owner~name'
      - BẮT BUỘC phải có owner.
      - Nếu chỉ nhập 'tt-trend-vn-burst' -> raise lỗi rõ ràng.
    """
    if not ref:
        raise RuntimeError("APIFY_TASK_REF chưa được set. Ví dụ: premium_dumpling~tt-trend-vn-burst")

    # Cho phép dùng '/', convert sang '~'
    ref = ref.replace("/", "~")

    if "~" not in ref:
        raise RuntimeError(
            f"APIFY_TASK_REF='{ref}' thiếu owner. "
            f"Vui lòng dùng dạng 'premium_dumpling~tt-trend-vn-burst'."
        )

    return ref

APIFY_TASK = normalize_task_ref(APIFY_TASK_REF_RAW)

# ------------------------------------------------
# DATE LIST
# ------------------------------------------------
def _date_list(mode, f, t):
    if mode == "BACKFILL" and f and t:
        s = dt.datetime.strptime(f, "%Y-%m-%d").date()
        e = dt.datetime.strptime(t, "%Y-%m-%d").date()
        out = []
        cur = s
        while cur <= e:
            out.append(cur.strftime("%Y-%m-%d"))
            cur += dt.timedelta(days=1)
        return out

    # TODAY
    return [dt.date.today().strftime("%Y-%m-%d")]

dates = _date_list(RUN_MODE, FROM_DT, TO_DT)

# ------------------------------------------------
# HELPERS
# ------------------------------------------------
def run_many(sql_block: str):
    for stmt in [s for s in sql_block.split(";") if s.strip()]:
        spark.sql(stmt)

def get_apify_token() -> str:
    # Ưu tiên secret scope
    try:
        if TOKEN_SCOPE and TOKEN_KEY:
            return dbutils.secrets.get(TOKEN_SCOPE, TOKEN_KEY)
    except Exception:
        pass

    if TOKEN_FB:
        return TOKEN_FB

    raise RuntimeError("Thiếu APIFY_TOKEN (secret apify/APIFY_TOKEN hoặc widget APIFY_TOKEN).")

APIFY_TOKEN = get_apify_token()

def apify_request(method, url, params=None, json_body=None,
                  max_retries=6, timeout=60):
    """
    Wrapper gọi Apify:
      - Tự gắn ?token= nếu thiếu.
      - Retry cho lỗi tạm thời.
      - Trả về JSON / list từ JSONL nếu có thể.
    """
    if params is None:
        params = {}

    # Gắn token nếu chưa có
    if "token" not in params and "token=" not in url:
        params["token"] = APIFY_TOKEN

    last_err = None

    for i in range(max_retries):
        try:
            resp = requests.request(
                method,
                url,
                params=params,
                json=json_body,
                timeout=timeout,
            )

            # Retry nếu 5xx
            if resp.status_code >= 500:
                raise RuntimeError(f"5xx {resp.status_code}: {resp.text[:200]}")
            resp.raise_for_status()

            ctype = (resp.headers.get("content-type") or "").lower()

            # JSON chuẩn
            if "application/json" in ctype:
                return resp.json()

            # JSON stream / JSONL
            if "application/x-json-stream" in ctype:
                lines = [ln for ln in resp.text.splitlines() if ln.strip()]
                return [json.loads(ln) for ln in lines]

            # Fallback: thử parse JSON, không được thì trả text
            try:
                return resp.json()
            except Exception:
                return resp.text

        except Exception as ex:
            last_err = ex
            if i == max_retries - 1:
                raise RuntimeError(f"Apify request failed after {max_retries} attempts: {ex}")
            time.sleep(min(2**i, 30))

    raise last_err or RuntimeError("Unknown Apify error")

print("Cell1 OK:", {
    "dates": dates,
    "APIFY_TASK": APIFY_TASK,
    "RAW_PATH": RAW_PATH,
    "RUN_LABEL": RUN_LABEL,
})


Cell1 OK: {'dates': ['2025-11-10'], 'APIFY_TASK': 'premium_dumpling~tt-trend-vn-burst', 'RAW_PATH': 'dbfs:/FileStore/tiktok/hashtag-runs', 'RUN_LABEL': '120'}


In [0]:
# ===== CO5173 — Cell 2 (Ingest RAW from Apify Task) =====
# Logic:
# - Với mỗi dt trong `dates` (từ Cell1)
# - Gọi Actor Task `APIFY_TASK`:
#       /v2/actor-tasks/{APIFY_TASK}/runs?...
# - Lấy run SUCCEEDED mới nhất
# - Lấy dataset items
# - Ghi JSONL vào RAW_PATH/dt=YYYY-MM-DD/..., luôn overwrite

import json

if "APIFY_TASK" not in locals():
    raise RuntimeError("Chưa chạy Cell 1 (APIFY_TASK không tồn tại).")

def fetch_latest_succeeded_from_task(task_ref: str):
    """
    Lấy run SUCCEEDED gần nhất + items dataset tương ứng.
    """
    runs = apify_request(
        "GET",
        f"https://api.apify.com/v2/actor-tasks/{task_ref}/runs",
        params={"limit": 10, "desc": 1}
    )

    data = runs.get("data", {})
    items = data.get("items", data if isinstance(data, list) else [])

    if not isinstance(items, list):
        raise RuntimeError(f"Response /runs không đúng format: {runs}")

    chosen = None
    for it in items:
        if it.get("status") == "SUCCEEDED":
            chosen = it
            break

    if not chosen:
        raise RuntimeError(f"Không tìm thấy run SUCCEEDED cho task {task_ref}.")

    run_id = chosen.get("id")
    ds_id  = chosen.get("defaultDatasetId")

    if not ds_id:
        # Không có dataset: trả rỗng nhưng vẫn log run_id
        return run_id, []

    ds_items = apify_request(
        "GET",
        f"https://api.apify.com/v2/datasets/{ds_id}/items",
        params={"clean": "true"}
    )

    if isinstance(ds_items, dict):
        ds_items = ds_items.get("items", [])
    elif not isinstance(ds_items, list):
        raise RuntimeError(f"Dataset items không phải list/dict: {type(ds_items)}")

    return run_id, ds_items

ingest_results = []

for d in dates:
    out_dir = f"{RAW_PATH}/dt={d}"
    dbutils.fs.mkdirs(out_dir)

    try:
        run_id, items = fetch_latest_succeeded_from_task(APIFY_TASK)

        # Mỗi lần chạy: file mới (ghi đè), có RUN_LABEL để trace
        fname = f"task_{APIFY_TASK.replace('~','_')}_run_{run_id or 'NA'}_{RUN_LABEL}.json"
        out_path = f"{out_dir}/{fname}"

        body = "\n".join(json.dumps(x, ensure_ascii=False) for x in (items or []))
        dbutils.fs.put(out_path, body, overwrite=True)

        ingest_results.append({
            "dt": d,
            "status": "SUCCEEDED(FETCHED)",
            "run_id": run_id,
            "count": len(items or []),
            "out_path": out_path,
        })

    except Exception as ex:
        ingest_results.append({
            "dt": d,
            "status": "ERROR",
            "error": str(ex),
        })

print("Ingest summary:", ingest_results)


Wrote 75923 bytes.
Ingest summary: [{'dt': '2025-11-10', 'status': 'SUCCEEDED(FETCHED)', 'run_id': 'y7pON9jc35IUSyJzo', 'count': 100, 'out_path': 'dbfs:/FileStore/tiktok/hashtag-runs/dt=2025-11-10/task_premium_dumpling_tt-trend-vn-burst_run_y7pON9jc35IUSyJzo_120.json'}]


In [0]:
# === Cell A ===
# Dùng ngày mới nhất bạn vừa ghi file; nếu đã có mảng `dates` thì lấy max(dates)
TARGET_DT = max(dates) if 'dates' in locals() and dates else '2025-11-10'
RAW_DIR = RAW_PATH  # bạn đã set ở Cell1
BRONZE_TABLE = "raw.bronze_tiktok_raw"

print("TARGET_DT =", TARGET_DT)
print("RAW_DIR   =", RAW_DIR)
print("BRONZE    =", BRONZE_TABLE)


TARGET_DT = 2025-11-10
RAW_DIR   = dbfs:/FileStore/tiktok/hashtag-runs
BRONZE    = raw.bronze_tiktok_raw


In [0]:
# === Cell B (JSONL-safe) ===
from pyspark.sql import functions as F, types as T

src_path = f"{RAW_DIR}/dt={TARGET_DT}/*.json"

# 1) Đọc đúng kiểu JSON Lines: multiLine=False
df_raw = (spark.read
          .option("multiLine", False)            # quan trọng cho JSONL
          .option("mode", "PERMISSIVE")          # giữ lỗi vào _corrupt_record
          .json(src_path))

print("Raw rows read:", df_raw.count())
print("Has _corrupt_record?", "_corrupt_record" in df_raw.columns)

# 2) Nếu payload có dạng {"items": [...]}, thì explode ra
if "items" in df_raw.columns and isinstance(df_raw.schema["items"].dataType, T.ArrayType):
    df_raw = df_raw.select(F.explode("items").alias("root")).select("root.*")
    print("Exploded items. Rows now:", df_raw.count())

# 3) Kiểm tra xem còn bản ghi lỗi không
if "_corrupt_record" in df_raw.columns:
    bad = df_raw.filter(F.col("_corrupt_record").isNotNull())
    n_bad = bad.count()
    if n_bad > 0:
        print(f"WARNING: corrupt rows = {n_bad}")
        # nếu muốn loại bỏ bản ghi lỗi:
        df_raw = df_raw.filter(F.col("_corrupt_record").isNull()).drop("_corrupt_record")

# 4) Thêm dt từ file path
df_raw = (df_raw
          .withColumn("_file_path", F.input_file_name())
          .withColumn("dt", F.to_date(
              F.regexp_extract("_file_path", r"dt=([0-9]{4}-[0-9]{2}-[0-9]{2})", 1)
          )))

# 5) Chuẩn hoá schema đúng với bảng Bronze
target_df = spark.table(BRONZE_TABLE)
target_schema = target_df.schema

def cast_to_target(df_src, target_schema: T.StructType):
    for field in target_schema.fields:
        name = field.name
        tpe  = field.dataType
        if name in df_src.columns:
            df_src = df_src.withColumn(name, F.col(name).cast(tpe))
        else:
            df_src = df_src.withColumn(name, F.lit(None).cast(tpe))
    return df_src.select([f.name for f in target_schema.fields])

df_norm = cast_to_target(df_raw, target_schema)

print("Rows after normalize:", df_norm.count())

# 6) Append vào Bronze
(df_norm.write
    .mode("append")
    .option("mergeSchema", "false")
    .saveAsTable(BRONZE_TABLE))

# 7) Kiểm tra đã vào chưa
cnt_new = spark.sql(f"""
SELECT COUNT(*) AS c
FROM {BRONZE_TABLE}
WHERE dt = to_date('{TARGET_DT}')
""").first().c
print(f"Ingested rows for {TARGET_DT} =", cnt_new)


Raw rows read: 100
Has _corrupt_record? False
Rows after normalize: 100
Ingested rows for 2025-11-10 = 102


In [0]:
# === Cell C ===
display(spark.sql(f"""
SELECT dt, COUNT(*) AS c
FROM {BRONZE_TABLE}
GROUP BY dt
ORDER BY dt DESC
"""))

mx_bronze = spark.sql(f"SELECT MAX(dt) AS mx FROM {BRONZE_TABLE}").first().mx
print("MAX(dt) in bronze =", mx_bronze)

# (Fallback hiếm khi cần) ép dt từ path
if str(mx_bronze) != str(TARGET_DT):
    print("Fallback: updating dt from _metadata.file_path ...")
    spark.sql(f"""
    UPDATE {BRONZE_TABLE}
    SET dt = to_date(regexp_extract(_metadata.file_path, 'dt=([0-9]{{4}}-[0-9]{{2}}-[0-9]{{2}})', 1))
    WHERE _metadata.file_path LIKE '%/dt=%';
    """)
    mx_bronze = spark.sql(f"SELECT MAX(dt) AS mx FROM {BRONZE_TABLE}").first().mx
    print("After fallback, MAX(dt) in bronze =", mx_bronze)


dt,c
2025-11-10,102
2025-11-02,100


MAX(dt) in bronze = 2025-11-10


In [0]:
# ==== PATCH: sửa dt NULL + rebuild Silver/Gold ====
from pyspark.sql import functions as F

# 1) Backfill dt trong Bronze từ path _source_file: .../dt=YYYY-MM-DD/...
run_many("""
UPDATE raw.bronze_tiktok_raw
SET dt = to_date(substring_index(substring_index(_source_file, 'dt=', -1), '/', 1))
WHERE dt IS NULL;
""")

# 2) Rebuild Silver (bản an toàn)
run_many("""
CREATE SCHEMA IF NOT EXISTS silver;

CREATE OR REPLACE TABLE silver.silver_trend_base AS
SELECT
  t_id,
  lower(regexp_replace(
    CASE
      WHEN `name` IS NOT NULL AND length(trim(`name`))>0 THEN `name`
      WHEN `id`   IS NOT NULL AND length(trim(`id`))>0   THEN `id`
      WHEN `url`  IS NOT NULL THEN regexp_extract(`url`, '#([A-Za-z0-9_]+)', 1)
      ELSE ''
    END
  , '[^0-9a-zA-Z_]', ''))                                    AS hashtag,
  `name`                                                     AS hashtag_raw,
  `countryCode`                                              AS country_code,
  `industryName`                                             AS industry,
  `type`                                                     AS category,
  `url`,
  CAST(`rank` AS INT)                                        AS rank,
  CAST(`rankDiff` AS INT)                                    AS rank_diff,
  CAST(`videoCount` AS BIGINT)                               AS video_count,
  CAST(`viewCount` AS BIGINT)                                AS view_count,
  CAST(`isPromoted` AS BOOLEAN)                              AS is_promoted,
  CAST(`markedAsNew` AS BOOLEAN)                             AS is_new,
  CAST(`trendingHistogram` AS STRING)                        AS trending_hist_json,
  CAST(`relatedCreators` AS STRING)                          AS related_creators_json,
  dt
FROM raw.bronze_tiktok_raw
WHERE `name` IS NOT NULL OR `id` IS NOT NULL OR `url` IS NOT NULL;

CREATE OR REPLACE TABLE silver.silver_trend_with_quality AS
SELECT
  *,
  array_remove(array(
    CASE WHEN rank IS NOT NULL AND rank < 1 THEN 'rank_invalid' END,
    CASE WHEN video_count < 0 THEN 'video_neg' END,
    CASE WHEN view_count  < 0 THEN 'view_neg'  END
  ), NULL) AS quality_issues
FROM silver.silver_trend_base;

CREATE OR REPLACE TABLE silver.silver_trend_quarantine AS
SELECT * FROM silver.silver_trend_with_quality
WHERE size(quality_issues) > 0;

CREATE OR REPLACE TABLE silver.silver_trend AS
SELECT t_id, hashtag, hashtag_raw, country_code, industry, category, url,
       rank, rank_diff, video_count, view_count, is_promoted, is_new,
       trending_hist_json, related_creators_json, dt
FROM silver.silver_trend_with_quality
WHERE hashtag IS NOT NULL AND length(trim(hashtag)) > 0;
""")

# 3) Rebuild Gold (robust với rank NULL)
run_many("""
CREATE SCHEMA IF NOT EXISTS gold;

CREATE OR REPLACE TABLE gold.trend_by_day_topk AS
SELECT *
FROM (
  SELECT s.*,
         row_number() OVER (
           PARTITION BY dt
           ORDER BY coalesce(rank, 2147483647), view_count DESC
         ) AS rn
  FROM silver.silver_trend s
)
WHERE rn <= 100;

CREATE OR REPLACE TABLE gold.trend_latest_top100 AS
WITH mx AS (SELECT MAX(dt) AS max_dt FROM silver.silver_trend)
SELECT *
FROM (
  SELECT s.*,
         row_number() OVER (
           ORDER BY coalesce(rank, 2147483647), view_count DESC
         ) AS rn
  FROM silver.silver_trend s
  WHERE s.dt = (SELECT max_dt FROM mx)
)
WHERE rn <= 100;

CREATE OR REPLACE TABLE gold.trend_country_summary AS
SELECT country_code, dt,
       COUNT(*) AS hashtag_cnt,
       AVG(rank) AS avg_rank,
       SUM(CASE WHEN is_promoted THEN 1 ELSE 0 END) AS promoted_cnt
FROM silver.silver_trend
GROUP BY country_code, dt;
""")

# 4) Kiểm tra nhanh
print("bronze:", spark.table("raw.bronze_tiktok_raw").count())
print("silver_ok:", spark.table("silver.silver_trend").count())
print("quarantine:", spark.table("silver.silver_trend_quarantine").count())
for t in ["gold.trend_by_day_topk","gold.trend_latest_top100","gold.trend_country_summary"]:
    print(t, "=", spark.table(t).count())

# (tuỳ chọn) xem max dt
mx = spark.sql("SELECT MAX(dt) AS mx FROM silver.silver_trend").first().mx
print("MAX(dt) in silver:", mx)


bronze: 202
silver_ok: 201
quarantine: 0
gold.trend_by_day_topk = 200
gold.trend_latest_top100 = 100
gold.trend_country_summary = 2
MAX(dt) in silver: 2025-11-10


In [0]:
# 1) Kiểm tra file RAW mới có chưa
display(dbutils.fs.ls(f"{RAW_PATH}/dt=2025-11-10"))

# 2) Check dt trong bronze
spark.sql("""
SELECT dt, COUNT(*) c
FROM raw.bronze_tiktok_raw
GROUP BY dt
ORDER BY dt DESC
""").show(50, False)

# 3) Check dt trong silver
spark.sql("""
SELECT dt, COUNT(*) c
FROM silver.silver_trend
GROUP BY dt
ORDER BY dt DESC
""").show(50, False)


path,name,size,modificationTime
dbfs:/FileStore/tiktok/hashtag-runs/dt=2025-11-10/task_premium_dumpling_tt-trend-vn-burst_run_y7pON9jc35IUSyJzo_120.json,task_premium_dumpling_tt-trend-vn-burst_run_y7pON9jc35IUSyJzo_120.json,75923,1762803979000


+----------+---+
|dt        |c  |
+----------+---+
|2025-11-02|100|
+----------+---+

+----------+---+
|dt        |c  |
+----------+---+
|2025-11-02|100|
+----------+---+



In [0]:
# Cell 4 — Silver + Quarantine (bản an toàn, không lọc theo type)
run_many("""
CREATE SCHEMA IF NOT EXISTS silver;

CREATE OR REPLACE TABLE silver.silver_trend_base AS
SELECT
  t_id,
  lower(regexp_replace(
    CASE
      WHEN `name` IS NOT NULL AND length(trim(`name`))>0 THEN `name`
      WHEN `id`   IS NOT NULL AND length(trim(`id`))>0   THEN `id`
      WHEN `url`  IS NOT NULL THEN regexp_extract(`url`, '#([A-Za-z0-9_]+)', 1)
      ELSE ''
    END
  , '[^0-9a-zA-Z_]', ''))                                    AS hashtag,
  `name`                                                     AS hashtag_raw,
  `countryCode`                                              AS country_code,
  `industryName`                                             AS industry,
  `type`                                                     AS category,
  `url`,
  CAST(`rank` AS INT)                                        AS rank,
  CAST(`rankDiff` AS INT)                                    AS rank_diff,
  CAST(`videoCount` AS BIGINT)                               AS video_count,
  CAST(`viewCount` AS BIGINT)                                AS view_count,
  CAST(`isPromoted` AS BOOLEAN)                              AS is_promoted,
  CAST(`markedAsNew` AS BOOLEAN)                             AS is_new,
  CAST(`trendingHistogram` AS STRING)                        AS trending_hist_json,
  CAST(`relatedCreators` AS STRING)                          AS related_creators_json,
  dt
FROM raw.bronze_tiktok_raw
WHERE `name` IS NOT NULL OR `id` IS NOT NULL OR `url` IS NOT NULL;

CREATE OR REPLACE TABLE silver.silver_trend_with_quality AS
SELECT
  *,
  array_remove(array(
    CASE WHEN rank IS NOT NULL AND rank < 1 THEN 'rank_invalid' END,
    CASE WHEN video_count < 0 THEN 'video_neg' END,
    CASE WHEN view_count  < 0 THEN 'view_neg'  END
  ), NULL) AS quality_issues
FROM silver.silver_trend_base;

CREATE OR REPLACE TABLE silver.silver_trend_quarantine AS
SELECT * FROM silver.silver_trend_with_quality
WHERE size(quality_issues) > 0;

CREATE OR REPLACE TABLE silver.silver_trend AS
SELECT t_id, hashtag, hashtag_raw, country_code, industry, category, url,
       rank, rank_diff, video_count, view_count, is_promoted, is_new,
       trending_hist_json, related_creators_json, dt
FROM silver.silver_trend_with_quality
WHERE hashtag IS NOT NULL AND length(trim(hashtag)) > 0;
""")
print("Silver rebuilt ✅")


Silver rebuilt ✅


In [0]:
# ===== Cell 5 — GOLD: Link Health + Capture Plan (robust to missing columns) =====
import requests, pandas as pd
from pyspark.sql import functions as F, types as T

# 0) Lấy bảng silver và phát hiện schema hiện có
s = spark.table("silver.silver_trend")
cols = set(s.columns)
has = lambda c: c in cols

# 1) Tạo cột hashtag an toàn
if has("hashtag"):
    hashtag_col = F.col("hashtag")
elif has("hashtag_raw"):
    hashtag_col = F.lower(F.regexp_replace(F.col("hashtag_raw"), "[^0-9A-Za-z_]", ""))
elif has("name"):
    hashtag_col = F.lower(F.regexp_replace(F.col("name"), "[^0-9A-Za-z_]", ""))
else:
    # fallback: tách từ URL dạng ...#hashtag
    hashtag_col = F.lower(F.regexp_extract(F.col("url"), r"#([0-9A-Za-z_]+)", 1))

# 2) Các cột khác (an toàn nếu thiếu)
country_col   = F.col("country_code") if has("country_code") else (F.col("countryCode") if has("countryCode") else F.lit(None).cast("string"))
rank_col      = F.col("rank").cast("int") if has("rank") else F.lit(None).cast("int")
rank_diff_col = F.col("rank_diff").cast("int") if has("rank_diff") else F.lit(0).cast("int")
is_new_col    = F.col("is_new").cast("boolean") if has("is_new") else F.lit(False)

df2 = (s
  .select(
      F.col("dt").cast("date").alias("dt"),
      hashtag_col.alias("hashtag"),
      F.col("url").alias("url"),
      country_col.alias("country_code"),
      rank_col.alias("rank"),
      rank_diff_col.alias("rank_diff"),
      is_new_col.alias("is_new"),
  )
)

# 3) Lọc ngày mới nhất và chọn Top 80 theo rank (nulls last)
max_dt = df2.select(F.max("dt")).first()[0]
if max_dt is None:
    raise ValueError("silver.silver_trend đang rỗng — hãy chạy lại Bronze/Silver trước.")

cand = (df2.where(F.col("dt") == max_dt)
            .orderBy(
                F.when(F.col("rank").isNull(), 1).otherwise(0).asc(),
                F.col("rank").asc_nulls_last(),
                F.col("url").asc()
            )
            .limit(80))

pdf = cand.toPandas()

# 4) Kiểm tra link (HEAD→GET; không scrape nội dung)
headers = {"User-Agent": "Mozilla/5.0 (compatible; DatabricksLinkHealth/1.0)"}
def check_url(u: str):
    try:
        try:
            r = requests.head(u, timeout=8, allow_redirects=True, headers=headers)
            if r.status_code >= 400:
                raise Exception("head_blocked")
        except Exception:
            r = requests.get(u, timeout=10, allow_redirects=True, headers=headers)
        return (r.status_code, getattr(r, "url", u), len(getattr(r, "history", []) or []), (200 <= r.status_code < 300))
    except Exception:
        return (None, None, None, False)

rows = []
if not pdf.empty:
    for _, r in pdf.iterrows():
        sc, final_u, redirects, ok = check_url(r["url"])
        rows.append({
            "dt":           r["dt"],
            "hashtag":      r["hashtag"],
            "url":          r["url"],
            "country_code": r.get("country_code"),
            "rank":         int(r["rank"]) if pd.notnull(r["rank"]) else None,
            "rank_diff":    int(r["rank_diff"]) if pd.notnull(r["rank_diff"]) else 0,
            "is_new":       bool(r.get("is_new", False)),
            "status_code":  sc,
            "final_url":    final_u,
            "redirects":    redirects,
            "is_ok":        bool(ok),
        })

# 5) Ghi gold.hashtag_link_health (kể cả khi rỗng)
schema = T.StructType([
    T.StructField("dt", T.DateType(), True),
    T.StructField("hashtag", T.StringType(), True),
    T.StructField("url", T.StringType(), True),
    T.StructField("country_code", T.StringType(), True),
    T.StructField("rank", T.IntegerType(), True),
    T.StructField("rank_diff", T.IntegerType(), True),
    T.StructField("is_new", T.BooleanType(), True),
    T.StructField("status_code", T.IntegerType(), True),
    T.StructField("final_url", T.StringType(), True),
    T.StructField("redirects", T.IntegerType(), True),
    T.StructField("is_ok", T.BooleanType(), True),
])

link_df = spark.createDataFrame(rows, schema=schema)
(link_df.write.mode("overwrite").saveAsTable("gold.hashtag_link_health"))

# 6) Ghi gold.hashtag_capture_plan (NOW/SOON/LOW)
spark.sql("""
CREATE OR REPLACE TABLE gold.hashtag_capture_plan AS
SELECT
  dt, hashtag, url, country_code, rank, rank_diff, is_new,
  status_code, final_url, redirects, is_ok,
  CASE
    WHEN is_ok = true AND (is_new = true OR COALESCE(rank, 999) <= 20 OR COALESCE(-rank_diff,0) >= 10)
      THEN 'NOW'
    WHEN is_ok = true AND COALESCE(rank, 999) <= 50
      THEN 'SOON'
    ELSE 'LOW'
  END AS priority,
  CASE
    WHEN status_code IS NULL THEN 'network_error'
    WHEN status_code BETWEEN 200 AND 299 THEN
      CASE WHEN COALESCE(redirects,0) > 0 THEN 'ok_redirect' ELSE 'ok' END
    WHEN status_code BETWEEN 300 AND 399 THEN 'redirect_only'
    WHEN status_code BETWEEN 400 AND 499 THEN CONCAT('client_', CAST(status_code AS STRING))
    WHEN status_code BETWEEN 500 AND 599 THEN CONCAT('server_', CAST(status_code AS STRING))
    ELSE 'unknown'
  END AS reason
FROM gold.hashtag_link_health
""")

print("Built: gold.hashtag_link_health =", spark.table("gold.hashtag_link_health").count(),
      "| gold.hashtag_capture_plan =", spark.table("gold.hashtag_capture_plan").count())




Built: gold.hashtag_link_health = 80 | gold.hashtag_capture_plan = 80


In [0]:
print("bronze:", spark.table("raw.bronze_tiktok_raw").count())
print("silver_ok:", spark.table("silver.silver_trend").count())
print("quarantine:", spark.table("silver.silver_trend_quarantine").count())
for t in ["gold.trend_latest_top100","gold.trend_by_day_topk","gold.trend_country_summary"]:
    print(t, "=", spark.table(t).count())


bronze: 202
silver_ok: 201
quarantine: 0
gold.trend_latest_top100 = 100
gold.trend_by_day_topk = 200
gold.trend_country_summary = 2


In [0]:
from pyspark.sql import functions as F

counts = {
  "bronze": spark.sql("SELECT COUNT(*) c FROM raw.bronze_tiktok_raw").first().c,
  "silver_ok": spark.sql("SELECT COUNT(*) c FROM silver.silver_trend").first().c,
  "quarantine": spark.sql("SELECT COUNT(*) c FROM silver.silver_trend_quarantine").first().c
}
print("Counts:", counts)

mx = spark.sql("SELECT MAX(dt) mx FROM silver.silver_trend").first().mx
print("Latest dt:", mx)

display(spark.sql("""
SELECT dt, hashtag, rank, view_count, video_count, country_code, industry, is_new, is_promoted
FROM silver.silver_trend
WHERE dt = (SELECT MAX(dt) FROM silver.silver_trend)
ORDER BY rank ASC NULLS LAST
LIMIT 50
"""))


Counts: {'bronze': 202, 'silver_ok': 201, 'quarantine': 0}
Latest dt: 2025-11-10


dt,hashtag,rank,view_count,video_count,country_code,industry,is_new,is_promoted
2025-11-10,gdragon,1,112768016,24483,VN,News & Entertainment,False,False
2025-11-10,gdragon,1,112768016,24483,VN,News & Entertainment,False,False
2025-11-10,gdragon,1,112768016,24483,VN,News & Entertainment,False,False
2025-11-10,gd,2,31205045,9071,VN,,True,False
2025-11-10,tiktokshop1111,3,177441932,11219,VN,Apparel & Accessories,False,False
2025-11-10,concert,4,26319508,6252,VN,News & Entertainment,True,False
2025-11-10,lixi,5,6181628,13470,VN,,False,False
2025-11-10,bigbang,6,30156154,4106,VN,News & Entertainment,True,False
2025-11-10,salevuivodoi,7,174245497,7887,VN,,False,False
2025-11-10,vannghe,8,45943969,2570,VN,Education,True,False


In [0]:
# Gói gọn các bảng gold thành view dễ dùng cho dashboard
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

spark.sql("""
CREATE OR REPLACE VIEW gold.vw_trend_latest AS
SELECT dt, hashtag, hashtag_raw, country_code, industry, category, url,
       rank, rank_diff, view_count, video_count, is_promoted, is_new
FROM gold.trend_latest_top100
""")

spark.sql("""
CREATE OR REPLACE VIEW gold.vw_trend_country AS
SELECT country_code, dt, hashtag_cnt, avg_rank, promoted_cnt
FROM gold.trend_country_summary
""")

spark.sql("""
CREATE OR REPLACE VIEW gold.vw_trend_momentum_top AS
SELECT dt, hashtag, rank, prev_rank, rank_velocity, view_delta, video_delta
FROM gold.trend_momentum
WHERE rank IS NOT NULL
QUALIFY row_number() OVER (PARTITION BY dt ORDER BY rank_velocity DESC, rank ASC) <= 50
""")

spark.sql("""
CREATE OR REPLACE VIEW gold.vw_trend_weekly AS
SELECT week, hashtag, best_rank, avg_rank, max_views, new_days_count
FROM gold.trend_weekly_summary
""")

print("Created views: vw_trend_latest, vw_trend_country, vw_trend_momentum_top, vw_trend_weekly")


Created views: vw_trend_latest, vw_trend_country, vw_trend_momentum_top, vw_trend_weekly


In [0]:
# Gói gọn các bảng gold thành view dễ dùng cho dashboard
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

spark.sql("""
CREATE OR REPLACE VIEW gold.vw_trend_latest AS
SELECT dt, hashtag, hashtag_raw, country_code, industry, category, url,
       rank, rank_diff, view_count, video_count, is_promoted, is_new
FROM gold.trend_latest_top100
""")

spark.sql("""
CREATE OR REPLACE VIEW gold.vw_trend_country AS
SELECT country_code, dt, hashtag_cnt, avg_rank, promoted_cnt
FROM gold.trend_country_summary
""")

spark.sql("""
CREATE OR REPLACE VIEW gold.vw_trend_momentum_top AS
SELECT dt, hashtag, rank, prev_rank, rank_velocity, view_delta, video_delta
FROM gold.trend_momentum
WHERE rank IS NOT NULL
QUALIFY row_number() OVER (PARTITION BY dt ORDER BY rank_velocity DESC, rank ASC) <= 50
""")

spark.sql("""
CREATE OR REPLACE VIEW gold.vw_trend_weekly AS
SELECT week, hashtag, best_rank, avg_rank, max_views, new_days_count
FROM gold.trend_weekly_summary
""")

print("Created views: vw_trend_latest, vw_trend_country, vw_trend_momentum_top, vw_trend_weekly")


Created views: vw_trend_latest, vw_trend_country, vw_trend_momentum_top, vw_trend_weekly


In [0]:
# Chạy khi dữ liệu đã ổn định (ví dụ hàng tuần)
tables = [
  "raw.bronze_tiktok_raw",
  "silver.silver_trend", "silver.silver_trend_quarantine",
  "gold.trend_latest_top100", "gold.trend_by_day_topk", "gold.trend_momentum",
  "gold.trend_retention", "gold.trend_weekly_summary", "gold.trend_country_summary",
  "gold.trend_industry_summary", "gold.trend_view_distribution",
  "gold.trend_new_entries", "gold.trend_promoted_share", "gold.run_audit"
]

for t in tables:
    try:
        spark.sql(f"OPTIMIZE {t}")
        spark.sql(f"VACUUM {t} RETAIN 168 HOURS")  # 7 ngày
        print("Optimized & vacuumed:", t)
    except Exception as e:
        print("Skip:", t, "-", str(e)[:120])


Optimized & vacuumed: raw.bronze_tiktok_raw
Optimized & vacuumed: silver.silver_trend
Optimized & vacuumed: silver.silver_trend_quarantine
Optimized & vacuumed: gold.trend_latest_top100
Optimized & vacuumed: gold.trend_by_day_topk
Optimized & vacuumed: gold.trend_momentum
Optimized & vacuumed: gold.trend_retention
Optimized & vacuumed: gold.trend_weekly_summary
Optimized & vacuumed: gold.trend_country_summary
Optimized & vacuumed: gold.trend_industry_summary
Optimized & vacuumed: gold.trend_view_distribution
Optimized & vacuumed: gold.trend_new_entries
Optimized & vacuumed: gold.trend_promoted_share
Optimized & vacuumed: gold.run_audit


In [0]:
from pyspark.sql import functions as F

def cnt(t): 
    try: return spark.table(t).count()
    except: return -1

print({
  "raw.bronze_tiktok_raw": cnt("raw.bronze_tiktok_raw"),
  "silver.silver_trend": cnt("silver.silver_trend"),
  "silver.silver_trend_quarantine": cnt("silver.silver_trend_quarantine"),
  "gold.trend_latest_top100": cnt("gold.trend_latest_top100"),
})

print("\n-- Bronze sample --")
display(spark.sql("SELECT * FROM raw.bronze_tiktok_raw ORDER BY dt DESC LIMIT 5"))

print("\n-- Quarantine sample (nếu có) --")
display(spark.sql("""
SELECT hashtag_raw, country_code, industry, rank, view_count, video_count, quality_issues, dt
FROM silver.silver_trend_with_quality
WHERE size(quality_issues) > 0
ORDER BY dt DESC LIMIT 20
"""))


{'raw.bronze_tiktok_raw': 202, 'silver.silver_trend': 201, 'silver.silver_trend_quarantine': 0, 'gold.trend_latest_top100': 100}

-- Bronze sample --


t_id,id,name,countryCode,industryName,type,url,rank,rankDiff,videoCount,viewCount,isPromoted,markedAsNew,relatedCreators,trendingHistogram,_ingest_time,_source_file,dt
,1190,gdragon,VN,News & Entertainment,hashtag,https://www.tiktok.com/tag/gdragon,1,1,24483,112768016,False,False,,"[{2025-11-03T00:00:00.000Z, 0.2}, {2025-11-04T00:00:00.000Z, 0.2}, {2025-11-05T00:00:00.000Z, 0.21}, {2025-11-06T00:00:00.000Z, 0.23}, {2025-11-07T00:00:00.000Z, 0.37}, {2025-11-08T00:00:00.000Z, 0.5}, {2025-11-09T00:00:00.000Z, 1.0}]",,,2025-11-10
,1190,gdragon,VN,News & Entertainment,hashtag,https://www.tiktok.com/tag/gdragon,1,1,24483,112768016,False,False,,"[{2025-11-03T00:00:00.000Z, 0.2}, {2025-11-04T00:00:00.000Z, 0.2}, {2025-11-05T00:00:00.000Z, 0.21}, {2025-11-06T00:00:00.000Z, 0.23}, {2025-11-07T00:00:00.000Z, 0.37}, {2025-11-08T00:00:00.000Z, 0.5}, {2025-11-09T00:00:00.000Z, 1.0}]",,,2025-11-10
,1190,gdragon,VN,News & Entertainment,hashtag,https://www.tiktok.com/tag/gdragon,1,1,24483,112768016,False,False,,"[{2025-11-03T00:00:00.000Z, 0.2}, {2025-11-04T00:00:00.000Z, 0.2}, {2025-11-05T00:00:00.000Z, 0.21}, {2025-11-06T00:00:00.000Z, 0.23}, {2025-11-07T00:00:00.000Z, 0.37}, {2025-11-08T00:00:00.000Z, 0.5}, {2025-11-09T00:00:00.000Z, 1.0}]",,,2025-11-10
,7364,gd,VN,,hashtag,https://www.tiktok.com/tag/gd,2,0,9071,31205045,False,True,"[{https://p16-sign-sg.tiktokcdn.com/tos-alisg-avt-0068/c76d5d85b92eec9039712c67daaa67fd~tplv-tiktokx-cropcenter:100:100.png?dr=14579&refresh_token=d5de8382&x-expires=1762966800&x-signature=rXyEg7NGTOgP09%2BjtqQKB3Ddhoc%3D&t=4d5b0474&ps=13740610&shp=a5d48078&shcp=317596d8&idc=my, Dj Sugar /민, https://www.tiktok.com/@Dj Sugar /민}]","[{2025-11-03T00:00:00.000Z, 0.29}, {2025-11-04T00:00:00.000Z, 0.26}, {2025-11-05T00:00:00.000Z, 0.25}, {2025-11-06T00:00:00.000Z, 0.27}, {2025-11-07T00:00:00.000Z, 0.37}, {2025-11-08T00:00:00.000Z, 0.5}, {2025-11-09T00:00:00.000Z, 1.0}]",,,2025-11-10
,7026157776921427994,tiktokshop1111,VN,Apparel & Accessories,hashtag,https://www.tiktok.com/tag/tiktokshop1111,3,1,11219,177441932,False,False,,"[{2025-11-03T00:00:00.000Z, 0.11}, {2025-11-04T00:00:00.000Z, 0.22}, {2025-11-05T00:00:00.000Z, 0.42}, {2025-11-06T00:00:00.000Z, 0.63}, {2025-11-07T00:00:00.000Z, 0.7}, {2025-11-08T00:00:00.000Z, 0.89}, {2025-11-09T00:00:00.000Z, 1.0}]",,,2025-11-10



-- Quarantine sample (nếu có) --


hashtag_raw,country_code,industry,rank,view_count,video_count,quality_issues,dt
