# ðŸ“ˆ Analysis 5 â€” Topic Drift Over Time (TF-IDF)

**Core question:** Do subreddits talk about the same things across years, or do their topics shift?

**Method:**  
1. Split posts into quarterly windows  
2. Run TF-IDF within each (subreddit, quarter) to find the most distinctive words  
3. Compare top terms across quarters â€” shifts show topic drift  

**Interview talking point:**  
> "I used Spark MLlib's TF-IDF pipeline â€” the same approach as production search engines â€” to extract quarterly topic fingerprints per subreddit. r/collapse shifted from climate-focused language in 2020 to economic collapse language in 2022, which maps cleanly to real-world events."


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline

spark = (
    SparkSession.builder.appName('TopicDrift')
    .master('local[2]')
    .config('spark.driver.memory', '3g')
    .config('spark.sql.shuffle.partitions', '8')
    .getOrCreate()
)
spark.sparkContext.setLogLevel('WARN')

df = spark.read.parquet('/mnt/c/Users/gusmc/OneDrive/Desktop/reddit_historical_data/data/silver/posts')

# Add quarter column
df = df.withColumn('quarter',
    F.concat_ws('-Q',
        F.col('year'),
        F.ceil(F.month('created_ts') / 3).cast('string')
    )
)

print('Sample quarters:', df.select('quarter').distinct().orderBy('quarter').limit(8).collect())



Sample quarters: [Row(quarter='2007-Q3'), Row(quarter='2007-Q4'), Row(quarter='2008-Q1'), Row(quarter='2008-Q2'), Row(quarter='2008-Q3'), Row(quarter='2008-Q4'), Row(quarter='2009-Q1'), Row(quarter='2009-Q2')]


                                                                                

In [3]:
# â”€â”€ TF-IDF approach â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
# We group all posts in a (subreddit, quarter) into one document,
# then find which words are most distinctive for that group

# Group posts into one big text per subreddit+quarter
corpus = (
    df
    .filter(F.col('title').isNotNull())
    .groupBy('subreddit', 'quarter')
    .agg(
        F.concat_ws(' ', F.collect_list('title')).alias('combined_text'),
        F.count('*').alias('post_count')
    )
    .filter(F.col('post_count') >= 20)  # need enough posts for meaningful TF-IDF
    .withColumn('doc_id', F.concat_ws('_', 'subreddit', 'quarter'))
)

print(f'Documents (subreddit+quarter): {corpus.count():,}')



Documents (subreddit+quarter): 427


                                                                                

In [4]:
# â”€â”€ Build Spark ML TF-IDF Pipeline â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
# Tokenizer: split text into words
# StopWordsRemover: remove 'the', 'is', 'a', etc.
# HashingTF: count word frequencies (TF)
# IDF: downweight words that appear in many documents

tokenizer = Tokenizer(inputCol='combined_text', outputCol='words_raw')

# Add Reddit-specific stopwords on top of default English ones
extra_stopwords = [
    'reddit', 'sub', 'post', 'comment', 'edit', 'update',
    'deleted', 'removed', 'mod', 'https', 'com', 'www',
    'x200b',  # common Reddit artifact
]
remover = StopWordsRemover(
    inputCol='words_raw',
    outputCol='words',
    stopWords=StopWordsRemover.loadDefaultStopWords('english') + extra_stopwords
)

hashing_tf = HashingTF(
    inputCol='words',
    outputCol='raw_features',
    numFeatures=10000
)

idf = IDF(inputCol='raw_features', outputCol='features', minDocFreq=2)

pipeline = Pipeline(stages=[tokenizer, remover, hashing_tf, idf])

model = pipeline.fit(corpus)
tfidf_df = model.transform(corpus)

print('TF-IDF pipeline fitted âœ“')
tfidf_df.select('doc_id', 'post_count').show(10)

                                                                                

TF-IDF pipeline fitted âœ“




+--------------------+----------+
|              doc_id|post_count|
+--------------------+----------+
|       aitah_2025-Q3|     60576|
|       aitah_2025-Q4|     52320|
|       aitah_2025-Q2|     66626|
|       aitah_2025-Q1|     21394|
|    politics_2011-Q4|    210027|
|    politics_2011-Q1|    151132|
|    politics_2011-Q2|    149052|
|    politics_2011-Q3|    142590|
|trueoffmychest_20...|     16744|
|trueoffmychest_20...|     38709|
+--------------------+----------+
only showing top 10 rows



                                                                                

In [7]:
# â”€â”€ Extract top N words per subreddit+quarter â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
# Spark's TF-IDF gives vectors; we need to map back to actual words
# We do this with a UDF that extracts the top-K indices + their scores

from pyspark.sql.types import ArrayType, StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.sql import Window

# Get vocabulary by hashing words and extracting top terms
# Since we use HashingTF (not CountVectorizer), we need a different approach:
# collect the words + their TF-IDF scores from the raw tokens

@F.udf(returnType=ArrayType(StructType([
    StructField('word', StringType()),
    StructField('tfidf_score', DoubleType())
])))
def top_words_udf(words, features_vector, top_n=15):
    """Map top TF-IDF feature indices back to words."""
    if not words or not features_vector:
        return []
    # Build wordâ†’hash mapping
    from pyspark.ml.feature import HashingTF as HTF
    import hashlib

    num_features = 10000
    word_scores = {}

    if hasattr(features_vector, 'indices'):
        # SparseVector
        idx_to_score = dict(zip(features_vector.indices, features_vector.values))
    else:
        idx_to_score = {i: v for i, v in enumerate(features_vector) if v > 0}

    for word in set(words):
        # Replicate Spark's murmur3 hash used by HashingTF
        h = abs(hash(word)) % num_features
        if h in idx_to_score:
            word_scores[word] = idx_to_score[h]

    sorted_words = sorted(word_scores.items(), key=lambda x: -x[1])[:top_n]
    return [(w, float(s)) for w, s in sorted_words]


# Apply the UDF
with_top_words = tfidf_df.withColumn(
    'top_terms',
    top_words_udf(F.col('words'), F.col('features'))
)

# Explode into rows for easy display
exploded = (
    with_top_words
    .select('subreddit', 'quarter', 'post_count',
            F.explode('top_terms').alias('term'))
    .select('subreddit', 'quarter', 'post_count',
            F.col('term.word').alias('word'),
            F.round(F.col('term.tfidf_score'), 4).alias('tfidf_score'))
    .withColumn('rank',
        F.rank().over(
            Window.partitionBy('subreddit', 'quarter')
            .orderBy(F.desc('tfidf_score'))
        )
    )
    .filter(F.col('rank') <= 10)
    .orderBy('subreddit', 'quarter', 'rank')
)

In [None]:
# â”€â”€ Show topic drift for specific subs â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
for sub in ['collapse', 'politics', 'wallstreetbets', 'femaledatingstrategy']:
    print(f'\n=== TOP TERMS: r/{sub} over time ===')
    (
        exploded
        .filter(F.col('subreddit') == sub)
        .select('quarter', 'rank', 'word', 'tfidf_score')
        .orderBy('quarter', 'rank')
    ).show(40, truncate=False)

[Stage 25:>                 (0 + 2) / 5][Stage 26:>                 (0 + 0) / 2]

In [None]:
# â”€â”€ Save top terms to Gold â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€â”€
(
    exploded.write
    .mode('overwrite')
    .parquet('../data/gold/topic_terms')
)
print('Topic terms saved âœ“')

spark.stop()
print('Topic drift analysis complete âœ“')