<a href="https://colab.research.google.com/github/AshkanSamavatian/AMD-Final-Project/blob/main/Algorithms_for_Massive_Data_Final_Project_(Ashkan_Samavatian)_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### **Importing necessary libraries and Import Data from Kaggle**

In [None]:
#Importing necessary libraries for the project
import os
import sys
import pandas as pd
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

In [None]:
#Importing Dataset from Kaggle website with my kaggle_username and kaggle_key
os.environ['KAGGLE_USERNAME'] = "*****"      #For presenteing the project, I substituted my kaggle username with "*****"
os.environ['KAGGLE_KEY'] = "*****"           #For presenteing the project, I substituted my kaggle key with "*****"
!kaggle datasets download -d xhlulu/medal-emnlp

Downloading medal-emnlp.zip to /content
100% 6.81G/6.82G [00:31<00:00, 226MB/s]
100% 6.82G/6.82G [00:31<00:00, 230MB/s]


In [None]:
#Unzipping only the "full_data.csv" file
!unzip medal-emnlp.zip full_data.csv

Archive:  medal-emnlp.zip
  inflating: full_data.csv           


### **Extracting a subset for the project**

In [None]:
#Reading and storing the dataset
MeDAL_df=pd.read_csv("full_data.csv")

In [None]:
#Extracting a random subset and reseting all the indexes in the subset
MeDAL_subset_df = MeDAL_df.sample(n=700000)
MeDAL_subset_df = MeDAL_subset_df.reset_index(drop=True)

In [None]:
#Monitoring the subset
MeDAL_subset_df.head()

Unnamed: 0,TEXT,LOCATION,LABEL
0,pediatricians should encourage participation i...,10|13|27,mentally retarded|right|physical
1,SVR sclerosis ssc is a multiorgan connective t...,0,systemic
2,heavy metal concentrations in street dust of b...,53|86|105,background|correlation|natural
3,the critical role of thrombin in mediating pla...,77,artery
4,eight patients females male with congenital ad...,7,hyperplasia


In [None]:
#Overviewing the subset
MeDAL_subset_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 700000 entries, 0 to 699999
Data columns (total 3 columns):
 #   Column    Non-Null Count   Dtype 
---  ------    --------------   ----- 
 0   TEXT      700000 non-null  object
 1   LOCATION  700000 non-null  object
 2   LABEL     700000 non-null  object
dtypes: object(3)
memory usage: 16.0+ MB


In [None]:
#Saving the subset to CSV format for the upcoming processes
MeDAL_subset_df.to_csv('MeDAL_subset_df.csv', index=False)

### **PySpark Setup**

In [None]:
#Setup Java, Downloading Spark, Extracting its files and Installing FindSpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
#Setting the Environment Paths
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["JAVA_OPTS"] = "-Xms512m -Xmx4g"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [None]:
#Initializing FindSpark
import findspark
findspark.init()

In [None]:
#Importing necessary libraries for PySpark
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import lower, regexp_replace, concat_ws, udf, col, size, sum as sql_sum, abs, row_number, monotonically_increasing_id
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, MinHashLSH, CountVectorizer
from pyspark.ml.linalg import SparseVector

In [None]:
#Starting a PySpark Session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('AMDProject') \
    .config("spark.driver.memory", "10g") \
    .config("spark.executor.memory", "10g") \
    .config("spark.default.parallelism", "100") \
    .getOrCreate()

### **The Main Process on the subset**

In [None]:
#Loading the subset on PySpark
df = spark.read.csv("MeDAL_subset_df.csv", header=True, inferSchema=True)

In [None]:
#"TEXT" column preprocessing
df = df.withColumn("TEXT", lower(col("TEXT")))  #Convert to lowercase
df = df.withColumn("TEXT", regexp_replace(col("TEXT"), '[^a-zA-Z\s]', ' ')) #remove the punctuations

#Tokenizing the "TEXT" column
tokenizer = Tokenizer(inputCol="TEXT", outputCol="tokenized_TEXT")
df = tokenizer.transform(df)

#Removing the stop words
remover = StopWordsRemover(inputCol="tokenized_TEXT", outputCol="filtered_TEXT")
df = remover.transform(df)

#Concatenating the words back together
df = df.withColumn("filtered_TEXT_str", concat_ws(" ", col("filtered_TEXT")))

In [None]:
#Monitoring the subset on PySpark after the preprocessing processes
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                TEXT|            LOCATION|               LABEL|      tokenized_TEXT|       filtered_TEXT|   filtered_TEXT_str|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|pediatricians sho...|            10|13|27|mentally retarded...|[pediatricians, s...|[pediatricians, e...|pediatricians enc...|
|svr sclerosis ssc...|                   0|            systemic|[svr, sclerosis, ...|[svr, sclerosis, ...|svr sclerosis ssc...|
|heavy metal conce...|           53|86|105|background|correl...|[heavy, metal, co...|[heavy, metal, co...|heavy metal conce...|
|the critical role...|                  77|              artery|[the, critical, r...|[critical, role, ...|critical role thr...|
|eight patients fe...|                   7|         hyperplasia|[eight, patients,...|[eight, patients,..

In [None]:
#Overviewing the subset on PySpark after the preprocessing processes
df.printSchema()

root
 |-- TEXT: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- LABEL: string (nullable = true)
 |-- tokenized_TEXT: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_TEXT: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_TEXT_str: string (nullable = false)



In [None]:
#Checking again the number of the rows in the subset
df.count()

700000

In [None]:
#Applying Shingling process on the subset

k = 10

def shingle_document(string):
    return [string[i:i+k] for i in range(len(string) - k + 1)]

shingle_udf = udf(shingle_document, ArrayType(StringType()))

df_shingled = df.withColumn("shingles", shingle_udf(df["filtered_TEXT_str"]))

In [None]:
#Converting shingles to vectors for minhash
cv = CountVectorizer(inputCol="shingles", outputCol="features")
model = cv.fit(df_shingled)
df_vectorized = model.transform(df_shingled)

#Filtering out null vectors
df_vectorized = df_vectorized.filter(col('features').isNotNull())

#Filtering out zero vectors
def is_nonzero(v):
    if isinstance(v, SparseVector):
        return v.numNonzeros() > 0
    else:  # DenseVector
        return any(i != 0 for i in v)

#Apply the filtering using RDD and then convert back to data frame
df_vectorized = df_vectorized.rdd.filter(lambda row: is_nonzero(row['features'])).toDF()


#Creating a window specification without any partitioning, and order by the original columns
window_spec = Window.orderBy(df_vectorized.columns)

#Adding a unique ID to the vectorized data frame
df_vectorized = df_vectorized.withColumn("id", row_number().over(window_spec) - 1)

#Applying MinHashLSH
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model_lsh = mh.fit(df_vectorized)

#Querying for Similar Items
threshold = 0.8
results = model_lsh.approxSimilarityJoin(df_vectorized, df_vectorized, threshold)\
                   .filter(col("datasetA.id") < col("datasetB.id"))\
                   .select("datasetA.id", "datasetB.id").collect()


In [None]:
#Checking the number of similar pairs
len(results)

18738

In [None]:
#Printing the similar pairs
for row in results:
    print(f"({row[0]}, {row[1]})")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
(619626, 619899)
(619626, 619921)
(619626, 619926)
(619897, 619899)
(619897, 619921)
(619897, 619926)
(619899, 619907)
(619899, 619921)
(619899, 619926)
(619899, 619928)
(619899, 619935)
(619899, 619937)
(619899, 619951)
(619899, 619965)
(619899, 619970)
(619907, 619921)
(619907, 619926)
(619907, 619928)
(619920, 619921)
(619920, 619926)
(619921, 619926)
(619921, 619928)
(619921, 619937)
(619921, 619965)
(619926, 619928)
(619926, 619935)
(619926, 619937)
(619926, 619951)
(619926, 619965)
(619926, 619970)
(619928, 619937)
(388883, 557610)
(537400, 557610)
(438602, 629814)
(438602, 606288)
(542153, 626260)
(542153, 626342)
(271285, 407331)
(271285, 619999)
(606677, 606708)
(606700, 606708)
(609366, 611479)
(611479, 624045)
(611479, 626011)
(609540, 616714)
(541244, 610096)
(612034, 614059)
(135419, 276722)
(623938, 630718)
(624177, 624223)
(42326, 628071)
(616743, 620716)
(590421, 621112)
(590421, 609741)
(621112, 630692)
(