## HW 3

In this assignment, we will learn about locality sensitive hashing in Apache Spark.

Start by running the code below if you are using Google Colab

## Name
Robert Thompson (rt598@drexel.edu)

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Spark 2.4.5 not found at original link. Using 3.0.3 from apache archive
!wget https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz

--2023-03-10 23:09:20--  https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 220400553 (210M) [application/x-gzip]
Saving to: ‘spark-3.0.3-bin-hadoop2.7.tgz.2’


2023-03-10 23:09:29 (25.5 MB/s) - ‘spark-3.0.3-bin-hadoop2.7.tgz.2’ saved [220400553/220400553]



In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.0.3-bin-hadoop2.7"

In [None]:
!pip install -q findspark
# Explicitly define version here
!pip install pyspark==3.0.3

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from pyspark.sql import SparkSession

In [None]:
APP_NAME = "HW3"

In [None]:
spark = SparkSession.builder.appName(APP_NAME).getOrCreate()

In [None]:
spark

1. Load the wikipedia links dataset provided in this assignment as a dataframe.

In [None]:
should_infer_schema = "true"
contains_header = "true"
delimiter = ","
file_type = "csv"
file_name = "drive/MyDrive/wikipedia_links.csv"

dataframe = spark.read.format(file_type) \
  .option("inferSchema", should_infer_schema) \
  .option("header", contains_header) \
  .option("sep", delimiter) \
  .csv(file_name)

In [None]:
dataframe.show(10, truncate=False)

+--------------------------------------------------------------+
|link                                                          |
+--------------------------------------------------------------+
|http://en.wikipedia.org/wiki/Lungotevere_dei_Mellini          |
|http://en.wikipedia.org/wiki/South_Rowan_High_School          |
|http://en.wikipedia.org/wiki/Haikou_College_of_Economics      |
|http://en.wikipedia.org/wiki/Saylesville,_Rhode_Island        |
|http://en.wikipedia.org/wiki/Wallington,_New_Jersey           |
|http://en.wikipedia.org/wiki/Saint_Andrew's_Chapel            |
|http://en.wikipedia.org/wiki/Nemperor_Records                 |
|http://en.wikipedia.org/wiki/Kepler-37b                       |
|http://en.wikipedia.org/wiki/Ecomuseo_della_Montagna_Pistoiese|
|http://en.wikipedia.org/wiki/Virtual_Network_Computing        |
+--------------------------------------------------------------+
only showing top 10 rows



2. We would like to keep the name of the Wikipedia entry only. Parse the links on the '/' symbol and keep only the last part (you may use PySpark functions for string splitting).

In [None]:
from urllib.request import unquote

def parse_link(string_to_parse):
  if string_to_parse is not None:
      string_split = string_to_parse.split("/")
      return unquote(string_split[-1], encoding = "utf-8")

string = '%20Hey%20Python%20is%20cool!'
print(parse_link(string))

 Hey Python is cool!


In [None]:
from pyspark.sql.functions import  udf
from pyspark.sql.types import StringType

link_udf = udf(lambda link: parse_link(link), StringType())

In [None]:
link_dataframe = dataframe.withColumn("link", link_udf("link"))
link_dataframe.show(10, truncate=False)

+---------------------------------+
|link                             |
+---------------------------------+
|Lungotevere_dei_Mellini          |
|South_Rowan_High_School          |
|Haikou_College_of_Economics      |
|Saylesville,_Rhode_Island        |
|Wallington,_New_Jersey           |
|Saint_Andrew's_Chapel            |
|Nemperor_Records                 |
|Kepler-37b                       |
|Ecomuseo_della_Montagna_Pistoiese|
|Virtual_Network_Computing        |
+---------------------------------+
only showing top 10 rows



3. Add an ID column that will be a sequential integer between 1 and len(wiki) in our Wikipedia links table. You may order the column using any other column of your choice.

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

link_dataframe2 = link_dataframe.withColumn("ID", monotonically_increasing_id() + 1)
link_dataframe2.show(10, truncate=False)

+---------------------------------+---+
|link                             |ID |
+---------------------------------+---+
|Lungotevere_dei_Mellini          |1  |
|South_Rowan_High_School          |2  |
|Haikou_College_of_Economics      |3  |
|Saylesville,_Rhode_Island        |4  |
|Wallington,_New_Jersey           |5  |
|Saint_Andrew's_Chapel            |6  |
|Nemperor_Records                 |7  |
|Kepler-37b                       |8  |
|Ecomuseo_della_Montagna_Pistoiese|9  |
|Virtual_Network_Computing        |10 |
+---------------------------------+---+
only showing top 10 rows



4. Clean up the data by removing missing data.

In [None]:
link_dataframe2.sort("link", ascending=True).show(20, truncate=False)

+---------------------------------+-----+
|link                             |ID   |
+---------------------------------+-----+
|null                             |45262|
|!!!                              |56373|
|!!!                              |46251|
|!Oka_Tokat                       |12329|
|!PAUS3                           |42577|
|!PAUS3                           |43338|
|!PAUS3                           |47829|
|!PAUS3                           |58634|
|!Women_Art_Revolution            |42668|
|"Bassy"_Bob_Brockmann            |59249|
|"Dr._Death"_Steve_Williams       |51634|
|"Irish"_Teddy_Mann               |55706|
|"King"_Bennie_Nawahi             |56978|
|"L"_Is_for_Lawless               |55244|
|"O"_(Flowers_of_Hell_album)      |42153|
|"P"_Is_for_Peril                 |48870|
|"P"_Is_for_Peril                 |56510|
|"Pimpernel"_Smith                |44382|
|"Pussy_Cats"_Starring_the_Walkmen|47835|
|"Q"_Is_for_Quarry                |15238|
+---------------------------------

In [None]:
print("Total Count:", link_dataframe2.count())
link_dataframe3 = link_dataframe2.na.drop()
print("New Count:", link_dataframe3.count())

Total Count: 60000
New Count: 59999


In [None]:
import re

def clean_link(string_to_clean):
  regex = "^[a-zA-Z10-9\s_]$"
  string_to_clean = "".join(char.strip() for char in string_to_clean.lower() if re.search(regex,char) != None )
  string_to_clean = " ".join(word for word in string_to_clean.split('_') if word != "")
  
  if string_to_clean.strip() != '':
      return string_to_clean
  else: 
      return None

test_string_to_clean = '68_(film)'
print(clean_link(test_string_to_clean))

test_string_to_clean = 'Til_Death_Do_Us_Part_(Star_Trek:_Deep_Space_Nine)'
print(clean_link(test_string_to_clean))

test_string_to_clean = '!!!'
print(clean_link(test_string_to_clean))

test_string_to_clean = 'null'
print(clean_link(test_string_to_clean))

68 film
til death do us part star trek deep space nine
None
null


In [None]:
clean_link_udf = udf(lambda link: clean_link(link), StringType())
clean_link_dataframe = link_dataframe3.withColumn("link", clean_link_udf("link"))

In [None]:
print("Total Count:", clean_link_dataframe.count())
clean_link_dataframe2 = clean_link_dataframe.na.drop()
print("New Count:", clean_link_dataframe2.count())

Total Count: 59999
New Count: 59996


In [None]:
clean_link_dataframe2.sort("link", ascending=True).show(20, truncate=False)

+--------------------------+-----+
|link                      |ID   |
+--------------------------+-----+
|00 boston massachusetts   |12630|
|00 detroit michigan       |50043|
|00 jones beach new york   |32630|
|00 paris france           |29441|
|00 verona italy           |9441 |
|07ghost                   |53006|
|1                         |2167 |
|1 2 4 8                   |364  |
|1 aurigae                 |53213|
|1 corinthians 11          |52315|
|1 day                     |47388|
|1 for 3                   |52329|
|1 girl nation             |42166|
|1 hanover square          |58901|
|1 nenokkadine             |49611|
|1 thing                   |15606|
|10                        |54255|
|10 9 8 7 6 5 4 3 2 1      |51918|
|10 items or less tv series|47952|
|10 magazine               |3604 |
+--------------------------+-----+
only showing top 20 rows



5. Tokenize the words by splitting on the underscore symbol, count vectorize using a vocab size of 1,000,000, and create feature vectors using the count vectors. Your final result should be a dataframe containing the ID we previously generated and the feature vectors.

In [None]:
from pyspark.ml.feature import RegexTokenizer

regex_tokenizer = RegexTokenizer(inputCol="link", outputCol="words", pattern="[^\\w]")
words_dataframe = regex_tokenizer.transform(clean_link_dataframe2)
words_dataframe.show(20, truncate=False)

+-------------------------------------+---+------------------------------------------+
|link                                 |ID |words                                     |
+-------------------------------------+---+------------------------------------------+
|lungotevere dei mellini              |1  |[lungotevere, dei, mellini]               |
|south rowan high school              |2  |[south, rowan, high, school]              |
|haikou college of economics          |3  |[haikou, college, of, economics]          |
|saylesville rhode island             |4  |[saylesville, rhode, island]              |
|wallington new jersey                |5  |[wallington, new, jersey]                 |
|saint andrews chapel                 |6  |[saint, andrews, chapel]                  |
|nemperor records                     |7  |[nemperor, records]                       |
|kepler37b                            |8  |[kepler37b]                               |
|ecomuseo della montagna pistoiese    |9  |

In [None]:
from pyspark.ml.feature import CountVectorizer

word_count_vectorizer = CountVectorizer(inputCol="words", outputCol="features")
word_count_vectorizer_model = word_count_vectorizer.fit(words_dataframe)

In [None]:
features_data = word_count_vectorizer_model.transform(words_dataframe)
features_data.sort("link").show(10, truncate=False)

+-----------------------+-----+-----------------------------+--------------------------------------------------+
|link                   |ID   |words                        |features                                          |
+-----------------------+-----+-----------------------------+--------------------------------------------------+
|00 boston massachusetts|12630|[00, boston, massachusetts]  |(53342,[334,818,4443],[1.0,1.0,1.0])              |
|00 detroit michigan    |50043|[00, detroit, michigan]      |(53342,[173,2600,4443],[1.0,1.0,1.0])             |
|00 jones beach new york|32630|[00, jones, beach, new, york]|(53342,[10,36,217,477,4443],[1.0,1.0,1.0,1.0,1.0])|
|00 paris france        |29441|[00, paris, france]          |(53342,[639,713,4443],[1.0,1.0,1.0])              |
|00 verona italy        |9441 |[00, verona, italy]          |(53342,[903,3551,4443],[1.0,1.0,1.0])             |
|07ghost                |53006|[07ghost]                    |(53342,[23688],[1.0])              

In [None]:
featurized_vector_dataframe = features_data.select(["ID", "features"])
featurized_vector_dataframe.show(10, truncate=False)

+---+--------------------------------------------------+
|ID |features                                          |
+---+--------------------------------------------------+
|1  |(53342,[14933,19449,53046],[1.0,1.0,1.0])         |
|2  |(53342,[6,24,30,6142],[1.0,1.0,1.0,1.0])          |
|3  |(53342,[0,38,1631,12830],[1.0,1.0,1.0,1.0])       |
|4  |(53342,[81,1710,30101],[1.0,1.0,1.0])             |
|5  |(53342,[10,372,13783],[1.0,1.0,1.0])              |
|6  |(53342,[373,1207,3046],[1.0,1.0,1.0])             |
|7  |(53342,[298,46425],[1.0,1.0])                     |
|8  |(53342,[45385],[1.0])                             |
|9  |(53342,[4517,27085,32943,40280],[1.0,1.0,1.0,1.0])|
|10 |(53342,[335,1227,1620],[1.0,1.0,1.0])             |
+---+--------------------------------------------------+
only showing top 10 rows



6. In the last part of the assignment, use the MinHashLSH model to fit our feature vectors and then find all pairs using a threshold of 0.3.

In [None]:
from pyspark.ml.feature import MinHashLSH

min_hash_lsh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5, seed=1)
mhlsh_model = min_hash_lsh.fit(featurized_vector_dataframe)

In [None]:
mhlsh_model_transformed = mhlsh_model.transform(featurized_vector_dataframe)
mhlsh_model_transformed.show(10, truncate=False)

+---+--------------------------------------------------+----------------------------------------------------------------------------------+
|ID |features                                          |hashes                                                                            |
+---+--------------------------------------------------+----------------------------------------------------------------------------------+
|1  |(53342,[14933,19449,53046],[1.0,1.0,1.0])         |[[6.51423894E8], [6.84011766E8], [4.00583659E8], [4.64784548E8], [1.184775354E9]] |
|2  |(53342,[6,24,30,6142],[1.0,1.0,1.0,1.0])          |[[7.31069121E8], [4.7219434E8], [1.039047564E9], [3.02213422E8], [4.52771699E8]]  |
|3  |(53342,[0,38,1631,12830],[1.0,1.0,1.0,1.0])       |[[2.85932752E8], [5.72731754E8], [4.61995813E8], [9.1336298E7], [1.30672501E8]]   |
|4  |(53342,[81,1710,30101],[1.0,1.0,1.0])             |[[1.30648934E8], [7.17696608E8], [6.6761203E8], [9.00102729E8], [4.29453518E8]]   |
|5  |(53342,[10,372,

In [None]:
threshold = 0.3
mhlsh_model.approxSimilarityJoin(featurized_vector_dataframe, featurized_vector_dataframe, threshold).filter("distCol != 0").sort('distCol', ascending=False).show(truncate=True)

+--------------------+--------------------+------------------+
|            datasetA|            datasetB|           distCol|
+--------------------+--------------------+------------------+
|[31875, (53342,[2...|[34167, (53342,[2...|0.2857142857142857|
|[15282, (53342,[2...|[23511, (53342,[2...|0.2857142857142857|
|[9075, (53342,[21...|[34084, (53342,[2...|0.2857142857142857|
|[7702, (53342,[21...|[23511, (53342,[2...|0.2857142857142857|
|[29075, (53342,[2...|[3881, (53342,[21...|0.2857142857142857|
|[53601, (53342,[2...|[8347, (53342,[21...|0.2857142857142857|
|[22646, (53342,[2...|[31875, (53342,[2...|0.2857142857142857|
|[35282, (53342,[2...|[14799, (53342,[2...|0.2857142857142857|
|[14167, (53342,[2...|[54084, (53342,[2...|0.2857142857142857|
|[44047, (53342,[2...|[11070, (53342,[2...|0.2857142857142857|
|[3511, (53342,[21...|[2646, (53342,[21...|0.2857142857142857|
|[27702, (53342,[2...|[14167, (53342,[2...|0.2857142857142857|
|[54799, (53342,[2...|[22646, (53342,[2...|0.2857142857