# Mastodon Sentiment

#### Install requirements

Uncomment to run.

In [1]:
# %pip install -r requirements.txt

#### Load access token

Access token is stored in .env file.  A new token is generated at https://mastodon.social/settings/applications.

In [2]:
from dotenv import load_dotenv
import os

load_dotenv()
ACCESS_TOKEN = os.environ["ACCESS_TOKEN"]

#### Get data

Run the data acquisition pipeline for Mastodon posts and persist a local CSV file. On subsequent runs, check whether the CSV file exists and has more than 500 lines, then load it to avoid refetching. Otherwise connect to mastodon.social with a valid token, read the local public timeline in chunks of 40, and iterate until 500 usable rows are collected. Enforce English only and skip sensitive posts. Strip HTML to keep an original_text copy, and produce a fully normalized content_text by replacing URLs with a [URL] marker, applying NFKC Unicode normalization, and collapsing whitespace. Reject very short posts under 20 characters. Capture id, created_at, account_acct, content_text, and original_text for each accepted post. Respect rate limits by waiting automatically and paginate with fetch_next. When done, write a headered CSV and report how many rows were loaded or saved.

In [3]:
from mastodon import Mastodon
from pathlib import Path

import csv, re, html, time, unicodedata

POSTS_CSV = "posts.csv"
API_URL = "https://mastodon.social"

rows = []

def have_data():
    p = Path(POSTS_CSV)

    if not p.exists() or p.stat().st_size == 0:
        return False

    with p.open("r", encoding="utf-8", newline="") as f:
        line_count = sum(1 for _ in f)

    print(f"Found {line_count - 1} data rows in {POSTS_CSV}")
    return max(0, line_count) > 500


def load_data():
    with open(POSTS_CSV, "r", encoding="utf-8", newline="") as f:
        r = csv.DictReader(f)
        for row in r:
            rows.append(row)

    print(f"Loaded {len(rows)} rows from {POSTS_CSV}")


def get_data():
    mastodon = Mastodon(
        access_token=ACCESS_TOKEN,
        api_base_url=API_URL,
        ratelimit_method="wait",
    )

    posts = mastodon.timeline_public(limit=40, local=True)

    while posts and len(rows) < 500:
        for post in posts:
            # only use English posts
            if not post.get("language") == "en":
                continue

            # skip sensitive posts
            if post.get("sensitive", False):
                continue

            content = post.get("content", "")

            # original text (only strip HTML tags)
            original_text = strip_html(content)

            # clean and normalize text
            content_text = clean_text(content)

            # skip short posts
            if len(content_text) < 20:
                continue

            rows.append(
                {
                    "id": post["id"],
                    "created_at": post["created_at"].isoformat(),
                    "account_acct": post["account"]["acct"],
                    "content_text": content_text,
                    "original_text": original_text,
                }
            )

            if len(rows) >= 500:
                break

        if len(rows) >= 500:
            break

        time.sleep(0.1)
        posts = mastodon.fetch_next(posts)


def save_data():
    fieldnames = list(rows[0].keys()) if rows else []

    with open(POSTS_CSV, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        w.writerows(rows)

    print(f"Wrote {len(rows)} rows to {POSTS_CSV}")


def strip_html(s):
    # strip HTML tags
    s = re.sub(r"<br\s*/?>", " ", s, flags=re.I)
    s = re.sub(r"</p\s*>", " ", s, flags=re.I)
    s = re.sub(r"<.*?>", "", s)
    s = html.unescape(s)

    return s

def clean_text(s):
    s = strip_html(s)

    # replace URLs with a marker to keep signal
    s = re.sub(
        r"(https?://\S+|www\.\S+|\b[\w-]+(?:\.[\w-]+)+(?:/\S*)?)",
        " [URL] ",
        s,
        flags=re.I,
    )

    # Unicode normalize
    s = unicodedata.normalize("NFKC", s)

    # collapse whitespace
    s = re.sub(r"\s+", " ", s).strip()

    # trim leading/trailing whitespace
    s = s.strip()

    return s


if have_data():
    load_data()
else:
    get_data()
    save_data()

print(f"Total rows: {len(rows)}")

Found 500 data rows in posts.csv
Loaded 500 rows from posts.csv
Total rows: 500


#### Perform sentence split and tokenization

Load a spaCy English pipeline with sentence segmentation enabled. Read each row's content_text and pair it with the full row so metadata is attached. Stream pairs through nlp.pipe to segment sentences and tokenize without removing punctuation or casing. For each document, build a record that preserves the original text, the sentence splits, the surface tokens, the lemmas with hashtags, mentions, and the URL marker kept as-is, and the coarse part-of-speech tags.

Compute simple sentiment features directly from the document. Count exclamation marks and question marks. Count URL markers, hashtags, and mentions. Count all caps tokens while ignoring the URL marker. Detect elongated tokens and a emoji presence using code point checks. Write one JSON object per line to preprocessed_posts.jsonl so downstream steps can score without reprocessing. Report processed record counts.

In [4]:
import spacy, json

from spacy.tokens import Doc

nlp = spacy.load("en_core_web_sm", disable=["ner", "textcat"])
nlp.enable_pipe("senter")

PREPROCESSED_FILE = "preprocessed_posts.jsonl"


def sentiment_features(doc):
    txt = doc.text
    return {
        "exclamations": txt.count("!"),
        "questions": txt.count("?"),
        "all_caps_tokens": sum(
            1 for t in doc if t.is_alpha and t.text.isupper() and len(t) > 1
        ),
        "elongated_tokens": sum(
            1 for t in doc if __import__("re").search(r"(.)\1{2,}", t.text)
        ),
        "emoji_tokens": sum(1 for t in doc if any(ord(ch) >= 0x1F300 for ch in t.text)),
        "features": {
            "exclamations": txt.count("!"),
            "questions": txt.count("?"),
            "url_count": sum(1 for t in doc if t.text == "URL"),
            "hashtag_count": sum(1 for t in doc if t.text.startswith("#")),
            "mention_count": sum(1 for t in doc if t.text.startswith("@")),
            "all_caps_tokens": sum(
                1 for t in doc
                if t.is_alpha and t.text.isupper() and len(t) > 1 and t.text != "URL"
            ),
            "elongated_tokens": sum(1 for t in doc if __import__("re").search(r"(.)\1{2,}", t.text)),
            "emoji_tokens": sum(1 for t in doc if any(ord(ch) >= 0x1F300 for ch in t.text)),
        },
    }


records = []

for r in rows:
    t = r.get("content_text")
    
    if isinstance(t, str) and t.strip():
        records.append((t, r))

with open(PREPROCESSED_FILE, "w", encoding="utf-8") as f:
    for doc, meta in nlp.pipe(records, as_tuples=True, batch_size=64):
        toks = [t for t in doc if not t.is_space]

        rec = {
            "id": meta.get("id"),
            "text": doc.text,
            "sentences": [[t.text for t in s] for s in doc.sents],
            "tokens": [t.text for t in toks],
            "lemmas": [
                t.text if t.text.startswith("#") or t.text.startswith("@") or t.text == "URL"
                else t.lemma_
                for t in toks
            ],
            "pos": [t.pos_ for t in toks],
            "features": sentiment_features(doc),
        }

        f.write(json.dumps(rec, ensure_ascii=False) + "\n")

print(f"{len(records)} records processed and saved to {PREPROCESSED_FILE}")


500 records processed and saved to preprocessed_posts.jsonl


#### Label posts sentiments and scores

Load a pretrained sentiment classifier and run it over the preprocessed posts. Read each JSONL record, classify the text with the Twitter RoBERTa pipeline using truncation to 256 tokens. Collect the lowercase label and confidence score along with the original id and text. Write the accumulated predictions to a CSV. Print the predictions count.

In [5]:
from transformers import pipeline
import json, csv

PREDICTIONS_FILE = "sentiment_predictions.csv"

clf = pipeline(
    "sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment-latest"
)

preds = []

with open(PREPROCESSED_FILE, "r", encoding="utf-8") as f:
    for line in f:
        rec = json.loads(line)
        out = clf(rec["text"], truncation=True, max_length=256)[0]

        preds.append(
            {
                "id": rec["id"],
                "label": out["label"].lower(),
                "score": float(out["score"]),
                "text": rec["text"],
            }
        )

with open(PREDICTIONS_FILE, "w", newline="", encoding="utf-8") as f:
    w = csv.DictWriter(f, fieldnames=["id", "label", "score", "text"])
    w.writeheader()
    w.writerows(preds)

print(f"\nWrote {len(preds)} predictions to {PREDICTIONS_FILE}")


Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Device set to use mps:0



Wrote 500 predictions to sentiment_predictions.csv
