<a href="https://colab.research.google.com/github/Wannayli/assignment-3/blob/main/Assignment_3_of_Locality_Sensitive_Hashing_using_Apache_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

References:

1. https://www.uber.com/en-ID/blog/lsh/
2. https://stackoverflow.com/questions/56816537/cant-find-kaggle-json-file-in-google-colab
3. https://spark.apache.org/docs/latest/api/python/index.html
4. https://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing

bold text# Initialization

## Checking the Environment

In [1]:
!java --version
!python --version

openjdk 11.0.18 2023-01-17
OpenJDK Runtime Environment (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1)
OpenJDK 64-Bit Server VM (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1, mixed mode, sharing)
Python 3.9.16


## Installing Apache Spark (PySpark)

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m17.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=5ed1ce14e3e27c6e96990afa0eadbc4032722f68e4c95f1bb06c4a7f466bc35d
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

## Initialize Apache Spark context

In [3]:
# Import Apache Spark SQL
from pyspark.sql import SparkSession

# Create Spark Session/Context
# We are using local machine with all the CPU cores [*]
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Hello Pyspark") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
# Check spark session
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f3d041dc400>


# Data Mining Task

The LSH task always consists of three steps:


1. Converting original data into vectors
2. Calculate the hash using MinHash algorithm
3. Searching the similar pairs using k-Nearest Neighbor, or join algorithm.



## Downloading the dataset

In [5]:
!pip install kaggle

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [6]:
!mkdir ~/.kaggle/
!touch ~/.kaggle/kaggle.json

# PLEASE USE YOUR OWN KEY
# Download your own key according to this instruction https://github.com/Kaggle/kaggle-api#api-credentials

api_token = {"username":"nayliizzati","key":"13d411e05954d1f2778b8d83b21f19bb"}

import json

with open('/root/.kaggle/kaggle.json', 'w') as file:
    json.dump(api_token, file)

!chmod 600 ~/.kaggle/kaggle.json

In [7]:
# Download from https://www.kaggle.com/datasets/urbanbricks/wikipedia-promotional-articles

!kaggle datasets download -d urbanbricks/wikipedia-promotional-articles

Downloading wikipedia-promotional-articles.zip to /content
 97% 196M/201M [00:01<00:00, 148MB/s]
100% 201M/201M [00:01<00:00, 145MB/s]


In [8]:
!unzip wikipedia-promotional-articles.zip

Archive:  wikipedia-promotional-articles.zip
  inflating: good.csv                
  inflating: promotional.csv         


In [9]:
!ls -la

total 783140
drwxr-xr-x 1 root root      4096 Mar  9 16:06 .
drwxr-xr-x 1 root root      4096 Mar  9 16:02 ..
drwxr-xr-x 4 root root      4096 Mar  7 18:12 .config
-rw-r--r-- 1 root root 475685227 Oct 27  2019 good.csv
-rw-r--r-- 1 root root 115360355 Oct 27  2019 promotional.csv
drwxr-xr-x 1 root root      4096 Mar  7 18:14 sample_data
-rw-r--r-- 1 root root 210863294 Mar  9 16:05 wikipedia-promotional-articles.zip


## Read the dataset

In [10]:
# Read CSV
df = spark.read.option("header", True).csv("/content/promotional.csv")
df.printSchema()

# Add an ID for the dataset
from pyspark.sql.functions import monotonically_increasing_id
newsDF = df.withColumn("id", monotonically_increasing_id())
newsDF.show()

root
 |-- text: string (nullable = true)
 |-- advert: string (nullable = true)
 |-- coi: string (nullable = true)
 |-- fanpov: string (nullable = true)
 |-- pr: string (nullable = true)
 |-- resume: string (nullable = true)
 |-- url: string (nullable = true)

+--------------------+------+---+------+---+------+--------------------+---+
|                text|advert|coi|fanpov| pr|resume|                 url| id|
+--------------------+------+---+------+---+------+--------------------+---+
|1 Litre no Namida...|     0|  0|     1|  0|     0|https://en.wikipe...|  0|
|1DayLater was fre...|     1|  1|     0|  0|     0|https://en.wikipe...|  1|
|1E is a privately...|     1|  0|     0|  0|     0|https://en.wikipe...|  2|
|1Malaysia pronoun...|     1|  0|     0|  0|     0|https://en.wikipe...|  3|
|The Jerusalem Bie...|     1|  0|     0|  0|     0|https://en.wikipe...|  4|
|1st Round Enterpr...|     0|  0|     0|  1|     0|https://en.wikipe...|  5|
|2ergo is a provid...|     1|  0|     0|  0|   

In [11]:
# Get the total rows
newsDF.count()

23837

## 1. Prepare the tokenizer

We transform the input into tokenized words.

In [12]:
# Prepare the tokenizer
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")
wordsDF = tokenizer.transform(newsDF)

wordsDF.show()

+--------------------+------+---+------+---+------+--------------------+---+--------------------+
|                text|advert|coi|fanpov| pr|resume|                 url| id|               words|
+--------------------+------+---+------+---+------+--------------------+---+--------------------+
|1 Litre no Namida...|     0|  0|     1|  0|     0|https://en.wikipe...|  0|[1, litre, no, na...|
|1DayLater was fre...|     1|  1|     0|  0|     0|https://en.wikipe...|  1|[1daylater, was, ...|
|1E is a privately...|     1|  0|     0|  0|     0|https://en.wikipe...|  2|[1e, is, a, priva...|
|1Malaysia pronoun...|     1|  0|     0|  0|     0|https://en.wikipe...|  3|[1malaysia, prono...|
|The Jerusalem Bie...|     1|  0|     0|  0|     0|https://en.wikipe...|  4|[the, jerusalem, ...|
|1st Round Enterpr...|     0|  0|     0|  1|     0|https://en.wikipe...|  5|[1st, round, ente...|
|2ergo is a provid...|     1|  0|     0|  0|     0|https://en.wikipe...|  6|[2ergo, is, a, pr...|
|2N Telekomunikace..

In [13]:
# Vectorize the dataset
from pyspark.ml.feature import CountVectorizer

vocabSize=1000

# Train the CountVectorizer Model using our data
cvModel = CountVectorizer(inputCol="words", outputCol="features", vocabSize=vocabSize, minDF=10).fit(wordsDF)

# Transform our data into vector
vectorizedDF = cvModel.transform(wordsDF)
vectorizedDF.show()

+--------------------+------+---+------+---+------+--------------------+---+--------------------+--------------------+
|                text|advert|coi|fanpov| pr|resume|                 url| id|               words|            features|
+--------------------+------+---+------+---+------+--------------------+---+--------------------+--------------------+
|1 Litre no Namida...|     0|  0|     1|  0|     0|https://en.wikipe...|  0|[1, litre, no, na...|(1000,[0,1,2,3,4,...|
|1DayLater was fre...|     1|  1|     0|  0|     0|https://en.wikipe...|  1|[1daylater, was, ...|(1000,[0,1,2,3,4,...|
|1E is a privately...|     1|  0|     0|  0|     0|https://en.wikipe...|  2|[1e, is, a, priva...|(1000,[0,1,2,3,4,...|
|1Malaysia pronoun...|     1|  0|     0|  0|     0|https://en.wikipe...|  3|[1malaysia, prono...|(1000,[0,1,2,3,4,...|
|The Jerusalem Bie...|     1|  0|     0|  0|     0|https://en.wikipe...|  4|[the, jerusalem, ...|(1000,[0,1,2,3,4,...|
|1st Round Enterpr...|     0|  0|     0|  1|    

## 2. Fit/train an LSH Model

In [14]:
from  pyspark.ml.feature import MinHashLSH

mh = MinHashLSH(inputCol="features", outputCol="hashValues", numHashTables=3)
LSHmodel = mh.fit(vectorizedDF)

LSHmodel.transform(vectorizedDF).show()

+--------------------+------+---+------+---+------+--------------------+---+--------------------+--------------------+--------------------+
|                text|advert|coi|fanpov| pr|resume|                 url| id|               words|            features|          hashValues|
+--------------------+------+---+------+---+------+--------------------+---+--------------------+--------------------+--------------------+
|1 Litre no Namida...|     0|  0|     1|  0|     0|https://en.wikipe...|  0|[1, litre, no, na...|(1000,[0,1,2,3,4,...|[[1.8045414E7], [...|
|1DayLater was fre...|     1|  1|     0|  0|     0|https://en.wikipe...|  1|[1daylater, was, ...|(1000,[0,1,2,3,4,...|[[1.7546412E7], [...|
|1E is a privately...|     1|  0|     0|  0|     0|https://en.wikipe...|  2|[1e, is, a, priva...|(1000,[0,1,2,3,4,...|[[3877780.0], [3....|
|1Malaysia pronoun...|     1|  0|     0|  0|     0|https://en.wikipe...|  3|[1malaysia, prono...|(1000,[0,1,2,3,4,...|[[3877780.0], [1....|
|The Jerusalem Bie..

## 3. Searching the similar pairs/items for a key "if" "this"

In [23]:
# Testing searching for "if" "this"

from pyspark.ml.linalg import Vectors


# Convert the input with 3 words into 1000 size vectors
# If the words exist in the index we will give value = 1.0, otherwise 0.0
# Final result: key = [0, 0, ... , 1.0, ..., 1.0, 1.0, ....]

# key = Vectors.sparse(vocabSize, {cvModel.vocabulary.index("if"): 1.0, cvModel.vocabulary.index("this"): 1.0})
key = Vectors.sparse(vocabSize, {cvModel.vocabulary.index("if"): 1.0, cvModel.vocabulary.index("this"): 1.0})

In [22]:
# Define the number of neighbours
k = 40

# Search inside LSH model that we already trained
resultDF = LSHmodel.approxNearestNeighbors(vectorizedDF, key, k)
resultDF.show()

+----+------+---+------+---+------+---+---+-----+--------+----------+-------+
|text|advert|coi|fanpov| pr|resume|url| id|words|features|hashValues|distCol|
+----+------+---+------+---+------+---+---+-----+--------+----------+-------+
+----+------+---+------+---+------+---+---+-----+--------+----------+-------+



In [24]:
# Save the result into CSV
import pandas as pd

data = resultDF.toPandas()
data.to_csv("data.csv")