###Install Spark

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=ba9479c212966bcfd4b28e2c2d52a975253d5ba3167855d8f3036e1cad895dbf
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


###Initialize Apache Spark Context

In [2]:
# Import Apache Spark SQL
from pyspark.sql import SparkSession

# Create Spark Session/Context
# We are using local machine with all the CPU cores [*]
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Hello Pyspark") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Check spark session
print(spark)

###Downloading Dataset

In [3]:
!pip install kaggle
!mkdir ~/.kaggle/
!touch ~/.kaggle/kaggle.json



In [4]:
api_token = {"username":"husniridhartazzikry","key":"d7cd05ddab3da1e874cc4bfd1778f664"}

In [5]:
import json

with open('/root/.kaggle/kaggle.json', 'w') as file:
    json.dump(api_token, file)

!chmod 600 ~/.kaggle/kaggle.json

In [6]:
# Download from https://www.kaggle.com/datasets/urbanbricks/wikipedia-promotional-articles

!kaggle datasets download -d urbanbricks/wikipedia-promotional-articles

Downloading wikipedia-promotional-articles.zip to /content
 96% 193M/201M [00:02<00:00, 81.4MB/s]
100% 201M/201M [00:02<00:00, 71.6MB/s]


###Unzip the downloaded files

In [7]:
!unzip wikipedia-promotional-articles.zip

Archive:  wikipedia-promotional-articles.zip
  inflating: good.csv                
  inflating: promotional.csv         


The LSH task always consists of three steps:


1. Converting original data into vectors
2. Calculate the hash using MinHash algorithm
3. Searching the similar pairs using k-Nearest Neighbor, or join algorithm.

#Read Dataset

In [8]:
# Read CSV
df = spark.read.option("header", True).csv("/content/promotional.csv")
df.printSchema()

root
 |-- text: string (nullable = true)
 |-- advert: string (nullable = true)
 |-- coi: string (nullable = true)
 |-- fanpov: string (nullable = true)
 |-- pr: string (nullable = true)
 |-- resume: string (nullable = true)
 |-- url: string (nullable = true)



In [9]:
# Add an ID for the dataset
from pyspark.sql.functions import monotonically_increasing_id

newsDF = df.withColumn("id", monotonically_increasing_id())
newsDF.show()


+--------------------+------+---+------+---+------+--------------------+---+
|                text|advert|coi|fanpov| pr|resume|                 url| id|
+--------------------+------+---+------+---+------+--------------------+---+
|1 Litre no Namida...|     0|  0|     1|  0|     0|https://en.wikipe...|  0|
|1DayLater was fre...|     1|  1|     0|  0|     0|https://en.wikipe...|  1|
|1E is a privately...|     1|  0|     0|  0|     0|https://en.wikipe...|  2|
|1Malaysia pronoun...|     1|  0|     0|  0|     0|https://en.wikipe...|  3|
|The Jerusalem Bie...|     1|  0|     0|  0|     0|https://en.wikipe...|  4|
|1st Round Enterpr...|     0|  0|     0|  1|     0|https://en.wikipe...|  5|
|2ergo is a provid...|     1|  0|     0|  0|     0|https://en.wikipe...|  6|
|2N Telekomunikace...|     1|  0|     0|  0|     0|https://en.wikipe...|  7|
|A 3D printing mar...|     1|  0|     0|  0|     0|https://en.wikipe...|  8|
|3DR is an America...|     1|  1|     0|  0|     0|https://en.wikipe...|  9|

In [10]:
# Get the total rows
newsDF.count()

23837

#Preparing Tokenizer

In [11]:
# Prepare the tokenizer
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")
#Tokenized the words
wordsDF = tokenizer.transform(newsDF)

wordsDF.show()

+--------------------+------+---+------+---+------+--------------------+---+--------------------+
|                text|advert|coi|fanpov| pr|resume|                 url| id|               words|
+--------------------+------+---+------+---+------+--------------------+---+--------------------+
|1 Litre no Namida...|     0|  0|     1|  0|     0|https://en.wikipe...|  0|[1, litre, no, na...|
|1DayLater was fre...|     1|  1|     0|  0|     0|https://en.wikipe...|  1|[1daylater, was, ...|
|1E is a privately...|     1|  0|     0|  0|     0|https://en.wikipe...|  2|[1e, is, a, priva...|
|1Malaysia pronoun...|     1|  0|     0|  0|     0|https://en.wikipe...|  3|[1malaysia, prono...|
|The Jerusalem Bie...|     1|  0|     0|  0|     0|https://en.wikipe...|  4|[the, jerusalem, ...|
|1st Round Enterpr...|     0|  0|     0|  1|     0|https://en.wikipe...|  5|[1st, round, ente...|
|2ergo is a provid...|     1|  0|     0|  0|     0|https://en.wikipe...|  6|[2ergo, is, a, pr...|
|2N Telekomunikace..

In [12]:
# Vectorize the dataset
from pyspark.ml.feature import CountVectorizer

vocabSize=1000

# Train the CountVectorizer Model using our data
cvModel = CountVectorizer(inputCol="words", outputCol="features", vocabSize=vocabSize, minDF=10).fit(wordsDF)

# Transform our data into vector
vectorizedDF = cvModel.transform(wordsDF)
vectorizedDF.show()

+--------------------+------+---+------+---+------+--------------------+---+--------------------+--------------------+
|                text|advert|coi|fanpov| pr|resume|                 url| id|               words|            features|
+--------------------+------+---+------+---+------+--------------------+---+--------------------+--------------------+
|1 Litre no Namida...|     0|  0|     1|  0|     0|https://en.wikipe...|  0|[1, litre, no, na...|(1000,[0,1,2,3,4,...|
|1DayLater was fre...|     1|  1|     0|  0|     0|https://en.wikipe...|  1|[1daylater, was, ...|(1000,[0,1,2,3,4,...|
|1E is a privately...|     1|  0|     0|  0|     0|https://en.wikipe...|  2|[1e, is, a, priva...|(1000,[0,1,2,3,4,...|
|1Malaysia pronoun...|     1|  0|     0|  0|     0|https://en.wikipe...|  3|[1malaysia, prono...|(1000,[0,1,2,3,4,...|
|The Jerusalem Bie...|     1|  0|     0|  0|     0|https://en.wikipe...|  4|[the, jerusalem, ...|(1000,[0,1,2,3,4,...|
|1st Round Enterpr...|     0|  0|     0|  1|    

#Fit/Train the LSH Model

In [13]:
from  pyspark.ml.feature import MinHashLSH

mh = MinHashLSH(inputCol="features", outputCol="hashValues", numHashTables=3)
LSHmodel = mh.fit(vectorizedDF)

LSHmodel.transform(vectorizedDF).show()

+--------------------+------+---+------+---+------+--------------------+---+--------------------+--------------------+--------------------+
|                text|advert|coi|fanpov| pr|resume|                 url| id|               words|            features|          hashValues|
+--------------------+------+---+------+---+------+--------------------+---+--------------------+--------------------+--------------------+
|1 Litre no Namida...|     0|  0|     1|  0|     0|https://en.wikipe...|  0|[1, litre, no, na...|(1000,[0,1,2,3,4,...|[[5595911.0], [1....|
|1DayLater was fre...|     1|  1|     0|  0|     0|https://en.wikipe...|  1|[1daylater, was, ...|(1000,[0,1,2,3,4,...|[[9357764.0], [6....|
|1E is a privately...|     1|  0|     0|  0|     0|https://en.wikipe...|  2|[1e, is, a, priva...|(1000,[0,1,2,3,4,...|[[5595911.0], [1....|
|1Malaysia pronoun...|     1|  0|     0|  0|     0|https://en.wikipe...|  3|[1malaysia, prono...|(1000,[0,1,2,3,4,...|[[1041919.0], [1....|
|The Jerusalem Bie..

###Search similar pair/item

In [14]:
print(cvModel.vocabulary.index("united"))
print(cvModel.vocabulary.index("states"))

92
198


In [15]:
# Testing searching for "united" "states"

from pyspark.ml.linalg import Vectors


# Convert the input with 3 words into 1000 size vectors
# If the words exist in the index we will give value = 1.0, otherwise 0.0
# Final result: key = [0, 0, ... , 1.0, ..., 1.0, 1.0, ....]

key = Vectors.sparse(vocabSize, {cvModel.vocabulary.index("civil"): 1.0, cvModel.vocabulary.index("war"): 1.0})

In [16]:
# Define the number of neighbours
k = 40

# Search inside LSH model that we already trained
resultDF = LSHmodel.approxNearestNeighbors(vectorizedDF, key, k)
resultDF.show()

+----+------+---+------+---+------+---+---+-----+--------+----------+-------+
|text|advert|coi|fanpov| pr|resume|url| id|words|features|hashValues|distCol|
+----+------+---+------+---+------+---+---+-----+--------+----------+-------+
+----+------+---+------+---+------+---+---+-----+--------+----------+-------+



#Save Results to CSV

In [17]:
# Save the result into CSV
import pandas as pd

data = resultDF.toPandas()
data.to_csv("result.csv")