In [1]:
from transformers import pipeline
import torch
from torch.utils.data import DataLoader
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, udf, col
from pyspark.sql.types import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sentence_transformers import SentenceTransformer, InputExample, losses
from sentence_transformers import SentenceTransformer, util

In [2]:
spark = SparkSession.builder \
        .appName("test_cnn_fox") \
        .getOrCreate()

24/03/04 01:35:52 WARN Utils: Your hostname, Amys-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.193 instead (on interface en0)
24/03/04 01:35:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/04 01:35:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/04 01:35:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/03/04 01:35:53 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
cnn_df = spark.read.json("ml_data/2024-03-01_cnn.json")

                                                                                

In [4]:
cnn_df.printSchema()
cnn_df.show(1, truncate=False)

root
 |-- date: string (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- publisher: struct (nullable = true)
 |    |-- href: string (nullable = true)
 |    |-- title: string (nullable = true)
 |-- short_description: string (nullable = true)
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)
 |-- top_image: string (nullable = true)
 |-- url: string (nullable = true)
 |-- videos: array (nullable = true)
 |    |-- element: string (containsNull = true)

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [5]:
fox_df = spark.read.json("ml_data/2024-03-01_foxnews.json")
fox_df.printSchema()

root
 |-- date: string (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- publisher: struct (nullable = true)
 |    |-- href: string (nullable = true)
 |    |-- title: string (nullable = true)
 |-- short_description: string (nullable = true)
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)
 |-- top_image: string (nullable = true)
 |-- url: string (nullable = true)
 |-- videos: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [6]:
fox_selected_df = fox_df.select("title", "text", "short_description")

In [7]:
fox_selected_df.show(5, truncate=False)

+--------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
fox_selected_df = fox_selected_df.withColumn("text", regexp_replace(fox_selected_df["text"], "Join Fox News for access to this content Plus special access to select articles and other premium content with your account - free of charge. Please enter a valid email address.\n\n", ""))
fox_selected_df = fox_selected_df.withColumn("text", regexp_replace(fox_selected_df["text"], "CLICK HERE TO GET THE FOX NEWS APP", ""))
fox_selected_df = fox_selected_df.withColumn("text", regexp_replace(fox_selected_df["text"], "\n\n", " "))

In [9]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
fox_selected_df = tokenizer.transform(fox_selected_df)

remover = StopWordsRemover(inputCol="words", outputCol="clean_words")
fox_selected_df = remover.transform(fox_selected_df)

fox_selected_df = fox_selected_df.withColumn("clean_text", col("clean_words").cast("string"))

fox_selected_df = fox_selected_df.drop("words", "clean_words", "clean_text")

def tokenize_title(title):
    return word_tokenize(title)
tokenize_title_udf = udf(tokenize_title, ArrayType(StringType()))
fox_selected_df = fox_selected_df.withColumn("title_tokens", tokenize_title_udf("title"))

fox_selected_df.show(2, truncate=False)

[Stage 4:>                                                          (0 + 1) / 1]

+--------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [10]:
def nltk_sentence_tokenize(text):
    sentences = nltk.sent_tokenize(text)
    return sentences

sentence_tokenize_udf = udf(nltk_sentence_tokenize, ArrayType(StringType()))
fox_selected_df = fox_selected_df.withColumn("sentences", sentence_tokenize_udf(fox_selected_df["text"]))
fox_selected_df.select("sentences").collect()[0]

Row(sentences=['Vice President Harris took to social media to praise the efforts of poll workers amid the presidential primaries and just months ahead of the 2024 election, but her complimentary remarks quickly backfired.', '"Our democracy could not function without nonpartisan poll workers like Vasu and Rob whom I met in Georgia.', 'President Biden and I thank you and we support you," she said Tuesday on X, sharing a photo of her appearing to listen to Vasu Abhiraman.', 'Instead of applauding a "nonpartisan" poll worker, X’s Community Notes immediately tagged the tweet with additional context that readers "might want to know," including that Abhiraman is actually a liberal advocate who promotes the progressive movement.', '"The ‘nonpartisan poll worker’ on the left is Vasu Abhiraman, a staffer at the left-wing Alliance for Justice and formerly of ACLU Georgia," reads a notice on Harris’ tweet.', 'KAMALA HARRIS \'READY TO SERVE\' AS DEMOCRATS SOUND THE ALARM ABOUT BIDEN\'S AGE: REPORT 

In [11]:
model_name = "Sakil/sentence_similarity_semantic_search"
model = SentenceTransformer(model_name)

title = fox_selected_df.select("title").collect()[0][0]
sentences = fox_selected_df.select("text").rdd.flatMap(lambda x: x[0].split(". ")).collect()

# Encode title
title_embedding = model.encode(title)

# Encode each sentence and compute cosine similarity with the title
similarities = []
for sentence in sentences:
    sentence_embedding = model.encode(sentence)
    similarity = util.cos_sim(title_embedding, sentence_embedding)
    similarities.append(similarity)

fox_selected_df.select("title", "text").show(truncate=False)
print(similarities)

+--------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [12]:
def compute_similarity(title, text):
    title_tokens = word_tokenize(title)
    sentences = nltk.sent_tokenize(text)

    max_similarity = float("-inf")
    associated_sentence = ""
    similarities = []
    similarity_sum = 0
    for sentence in sentences:
        sentence_tokens = word_tokenize(sentence)
        title_embedding = model.encode(title_tokens)
        sentence_embedding = model.encode(sentence_tokens)

        similarity = util.cos_sim(title_embedding, sentence_embedding)
        similarity_value = similarity[0][0]
        if similarity_value > max_similarity:
            max_similarity = similarity_value
            associated_sentence = sentence
        similarities.append(similarity_value)
        similarity_sum += similarity_value

    average_similarity = similarity_sum / len(sentences)
    sorted_similarities = sorted(similarities, reverse=True)
    return sorted_similarities, average_similarity, max_similarity, associated_sentence


In [13]:
model_name = "Sakil/sentence_similarity_semantic_search"
model = SentenceTransformer(model_name)

first_row = fox_selected_df.limit(1)
title = first_row.select("title").collect()[0][0]
text = first_row.select("text").collect()[0][0]
similarities, average_similarity, max_similarity, associated_sentence = compute_similarity(title, text)

print(title)
print(text)
print("similarities", similarities)
print("avg", average_similarity)
print("Maximum Similarity:", max_similarity)
print("Associated Sentence:", associated_sentence)

Kamala Harris praises 'nonpartisan poll worker,' which immediately backfires when people learn who he is
Vice President Harris took to social media to praise the efforts of poll workers amid the presidential primaries and just months ahead of the 2024 election, but her complimentary remarks quickly backfired. "Our democracy could not function without nonpartisan poll workers like Vasu and Rob whom I met in Georgia. President Biden and I thank you and we support you," she said Tuesday on X, sharing a photo of her appearing to listen to Vasu Abhiraman. Instead of applauding a "nonpartisan" poll worker, X’s Community Notes immediately tagged the tweet with additional context that readers "might want to know," including that Abhiraman is actually a liberal advocate who promotes the progressive movement. "The ‘nonpartisan poll worker’ on the left is Vasu Abhiraman, a staffer at the left-wing Alliance for Justice and formerly of ACLU Georgia," reads a notice on Harris’ tweet. KAMALA HARRIS '

In [14]:
model_name = "Sakil/sentence_similarity_semantic_search"
model = SentenceTransformer(model_name)

rows = fox_selected_df.collect()

for row in rows:
    title = row["title"]
    text = row["text"]
    similarities, average_similarity, max_similarity, associated_sentence = compute_similarity(title, text)
    
    print("Title:", title)
    print("Average Similarity:", average_similarity)
    print("Maximum Similarity:", max_similarity)
    print("Associated Sentence:", associated_sentence)

                                                                                

Title: Kamala Harris praises 'nonpartisan poll worker,' which immediately backfires when people learn who he is
Average Similarity: tensor(0.3968)
Maximum Similarity: tensor(1.0000)
Associated Sentence: KAMALA HARRIS 'READY TO SERVE' AS DEMOCRATS SOUND THE ALARM ABOUT BIDEN'S AGE: REPORT Abhiraman’s biography on the Alliance for Justice (AFJ) website identifies him as "the Helen Rosenthal Senior Counsel for the Building the Bench program at Alliance for Justice."
Title: Marianne Williamson returns to presidential race, saying Biden is vulnerable against Trump
Average Similarity: tensor(0.2779)
Maximum Similarity: tensor(0.5111)
Associated Sentence: she asked.
Title: NY AG taunts Trump about interest he owes on civil fraud judgment
Average Similarity: tensor(0.3253)
Maximum Similarity: tensor(0.5183)
Associated Sentence: New York Attorney General Letitia James appeared to taunt former President Trump about the interest he may own in a civil fraud judgment.
Title: Hunter Biden testifies 

In [15]:
model_name = "Sakil/sentence_similarity_semantic_search"
model = SentenceTransformer(model_name)

rows = fox_selected_df.collect()

total_similarity_sum_fox = 0
total_articles_fox = 0

for row in rows:
    title = row["title"]
    text = row["text"]
    
    sorted_similarities, average_similarity, max_similarity, _ = compute_similarity(title, text)
    
    total_similarity_sum_fox += np.mean(sorted_similarities[:2])
    total_articles_fox += 1

overall_avg_similarity_fox = total_similarity_sum_fox / total_articles_fox
print("Overall Average Similarity for the Fox News Articles:", np.round(overall_avg_similarity_fox, 4))

Overall Average Similarity for the Fox News Articles: 0.6253


In [16]:
# this code doesn't work for some reason, think there's some issues converting the similarity scores to a format that can append to pyspark df

# def compute_similarity(title, text):
#     title_tokens = word_tokenize(title)
#     sentences = nltk.sent_tokenize(text)

#     similarity_sum = 0
#     for sentence in sentences:
#         sentence_tokens = word_tokenize(sentence)
#         title_embedding = model.encode(title_tokens)
#         sentence_embedding = model.encode(sentence_tokens)

#         similarity = util.cos_sim(title_embedding, sentence_embedding)
#         similarity_sum += similarity[0][0]

#     average_similarity = similarity_sum / len(sentences)
#     return average_similarity

# compute_similarity_udf = udf(compute_similarity, \
#                              returnType=FloatType())

# result_df = fox_selected_df.withColumn("average_similarity", compute_similarity_udf("title", "text"))
# result_df.show(truncate=False)

### CNN

In [17]:
cnn_selected_df = cnn_df.select("title", "text", "short_description")
cnn_selected_df.show(5, truncate=False)

+------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [18]:
cnn_selected_df = cnn_selected_df.withColumn("text", regexp_replace(cnn_selected_df["text"], "\n\n", " "))

tokenizer = Tokenizer(inputCol="text", outputCol="words")
cnn_selected_df = tokenizer.transform(cnn_selected_df)

remover = StopWordsRemover(inputCol="words", outputCol="clean_words")
cnn_selected_df = remover.transform(cnn_selected_df)

cnn_selected_df = cnn_selected_df.withColumn("clean_text", col("clean_words").cast("string"))

cnn_selected_df = cnn_selected_df.drop("words", "clean_words", "clean_text")

def tokenize_title(title):
    return word_tokenize(title)
tokenize_title_udf = udf(tokenize_title, ArrayType(StringType()))
cnn_selected_df = cnn_selected_df.withColumn("title_tokens", tokenize_title_udf("title"))
cnn_selected_df.show(2, truncate=False)

+------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [19]:
def nltk_sentence_tokenize(text):
    sentences = nltk.sent_tokenize(text)
    return sentences

sentence_tokenize_udf = udf(nltk_sentence_tokenize, ArrayType(StringType()))
cnn_selected_df = cnn_selected_df.withColumn("sentences", sentence_tokenize_udf(cnn_selected_df["text"]))
cnn_selected_df.select("sentences").collect()[0]

Row(sentences=['From CNN’s Tierney Sneed, Holmes Lybrand, Denise Royal and Sabrina Souza in Fort Pierce, Florida, with Hannah Rabinowitz, Jeremy Herb and Marshall Cohen in Washington, DC The Alto Lee Adams Sr.', 'US Courthouse in Fort Pierce, Florida.', 'Lynne Sladky/AP/File The morning session of the hearing in the classified documents case has ended without a decision from the judge on when the case will go to trial.', 'Arguments Friday morning, though ostensibly about scheduling, also touched on discovery demands by the defendants because those demands could affect the timeline.', 'Judge Aileen Cannon picked apart proposed trial schedules from special counsel Jack Smith and the defendants in the classified documents case.', 'Prosecutors with the special counsel’s office have requested in court filings that the trial begin on July 8, while defense attorneys have proposed that Donald Trump and Carlos De Oliveira’s trial begin on August 12 and Walt Nauta’s trial begins September 9.', '

In [20]:
model_name = "Sakil/sentence_similarity_semantic_search"
model = SentenceTransformer(model_name)

rows = cnn_selected_df.collect()

total_similarity_sum_cnn = 0
total_articles_cnn = 0

for row in rows:
    title = row["title"]
    text = row["text"]
    
    sorted_similarities, average_similarity, max_similarity, _ = compute_similarity(title, text)
    total_similarity_sum_cnn += np.mean(sorted_similarities[:2])
    total_articles_cnn += 1

overall_avg_similarity_cnn = total_similarity_sum_cnn / total_articles_cnn
print("Overall Average Similarity for the CNN Articles:", np.round(overall_avg_similarity_cnn, 4))

Overall Average Similarity for the CNN Articles: 0.4942


In [21]:
spark.stop()