# Data Mining Course Spark Exercise
## Sharif University of Technology

In this notebook we are going to analyze farsi news. 
Outline of the exercise:
* Dataset preparation
* Preprocessing 
* Exploration 
* Word Collections

You should replace the `TODO` parts with your implementation. Remeber that each `TODO` may take multiple lines and you shouldn't limit your self to one-line codes.

## Prerequisites
You should be faimilar with [tf-idf](https://en.wikipedia.org/wiki/Tf%E2%80%93idf). In this notebook you should use the following formula for tf-idf:
$$f_{t,d}/len(d) \times log(1 + \frac{N}{n_t})$$

## Warning: RDD api only
You **can not** use Dataframe, Dataset, mllib, ml, ... apis of spark in this exercise. You should only use the [RDD api](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD).

# Please enter your name below:
# Name:
# Student Number:

# Section 1: Dataset prepartition
This section you need to download [dataset](https://drive.google.com/file/d/1bRxHQDzPr6wDimbM7b89H47kH8O3YV8Y/view?usp=sharing) in a directory you work. After that run the below cell to untar the datase.

**Note 1: Don't change the below command.**

**Note 2: If you use Windows OS, unzip the dataset manually.**

## Install Pypark & Initialization
Uncomment this section if you use google colab or local pc

In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession \
    .builder \
    .appName("HW2") \
    .master("local[*]") \
    .config("spark.driver.memory", "30g") \
    .config("spark.executor.memory", "30g") \
    .config("spark.driver.maxResultSize", 0) \
    .getOrCreate()

sc:SparkContext=spark.sparkContext

23/12/30 23:11:15 WARN Utils: Your hostname, aac resolves to a loopback address: 127.0.1.1; using 192.168.98.134 instead (on interface ens33)
23/12/30 23:11:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/30 23:11:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/12/30 23:11:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Reading the data

In [3]:
# Sampling 1/20 of the data
news_rdd = sc.textFile("news_data.jsonl") \
.sample(False, 1/20)

In [4]:
# Converting to json objects
import json
json_rdd = news_rdd.map(lambda x: json.loads(x)) 

# Section 2: Preprocessing
This section we try to normalize news and remove useless characters (for example /n and /u200c and ...). Also find and remove stop words.

In [5]:
# Removing useless characters
import re
def remove_useless_characters(x):
    x['body'] = re.sub(r'[^آ-ی ]', "", x['body'])
    return x
clean_news_rdd = json_rdd.map(lambda x: remove_useless_characters(x)) #TODO: remove useless charachters


In [6]:
# Finding and removing stopwords.
def remove_stop_words(x):
    stop_words = ["اعلام", "انحام", "قرار", "مورد", "باید", "دارد", "میشود", "کرده", "خواهد", "داد", "دو", "و", "کرد", "شده", "گفت", "در", "به", "از", "که", "این", "را", "با", "های", "برای", "تا", "ها", "است", "یا", "اما", "هم", "نه", "یک",
    "خود", "یکی", "بود", "شد", "می", "شود", "کرد", "او", "ما", "شما", "آن", "آنها", "ایشان", "با", "در", "بر", "برای",
    "به", "روی", "زیر", "بالا", "جلوی", "پشت", "پنجره", "دنبال", "جلو", "پیش", "پس", "نزدیک", "دور", "برخی", "چند",
    "چنین", "چه", "چون", "چگونه", "کجا", "کدام", "که", "کدامین", "کدامیک", "کدامیکی", "کجا", "کی", "چرا", "چطور", "چندین",
    "چند", "چنین", "کسی", "کس", "کدامیک", "کدامین", "کدامیکی", "کدام", "اینکه", "آنکه", "ولی", "اما", "اگر", "هر", "هرکس",
    "هرچه", "همان", "تنها", "وقتی", "تا", "تازه", "الان", "همیشه", "هنوز", "همین", "اول", "حالا", "پیش", "پس", "بار",
    "بارها", "همچنین", "نه", "نیز", "بلکه", "بله", "بلکه", "بلکه", "آره", "آری", "آره", "آری", "آره", "آری", "آره",
    "آری", "هم", "دیگر", "هم", "دیگران", "دیگری", "دیگه", "دیگه", "دیگران", "دیگری", "جلو", "پیش", "پس", "بالا", "پایین"]
    x['body'] = " ".join([word for word in x['body'].split() if word not in stop_words])
    return x
processed_news_rdd = clean_news_rdd.map(lambda x: remove_stop_words(x)) 


<b>
<h1> Section2: Assign number to each document </h1>
</b>

In [7]:
# Generating a new ID for each document
def generate_new_id(x):
    x['new_id'] = ids.index(x['uid'])  
    return x  
ids = processed_news_rdd.sortBy(lambda x: x['uid']).map(lambda x: x['uid']).collect()
numbered_news_rdd = processed_news_rdd.map(generate_new_id)

                                                                                

<b>
<h1> Section3: Shingling </h1>
</b>

In [8]:
# Generating the list of all shingles
size_one_shingle = 7
def partition_string(x):
    body = x['body']
    shingle_presence = [x['new_id']]
    body_shingles = []
    for i in range(len(body)-size_one_shingle):
        body_shingles.append((str(body[i:i+size_one_shingle]), shingle_presence))
    return body_shingles
# Merging shingle presence lists with the same key(shingle)
shingle_doc_matrix = numbered_news_rdd.flatMap(partition_string).reduceByKey(lambda x,y: x+y).sortByKey()
total_number_of_shingles = len(shingle_doc_matrix.collect())
all_shingles_rdd = shingle_doc_matrix.map(lambda x: x[0])
all_shingles_list = all_shingles_rdd.collect()
all_shingles_dict = {value: index for index, value in enumerate(all_shingles_list)}


                                                                                

In [9]:
# Generating a new RDD where keys represent the document's new IDs, 
# and values represent the IDs of the shingles it contains.
def add_shingle_id(x):
    return (x[1], all_shingles_dict[x[0]]) # Keys are document new ids and values are shingle numbers.
def break_keys(x):
    return [(element, x[1]) for element in x[0]]
def process_group(x):
    return (x[0], list(x[1]))
shingle_doc_matrix_with_shingle_id = shingle_doc_matrix.map(add_shingle_id) \
.flatMap(break_keys) \
.groupByKey() \
.map(process_group)

<b>
<h1> Section4: Generating Hash Functions </h1>
</b>

In [11]:
# Generating prime numbers larger than total_ingnumber_of_shingles

from sympy import primerange

def generate_primes_larger_than(start):
    """Generate prime numbers larger than a specified number using sympy."""
    return primerange(start + 1, start + 100)  # Adjust the range as needed

specific_number = total_number_of_shingles
primes = list(generate_primes_larger_than(specific_number))

P = primes[0]  # The first prime number after total number of shingles.

In [12]:
# Generating hash functions
import random
number_of_hash_functions = 300
def generate_hash_function(N, P):

    a = random.randint(1, P - 1)  # a should be in the range [1, P-1]
    b = random.randint(0, P - 1)  # b can be any value in the range [0, P-1]

    return (a, b)

hash_functions_coefficients = [generate_hash_function(total_number_of_shingles, P) for i in range(number_of_hash_functions)]
# Hash function numbers start from 1
hash_functions_coefficients = [(i+1, hash_functions_coefficients[i]) for i in range(number_of_hash_functions)]


# Section 5: Generating signature matrix


In [15]:
# Computing the hash for each document
def apply_hash(x):
    output_list = []
    hash_values_list = []
    for i in range(len(x[1])):
        hash_values_list = []
        for a, b in hash_functions_coefficients:
            hash_values_list.append((a, (((b[0]*x[1][i]+b[1])%P)%total_number_of_shingles)))
        output_list.append((x[1][i], hash_values_list))
    return (x[0], output_list)
        
hashed_shigle_doc_matrix = shingle_doc_matrix_with_shingle_id.map(apply_hash)


In [16]:
# Finding minimum hash (min hashgin)
def find_min_hash(x):
    hash_values = []
    min_hash = [total_number_of_shingles] * number_of_hash_functions
    min_hash_index = [0] * number_of_hash_functions
    
    for item in x[1]:
        for sub_item in item[1]:
            if sub_item[1] < min_hash[sub_item[0]-1]:
                min_hash[sub_item[0]-1] = sub_item[1]
            

    return (x[0], min_hash)
    
        
signature_matrix  = hashed_shigle_doc_matrix.map(find_min_hash)

# Section 6: Generating condidate similar pairs

In [33]:
# Converting the signature matrix into bands and rows and generating similar pairs
b = 60
r = number_of_hash_functions/b
def divid_into_bounds(x):
    return_list = []
    counter = 0 # Shows the bound number (starting from 0)
    for i in range(0, number_of_hash_functions, int(r)):    
        subtuple = tuple(x[1][i:i + int(r)])    
        return_list.append(((counter, subtuple), x[0]))
        counter = counter + 1
    return return_list

def generate_condidates(x):
    return (x[0], tuple(x[1]))

bounded_signature_matrix = signature_matrix.flatMap(divid_into_bounds) \
.groupByKey() \
.map(generate_condidates)
condidate_similar_list = bounded_signature_matrix.collect()

                                                                                

In [34]:
condidate_similar_news = []
for pair in condidate_similar_list:
    if len(pair[1]) > 1:
        condidate_similar_news.append(pair[1]) 
numbered_news = numbered_news_rdd.collect()   

In [57]:
# Computing jacard similarity metric for each pair of condidates
total_number_of_condidates = 0
def compute_jacard_similarity(doc_new_id1, doc_new_id2):
    flag1 = 0
    flag2 = 0
    for new in numbered_news:
        if new['new_id'] == doc_new_id1:
            body1 = new['body']
            flag1 = 1
        if new['new_id'] == doc_new_id2:
            body2 = new['body']
            flag2 = 1
        if flag1 ==1 and flag2 == 1:
            break

    body1_shingles = []
    body2_shingles = []
    for i in range(len(body1)-size_one_shingle):
        body1_shingles.append(str(body1[i:i+size_one_shingle]))
    for i in range(len(body2)-size_one_shingle):
        body2_shingles.append(str(body2[i:i+size_one_shingle]))
    common_shingles = set(body1_shingles).intersection(set(body2_shingles))
    union_shingles = set(body1_shingles).union(set(body2_shingles))
    return len(common_shingles)/len(union_shingles)
# Uncomment the following lines to find all similar items.

# for item in condidate_similar_news:
#     if len(item) > 2:
#         jacard_similarity = compute_jacard_similarity(item[0], item[1])
#         if jacard_similarity > 0.8:
#             print(f"Condidate item is {item}. Jacard similarity is {jacard_similarity}" )


In [59]:
def convert_uid_to_new_id(uid):
    for new in numbered_news:
        if new['uid'] == uid:
            new_id = new['new_id']
            return new, new_id
    print("No document was found!") 
    return None

def convert_new_id_to_uid(new_id):
    for new in numbered_news:
        if new['new_id'] == new_id:
            uid = new['uid']
            return new, uid
        
    return None      

uid = input("Please enter the uid:")
similarity_lower_bound = input("Please enther the lower bound of similarity for finding condidates:")
counter = 0
main_new, new_id = convert_uid_to_new_id(uid)
similar_items = []
flag = 0
for item in condidate_similar_news:
    if new_id in item and len(item) > 1:
        if flag == 0:
            print (f"The uid of your document is {uid}")
            print(f"\nThe title of your document is: {main_new['title']}")
            print(f"\nThe url of your document is: {main_new['url']}")
            print(f"\nThe body of your docuoment is:")
            print(main_new['body'])
            print("--------------------------------------------------------------------")
        flag = 1
        while counter < len(item):
            if item[counter] != new_id and compute_jacard_similarity(new_id, item[counter]) > float(similarity_lower_bound):
                new, similar_item = convert_new_id_to_uid(item[counter])
                similar_items.append(similar_items)
                print(f"{similar_item} is similar to your document and the jacard similarity is {compute_jacard_similarity(new_id, item[counter]):.3f}")
                print(f"\nThe title of this condidate is: {new['title']}")
                print(f"\nThe url of this condidate is: {new['url']}")
                print(f"\nThe body of this condidate is:")
                print(new['body'])
                print("--------------------------------------------------------------------")
            counter = counter + 1
if flag == 0:
    print("No similar documents were found")


The uid of your document is 2922f433fc511183774e409f3

The title of your document is: بازگشت ستاره آبی‌ها به ترکیب اصلی استقلال

The url of your document is: http://9sobh.news/fa/news/55831

The body of your docuoment is:
بازیکن استقلال میتواند تیم ادامه بازیهای لیگ برتر فوتبال همراهی کندبه گزارش صبح آرمین سهرابیان طور کامل بند مصدومیت رهایی پیدا میتواند تیم فوتبال استقلال تهران ادامه بازیهای لیگ برتر همراهی کندآرمین سهرابیان بعد بازی صنعت نفت دچار مصدومیت پایان دوران نقاهت آماده حضور ترکیب استقلال صورت صلاحدید جواد نکونام میتواند بازی آلومینیوم اراک ترکیب استقلال میدان بیایداستقلال بازیهای اخیر سه مدافع میانی بازی شاید حضور دوباره آرمین سهرابیان ترکیب تیم آبی روزبه چشمی خط هافبک برگردداستقلال بازی هوادار روزبه چشمی محمد حسین مرادمند ایمان سلیمی بازی آنجایی استقلال شیوه مدافع بازیهای اخیر امتیازات مهمی دست آورده بینی برابر آلومینیوم اراک شیوه بازی کند
--------------------------------------------------------------------
d220a968de71a64b7bfabe12c is similar to your document and the jacar