# FINAL PROJECT
The goal of this project is to implement trending topic detection algorithms in X messages, as proposed in the article "Sensing Trending Topics in Twitter" by L. M. Aiello et al., IEEE Transactions on Multimedia, vol. 15, no. 6, pp. 1268-1282, Oct. 2013, doi: 10.1109/TMM.2013.2265080.

•	The suggested Databricks runtime version is the 12.2 LTS ML (Scala 2.12, Spark 3.3.2) instead the Standard version.

•	You have to follow the instructions provided in this webpage to install the SparkNLP library in Databricks, in order to be able to use it in that environment:

https://sparknlp.org/docs/en/install#databricks-support


## STUDENTS IDENTIFICATION
100548074 - Denilson Hernandez Diaz


### Common code
The first part of the notebook is a skeleton, with the code to fetch the data from the server and start to analize the data provided and pre-process parts of it to fit the algorithm

## To Set Up Environment on Terminal (before running on Jupyter Notebook): 
(a)
I'm running Java 11 (for better compatibility)

(b)
conda create -n sparknlp python=3.10 -y
conda activate sparknlp

(c)
pip install spark-nlp==6.3.0 pyspark==3.5.0

(d)
jupyter notebook 

and then run the notebook!

In [10]:
import sparknlp

# This automatically pulls spark-nlp-silicon-6.3.0.jar if pip version is 6.3.0
spark = sparknlp.start(apple_silicon=True)



25/12/31 15:25:21 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [25]:
#Don't show warnings
spark.sparkContext.setLogLevel("WARN")

In [14]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark NLP") \
    .master("local[*]") \
    .config("spark.driver.memory", "8G") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "2000M") \
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-silicon_2.12:6.3.0") \
    .getOrCreate()

In [47]:
from sparknlp.base import DocumentAssembler, Pipeline
from sparknlp.annotator import LanguageDetectorDL
import pyspark.sql.functions as F
from pyspark.sql.types import TimestampType, StringType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, NGram,  CountVectorizer, IDF
from datetime import datetime, timedelta
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StringType, BooleanType
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
import numpy as np
from pyspark.ml.linalg import SparseVector
import re
import nltk
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')          
nltk.download('omw-1.4')            
nltk.download('stopwords')          
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/denilson/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     /Users/denilson/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/denilson/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /Users/denilson/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

In [17]:
from pyspark.sql.functions import col, udf, explode, from_json
from pyspark.sql.types import *
import sparknlp
import json
from urllib import request

In [18]:
from sparknlp.annotator import LanguageDetectorDL
import pyspark.sql.functions as F

In [19]:
#Link used to get data
datafile="http://mcomputing.tsc.uc3m.es/get_tweets.php?start_year=2019&start_month=08&start_day=01&start_hour=02&start_minute=00&minutes_length=20"

In [20]:
with request.urlopen(datafile) as url:
    data=url.read().decode()
    with open("/tmp/temporal.json","w") as f:
        f.write(data)

In [26]:
tweetDF= spark.read.json("file:///tmp/temporal.json")

                                                                                

In [22]:
#time is UTC -6 (Florida/ Eastern time)
#display(tweetDF.select("created_at","id","user.screen_name","text").head(2))

In [27]:
tweetDF.count()

44412

In [28]:
filteredDF=tweetDF.select("created_at","user.id","user.name","user.screen_name","user.lang","text")

In [29]:
#display(filteredDF.head(2))

In [32]:
#convert doc version of our text
document_assembler = (
    DocumentAssembler()
    .setInputCol("text")
    .setOutputCol("document")
)

In [33]:
#detects language
languageDetector = (
    LanguageDetectorDL.pretrained()
    .setInputCols("document")
    .setOutputCol("language") 
)

ld_wiki_tatoeba_cnn_21 download started this may take some time.
Approximate size to download 7.1 MB
[ | ]

25/12/31 15:28:08 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
25/12/31 15:28:09 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.


ld_wiki_tatoeba_cnn_21 download started this may take some time.
Approximate size to download 7.1 MB
Download done! Loading the resource.
[ / ]

2025-12-31 15:28:12.744374: W external/org_tensorflow/tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz


[OK!]


In [34]:
nlpPipeline = Pipeline(stages=[document_assembler, languageDetector])

In [35]:
result = nlpPipeline.fit(filteredDF).transform(filteredDF)

In [36]:
def filterLang(data):
    result=False
    result = 'en' in data
    return result

In [37]:
udf_filterLang = udf(filterLang,BooleanType())

In [38]:
dataFilteredDF=result.filter(udf_filterLang("language.result")).select("created_at","text","language.result")

In [39]:
display(dataFilteredDF.head(2))

                                                                                

[Row(created_at='Thu Aug 01 08:00:00 +0000 2019', text='kamala harris fucked up bgt parah', result=['en']),
 Row(created_at='Thu Aug 01 08:00:00 +0000 2019', text='RT @jnkchanels: this is so sad https://t.co/oQr6apodZ1', result=['en'])]


### MY WORK STARTS HERE

In this portion, we clean up the twitter data further 


In [40]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [41]:
#Here I change the format of the timestamp to something easier to work with. 
df_parsed = dataFilteredDF.withColumn("parsed_timestamp", F.to_timestamp(F.col("created_at"), "EEE MMM dd HH:mm:ss ZZZZZ yyyy"))

In [42]:
#Here we remove usernames, webesite links, punctuation, and tokenize tweets. 
#We also lemmatize the data and delete som common words that have no meaning for topics. 
lemmatizer = WordNetLemmatizer()

def tokenize(text):
    return 

def lemmatization(tokens):
    if tokens is None:
        return None
    return [lemmatizer.lemmatize(t) for t in tokens]

def clean_and_tokenize(text):
    no_ids = re.sub(r'@\w+', '', text)
    no_links = re.sub(r'https?://\S+', '', no_ids)
    no_punc = re.sub(r"[^\w\s]", " ", no_links)
    lower_text = no_punc.lower()
    tokens = lower_text.split()
    return tokens
  
delete_rt = set(["rt", "re", "m", "reply"])
def remove_rt(tokens):
    return [t for t in tokens if t not in delete_rt]


In [43]:
df_rdd = df_parsed.select("parsed_timestamp", "text").rdd.map(lambda row: (row[0], row[1] if row[1] else ""))

In [44]:
rdd_tokens_no_stop = df_rdd.map(
    lambda x: (
        x[0],
        lemmatization(remove_rt(clean_and_tokenize(x[1])))
    )
)

In [48]:
df_clean_tokenized = rdd_tokens_no_stop.toDF(["created_at", "words"])
df_clean_tokenized.cache()
display(df_clean_tokenized.head(5))

                                                                                

[Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), words=['kamala', 'harris', 'fucked', 'up', 'bgt', 'parah']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), words=['this', 'is', 'so', 'sad']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), words=['ako', 'sana', 'kaya', 'lang', 'walang', 'pera', 'wait', 'hanap', 'buhay', 'muna', 'ako']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), words=['amen']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), words=['nerf', 'everywhere'])]

In [49]:
#We define a helper function to extract data or tweets only writtten in English or using the Latin-script alphabet.
def all_english(tokens):
    for token in tokens:
        if not re.match(r'^[a-zA-Z]+$', token):
            return False
    return True
  
all_english_udf = udf(all_english, BooleanType())

df_clean_tokenized = df_clean_tokenized.filter(all_english_udf(df_clean_tokenized["words"]))
df_clean_tokenized.cache()
display(df_clean_tokenized.head(5))

[Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), words=['kamala', 'harris', 'fucked', 'up', 'bgt', 'parah']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), words=['this', 'is', 'so', 'sad']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), words=['ako', 'sana', 'kaya', 'lang', 'walang', 'pera', 'wait', 'hanap', 'buhay', 'muna', 'ako']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), words=['amen']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), words=['nerf', 'everywhere'])]

In [50]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
df_filtered = remover.transform(df_clean_tokenized)["created_at", "filtered_words"]


In [51]:
display(df_filtered.head(5))

[Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['kamala', 'harris', 'fucked', 'bgt', 'parah']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['sad']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['ako', 'sana', 'kaya', 'lang', 'walang', 'pera', 'wait', 'hanap', 'buhay', 'muna', 'ako']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['amen']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['nerf', 'everywhere'])]

In [52]:
#Compute tf_idf score
cv = CountVectorizer(
    inputCol="filtered_words", 
    outputCol="raw_features",
    vocabSize=2**16, 
    minDF=2          
)
cv_model = cv.fit(df_filtered)
df_featurized = cv_model.transform(df_filtered)
df_featurized.cache()

                                                                                

DataFrame[created_at: timestamp, filtered_words: array<string>, raw_features: vector]

In [53]:
display(df_featurized.head(5))

[Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['kamala', 'harris', 'fucked', 'bgt', 'parah'], raw_features=SparseVector(5323, {245: 1.0, 327: 1.0, 849: 1.0, 5018: 1.0})),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['sad'], raw_features=SparseVector(5323, {202: 1.0})),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['ako', 'sana', 'kaya', 'lang', 'walang', 'pera', 'wait', 'hanap', 'buhay', 'muna', 'ako'], raw_features=SparseVector(5323, {129: 1.0, 395: 1.0, 706: 2.0, 1024: 1.0, 1247: 1.0, 2593: 1.0, 3006: 1.0, 4368: 1.0, 4494: 1.0, 4727: 1.0})),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['amen'], raw_features=SparseVector(5323, {772: 1.0})),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['nerf', 'everywhere'], raw_features=SparseVector(5323, {1139: 1.0}))]

In [54]:
idf = IDF(inputCol="raw_features", outputCol="features")
idfModel = idf.fit(df_featurized)
df_tfidf = idfModel.transform(df_featurized)
df_tfidf.cache()

                                                                                

DataFrame[created_at: timestamp, filtered_words: array<string>, raw_features: vector, features: vector]

In [55]:
display(df_tfidf.head(5))

[Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['kamala', 'harris', 'fucked', 'bgt', 'parah'], raw_features=SparseVector(5323, {245: 1.0, 327: 1.0, 849: 1.0, 5018: 1.0}), features=SparseVector(5323, {245: 5.5148, 327: 5.731, 849: 6.5333, 5018: 7.9997})),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['sad'], raw_features=SparseVector(5323, {202: 1.0}), features=SparseVector(5323, {202: 5.4607})),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['ako', 'sana', 'kaya', 'lang', 'walang', 'pera', 'wait', 'hanap', 'buhay', 'muna', 'ako'], raw_features=SparseVector(5323, {129: 1.0, 395: 1.0, 706: 2.0, 1024: 1.0, 1247: 1.0, 2593: 1.0, 3006: 1.0, 4368: 1.0, 4494: 1.0, 4727: 1.0}), features=SparseVector(5323, {129: 5.1093, 395: 5.8402, 706: 13.0667, 1024: 6.7004, 1247: 7.0188, 2593: 7.4889, 3006: 7.712, 4368: 7.9997, 4494: 7.9997, 4727: 7.9997})),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['amen'], raw_feat

In [56]:
idf_values = idfModel.idf.toArray() 
print(len(idf_values))

5323


In [57]:
#Vocabulary extracted from the data
vocabulary = cv_model.vocabulary
print("Vocabulary size:", len(vocabulary))

Vocabulary size: 5323


In [58]:
#Here we print the top 10 words based on their global tf-idf score. 
#A lower scores indicate a higher frequency of a word appearing. 
#This can be used for camparison to compare the top words and topics. 

word_idf_pairs = list(zip(vocabulary, idf_values))
sorted_word_idf = sorted(word_idf_pairs, key=lambda x: x[1])
sorted_word_idf[:10]

[('like', np.float64(3.087023693763398)),
 ('one', np.float64(3.3052772597834164)),
 ('wa', np.float64(3.3945083935113587)),
 ('u', np.float64(3.42840994518704)),
 ('love', np.float64(3.503579488565721)),
 ('get', np.float64(3.5110422097673104)),
 ('people', np.float64(3.597032657622833)),
 ('good', np.float64(3.6052294248270114)),
 ('know', np.float64(3.668945239213119)),
 ('time', np.float64(3.7183935146271003))]

In [59]:
#This is the resulting data frame that will be used to pass into our algorithm for topic detection 
df_algorithm = df_tfidf["created_at", "filtered_words"]
df_algorithm.cache()
display(df_algorithm.head(5))

[Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['kamala', 'harris', 'fucked', 'bgt', 'parah']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['sad']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['ako', 'sana', 'kaya', 'lang', 'walang', 'pera', 'wait', 'hanap', 'buhay', 'muna', 'ako']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['amen']),
 Row(created_at=datetime.datetime(2019, 8, 1, 1, 0), filtered_words=['nerf', 'everywhere'])]

In [60]:
print(df_algorithm.dtypes)

[('created_at', 'timestamp'), ('filtered_words', 'array<string>')]


### BN Grams Algorithm 

In [61]:
### HELPER FUNCTIONS
def overlap(ng1, ng2):   
      s1 = set(ng1.split())
      s2 = set(ng2.split())
      return len(s1.intersection(s2)) > 0

In [62]:
#Here we define our report function
def detect_bn_gram_topics(df, start_time, end_time, top_k=100, num_topics=10, ngram_size=2):  
    """
    Detect BN-gram topics (n-grams) in tweets from df that fall into [start_time, end_time).

    df: Spark DataFrame. Must have columns:
            - created_at TimestampType  
            - filtered_words already tokenized & cleaned
    start_time, end_time: The time window for filtering.
    top_k: Number of top n-grams to consider by frequency.
    num_topics: How many final topics to return after ranking.
    ngram_size: size of n-grams (2 for bigrams, 3 for trigrams, etc.)
    return: A list of (topic_ngrams, topic_score).
    """

    #Here we filter the data frame a time window 
    df_window = df.filter(
        (F.col("created_at") >= F.lit(start_time)) &
        (F.col("created_at") < F.lit(end_time))
    )
    #Return and empty list if the time window contains nothing 
    if df_window.count() == 0:
        return []
    
    #Generate NGrams    
    ngram = NGram(n=ngram_size, inputCol="filtered_words", outputCol="ngrams")
    df_ngrams = ngram.transform(df_window)

    #Explode and Count N-Grams
    df_ngrams_exploded = df_ngrams.select(F.explode("ngrams").alias("ngram"))
    df_ngrams_freq = df_ngrams_exploded.groupBy("ngram").count().withColumnRenamed("count", "current_count")
    total_current = df_ngrams_exploded.count()

    #Select the top K candidates 
    df_ratio = df_ngrams_freq

    df_candidates = df_ratio.orderBy(F.desc("current_count")).limit(top_k).dropDuplicates(["ngram"])
    ratio_map = {row.ngram: row["current_count"] for row in df_candidates.collect()}  # re-use as "ratio"
    freq_map  = ratio_map

    candidate_ngrams = list(ratio_map.keys())

    #Build topics by overlap. Group similar n-grams together to form [topics].
    topics = []
    for ng in candidate_ngrams:
        placed = False
        for t in topics:
            #if overlap with any ngram in topic t, add it
            if any(overlap(ng, existing_ng) for existing_ng in t):
                t.append(ng)
                placed = True
                break
        if not placed:
            topics.append([ng])

    #Score each topic and sort. 
    #Sum up all ng scores (ratio_map[ng] = current_count).
    #Divide by len(t) to get an average frequency (or “score”) of that topic.
    scored_topics = []
    for t in topics:
        if t:
            #Compute ratio
            s = sum(ratio_map[ng] for ng in t) / len(t)
        else:
            s = 0
        scored_topics.append((t, s))

    scored_topics.sort(key=lambda x: x[1], reverse=True)

    # Return top N topics 
    final_topics = scored_topics[:num_topics]

    return final_topics

In [63]:
#Function that cycles over different time windows to extract trending topics

# Here we define the starting time window, the desired window time length to process, and define the end time of the overall tweet data.
'''
start_time_dt: Year, month, day, hour, minute, second. This sets the starting time. 
               This must be the starting time of all the data, taking into account a specific date.
window_length: Length of time to analyze per loop.
end_time_dt: End time of tweet data. 
'''
start_time_dt = datetime(2019, 8, 1, 1, 0, 0)
window_length = 5 
end_time_dt   = start_time_dt + timedelta(minutes= 20) 

current_window_start = start_time_dt

while current_window_start < end_time_dt:
    current_window_end = current_window_start + timedelta(minutes=window_length)
    if current_window_end > end_time_dt:
        current_window_end = end_time_dt
    
    #Convert timesatmps to string to print out the report 
    window_start_str = current_window_start.strftime("%Y-%m-%d %H:%M:%S")
    window_end_str   = current_window_end.strftime("%Y-%m-%d %H:%M:%S")

    print(f"\n#### Processing window: {window_start_str} - {window_end_str} ####")
    #Call the topic detection function
    topics = detect_bn_gram_topics(
        df_algorithm,
        start_time= window_start_str,
        end_time= window_end_str,
        top_k=150,
        num_topics=10,
        ngram_size=3
    )
    print("Number of topics returned:", len(topics))

    for i, (t_bigrams, score) in enumerate(topics):
        print(f"  Topic {i+1}: {t_bigrams[:3]}, Score={score:.3f}")

    # Advance the window
    current_window_start = current_window_end


#### Processing window: 2019-08-01 01:00:00 - 2019-08-01 01:05:00 ####
Number of topics returned: 10
  Topic 1: ['biggest fan week'], Score=10.000
  Topic 2: ['malay kan english'], Score=6.000
  Topic 3: ['suspended suspended suspended'], Score=5.000
  Topic 4: ['stay cool everyone', 'cool everyone icecream', 'whatsapp hookup cool'], Score=4.400
  Topic 5: ['recognition camera cyber', 'facial recognition camera', 'camera cyber war'], Score=4.000
  Topic 6: ['protestors another level', 'hong kong protestors', 'kong protestors another'], Score=3.889
  Topic 7: ['weversetrans v op', 'v became small'], Score=3.500
  Topic 8: ['kaorhys angaugustoko kaorhys', 'angaugustoko kaorhys angaugustoko'], Score=3.500
  Topic 9: ['sco pa tu', 'pa tu manaa', 'tu manaa hell'], Score=3.455
  Topic 10: ['happy new month', 'happy national girlfriend'], Score=3.000

#### Processing window: 2019-08-01 01:05:00 - 2019-08-01 01:10:00 ####
Number of topics returned: 10
  Topic 1: ['stay cool everyone', 'cool e


### CONCLUSIONS


In this project, I implemented the BN-Grams algorithm described in the article “Sensing Trending Topics in Twitter” by L. M. Aiello.

### IMPLEMENTATION
My first steps centered on cleaning the tweet data. In class, I filtered the tweets so only English ones remained for topic detection. Although this tool was effective, some non-English tweets still got through.

Next, I cleaned the data further by removing any tweets that contained characters outside of the English alphabet. I performed this after removing usernames, links, punctuation, tokenizing, and using a WordNetLemmatizer to lemmatize the English words. This helped identify tweets in other scripts, but some using Latin letters (like Malay) still slipped in.

Following Aiello’s suggestions, I encountered a few issues with the lemmatizer because some words became ambiguous, like “wa,” and others got shortened in hard-to-read ways. I also used a stop word remover to further prepare the data.

After cleaning, I computed the global tf-idf score for each word in the tweets’ vocabulary. These scores could help compare the results of trending topics and their frequency. The DataFrame passed to the function is called df_algorithm.

I chose to implement BN-Grams for topic extraction. My DataFrame contained the columns “created_at” (the tweet’s timestamp in a more workable format) and “filtered_words” (the tokenized tweets).

In my report function, I focused on trending topics over a specific time interval. I filtered tweets to a window [start_time, end_time), ensuring only recent tweets were considered. I used Spark’s NGram transformer to produce bigrams or trigrams. By setting ngram_size=3, each tweet was converted to consecutive three-word phrases. This can be changed by the user to find different n-gram lengths. I used three to preserve short but meaningful units that might point to a possible topic. Single-word n-grams may not capture meaningful phrases.

Once the trigrams were generated, I exploded them into rows and grouped by the trigram string to compute a simple frequency (current_count) using SQL-like functions. Then I sorted them by descending frequency and limited the list to top_K (150). This step kept the focus on the most frequent trigrams rather than rare noise. Some clusters contained many n-grams, so in the final report function I only chose up to three topics in each cluster of related n-grams. This variable is also user-configurable. Decreasing it reduces noise in potential topics, while increasing it yields more possible topics. I set it to 150 to ensure finding at least 10 clusters of possible topics. When I used a smaller number, the algorithm sometimes couldn’t find 10 possible “topics.”

The algorithm grouped trigrams that overlapped by at least one word. For instance, if “facial recognition camera” shares “camera” with “camera cyber war,” they would be merged into one topic. Finally, each topic was assigned a score based on the average frequency (or ratio) of its trigrams. The top N clusters (in my case, the top 10) became the final “trending topics.”

I chose BN-Grams because it captures short yet semantically relevant phrases without too much data sparsity, and it integrates easily with Spark’s native NGram and DataFrame operations.

### PROBLEMS I ENCOUNTERED
Lemmatization and removing non-English tweets was difficult. Some trending topics in the final results are in other languages that also use a Latin script.

One issue was that too many bigrams or trigrams ended up in a single large cluster. If trigram A shared a word with B, and B shared a different word with C, they would all merge, creating big clusters for “final topics.” That’s why I decided to show only up to three topics in each cluster.

Another challenge was handling repeated words, like “hello hello hello,” or merging trigrams that caused the same word to appear multiple times. I tried implementing a fix to detect these patterns and discard them, but it wasn’t successful. Some trigrams are duplicates because of data duplication, so I used dropDuplicates to handle that.

While completing this project, I broke my left arm. I had to undergo two bone fracture reductions and couldn’t communicate much with the professor, which limited the time I could spend on the project and understading of some aspects of Natural Language Processing.

### PERFORMANCE AND RESULTS
The time window length is user-adjustable. The larger the window, the longer each cleaning step takes. The longest window I tried was 60 minutes, and I’m not entirely sure how to speed things up beyond caching intermediate results.

The results for each topic show clusters, with each phrase made up of three words that share some relationship. Some topics appear in different languages that use a Latin script.

### MORE GENERAL CONCLUSIONS AND LESSONS LEARNED
Implementing BN-Grams showed me how powerful Apache Spark can be for handling large datasets and text mining. Trigrams capture more context than single words yet are still common enough to be meaningful. However, bigrams were also useful. For example, when I used ngram_size=2, “kamala harris” came up along with other trending topics about U.S. presidential debates. I ultimately chose trigrams in my final setup, but it’s flexible for the user.

Simple overlap checks can lead to chain merges, so tuning merge conditions or limiting how many trigrams you consider can be tricky but is important for coherent topics. Repeated words in n-grams can inflate certain topics, and while I tried to fix the merging logic, it remained challenging.

Because BN-Grams integrates well with Spark’s DataFrame operations, it can scale to large tweet datasets. Overall, BN-Grams strikes a nice balance between capturing enough context (multi-word phrases) and staying computationally feasible.

This was an interesting project, though I didn’t feel I had enough NLP theory to fully interpret the results. Nevertheless, it highlights how useful Apache Spark can be for large-scale text processing.