## Count occurrences of verbs in the UN debates and find the most similar debate contents

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.4.tar.gz (317.3 MB)
[K     |████████████████████████████████| 317.3 MB 23 kB/s s eta 0:00:01
[?25hCollecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[K     |████████████████████████████████| 200 kB 61.4 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.4-py2.py3-none-any.whl size=317849791 sha256=e8efe1699555423552d93009dc3479f9a40db034add1d11277dda28f3a47f8e6
  Stored in directory: /home/jovyan/.cache/pip/wheels/07/c2/e2/7687e8610c7c31573de4327479d2ff5feb2daba3e3b039919e
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.4


In [3]:
# Create an RDD from a file
from pyspark import SparkContext

# Initialize a SparkContext
sc = SparkContext.getOrCreate()

# Load the csv file into an RDD
debates_rdd = sc.textFile("hdfs://namenode:9000/un-general-debates.csv").map(lambda line: line.split(',')[-1])

# Load the text file into an RDD
verbs_rdd = sc.textFile("hdfs://namenode:9000/all_verbs.txt")

# Load the text file into an RDD
verb_dict_rdd = sc.textFile("hdfs://namenode:9000/verb_dict.txt")


In [4]:
import re

# Filter out None values
debates_rdd = debates_rdd.filter(lambda line: line is not None)

# Remove empty lines
text_rdd_clean = debates_rdd.filter(lambda line: line.strip())


In [5]:
# Remove punctuation and handle lowercase
def tokenize_text(text):
    tokens = re.findall(r"\b(?:[A-Za-z]\.){2,}|\b[A-Za-z]+(?:['-][A-Za-z]+)*|\d+(?:-\d+)*\b", text.lower())
    return tokens

text_rdd_clean = debates_rdd.filter(lambda line: line.strip()).flatMap(tokenize_text)

In [6]:
# Load in verb list
verb_list = verbs_rdd.collect() 

# filter verb match in verb list
verbs_in_text_rdd = text_rdd_clean.filter(lambda word: word in verb_list)

In [7]:
# Split each line by commas
verb_dict_split = verb_dict_rdd.map(lambda line: line.split(','))

# Create (verb form, root verb) for each form and collect as a dictionary
verb_dict_flat = verb_dict_split.flatMap(lambda forms: [(form, forms[0]) for form in forms])

verb_dict = verb_dict_flat.collectAsMap()

In [8]:
# Transfer in different tenses 
def normalize_verb(verb, verb_dict_flat):
    if verb in verb_dict:
        return verb_dict[verb] # Return the infinitive form of the verb in the dictionary
    else:
        return verb # If the verb does not exist in the dictionary, return the original verb

# regular verb
verbs_normalized_rdd = verbs_in_text_rdd.map(lambda verb: normalize_verb(verb, verb_dict))
    
# Count the frequently used verbs
verb_counts_rdd = verbs_normalized_rdd.map(lambda verb: (verb, 1)).reduceByKey(lambda a, b: a + b)

# Count the top 10 frequently used verbs
top_10_verbs = verb_counts_rdd.sortBy(lambda x: x[1], ascending=False).take(10)

# Display the results in the format 
for verb, count in top_10_verbs:
    print(f"('{verb}', {count})")

Py4JJavaError: An error occurred while calling o27.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://namenode:9000/un-general-debates.csv
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:297)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:239)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [None]:
!pip install --upgrade typing_extensions

In [None]:
!pip install torch sentence-transformers

In [None]:
!pip show typing_extensions

In [None]:
!pip install numpy

In [None]:
!pip install tqdm

In [None]:
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from tqdm.notebook import tqdm

# Load the SentenceTransformer model
model = SentenceTransformer('all-MiniLM-L6-v2')


In [None]:
"""
The code snippet (How to make Faiss run faster) below has been sourced from
https://github.com/facebookresearch/faiss/wiki/How-to-make-Faiss-run-faster
The code snippet appears in its original form to speed up query
"""

# Function to encode complete debate texts and create a Faiss IVFPQ index
def encode_and_index(debate_texts, model, batch_size=512, nlist=100):
    all_vectors = []
    """
    The code snippet (tqdm) below has been sourced from
    https://github.com/tqdm/tqdm
    I have applied to check process status.
    """
    # Encode full debate texts in batches with a progress bar
    for i in tqdm(range(0, len(debate_texts), batch_size), desc="Encoding Loading"):
        batch = debate_texts[i:i+batch_size]
        all_vectors.append(model.encode(batch))
    
    """
    The code snippet (IVFPQ) below has been sourced from
    https://github.com/facebookresearch/faiss/blob/main/tutorial/python/3-IVFPQ.py
    I have applied to speed up query.
    """
    # Combine all encoded vectors
    debate_vectors = np.vstack(all_vectors)
    dimension = 384
    
    # Create a Faiss index with IVFPQ for faster searches
    quantizer = faiss.IndexFlatL2(dimension)
    indexIVF = faiss.IndexIVFPQ(quantizer, dimension, nlist, 4, 8)
    
    # Train the index on debate vectors
    indexIVF.train(debate_vectors)
    
    # Add the vectors to the index
    indexIVF.add(debate_vectors)
    
    # Enable precomputed lookup tables to speed up searches
    indexIVF.use_precomputed_table = True

    return indexIVF, debate_vectors


In [None]:
# Function to search for the most similar debate to the query
def search_similar_debate(query_sentence, model, indexIVF, debate_texts, nprobe=8):
    # Encode the query sentence into a vector
    query_vector = model.encode([query_sentence])
    
    # Set the number of clusters to search through
    indexIVF.nprobe = nprobe
    
    # Search for the most similar debate
    D, I = indexIVF.search(query_vector, k=1)
    
    # Return the most similar debate based on the search result
    most_similar_debate = debate_texts[I[0][0]]
    return most_similar_debate


In [None]:
import pandas as pd

# Read the CSV file and choose text row
df = pd.read_csv('un-general-debates.csv')
debates_list = df['text'].tolist() 


# Encode the debate texts and store them in a Faiss index
indexIVF, debate_vectors = encode_and_index(debates_list, model, batch_size=512, nlist=100)


In [None]:
# The query sentence
query_sentence = "Global climate change is both a serious threat to our planet and survival."

# Search for the most similar debate content to the query
most_similar_debate = search_similar_debate(query_sentence, model, indexIVF, debates_list, )

# Output the most similar debate text
print("The Most similar to the query is:")
print(most_similar_debate)

