In [1]:
import boto3
import pandas as pd
import os, io, json, gzip, re
import datetime as dt
from pathlib import Path
from dotenv import load_dotenv

from transformers import AutoModelForSequenceClassification
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer, AutoConfig
import torch
import numpy as np
from scipy.special import softmax

from langdetect import detect

import pyarrow as pa, pyarrow.parquet as pq


# --- config ---
# ROOT = Path(__file__).resolve().parents[1]
ROOT = Path("/home/ubuntu/deds2025b_proj/opt/reddit_pipeline")    # FOR NOTEBOOK ONLY
load_dotenv(ROOT / ".env")

BUCKET = os.environ["LAKE_BUCKET"]
s3 = boto3.client("s3")
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# --- tag list ---
KEYWORDS = [
    "olaparib","lynparza","parp","parpi","diagnosis","biopsy",
    "screening","mammogram","ultrasound","ct scan","brca test",
    "genetic testing","starting olaparib","maintenance","chemo",
    "bevacizumab","platinum","surgery","radiation","follow-up",
    "maintenance dose","long term","quality of life","support group",
    "ned","no evidence of disease","clear scan","partial response",
    "stable disease","tumor shrinkage","ca-125 down","responding",
    "anemia","fatigue","nausea","vomiting","diarrhea","constipation",
    "decreased appetite","headache","dizziness","rash","insomnia",
    "cough","asthenia","dyspnea","brca1","brca2","hrd",
    "homologous recombination deficiency","ovarian","breast","mbc",
    "tnbc","her2","hr+","philippines","ph","pinoy","filipino","manila",
    "cebu","davao","tagalog","qc","quezon city"
]


# --- helper functions ---
def list_keys(prefix: str):
    paginator = s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=BUCKET, Prefix=prefix):
        for o in page.get("Contents", []):
            yield o["Key"]

def read_parquet_to_df(key: str) -> pd.DataFrame:
    buf = io.BytesIO()
    s3.download_fileobj(BUCKET, key, buf)
    buf.seek(0)
    table = pq.read_table(buf)
    return table.to_pandas()

def write_parquet(df: pd.DataFrame, key: str):
    table = pa.Table.from_pandas(df, preserve_index=False)
    out = io.BytesIO()
    pq.write_table(table, out, compression="snappy")
    out.seek(0)
    s3.upload_fileobj(out, BUCKET, key)
                      

# --- tagging logic ---
def compile_tag_patterns(keywords):
    """
    Build (keyword, regex) pairs that:
      - are case-insensitive
      - allow spaces OR hyphens between words
      - respect word boundaries at the ends
    """
    re_exp = r'[\s\-]+'
    patterns = []
    for kw in keywords:
        toks = [t for t in re.split(re_exp, kw.strip()) if t]
        if not toks:
            continue
        body = re_exp.join(map(re.escape, toks))  # escape special chars in each token
        pat = re.compile(rf'(?<!\w){body}(?!\w)', re.IGNORECASE)
        patterns.append((kw, pat))
    return patterns

_PATTERNS = compile_tag_patterns(KEYWORDS)

def find_tags_in_text(text: str):
    """Returns unique set of tags found in text"""
    if not text:
        return []
    return list({kw for kw, pat in _PATTERNS if pat.search(text)})


# --- scoring model ---
MODEL_NAME = "cardiffnlp/twitter-roberta-base-sentiment-latest"
_tokenizer = None
_model = None
_config = None

def load_model():
    global _tokenizer, _model, _config
    if _tokenizer is None or _model is None or _config is None:
        _tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
        _config = AutoConfig.from_pretrained(MODEL_NAME)
        _model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME).to(DEVICE)
        _model.eval()
    return _tokenizer, _model, _config

@torch.no_grad()
def score_texts(texts, batch_size=32, max_length=512):
    """
    Batched sentiment scoring.
    Returns a DataFrame with columns: negative_s, neutral_s, positive_s, sentiment_label
    """
    tokenizer, model, config = load_model()
    negs, neuts, poss, labels = [], [], [], []

    for i in range(0, len(texts), batch_size):
        chunk = texts[i:i+batch_size]
        enc = tokenizer(
            chunk,
            return_tensors='pt',
            padding=True,
            truncation=True,
            max_length=max_length
        ).to(DEVICE)

        logits = model(**enc).logits
        probs = torch.softmax(logits, dim=1).cpu().numpy()  # (B,3)

        # Map argmax to label via config
        argmax = probs.argmax(axis=1)
        lbls = [config.id2label[int(ix)] for ix in argmax]

        negs.extend(probs[:, 0].tolist())
        neuts.extend(probs[:, 1].tolist())
        poss.extend(probs[:, 2].tolist())
        labels.extend(lbls)

    return pd.DataFrame({
        "negative_s": negs,
        "neutral_s": neuts,
        "positive_s": poss,
        "sentiment_label": labels
    })

def sentiment_score(text: str):
    """Use CardiffNLP finetuned RoBERTa for sentiment analysis"""
    MODEL = f"cardiffnlp/twitter-roberta-base-sentiment-latest"
    tokenizer = AutoTokenizer.from_pretrained(MODEL)
    config = AutoConfig.from_pretrained(MODEL)
    # PyTorch
    model = AutoModelForSequenceClassification.from_pretrained(MODEL)
    
    encoded_input = tokenizer(text, return_tensors='pt', truncation=True, max_length=512)
    output = model(**encoded_input)
    scores = softmax(output[0][0].detach().numpy())

    return (scores[0], scores[1], scores[2], config.id2label[np.argmax(scores)])


# --- MAIN ---
def tag_and_score(date_str: str, write_gold: bool = True):
    # Paths
    posts_prefix    = f"silver/reddit/posts/dt={date_str}/"
    comments_prefix = f"silver/reddit/comments/dt={date_str}/"

    # --- POSTS ---
    post_keys = [k for k in list_keys(posts_prefix) if k.endswith(".parquet")]
    post_frames = []
    for key in post_keys:
        m = re.search(r"/subreddit=([^/]+)/", key)
        df = read_parquet_to_df(key)
        df["subreddit"] = m.group(1) if m else ""
        # Build a combined text field for matching (title: selftext)
        combined = (df["title"].fillna("") + ": " + df["selftext"].fillna(""))
        df["tags"] = combined.apply(find_tags_in_text)
        df[["negative_s", "neutral_s", "positive_s", "sentiment_label"]] = combined.apply(sentiment_score)
        df["scored_at"] = pd.Timestamp.now(tz="UTC")
        post_frames.append(df)
    posts_tagged = pd.concat(post_frames, ignore_index=True) if post_frames else pd.DataFrame()

    # --- COMMENTS ---
    comment_keys = [k for k in list_keys(comments_prefix) if k.endswith(".parquet")]
    comment_frames = []
    for key in comment_keys:
        m = re.search(r"/subreddit=([^/]+)/", key)
        df = read_parquet_to_df(key)
        df["subreddit"] = m.group(1) if m else ""
        df["tags"] = df["body"].fillna("").apply(find_tags_in_text)
        df[["negative_s", "neutral_s", "positive_s", "sentiment_label"]] = combined.apply(sentiment_score)
        df["scored_at"] = pd.Timestamp.now(tz="UTC")
        comment_frames.append(df)
    comments_tagged = pd.concat(comment_frames, ignore_index=True) if comment_frames else pd.DataFrame()

    # write to Gold (same partitioning: dt + subreddit)
    if write_gold and not posts_tagged.empty:
        for sr, df_sr in posts_tagged.groupby("subreddit"):
            out_key = f"gold/reddit/post_enriched/dt={date_str}/subreddit={sr}/part-0.parquet"
            write_parquet(df_sr.drop(columns=["subreddit"]), out_key)

    if write_gold and not comments_tagged.empty:
        for sr, df_sr in comments_tagged.groupby("subreddit"):
            out_key = f"gold/reddit/comment_enriched/dt={date_str}/subreddit={sr}/part-0.parquet"
            write_parquet(df_sr.drop(columns=["subreddit"]), out_key)

    return posts_tagged, comments_tagged

In [11]:
date_str = "2025-08-29"
prefix=f"gold/reddit/post_enriched/dt={date_str}/"
for key in list_keys(prefix):
    if not key.endswith(".parquet"): 
        continue
    df = read_parquet_to_df(key)
    if df.empty:
        continue
df

Unnamed: 0,post_name,subreddit_id,author_fullname,title,selftext,score,upvote_ratio,num_comments,url,created_utc,...,subreddit_type,subreddit_subscribers,author,author_premium,tags,negative_s,neutral_s,positive_s,sentiment_label,scored_at
0,t3_1n2n8if,t5_7y604d,t2_u49f2ucdn,"CA125 results -4000 units/ ml, malignant pleur...","Step mom, age 74 was just diagnosed stage 4a ...",3,1.00,10,https://www.reddit.com/r/ovariancancer_new/com...,1.756413e+09,...,public,936,mkellg1,False,[],0.466427,0.508973,0.024600,neutral,2025-08-30 23:39:05.463633+00:00
1,t3_1n1lwbi,t5_7y604d,t2_1dou5hhnjn,mom’s CA125 going up,Hi all. \n\nHaving one of those days where I a...,5,0.86,5,https://www.reddit.com/r/ovariancancer_new/com...,1.756312e+09,...,public,936,hamsanitizers,False,[],0.517748,0.435007,0.047245,negative,2025-08-30 23:39:05.463633+00:00
2,t3_1n0nwde,t5_7y604d,t2_9cmkaqmuk,My wife has just had a CAS125 test result of 51.,,3,1.00,5,/r/Ovariancancer/comments/1n0nvl6/my_wife_has_...,1.756220e+09,...,public,936,mavericktrader666,False,[],0.109737,0.823145,0.067119,neutral,2025-08-30 23:39:05.463633+00:00
3,t3_1mxtoq9,t5_7y604d,t2_zlt22xu01,7 weeks after diagnosis and still no treatment 😡,7 weeks ago my wife went to the ER for a ragin...,9,0.92,9,https://www.reddit.com/r/ovariancancer_new/com...,1.755929e+09,...,public,936,Big_Use_440,False,"[follow-up, chemo, surgery, diagnosis]",0.877598,0.113512,0.008890,negative,2025-08-30 23:39:05.463633+00:00
4,t3_1mwgu4c,t5_7y604d,t2_13dslcbt40,Pleural Effusion as recurrence,Had my 6 month scan. I have minor pleural eff...,3,1.00,7,https://www.reddit.com/r/ovariancancer_new/com...,1.755796e+09,...,public,936,Impressive-Hunt-1512,False,[],0.232349,0.749565,0.018086,neutral,2025-08-30 23:39:05.463633+00:00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
145,t3_1ftt272,t5_7y604d,t2_lyv8m,Starting Doxil on Friday,So my CA125 has been increasing in spite of ac...,3,0.81,2,https://www.reddit.com/r/ovariancancer_new/com...,1.727802e+09,...,public,936,pyrettapenguin,False,"[chemo, fatigue, nausea]",0.086129,0.760071,0.153800,neutral,2025-08-30 23:39:05.463633+00:00
146,t3_1fs3kqv,t5_7y604d,t2_nqdj6v3s,Waiting for pathology report,Just called department of pathology on Friday....,4,0.84,6,https://www.reddit.com/r/ovariancancer_new/com...,1.727614e+09,...,public,936,Larouquine9,False,"[chemo, ovarian, diagnosis]",0.588838,0.383206,0.027956,negative,2025-08-30 23:39:05.463633+00:00
147,t3_1fq3jje,t5_7y604d,t2_lccbf7cxt,Need advice and experience,"Hello All,\n\nMy mom and I went for her 3 mont...",7,1.00,6,https://www.reddit.com/r/ovariancancer_new/com...,1.727376e+09,...,public,936,AcanthisittaBusy6166,False,"[chemo, ned]",0.286015,0.671264,0.042721,neutral,2025-08-30 23:39:05.463633+00:00
148,t3_1fpzfdm,t5_7y604d,t2_lywd76mui,Need Advice,"Hi,\nFor context, I am 22 and about 11 weeks o...",3,1.00,2,https://i.redd.it/ex7itk53d6rd1.jpeg,1.727366e+09,...,public,936,Ok_Art9384,False,"[surgery, ultrasound, diarrhea]",0.364582,0.608611,0.026807,neutral,2025-08-30 23:39:05.463633+00:00


In [2]:
date_str = '2025-08-29'

posts_prefix    = f"silver/reddit/posts/dt={date_str}/"
comments_prefix = f"silver/reddit/comments/dt={date_str}/"

# --- POSTS ---
post_keys = [k for k in list_keys(posts_prefix) if k.endswith(".parquet")]
post_frames = []
for key in post_keys:
    df = read_parquet_to_df(key)
    # Build a combined text field for matching (title:  selftext)
    combined = (df["title"].fillna("") + ": " + df["selftext"].fillna(""))
    # df["tags"] = combined.apply(find_tags_in_text)
    # df['languange'] = combined.apply(detect)
    display(df)
    break

Unnamed: 0,post_name,subreddit_id,author_fullname,title,selftext,score,upvote_ratio,num_comments,url,created_utc,created_ts,run_id,dt,subreddit_name_prefixed,subreddit_type,subreddit_subscribers,author,author_premium
0,t3_1n3dnxp,t5_3eibw,t2_129nhur86k,Breast cancer before kids,"Hi, fellow brca carriers!\n\nHow do you go abo...",1,0.67,2,https://www.reddit.com/r/BRCA/comments/1n3dnxp...,1.756488e+09,2025-08-29 17:27:52+00:00,20250829T223859Z,2025-08-29,r/BRCA,public,5329,Ordinary-Sundae-5632,False
1,t3_1n2zn5r,t5_3eibw,t2_jst6hsbj,Prenventative sugery BRCA2,Hey! \n\nI am 22f and BRCA2 positive. I haven’...,1,1.00,2,https://www.reddit.com/r/BRCA/comments/1n2zn5r...,1.756448e+09,2025-08-29 06:09:09+00:00,20250829T223859Z,2025-08-29,r/BRCA,public,5329,DullBrick8271,False
2,t3_1n2xq0q,t5_3eibw,t2_3i4x2idh,BRCA1 positive + pregnant – feeling overwhelmed,"Hi everyone,\n\nI found out today that I’m BRC...",13,1.00,9,https://www.reddit.com/r/BRCA/comments/1n2xq0q...,1.756441e+09,2025-08-29 04:16:22+00:00,20250829T223859Z,2025-08-29,r/BRCA,public,5329,Aggressive_Natural79,False
3,t3_1n25gyp,t5_3eibw,t2_pybsac280,Salpingoophorectomy on Monday,I have a salpingoophorectomy scheduled for 7.0...,2,1.00,15,https://www.reddit.com/r/BRCA/comments/1n25gyp...,1.756365e+09,2025-08-28 07:08:42+00:00,20250829T223859Z,2025-08-29,r/BRCA,public,5329,HotWillingness5464,False
4,t3_1n1urh0,t5_3eibw,t2_8ualynlw,Recovery Question,Hiii -- I have recovery questions. \n\nMy jour...,4,1.00,8,https://www.reddit.com/r/BRCA/comments/1n1urh0...,1.756333e+09,2025-08-27 22:09:18+00:00,20250829T223859Z,2025-08-29,r/BRCA,public,5329,Conscious-Manner-534,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
145,t3_1m5r3kc,t5_3eibw,t2_jw4krb5k,How soon did you start feeling “zingers”?,How soon after your PDMX did you start to feel...,3,1.00,15,https://www.reddit.com/r/BRCA/comments/1m5r3kc...,1.753123e+09,2025-07-21 18:38:25+00:00,20250829T223859Z,2025-08-29,r/BRCA,public,5329,DisneyQueen64,True
146,t3_1m5kohy,t5_3eibw,t2_az76hxde,Reduction and Lift Before DMX- insurance cover...,"Hi all, BRCA 2+ (31 F) I am hoping to have my ...",5,1.00,7,https://www.reddit.com/r/BRCA/comments/1m5kohy...,1.753109e+09,2025-07-21 14:39:33+00:00,20250829T223859Z,2025-08-29,r/BRCA,public,5329,Extension-Degree7288,False
147,t3_1m52pnu,t5_3eibw,t2_661r33yx,"Nipple Areola $1,500 Tattoo Grant - Long Islan...","If anyone is in the Long Island, NY USA area (...",11,1.00,0,https://breastreconstruction.org/nipple-areola...,1.753052e+09,2025-07-20 22:57:50+00:00,20250829T223859Z,2025-08-29,r/BRCA,public,5329,disc0pants,False
148,t3_1m52j6e,t5_3eibw,t2_5uspq80j,Expanders vs. Implants,I'm almost 5 weeks post op with expanders. I'm...,1,1.00,7,https://www.reddit.com/r/BRCA/comments/1m52j6e...,1.753052e+09,2025-07-20 22:49:35+00:00,20250829T223859Z,2025-08-29,r/BRCA,public,5329,Pet-sit,False


In [17]:
df.columns

Index(['post_name', 'subreddit_id', 'author_fullname', 'title', 'selftext',
       'score', 'upvote_ratio', 'num_comments', 'url', 'created_utc',
       'created_ts', 'run_id', 'dt', 'subreddit_name_prefixed',
       'subreddit_type', 'subreddit_subscribers', 'author', 'author_premium'],
      dtype='object')

In [23]:
date_str = '2025-08-29'

posts_prefix    = f"gold/reddit/post_enriched/dt={date_str}/"
comments_prefix = f"gold/reddit/comment_enriched/dt={date_str}/"

# --- POSTS ---
post_keys = [k for k in list_keys(comments_prefix) if k.endswith(".parquet")]
post_frames = []
for key in post_keys:
    df = read_parquet_to_df(key)
    break

df.columns

Index(['comment_name', 'post_name', 'parent_comment_name', 'author_fullname',
       'body', 'score', 'created_utc', 'created_ts', 'run_id', 'dt', 'author',
       'author_premium', 'tags', 'negative_s', 'neutral_s', 'positive_s',
       'sentiment_label', 'scored_at'],
      dtype='object')

In [20]:
df.scored_at

0     2025-08-30 23:27:34.268930+00:00
1     2025-08-30 23:27:34.268930+00:00
2     2025-08-30 23:27:34.268930+00:00
3     2025-08-30 23:27:34.268930+00:00
4     2025-08-30 23:27:34.268930+00:00
                    ...               
145   2025-08-30 23:27:34.268930+00:00
146   2025-08-30 23:27:34.268930+00:00
147   2025-08-30 23:27:34.268930+00:00
148   2025-08-30 23:27:34.268930+00:00
149   2025-08-30 23:27:34.268930+00:00
Name: scored_at, Length: 150, dtype: datetime64[us, UTC]

In [3]:
combined

0      Breast cancer before kids: Hi, fellow brca car...
1      Prenventative sugery BRCA2: Hey! \n\nI am 22f ...
2      BRCA1 positive + pregnant – feeling overwhelme...
3      Salpingoophorectomy on Monday: I have a salpin...
4      Recovery Question: Hiii -- I have recovery que...
                             ...                        
145    How soon did you start feeling “zingers”?: How...
146    Reduction and Lift Before DMX- insurance cover...
147    Nipple Areola $1,500 Tattoo Grant - Long Islan...
148    Expanders vs. Implants: I'm almost 5 weeks pos...
149    No bra post mastectomy?: I finally had my bila...
Length: 150, dtype: object

In [11]:
from transformers import AutoModelForSequenceClassification
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer, AutoConfig
import numpy as np
from scipy.special import softmax


MODEL = f"cardiffnlp/twitter-roberta-base-sentiment-latest"
tokenizer = AutoTokenizer.from_pretrained(MODEL)
config = AutoConfig.from_pretrained(MODEL)

# PT
model = AutoModelForSequenceClassification.from_pretrained(MODEL)

text = combined.tolist()[:2]

Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [12]:
encoded_input = tokenizer(text, return_tensors='pt', padding=True, truncation=True, max_length=512)
# output = model(**encoded_input)
# scores = output[0][0].detach().numpy()
# scores = softmax(scores)
# ranking = np.argsort(scores)
# ranking = ranking[::-1]
# for i in range(scores.shape[0]):
#     l = config.id2label[ranking[i]]
#     s = scores[ranking[i]]
#     print(f"{i+1}) {l} {np.round(float(s), 4)}")

In [14]:
logits = model(**encoded_input).logits
probs = torch.softmax(logits, dim=1)
probs

tensor([[0.1419, 0.6938, 0.1643],
        [0.0539, 0.7537, 0.1923]], grad_fn=<SoftmaxBackward0>)

In [16]:
softmax(model(**encoded_input)[0][1].detach().numpy())

array([0.05393207, 0.75372005, 0.19234782], dtype=float32)

In [17]:
model(**encoded_input)[0][0].detach()

tensor([ 1.1837,  0.0319, -1.5346])

["Breast cancer before kids: Hi, fellow brca carriers!\n\nHow do you go about handling societal expectations to have kids if you've had cancer before kids? \n\nI got married a year and a half ago. 2 months after our wedding, my MRI showed breast cancer. I had a bilateral mastectomy and now I'm on tamoxifen for the next 10 years. We're allowed to pause the tamoxifen after 2 years to try for pregnancy. We have one healthy embryo that doesn't have BRCA. Last night we told my husband's parents about the embryo and they were overjoyed! We swore them to secrecy because my parents and most of my support system are Catholic to the max and just wouldn't support this decision. It felt so good to tell my in-laws! In this Catholic community, there's a lot of pressure to procreate. I have so many friends that are pregnant right now. Even more that have little kids. They want to get together with me but I don't have children, and when we get together as a group they all just talk about their experie