
# The Futures Edit — Signals (Merged Master Notebook)
*Rebuild date:* **2025-09-16 22:09**  
This notebook merges your existing Signals pipeline with new connectors and a normalized output.
It’s **drop-in ready** for your web app (Chart.js) with consistent CSV/Parquet/JSONL exports.

### What’s included
- **Shared utils** (retry, parsing, hashing, save helpers)
- **Config** (YAML or inline) + **Secrets** (env vars)
- **Connectors**: LS:N Global sectors, Reddit, Google Trends, TikTok (optional, Playwright), RSS news feeds, Substack
- **(Optional) Legacy keywords scraper hook** — paste your old keywords scraping cell into the provided hook and it will auto-merge
- **Normalization & dedupe**
- **Light heuristics for summaries** (top sources, frequent tokens, simple trend deltas) for executive overview
- **Exports**: `/data/signals_all.*` + derived summary CSVs

> You can paste this whole notebook over your existing one, or open alongside and migrate cells.


## Setup — Install dependencies (Colab)

In [None]:

%%bash
pip -q install praw==7.7.1 feedparser==6.0.10 pytrends==4.9.2 beautifulsoup4==4.12.3 lxml==5.2.2 html5lib==1.1 pandas==2.2.2 numpy==2.1.1 requests==2.32.3 pyyaml==6.0.2 tqdm==4.66.5 tenacity==9.0.0 python-dateutil==2.9.0.post0 rapidfuzz==3.9.7
# Optional: TikTok via Playwright (best-effort). Comment if not needed.
pip -q install playwright==1.47.0
python -m playwright install chromium


## Imports & Utils

In [None]:

import os, re, json, yaml, time, random, hashlib
from pathlib import Path
from datetime import datetime, timedelta
from urllib.parse import urlparse
import pandas as pd
import numpy as np
import requests, feedparser
from bs4 import BeautifulSoup
from dateutil import parser as dateparser
from tqdm import tqdm
from tenacity import retry, stop_after_attempt, wait_exponential
from rapidfuzz import process, fuzz

# Optional TikTok
try:
    from playwright.sync_api import sync_playwright
    HAS_PLAYWRIGHT = True
except Exception:
    HAS_PLAYWRIGHT = False

OUT_DIR = Path("./data")
OUT_DIR.mkdir(exist_ok=True)

def now_iso():
    return datetime.utcnow().replace(microsecond=0).isoformat() + "Z"

def to_iso(dt):
    if dt is None or (isinstance(dt, float) and np.isnan(dt)):
        return None
    if isinstance(dt, datetime):
        return dt.replace(microsecond=0).isoformat() + "Z"
    try:
        return dateparser.parse(str(dt)).replace(microsecond=0).isoformat() + "Z"
    except Exception:
        return None

def hash_id(*parts):
    s = "::".join([str(p) for p in parts if p is not None])
    return hashlib.sha256(s.encode("utf-8")).hexdigest()[:16]

def normalize_record(source_type, source_name, title, url, published_at=None, summary=None, tags=None, extra=None):
    return {
        "id": hash_id(source_type, source_name, url, title),
        "source_type": source_type,
        "source_name": source_name,
        "title": (title or "").strip(),
        "url": url,
        "published_at": to_iso(published_at),
        "summary": (summary or "").strip() if summary else None,
        "tags": tags or [],
        "extra": extra or {},
        "ingested_at": now_iso(),
    }

def save_df(df, stem):
    stem = re.sub(r"[^a-zA-Z0-9_\-]", "_", stem)
    csv_path = OUT_DIR / f"{stem}.csv"
    parquet_path = OUT_DIR / f"{stem}.parquet"
    jsonl_path = OUT_DIR / f"{stem}.jsonl"
    df.to_csv(csv_path, index=False)
    df.to_parquet(parquet_path, index=False)
    with open(jsonl_path, "w", encoding="utf-8") as f:
        for _, row in df.iterrows():
            f.write(json.dumps(row.to_dict(), ensure_ascii=False) + "\n")
    print(f"Saved:\n- {csv_path}\n- {parquet_path}\n- {jsonl_path}")
    return csv_path, parquet_path, jsonl_path


## Secrets — set env vars after running this cell

In [None]:

# Reddit
os.environ["REDDIT_CLIENT_ID"] = os.getenv("REDDIT_CLIENT_ID", "")
os.environ["REDDIT_CLIENT_SECRET"] = os.getenv("REDDIT_CLIENT_SECRET", "")
os.environ["REDDIT_USER_AGENT"] = os.getenv("REDDIT_USER_AGENT", "signals/1.0 by yourname")

# LS:N Global (if you have access; scraper works anonymously for headlines but may be partial)
os.environ["LSNG_EMAIL"] = os.getenv("LSNG_EMAIL", "")
os.environ["LSNG_PASSWORD"] = os.getenv("LSNG_PASSWORD", "")


## CONFIG — edit here or upload `sources.config.yaml`

In [None]:

import yaml
from pathlib import Path

DEFAULT_CONFIG = {
  "lsng": {
    "sectors": [
      {"name": "Beauty", "url": "https://www.lsnglobal.com/sectors/beauty"},
      {"name": "Fashion", "url": "https://www.lsnglobal.com/sectors/fashion"},
      {"name": "Health & Wellness", "url": "https://www.lsnglobal.com/sectors/health-wellness"},
    ],
    "max_pages": 2
  },
  "reddit": {
    "subreddits": ["femalefashionadvice","SkincareAddiction","AsianBeauty","MakeupAddiction","malefashion","fashion","fragrance","streetwear","trendanalysis","DataIsBeautiful"],
    "query": "signals OR trend OR 'future of' OR beauty OR fashion OR wellness",
    "time_filter": "week",
    "limit": 100
  },
  "gtrends": {
    "geo": "ES",
    "keywords": ["beauty trends","fashion trends","wellness trends","dopamine dressing","skin cycling","quiet luxury"],
    "tz": 120,
    "related_queries": True
  },
  "tiktok": {
    "hashtags": ["beauty","fashion","wellness","trendtok","skincare","outfitideas"],
    "max_per_hashtag": 20
  },
  "rss": {
    "feeds": [
      "https://www.voguebusiness.com/feed/rss",
      "https://www.businessoffashion.com/feed",
      "https://www.glossy.co/feed/",
      "https://www.beautyindependent.com/feed/",
      "https://www.highsnobiety.com/rss",
      "https://www.thecut.com/fashion/rss/index.xml"
    ],
    "max_per_feed": 50
  },
  "substack": {
    "feeds": [
      "https://linernotes.substack.com/feed",
      "https://piratewires.substack.com/feed",
      "https://readmaximums.substack.com/feed"
    ],
    "max_per_feed": 50
  }
}

CONFIG_PATH = Path("sources.config.yaml")
if CONFIG_PATH.exists():
    with open(CONFIG_PATH, "r") as f:
        cfg = yaml.safe_load(f)
    print("Loaded config from sources.config.yaml")
else:
    cfg = DEFAULT_CONFIG
    with open(CONFIG_PATH, "w") as f:
        yaml.safe_dump(cfg, f, sort_keys=False, allow_unicode=True)
    print("Wrote default config → sources.config.yaml")

cfg


## Collect — LS:N Global (Beauty, Fashion, Health & Wellness)

In [None]:

import requests

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=6))
def get(url, session=None, **kwargs):
    s = session or requests.Session()
    headers = kwargs.pop("headers", {})
    headers.setdefault("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome Safari")
    r = s.get(url, headers=headers, timeout=30, **kwargs)
    r.raise_for_status()
    return r

def parse_lsng_sector_page(html, sector_name, base_url):
    soup = BeautifulSoup(html, "lxml")
    items = []
    for a in soup.select("a"):
        title = (a.get_text() or "").strip()
        href = a.get("href")
        if not href or not title: 
            continue
        if len(title.split()) < 3:
            continue
        if not href.startswith("http"):
            href = requests.compat.urljoin(base_url, href)
        card = a.find_parent(["article","div","li"]) or a
        summary = None
        date_txt = None
        txt = " ".join(card.get_text(" ").split())
        m = re.search(r"(\b\d{1,2}\s+[A-Za-z]{3,}\s+\d{4}\b|\b[A-Za-z]{3,}\s+\d{1,2},\s+\d{4}\b|\b\d{4}-\d{2}-\d{2}\b)", txt)
        if m: date_txt = m.group(1)
        ps = card.find_all("p")
        if ps:
            summary = ps[0].get_text(" ").strip()[:500]
        items.append(normalize_record("lsng", sector_name, title, href, date_txt, summary))
    return items

def collect_lsng(cfg):
    sectors = cfg["lsng"]["sectors"]
    max_pages = cfg["lsng"].get("max_pages", 1)
    all_items = []
    session = requests.Session()
    for s in sectors:
        base = s["url"]
        for page in range(1, max_pages+1):
            url = base if page == 1 else f"{base}?page={page}"
            try:
                r = get(url, session=session)
                items = parse_lsng_sector_page(r.text, s["name"], base)
                all_items.extend(items)
                time.sleep(random.uniform(0.5, 1.2))
            except Exception as e:
                print(f"[LSNG] Error {s['name']} page {page}: {e}")
    return pd.DataFrame(all_items)

df_lsng = collect_lsng(cfg)
print("LS:N rows:", len(df_lsng))
df_lsng.head(5)


## Collect — Reddit (PRAW search across configured subreddits)

In [None]:

import praw
from datetime import datetime as dt

def reddit_client():
    cid = os.environ.get("REDDIT_CLIENT_ID")
    csec = os.environ.get("REDDIT_CLIENT_SECRET")
    ua = os.environ.get("REDDIT_USER_AGENT", "signals/1.0")
    if not cid or not csec:
        raise RuntimeError("Set REDDIT_CLIENT_ID and REDDIT_CLIENT_SECRET in environment")
    return praw.Reddit(client_id=cid, client_secret=csec, user_agent=ua)

def collect_reddit(cfg):
    rc = reddit_client()
    subs = cfg["reddit"]["subreddits"]
    q = cfg["reddit"]["query"]
    tfilter = cfg["reddit"].get("time_filter","week")
    limit = cfg["reddit"].get("limit",100)
    rows = []
    for sub in subs:
        subreddit = rc.subreddit(sub)
        for post in subreddit.search(q, time_filter=tfilter, limit=limit, sort="new"):
            rows.append(normalize_record(
                "reddit", f"r/{sub}", post.title,
                url=f"https://www.reddit.com{post.permalink}",
                published_at=dt.utcfromtimestamp(post.created_utc),
                summary=(post.selftext or "")[:800],
                tags=[sub]
            ))
    return pd.DataFrame(rows)

try:
    df_reddit = collect_reddit(cfg)
    print("Reddit rows:", len(df_reddit))
    df_reddit.head(5)
except Exception as e:
    print("Reddit skipped/error:", e)
    df_reddit = pd.DataFrame([])


## Collect — Google Trends (interest over time + related queries)

In [None]:

from pytrends.request import TrendReq

def collect_gtrends(cfg):
    geo = cfg["gtrends"].get("geo","")
    tz = cfg["gtrends"].get("tz",0)
    kws = cfg["gtrends"]["keywords"]
    pytrends = TrendReq(hl='en-US', tz=tz)
    rows = []
    # interest over time (7d window, adjust in config if needed)
    pytrends.build_payload(kws, timeframe='now 7-d', geo=geo)
    iot = pytrends.interest_over_time()
    if not iot.empty:
        iot = iot.reset_index().rename(columns={"date":"published_at"})
        for _, r in iot.iterrows():
            for kw in kws:
                rows.append(normalize_record(
                    "gtrends","interest_over_time",
                    title=f"{kw} — interest",
                    url=f"https://trends.google.com/trends/explore?geo={geo}&q={kw}",
                    published_at=r["published_at"],
                    summary=f"Interest score: {r.get(kw, None)}",
                    tags=[kw, geo]
                ))
    # related queries
    if cfg["gtrends"].get("related_queries", True):
        rq = pytrends.related_queries()
        for kw, data in rq.items():
            if not data or "top" not in data or data["top"] is None:
                continue
            for _, row in data["top"].head(25).iterrows():
                q = row["query"]
                val = row.get("value", None)
                rows.append(normalize_record(
                    "gtrends","related_queries",
                    title=f"Related: {q}",
                    url=f"https://trends.google.com/trends/explore?geo={geo}&q={kw}",
                    published_at=datetime.utcnow(),
                    summary=f"Parent keyword: {kw}; Score: {val}",
                    tags=[kw,"related"]
                ))
    return pd.DataFrame(rows)

try:
    df_gtrends = collect_gtrends(cfg)
    print("Google Trends rows:", len(df_gtrends))
    df_gtrends.head(5)
except Exception as e:
    print("Google Trends skipped/error:", e)
    df_gtrends = pd.DataFrame([])


## Collect — TikTok (optional, Playwright; best-effort)

In [None]:

def collect_tiktok(cfg):
    if not HAS_PLAYWRIGHT:
        print("Playwright not available; skipping TikTok.")
        return pd.DataFrame([])
    rows = []
    from datetime import datetime as dt
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        context = browser.new_context()
        page = context.new_page()
        for tag in cfg["tiktok"]["hashtags"]:
            url = f"https://www.tiktok.com/tag/{tag}"
            try:
                page.goto(url, timeout=60000)
                page.wait_for_timeout(5000)
                anchors = page.query_selector_all("a")
                count = 0
                for a in anchors:
                    href = a.get_attribute("href") or ""
                    txt = (a.inner_text() or "").strip()
                    if "/video/" in href and len(txt) > 0:
                        rows.append(normalize_record(
                            "tiktok", f"#{tag}", txt[:160], href,
                            published_at=dt.utcnow(), tags=[tag]
                        ))
                        count += 1
                        if count >= cfg["tiktok"].get("max_per_hashtag",20):
                            break
                time.sleep(random.uniform(1.0,2.0))
            except Exception as e:
                print(f"TikTok error for #{tag}: {e}")
        context.close()
        browser.close()
    return pd.DataFrame(rows)

try:
    df_tiktok = collect_tiktok(cfg)
    print("TikTok rows:", len(df_tiktok))
    df_tiktok.head(5)
except Exception as e:
    print("TikTok skipped/error:", e)
    df_tiktok = pd.DataFrame([])


## Collect — News feeds (RSS)

In [None]:

from urllib.parse import urlparse

def collect_rss(cfg):
    feeds = cfg["rss"]["feeds"]
    max_per = cfg["rss"].get("max_per_feed",50)
    rows = []
    for url in feeds:
        fp = feedparser.parse(url)
        name = urlparse(url).netloc
        c = 0
        for entry in fp.entries:
            title = entry.get("title","")
            link = entry.get("link","")
            summary = BeautifulSoup(entry.get("summary","") or "", "lxml").get_text(" ")
            published = entry.get("published") or entry.get("updated")
            rows.append(normalize_record("rss", name, title, link, published, summary))
            c += 1
            if c >= max_per: break
    return pd.DataFrame(rows)

df_rss = collect_rss(cfg)
print("RSS rows:", len(df_rss))
df_rss.head(5)


## Collect — Substack Newsletters (via RSS)

In [None]:

def collect_substack(cfg):
    feeds = cfg["substack"]["feeds"]
    max_per = cfg["substack"].get("max_per_feed",50)
    rows = []
    for url in feeds:
        fp = feedparser.parse(url)
        name = urlparse(url).netloc
        c = 0
        for entry in fp.entries:
            title = entry.get("title","")
            link = entry.get("link","")
            summary = BeautifulSoup(entry.get("summary","") or "", "lxml").get_text(" ")
            published = entry.get("published") or entry.get("updated")
            rows.append(normalize_record("substack", name, title, link, published, summary))
            c += 1
            if c >= max_per: break
    return pd.DataFrame(rows)

df_substack = collect_substack(cfg)
print("Substack rows:", len(df_substack))
df_substack.head(5)



## (Optional) Legacy — paste your existing **Keywords/Signals scraper** here
If you had a previous dataframe (e.g., `df_keywords`), just assign it to `df_legacy`.  
Columns don't need to match; we'll normalize minimally via `legacy_to_records()`.


In [None]:

# EXAMPLE PLACEHOLDER — replace with your existing logic
# df_keywords = ...  # <- your old dataframe

try:
    df_keywords  # noqa: F821
    HAS_LEGACY = True
except NameError:
    HAS_LEGACY = False

def legacy_to_records(df):
    rows = []
    for _, r in df.iterrows():
        title = r.get("title") or r.get("keyword") or r.get("name") or "Untitled"
        url = r.get("url") or r.get("link") or ""
        published = r.get("published") or r.get("date") or r.get("timestamp")
        summary = r.get("summary") or r.get("desc") or ""
        rows.append(normalize_record("legacy","keywords",title,url,published,summary))
    return pd.DataFrame(rows)

if HAS_LEGACY:
    df_legacy = legacy_to_records(df_keywords)
    print("Legacy rows:", len(df_legacy))
    display(df_legacy.head(5))
else:
    df_legacy = pd.DataFrame([])
    print("No legacy dataframe detected; skipping.")


## Normalize, Merge & Dedupe

In [None]:

frames = [df for df in [df_lsng, df_reddit, df_gtrends, df_tiktok, df_rss, df_substack, df_legacy] if isinstance(df, pd.DataFrame) and not df.empty]
if frames:
    df_all = pd.concat(frames, ignore_index=True)
else:
    df_all = pd.DataFrame([], columns=["id","source_type","source_name","title","url","published_at","summary","tags","extra","ingested_at"])

# Basic clean
if not df_all.empty:
    df_all["title"] = df_all["title"].fillna("")
    df_all["url"] = df_all["url"].fillna("")
    df_all["published_at"] = df_all["published_at"].fillna("")
    df_all["summary"] = df_all["summary"].fillna("")

# Dedupe by (url, title) to reduce noise
if not df_all.empty:
    df_all = df_all.sort_values(by=["published_at","ingested_at"], ascending=False)
    df_all = df_all.drop_duplicates(subset=["url","title"], keep="first")

print("Total normalized rows:", len(df_all))
display(df_all.head(20))
save_df(df_all, "signals_all")


## Executive Summary (light heuristics; no external AI)

In [None]:

def safe_dt(s):
    try:
        return dateparser.parse(s)
    except Exception:
        return None

def summarise(df):
    res = {}
    if df.empty:
        return {"note":"No data to summarise."}
    # Window: last 7 days vs prior 7
    df["dt"] = df["published_at"].apply(safe_dt)
    cutoff = datetime.utcnow() - timedelta(days=7)
    cur = df[df["dt"] >= cutoff]
    prev = df[(df["dt"] < cutoff) & (df["dt"] >= cutoff - timedelta(days=7))]

    res["items_last_7d"] = len(cur)
    res["items_prev_7d"] = len(prev)
    res["delta_items_pct"] = None
    if len(prev) > 0:
        res["delta_items_pct"] = round((len(cur)-len(prev)) / max(len(prev),1) * 100, 1)

    # Top sources last 7 days
    top_sources = (cur.groupby(["source_type","source_name"])["id"]
                     .count().sort_values(ascending=False).head(10).reset_index())
    res["top_sources"] = top_sources

    # Frequent tokens in titles
    toks = []
    stop = set("the of a an and or for to in on with from by into about your our how what why".split())
    for t in cur["title"].astype(str):
        for w in re.findall(r"[A-Za-z][A-Za-z\-]+", t.lower()):
            if w not in stop and len(w) > 3:
                toks.append(w)
    if toks:
        s = pd.Series(toks).value_counts().head(30).reset_index()
        s.columns = ["token","count"]
        res["top_tokens"] = s

    return res

summary = summarise(df_all)
summary


## Export Summary Tables

In [None]:

if isinstance(summary, dict) and "note" not in summary:
    top_sources = summary.get("top_sources", pd.DataFrame())
    top_tokens = summary.get("top_tokens", pd.DataFrame())
    if isinstance(top_sources, pd.DataFrame) and not top_sources.empty:
        save_df(top_sources, "summary_top_sources_7d")
    if isinstance(top_tokens, pd.DataFrame) and not top_tokens.empty:
        save_df(top_tokens, "summary_top_tokens_7d")
else:
    print(summary.get("note","No summary available."))


## Chart.js Export — minimal payloads (JSON)

In [None]:

# Build minimal JSONs to fetch directly from your web app if you host them somewhere.
def write_json(obj, name):
    p = OUT_DIR / f"{name}.json"
    with open(p, "w", encoding="utf-8") as f:
        json.dump(obj, f, ensure_ascii=False, indent=2)
    print("Wrote", p)
    return p

# Example: counts by source_type for last 7 days
def counts_by_source_type(df):
    df = df.copy()
    df["dt"] = pd.to_datetime(df["published_at"], errors="coerce")
    cutoff = datetime.utcnow() - timedelta(days=7)
    cur = df[df["dt"] >= cutoff]
    return (cur.groupby("source_type")["id"].count().sort_values(ascending=False)
            .reset_index().rename(columns={"id":"count"}))

if not df_all.empty:
    chart_counts = counts_by_source_type(df_all).to_dict(orient="records")
    write_json(chart_counts, "chart_counts_by_source_type_7d")
else:
    print("No data for chart exports.")


## Output Manifest

In [None]:

manifest = {
  "all_rows_csv": "data/signals_all.csv",
  "all_rows_parquet": "data/signals_all.parquet",
  "all_rows_jsonl": "data/signals_all.jsonl",
  "summary_top_sources_csv": "data/summary_top_sources_7d.csv",
  "summary_top_tokens_csv": "data/summary_top_tokens_7d.csv",
  "chart_counts_by_source_type_7d": "data/chart_counts_by_source_type_7d.json"
}
write_json(manifest, "manifest")
manifest
