In [1]:
# ▶ パッケージ一式を作成（保存先をあなたの環境に合わせて変更OK）
BASE_DIR = r"C:\Users\Owner\youtube\yt_trendlab"  # ←保存先
import os, pathlib, textwrap

os.makedirs(BASE_DIR, exist_ok=True)

files = {}

# __init__.py（エクスポート）
files["__init__.py"] = r'''
# -*- coding: utf-8 -*-
from .pipeline import run_all
from .thumbnail_features import THUMBNAIL_COLS, extract_all_thumbnail_features_mediapipe
from .trending_utils import ensure_trending_snapshot_if_missing, add_trend_features
from .text_features import tokenize_japanese, build_vectorizer
from .modeling import train_rf, evaluate_rmse, feature_importance_df
__all__ = [
    "run_all",
    "THUMBNAIL_COLS", "extract_all_thumbnail_features_mediapipe",
    "ensure_trending_snapshot_if_missing", "add_trend_features",
    "tokenize_japanese", "build_vectorizer",
    "train_rf", "evaluate_rmse", "feature_importance_df",
]
'''

# thumbnail_features.py（サムネ特徴）
files["thumbnail_features.py"] = r'''
# -*- coding: utf-8 -*-
import numpy as np
import cv2
from PIL import Image
import requests
from io import BytesIO
import mediapipe as mp

THUMBNAIL_COLS = [
    "brightness","face_count","telop_ratio",
    "r_mean","g_mean","b_mean","h_mean","s_mean","v_mean"
] + [f"color_ratio_{i}" for i in range(5)]

def extract_all_thumbnail_features_mediapipe(url: str):
    try:
        resp = requests.get(url, timeout=10)
        img = Image.open(BytesIO(resp.content)).convert("RGB")
        arr = np.array(img)
        hsv = cv2.cvtColor(arr, cv2.COLOR_RGB2HSV)
        brightness = hsv[:,:,2].mean()
        r_mean, g_mean, b_mean = arr[:,:,0].mean(), arr[:,:,1].mean(), arr[:,:,2].mean()
        h_mean, s_mean, v_mean = hsv[:,:,0].mean(), hsv[:,:,1].mean(), hsv[:,:,2].mean()
        pixels = arr.reshape(-1,3).astype(np.float32)
        criteria = (cv2.TERM_CRITERIA_EPS+cv2.TERM_CRITERIA_MAX_ITER, 100, 0.2)
        _, labels, _ = cv2.kmeans(pixels, 5, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS)
        counts = np.bincount(labels.flatten(), minlength=5)
        color_ratios = (counts / counts.sum()).tolist()
        gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY)
        _, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY)
        telop_ratio = float((thresh==255).sum()) / float(thresh.size)
        with mp.solutions.face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.5) as fd:
            res = fd.process(cv2.cvtColor(arr, cv2.COLOR_RGB2BGR))
            face_count = len(res.detections) if res.detections else 0
        return [brightness, face_count, telop_ratio,
                r_mean, g_mean, b_mean, h_mean, s_mean, v_mean] + color_ratios
    except Exception:
        return [0]*len(THUMBNAIL_COLS)
'''

# trending_utils.py（急上昇）
files["trending_utils.py"] = r'''
# -*- coding: utf-8 -*-
import os, re, glob, pathlib, importlib.util
import pandas as pd
from datetime import datetime

TREND_CSV_GLOB = "trending_JP_*.csv"
TREND_USE_LAST_N = 14
TREND_TOPK = 500
TREND_VOCAB_PATH = "trend_vocab.json"

# fetch_trending / trend_features のローダ
def _load_trend_modules():
    try:
        from yt_trend.get_trending import fetch_trending as _fetch
        from yt_trend.trend_features import (
            build_trend_vocab_from_csvs as _build_vocab,
            save_trend_vocab_json as _save_vocab,
            load_trend_vocab_json as _load_vocab,
            title_trend_features as _title_feats,
        )
        return _fetch, _build_vocab, _save_vocab, _load_vocab, _title_feats
    except ModuleNotFoundError:
        base = pathlib.Path.cwd() / "code"
        gt, tf = base / "get_trending.py", base / "trend_features.py"
        def _load(name, path):
            spec = importlib.util.spec_from_file_location(name, str(path))
            mod = importlib.util.module_from_spec(spec); spec.loader.exec_module(mod); return mod
        if gt.exists() and tf.exists():
            gt = _load("yt_trend_get_trending", gt)
            tf = _load("yt_trend_trend_features", tf)
            return gt.fetch_trending, tf.build_trend_vocab_from_csvs, tf.save_trend_vocab_json, tf.load_trend_vocab_json, tf.title_trend_features
        raise

_fetch, _build_vocab, _save_vocab, _load_vocab, _title_feats = _load_trend_modules()

def ensure_trending_snapshot_if_missing(api_key_env="YT_API_KEY", region="JP", max_results=200):
    csvs = glob.glob(TREND_CSV_GLOB)
    if len(csvs)>0: return
    api_key = os.getenv(api_key_env) or os.getenv("API_KEY") or os.getenv("YOUTUBE_API_KEY")
    if not api_key:
        print("⚠️ 急上昇CSVが無く、APIキー未設定のため自動取得をスキップ"); return
    print("ℹ️ 急上昇CSVが無いので、その場取得します…")
    df = _fetch(api_key, region_code=region, max_results=max_results)
    out = f"trending_JP_{datetime.now().strftime('%Y%m%d')}.csv"
    df.to_csv(out, index=False, encoding="utf-8-sig")
    print(f"✅ Saved: {out} (rows={len(df)})")

def add_trend_features(titles: pd.Series) -> pd.DataFrame:
    csvs = sorted(glob.glob(TREND_CSV_GLOB))
    if len(csvs)==0:
        return pd.DataFrame({"trend_overlap_count":[0]*len(titles),
                             "trend_overlap_ratio":[0.0]*len(titles),
                             "trend_cosine_sim":[0.0]*len(titles)})
    def key(p):
        m = re.search(r"(\d{8})", os.path.basename(p)); return m.group(1) if m else "00000000"
    csvs = sorted(csvs, key=key)[-TREND_USE_LAST_N:]
    if os.path.exists(TREND_VOCAB_PATH):
        hot = _load_vocab(TREND_VOCAB_PATH)
        _, trend_titles = _build_vocab(csvs, top_k=TREND_TOPK)
    else:
        hot, trend_titles = _build_vocab(csvs, top_k=TREND_TOPK)
        _save_vocab(hot, TREND_VOCAB_PATH)
    feats = titles.fillna("").apply(lambda t: _title_feats(t, hot, trend_titles_for_bow=trend_titles))
    return pd.DataFrame(list(feats.values))
'''

# text_features.py（タイトルのTF-IDF）
files["text_features.py"] = r'''
# -*- coding: utf-8 -*-
from janome.tokenizer import Tokenizer
from sklearn.feature_extraction.text import TfidfVectorizer

_tokenizer = Tokenizer()

def tokenize_japanese(text: str):
    return [t.base_form for t in _tokenizer.tokenize(text)
            if t.part_of_speech.split(',')[0] in ['名詞','動詞','形容詞']]

def build_vectorizer(max_features: int = 300):
    return TfidfVectorizer(tokenizer=tokenize_japanese, token_pattern=None, max_features=max_features)
'''

# modeling.py（学習・評価）
files["modeling.py"] = r'''
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

def train_rf(X_train, y_train, n_estimators=100, max_depth=10, random_state=42, n_jobs=-1):
    model = RandomForestRegressor(n_estimators=n_estimators, max_depth=max_depth, random_state=random_state, n_jobs=n_jobs)
    model.fit(X_train, y_train)
    return model

def evaluate_rmse(model, X_test, y_test):
    log_pred = model.predict(X_test)
    y_pred  = np.expm1(log_pred).astype(int)
    rmse_log = mean_squared_error(np.log1p(y_test), log_pred) ** 0.5
    rmse_raw = mean_squared_error(y_test, y_pred) ** 0.5
    return rmse_log, rmse_raw, y_pred

def feature_importance_df(model, columns, top=30):
    imp = pd.Series(model.feature_importances_, index=columns)
    return imp.sort_values(ascending=False).head(top)
'''

# pipeline.py（全部入り：読み込み→特徴量→学習→評価）
files["pipeline.py"] = r'''
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
from isodate import parse_duration
from .thumbnail_features import THUMBNAIL_COLS, extract_all_thumbnail_features_mediapipe
from .trending_utils import ensure_trending_snapshot_if_missing, add_trend_features
from .text_features import build_vectorizer
from .modeling import train_rf, evaluate_rmse, feature_importance_df

def _ensure_thumbnail_features(df: pd.DataFrame) -> pd.DataFrame:
    if set(THUMBNAIL_COLS).issubset(df.columns): return df
    if "thumbnail" not in df.columns:
        raise ValueError("⚠️ 'thumbnail' 列がありません")
    feats = [extract_all_thumbnail_features_mediapipe(u) for u in df["thumbnail"]]
    return pd.concat([df.reset_index(drop=True),
                      pd.DataFrame(feats, columns=THUMBNAIL_COLS)], axis=1)

def _time_features(df: pd.DataFrame) -> pd.DataFrame:
    df["weekday"] = df["publishedAt"].dt.weekday
    df["hour"] = df["publishedAt"].dt.hour
    df["is_weekend"] = df["weekday"].isin([5,6]).astype(int)
    df["is_month_start"] = df["publishedAt"].dt.is_month_start.astype(int)
    df["is_month_end"] = df["publishedAt"].dt.is_month_end.astype(int)
    return df

def run_all(xlsx_path: str, cutoff="2025-07-01", tfidf_max_features=300):
    # 1) load & basic clean
    df = pd.read_excel(xlsx_path)
    df["title"] = df["title"].fillna("")
    df["categoryId"] = pd.to_numeric(df["categoryId"], errors="coerce").fillna(-1).astype(int)
    df["viewCount"] = pd.to_numeric(df["viewCount"], errors="coerce").fillna(0)
    df["publishedAt"] = pd.to_datetime(df["publishedAt"], utc=True)
    df["duration_seconds"] = df["duration"].apply(lambda x: parse_duration(x).total_seconds() if pd.notnull(x) else 0)
    df = df[df["duration_seconds"] > 60].copy()

    # 2) thumbnail
    df = _ensure_thumbnail_features(df)

    # 3) time
    df = _time_features(df)

    # 4) trending (CSVなければここで取得)
    ensure_trending_snapshot_if_missing()
    trend_df = add_trend_features(df["title"])
    df = pd.concat([df.reset_index(drop=True), trend_df.reset_index(drop=True)], axis=1)

    # 5) TF-IDF
    vectorizer = build_vectorizer(max_features=tfidf_max_features)
    cutoff_ts = pd.to_datetime(cutoff, utc=True)
    df_train = df[df["publishedAt"] < cutoff_ts].copy()
    df_test  = df[df["publishedAt"] >= cutoff_ts].copy()

    tfidf_train = vectorizer.fit_transform(df_train["title"])
    tfidf_test  = vectorizer.transform(df_test["title"])
    tfidf_cols  = [f"tfidf_{w}" for w in vectorizer.get_feature_names_out()]
    tfidf_df_tr = pd.DataFrame(tfidf_train.toarray(), columns=tfidf_cols, index=df_train.index)
    tfidf_df_te = pd.DataFrame(tfidf_test.toarray(),  columns=tfidf_cols, index=df_test.index)

    # 6) assemble features
    base_cols = ["categoryId","weekday","hour","is_weekend","is_month_start","is_month_end"]
    trend_cols = ["trend_overlap_count","trend_overlap_ratio","trend_cosine_sim"]
    X_train = pd.concat([df_train[base_cols], df_train[[c for c in THUMBNAIL_COLS]], tfidf_df_tr], axis=1)
    X_test  = pd.concat([df_test[base_cols],  df_test[[c for c in THUMBNAIL_COLS]],  tfidf_df_te], axis=1)
    # trendは最後に concat（列順を固定しておく）
    X_train = pd.concat([X_train, df_train[trend_cols]], axis=1).reset_index(drop=True)
    X_test  = pd.concat([X_test,  df_test[trend_cols]],  axis=1).reset_index(drop=True)

    y_train = np.log1p(df_train["viewCount"])
    y_test  = df_test["viewCount"]

    # 7) train & eval
    model = train_rf(X_train, y_train)
    rmse_log, rmse_raw, y_pred = evaluate_rmse(model, X_test, y_test)
    imp_top = feature_importance_df(model, X_train.columns, top=30)

    # 8) result table
    df_result = df_test[["title","publishedAt","viewCount"]].copy()
    df_result["predicted_viewCount"] = y_pred
    df_result["abs_error"] = (df_result["predicted_viewCount"] - df_result["viewCount"]).abs()
    df_result = df_result.sort_values("publishedAt", ascending=False).reset_index(drop=True)

    metrics = {"rmse_log": rmse_log, "rmse_raw": rmse_raw}
    return model, df_result, metrics, imp_top
'''

for name, content in files.items():
    pathlib.Path(BASE_DIR, name).write_text(textwrap.dedent(content).strip()+"\n", encoding="utf-8")

print("✅ Created package at:", BASE_DIR)



✅ Created package at: C:\Users\Owner\youtube\yt_trendlab
