# Data Mining Course Spark Exercise
## Sharif University of Technology

In this notebook we are going to analyze farsi news. 

## Warning: RDD api only
You **can not** use Dataframe, Dataset, mllib, ml, ... apis of spark in this exercise. You should only use the [RDD api](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD).

# Please enter your name below:
# Name: Ilia Hashemi Rad
# Student Number: 99102456

# Section 1: Dataset prepartition
This section you need to download [dataset](https://drive.google.com/file/d/1bRxHQDzPr6wDimbM7b89H47kH8O3YV8Y/view?usp=sharing) in a directory you work. After that run the below cell to untar the datase.

**Note 1: Don't change the below command.**

**Note 2: If you use Windows OS, unzip the dataset manually.**

## Install Pypark & Initialization
Uncomment this section if you use google colab or local pc

In [1]:
#! pip install pyspark

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("HW1") \
    .master("local[*]") \
    .getOrCreate()

sc=spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/06 21:06:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Reading the data

In [2]:
news_rdd = sc.textFile("news_data.jsonl")

news_rdd.takeSample(False, 1, 12)

                                                                                

['{"body": "نشست بزرگداشت روز جهانی حافظ شیرازی 20 مهرماه در دانشگاه پکن برگزار می شود.\\n\\n- ایرنا نوشت: با همکاری مشترک گروه زبان و ادبیات فارسی دانشگاه پکن و رایزنی فرهنگی جمهوری اسلامی ایران در چین، نشست بزرگداشت روز جهانی حافظ شیرازی روز پنجشنبه 20 مهرماه ساعت 10:30 تا 12 در دانشگاه پکن برگزار می شود.\\n\\nدر این نشست، محسن بختیار، سفیر و نعمت الله ایران زاده، رایزن فرهنگی جمهوری اسلامی ایران در چین، پروفسور چن مینْگ، رییس محترم دانشکده زبان های خارجی دانشگاه پکن، بهادر باقری، استاد مدعو زبان و ادبیات فارسی دانشکده زبان های خارجی دانشگاه پکن، سخنرانی می کنند.\\n\\nخانم وانگ یی دان، استاد رشته زبان و ادبیات فارسی دانشگاه پکن، شی گوانگ، استاد رشته زبان و ادبیات فارسی و مدیر مرکز پژوهش های فرهنگ ایران در دانشگاه پکن و لیو یینگ جون، استاد و مدیر گروه زبان و ادبیات فارسی دانشگاه پکن از دیگر سخنرانان این نشست هستند.", "image_title_url": "https://ettelaat.com/files/fa/news/1402/7/20/82679_282.jpg", "language": "fa", "source": "موتور جستجوی قطره", "title": "ادای احترام چینی ها به حافظ +ع

## Shingling : 2 words without stop words

In [4]:
import json
import re

# Parse the JSON lines and extract the UID and body text from each line
parsed_news_rdd = news_rdd.map(lambda line: (json.loads(line)['uid'], json.loads(line)['body']))

# Define a function to remove useless characters including '\n', '\u200c', and '\\n'
def remove_useless_characters(text):
    # Remove special characters, digits, and unwanted unicode characters
    text = re.sub(r'\\|\\n|\\u200c|\n|پایان پیام|[^آ-ی\s]', ' ', text)

    # Define a regular expression pattern that matches one or more spaces
    pattern = re.compile(r" +")
    # Apply the pattern to the text and replace the matches with a single space
    text = pattern.sub(" ", text)
    return text

# Define a function to determine a list of Farsi stop words
def load_stop_words(file_paths):
    stop_words_set = set()
    for file_path in file_paths:
        with open(file_path, 'r', encoding='utf-8') as file:
            stop_words_set.update(file.read().splitlines())
    return stop_words_set

# Define a function to create shinglings of words.
def generate_ngrams(text, n):
    # Split the text into words
    words = text.split()
    # Generate n-grams using a sliding window of size n
    ngrams = [tuple(words[i:i+n]) for i in range(len(words) - n + 1)]
    return ngrams

# List of paths to your stop words files
stop_words_files = ['verbal.txt', 'persian.txt', 'short.txt', 'chars.txt', 'nonverbal.txt']

# Load stop words from files
stop_words = load_stop_words(stop_words_files)

# Remove useless characters from the body text in parsed_news_rdd
clean_news_rdd = parsed_news_rdd.map(lambda x: (x[0], remove_useless_characters(x[1])))

# Remove the stop words from clean_news_rdd without splitting the text
clean_news_rdd = clean_news_rdd.map(lambda x: (x[0], " ".join(filter(lambda w: w not in stop_words, x[1].split()))))

# Process the cleaned news RDD by generating bi-grams (2-grams)
processed_news_rdd = clean_news_rdd.map(lambda x: (x[0], generate_ngrams(x[1], 2)))

# Display the processed news RDD for one record
print(processed_news_rdd.take(1))

[('68feae4bbbedc2d54adbb2369', [('سرمربی', 'ملوان'), ('ملوان', 'تصاویر'), ('تصاویر', 'وایرالی'), ('وایرالی', 'دیدار'), ('دیدار', 'نساجی'), ('نساجی', 'پست'), ('پست', 'جالب'), ('جالب', 'منتشر'), ('منتشر', 'گزارش'), ('گزارش', 'ورزش'), ('ورزش', 'سه'), ('سه', 'مهدی'), ('مهدی', 'تارتار'), ('تارتار', 'خوشحالی'), ('خوشحالی', 'عجیب'), ('عجیب', 'غریب'), ('غریب', 'دقیقه'), ('دقیقه', 'چهرههای'), ('چهرههای', 'جالب'), ('جالب', 'هفته'), ('هفته', 'لیگ'), ('لیگ', 'برتر'), ('برتر', 'اختصاص'), ('اختصاص', 'موردتوجه'), ('موردتوجه', 'هواداران'), ('هواداران', 'فوتبال'), ('فوتبال', 'قرار'), ('قرار', 'تارتار'), ('تارتار', 'انتشار'), ('انتشار', 'پست'), ('پست', 'اینستاگرامی'), ('اینستاگرامی', 'تصاویر'), ('تصاویر', 'جشن'), ('جشن', 'خوشحالی'), ('خوشحالی', 'نوشته'), ('نوشته', 'جادوی'), ('جادوی', 'طرفداران'), ('طرفداران', 'انزلی'), ('انزلی', 'سرمربی'), ('سرمربی', 'ملوان'), ('ملوان', 'وعده'), ('وعده', 'تیمش'), ('تیمش', 'ادامه'), ('ادامه', 'فصل'), ('فصل', 'جنگید'), ('جنگید', 'رضایت'), ('رضایت', 'هواداران'), ('هواداران

## Min-Hashing and Signature Matrix

In [None]:
import random
import sys

# Define the number of bands and rows for the Min-Hash algorithm
NUM_BAND = 40  # Number of bands
NUM_ROW = 3    # Number of rows per band

# Calculate the total number of hash functions
k = NUM_BAND * NUM_ROW

# Define a large prime number for the hash functions
p = 4294967311

# Generate k random coefficients for the hash functions
a = random.sample(range(1, p), k)
b = random.sample(range(0, p), k)

# Define a function to hash a shingle using the ith hash function
def hash_shingle(shingle, i):
    shingle = " ".join(shingle)
    return (a[i] * hash(shingle) + b[i]) % p

# Define a function to compute the min-hash signature of a set of shingles
def min_hash(shingles):
    # Initialize a signature vector of length k with positive infinity values
    signature = [sys.maxsize] * k
    # Loop over each shingle in the set
    for shingle in shingles:
        # Loop over each hash function
        for i in range(k):
            # Hash the shingle using the ith hash function
            hash_value = hash_shingle(shingle, i)
            # Update the ith value of the signature if the hash value is smaller to find the minimum signature
            if signature[i] > hash_value:
                signature[i] = hash_value
    # Return the signature vector
    return signature

# Min-Hash and Signature Matrix
# Apply the min_hash function to each set of shingles in processed_news_rdd
# to generate the min-hash signature matrix for each news article
signature_matrix = processed_news_rdd.map(lambda x: (x[0], min_hash(x[1])))

## LSH and Candidate Pairs

In [5]:
import itertools

# Define a function to hash a signature vector using the ith band
def hash_signature(signature, i):
    # Extract the band corresponding to the ith band index
    band = signature[i * NUM_ROW: (i + 1) * NUM_ROW]
    # Calculate the hash value of the band
    value = hash(tuple(band))
    return (a[i] * value + b[i]) % p

# Define a function to generate candidate pairs from a band
def generate_candidates(band):
    # Group the tuples by their second element using the groupby function
    grouped = itertools.groupby(sorted(band, key=lambda x: x[1]), key=lambda x: x[1])
    # Initialize an empty list of candidate pairs
    candidates = []
    # Loop over each group
    for _, group in grouped:
        # Extract the list of articles in the group
        articles = [x[0] for x in group]
        # If the group has more than one article, generate all possible pairs
        if len(articles) > 1:
            for i in range(len(articles)):
                for j in range(i + 1, len(articles)):
                    # Append the pair to the list of candidates
                    candidates.append((articles[i], articles[j]))
    # Return the list of candidate pairs
    return candidates

# LSH and generating candidate pairs

# Apply the hash_signature function to each signature vector in the RDD
hashed_matrix = signature_matrix.flatMap(lambda x: [(i, (x[0], hash_signature(x[1], i))) for i in range(NUM_BAND)])

# Group the hashed matrix by the band index
grouped_matrix = hashed_matrix.groupByKey().mapValues(list)

# Generate candidate pairs from each band
candidate_pairs = grouped_matrix.flatMap(lambda x: generate_candidates(x[1]))

# Remove duplicate pairs and sort them by the article ids
candidate_pairs = candidate_pairs.distinct().sortBy(lambda x: (x[0], x[1]))

# Display the first 10 candidate pairs
print(candidate_pairs.take(10))



[('00001f08ded0a4ad2f7618cd6', '2da46f469fd627b1b4f5c927e'), ('000046429e09f6b639b6d8532', '0db68ee6e6f6695e0ecce36be'), ('000046429e09f6b639b6d8532', '115b9fbe166a1641bd72df893'), ('000046429e09f6b639b6d8532', '116e1951e6ec261e106a2ce0f'), ('000046429e09f6b639b6d8532', '12f245b2790078eea668d2179'), ('000046429e09f6b639b6d8532', '1f318e048098dc77b367fc633'), ('000046429e09f6b639b6d8532', '2261fa398dbe5e07ba76ef9e2'), ('000046429e09f6b639b6d8532', '38f030b89882842729e3b165a'), ('000046429e09f6b639b6d8532', '3b92e8f292f26b2a9232b6cb2'), ('000046429e09f6b639b6d8532', '5d3c94171ec1f070ea33a4866')]


                                                                                

## Clustering and Finding Similar News

#### Sample 1

In [None]:
# Define the query UID
query_uid = 'b272bcaeb94af1c6e9c6ed0aa'

# Filter candidate pairs for the given query UID
query_candidates = candidate_pairs.filter(lambda x: x[0] == query_uid or x[1] == query_uid).map(lambda x: x[1] if x[0] == query_uid else x[0]).collect()

# Retrieve the set of shingles for the query UID
query_shingles = set(processed_news_rdd.lookup(query_uid)[0])

# Retrieve shingles and UIDs for the candidate pairs
candidates_uid_shingles = processed_news_rdd.filter(lambda x: x[0] in query_candidates).collect()

# Define the Jaccard similarity threshold
jaccard_threshold = 0.3

# Define a function to compute the Jaccard similarity
def compute_jaccard_similarity(set1, set2):
    intersection_size = len(set1.intersection(set2))
    union_size = len(set1.union(set2))
    return intersection_size / union_size if union_size > 0 else 0

# Find similar news based on Jaccard similarity
similar_news_uid = []
similar_news = []
for candidate_uid, candidate_shingles in candidates_uid_shingles:
    candidate_shingles = set(candidate_shingles)
    similarity = compute_jaccard_similarity(query_shingles, candidate_shingles)
    # Check if the Jaccard similarity is above the threshold
    if similarity > jaccard_threshold:
        similar_news.append((candidate_uid, similarity))
        similar_news_uid.append(candidate_uid)

#### Validation Of Results for sample 1

In [12]:
# Extract the title of the query article
query_title = list(set(news_rdd.filter(lambda x: json.loads(x)['uid'] in [query_uid]).map(lambda x: json.loads(x)['title']).collect()))

# Extract the title and UID of similar news articles
similar_news_list = list(set(news_rdd.filter(lambda x: json.loads(x)['uid'] in similar_news_uid).map(lambda x: (json.loads(x)['uid'], json.loads(x)['title'])).collect()))

# Create dictionaries for efficient lookup
dict1 = dict(similar_news)
dict2 = dict(similar_news_list)

# Join the dictionaries to get a list of tuples with common keys(uid)... final result is a list of tuples with uis, similarity and title of each similar news
joined_list = [(key, dict1[key], dict2[key]) for key in set(dict1) & set(dict2)]

# Display the query article title
print('The given article title: ')
print(query_title[0])
print('\n\n')

# Display information about similar news articles
print(f'For the given article, {len(joined_list)} unique similar news founded:\n')
for uid, similarity, title in joined_list:
    print(f"Title: {title}\n")
    print(f"UID: {uid}, Similarity: {similarity}\n\n")




The given article title: 
عرضه 6 خودروی وارداتی در سامانه یکپارچه آغاز شد- اخبار صنعت و تجارت - اخبار اقتصادی تسنیم | Tasnim



For the given article, 12 similar news founded:

Title: سامانه یکپارچه فروش برای خودروهای وارداتی باز شد

UID: 46435999e803005bd47cf74e0, Similarity: 0.9166666666666666


Title: جزئیات عرضه ۶ خودروی وارداتی در سامانه یکپارچه | زمان ثبت نام و اسامی خودروها

UID: 74ac443c728e00bce09493ba1, Similarity: 0.9032258064516129


Title: خبر مهم از سامانه یکپارچه خودرو/متقاضیان خرید خودرو بخوانند

UID: e01e651fa146e1ee350faad5b, Similarity: 0.5492957746478874


Title:  عرضه ۶ خودروی جدید در سامانه یکپارچه

UID: a3be28df0263addf005bae7b9, Similarity: 0.7903225806451613


Title: عرضه 6 خودروی وارداتی در سامانه یکپارچه آغاز شد- اخبار صنعت و تجارت - اخبار اقتصادی تسنیم | Tasnim

UID: b272bcaeb94af1c6e9c6ed0aa, Similarity: 1.0


Title: تویوتا کرولا به لیست خودروهای وارداتی ... - ارانیکو

UID: 4e59c26a11124d15c2232ca4f, Similarity: 0.3055555555555556


Title:  سامانه یکپارچه ف

                                                                                

#### sample 2

In [None]:
# Define the query UID
query_uid = '000046429e09f6b639b6d8532'

# Filter candidate pairs for the given query UID
query_candidates = candidate_pairs.filter(lambda x: x[0] == query_uid or x[1] == query_uid).map(lambda x: x[1] if x[0] == query_uid else x[0]).collect()

# Retrieve the set of shingles for the query UID
query_shingles = set(processed_news_rdd.lookup(query_uid)[0])

# Retrieve shingles and UIDs for the candidate pairs
candidates_uid_shingles = processed_news_rdd.filter(lambda x: x[0] in query_candidates).collect()

# Define the Jaccard similarity threshold
jaccard_threshold = 0.3

# Define a function to compute the Jaccard similarity
def compute_jaccard_similarity(set1, set2):
    intersection_size = len(set1.intersection(set2))
    union_size = len(set1.union(set2))
    return intersection_size / union_size if union_size > 0 else 0

# Find similar news based on Jaccard similarity
similar_news_uid = []
similar_news = []
for candidate_uid, candidate_shingles in candidates_uid_shingles:
    candidate_shingles = set(candidate_shingles)
    similarity = compute_jaccard_similarity(query_shingles, candidate_shingles)
    # Check if the Jaccard similarity is above the threshold
    if similarity > jaccard_threshold:
        similar_news.append((candidate_uid, similarity))
        similar_news_uid.append(candidate_uid)


#### Validation Of Results for sample 2

In [15]:
# Extract the title of the query article
query_title = list(set(news_rdd.filter(lambda x: json.loads(x)['uid'] in [query_uid]).map(lambda x: json.loads(x)['title']).collect()))

# Extract the title and UID of similar news articles
similar_news_list = list(set(news_rdd.filter(lambda x: json.loads(x)['uid'] in similar_news_uid).map(lambda x: (json.loads(x)['uid'], json.loads(x)['title'])).collect()))

# Create dictionaries for efficient lookup
dict1 = dict(similar_news)
dict2 = dict(similar_news_list)

# Join the dictionaries to get a list of tuples with common keys(uid)... final result is a list of tuples with uis, similarity and title of each similar news
joined_list = [(key, dict1[key], dict2[key]) for key in set(dict1) & set(dict2)]

# Display the query article title
print('The given article title: ')
print(query_title[0])
print('\n\n')

# Display information about similar news articles
print(f'For the given article, {len(joined_list)} unique similar news founded:\n')
for uid, similarity, title in joined_list:
    print(f"Title: {title}\n")
    print(f"UID: {uid}, Similarity: {similarity}\n\n")



The given article title: 
شیوه جدید فروش خودرو اعلام شد



For the given article, 28 unique similar news founded:

Title: سامانه یکپارچه خودرو حذف شد

UID: 3aaecce21996a6b83301e969f, Similarity: 0.45614035087719296


Title: خبر فوری درباره سامانه یکپارچه خودرو / زمان عرضه مستقیم اعلام شد

UID: 990cacaf05c55d5291602a1f4, Similarity: 1.0


Title: شیوه جدید فروش خودرو اعلام شد + جزئیات

UID: b27e0e25f39dae13486cfa9fe, Similarity: 0.7142857142857143


Title: چرا سامانه یکپارچه خودرو حذف شد؟ | علت حذف دور سوم ثبت نام خودرو - اندیشه معاصر

UID: 8381cae224fa2aa9b934cf381, Similarity: 0.5070422535211268


Title: سامانه یکپارچه خودرو حذف شد/ چگونه می‌شود از کارخانه خودرو خرید؟

UID: 12f245b2790078eea668d2179, Similarity: 0.7592592592592593


Title: سامانه یکپارچه خودرو حذف شد- اخبار صنعت و تجارت - اخبار اقتصادی تسنیم | Tasnim

UID: b9c4aad52e7eefbd4466102ec, Similarity: 0.7192982456140351


Title: خبر مهم برای خریداران خودرو /اعلام شیوه جدید فروش خودرو

UID: 9727a5eba5efe8ae25551e5fa, Similarit

                                                                                