In [14]:
import math
from collections import Counter
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import log, col, row_number, split
from pyspark.sql.window import Window

import nltk
import nltk.corpus
from nltk.corpus import inaugural
nltk.download('inaugural')
# inaugural.words('2005-Bush.txt') 

[nltk_data] Downloading package inaugural to
[nltk_data]     /Users/susiesyli/nltk_data...
[nltk_data]   Package inaugural is already up-to-date!


True

### Data cleaning

In [15]:
import re
import string
from nltk.corpus import stopwords
nltk.download('stopwords')

def clean_text(text):
    # Convert to all lowercase
    text = text.lower()
    
    # Remove text inside square brackets
    text = re.sub(r'\[.*?\]', '', text)
    
    # Remove punctuation
    text = re.sub(r'[%s]' % re.escape(string.punctuation), ' ', text)
    
    # Remove numbers
    text = re.sub(r'\d+', ' ', text)
    
    # Remove excessive whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    
    # Remove stopwords and words that are 3 letters or less (e.g. "is", "the", "on")
    stop_words = set(stopwords.words('english'))
    text = ' '.join([word for word in text.split() if word not in stop_words and len(word) > 3])
    
    return text

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/susiesyli/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


### Load and clean the corpus

In [16]:
corpus = nltk.corpus.inaugural.fileids()[-10:] # List with last 10 president speeches by title
N = len(corpus) # Total number of documents in the corpus 
doc_lengths = {}
all_words = [] # List of lists of words 
for doc in corpus: # For every speech 
        speech = " ".join(inaugural.words(doc)) # Combine every word into a string
        cleaned_text = clean_text(speech).split() # List of cleaned words
        all_words.append(cleaned_text) # Clean text and split words into a list
        doc_lengths[doc] = len(set(cleaned_text))

print("Number of documents in corpus:", N)
print("Number of unique words in each document:", doc_lengths)

Number of documents in corpus: 10
Number of unique words in each document: {'1985-Reagan.txt': 713, '1989-Bush.txt': 601, '1993-Clinton.txt': 484, '1997-Clinton.txt': 591, '2001-Bush.txt': 481, '2005-Bush.txt': 615, '2009-Obama.txt': 751, '2013-Obama.txt': 658, '2017-Trump.txt': 430, '2021-Biden.txt': 612}


### Use Spark to find TF-IDF of every word

In [17]:
# sc = SparkContext.getOrCreate()
# rdd = sc.parallelize(all_words)
# word_count = rdd.flatMap(lambda speech: set(speech)).map(lambda word: (word, 1))
# word_count = word_count.reduceByKey(lambda x, y: x + y) # Aggregate by count 

In [18]:
def find_tf(document):
        word_count = Counter(document)
        total = len(document)
        tf = {word: count/total for word, count in word_count.items()}
        return tf

def find_tfidf(document, df):
        tf = find_tf(document)
        tfidf = {word: tf[word] * math.log(len(document)/df[word]) for word in tf}
        return tfidf

def top_tfidf_pres(tfidf, pres):
        top20_words = []

        pres_words = {}
        
        return 

In [19]:
# Create SparkSession
# value = "tfidf"
# conf = SparkConf.setAppName(value).setMaster("local")
sc = SparkSession.getOrCreate()# conf=conf
rdd = sc.parallelize(all_words)
word_count = rdd.flatMap(lambda speech: set(speech)).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
df = word_count.collectAsMap()
total_tfidf = [find_tfidf(document, df) for document in all_words]

pres_words = {}

AttributeError: type object 'SparkSession' has no attribute 'getOrCreate'

In [None]:
# Create SparkSession
spark = SparkSession.builder.appName("tfidf").getOrCreate()

# Combine all words in the corpus into one list
all_corpus_words = [word for doc_words in all_words for word in doc_words]

# Convert the combined corpus into DataFrame format
corpus_words = spark.createDataFrame([(t, 1) for t in all_corpus_words], ["word", "count"])

# Compute Term Frequency (TF) for the entire corpus
tf = corpus_words.groupBy("word").count().withColumnRenamed("count", "tf")

# Compute Document Frequency (DF). 
df_exploded = spark.createDataFrame([(doc, word) for doc, words in zip(corpus, all_words) for word in words], ["document", "word"])
df_freq = df_exploded.dropDuplicates(["word", "document"]).groupBy("word").count().withColumnRenamed("count", "df")

# Compute Inverse Document Frequency (IDF) for the entire corpus
N = len(corpus)
idf = df_freq.withColumn("idf", log((N+1)/(col("df")+1)))

# Compute TF-IDF for the entire corpus
tf_idf = tf.join(idf, on="word", how="left").withColumn("tf_idf", col("tf") * col("idf"))

# Display the results
tf_idf.orderBy(col("tf_idf").desc()).show()

23/10/13 01:32:35 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


                                                                                

+----------+---+---+-------------------+------------------+
|      word| tf| df|                idf|            tf_idf|
+----------+---+---+-------------------+------------------+
|     story| 22|  4| 0.7884573603642703|17.346061928013945|
|     human| 21|  4| 0.7884573603642703|16.557604567649676|
|   freedom| 66|  8|0.20067069546215124|13.244265900501983|
|      self| 10|  3| 1.0116009116784799|10.116009116784799|
|    enough| 10|  3| 1.0116009116784799|10.116009116784799|
|      days| 10|  3| 1.0116009116784799|10.116009116784799|
|   century| 30|  7| 0.3184537311185346| 9.553611933556038|
|generation| 21|  6| 0.4519851237430572| 9.491687598604202|
|    dreams|  9|  3| 1.0116009116784799| 9.104408205106319|
|      seen|  9|  3| 1.0116009116784799| 9.104408205106319|
|      made| 20|  6| 0.4519851237430572| 9.039702474861144|
|   journey| 19|  6| 0.4519851237430572| 8.587717351118087|
|    breeze|  5|  1| 1.7047480922384253| 8.523740461192126|
|      xand|  5|  1| 1.7047480922384253|

### Cross-check answer with scikit-learn's tfidf()

In [None]:
# from pyspark.sql import SparkSession
# from pyspark.ml.feature import HashingTF, IDF, Tokenizer

# # Create SparkSession
# spark = SparkSession.builder.appName("tfidf_example").getOrCreate()

# # Convert your corpus data into DataFrame format
# df_data = [(doc, " ".join(words)) for doc, words in zip(corpus, all_words)]
# df = spark.createDataFrame(df_data, ["document", "words"])

# # Tokenize words (this will split the words into separate rows)
# tokenizer = Tokenizer(inputCol="words", outputCol="tokens")
# tokensData = tokenizer.transform(df)

# # Term Frequency
# hashingTF = HashingTF(inputCol="tokens", outputCol="rawFeatures")
# featurizedData = hashingTF.transform(tokensData)

# # Inverse Document Frequency
# idf = IDF(inputCol="rawFeatures", outputCol="features")
# idfModel = idf.fit(featurizedData)
# rescaledData = idfModel.transform(featurizedData)

# # Extract top 20 words based on TF-IDF value for each document
# from pyspark.sql.functions import udf
# from pyspark.sql.types import StringType, ArrayType
# from pyspark.ml.linalg import SparseVector
# import numpy as np

# def extract_top_words(features):
#     if isinstance(features, SparseVector):
#         index_word = {int(v): k for k, v in hashingTF.indexOfTerm.items()}
#         sorted_indices = np.argsort(features.toArray())[::-1][:20]
#         return [index_word[idx] for idx in sorted_indices if idx in index_word]
#     return []

# udf_top_words = udf(extract_top_words, ArrayType(StringType()))
# top_words_df = rescaledData.withColumn("top_20_words", udf_top_words("features"))
# top_words_df.select("document", "top_20_words").show(truncate=False)
