<a href="https://colab.research.google.com/github/gregorio-saporito/Spark-AMD/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Finding similar items: StackSample
Gregorio Luigi Saporito - DSE (2020-2021)

In [1]:
# optional remove files
# !rm kaggle.json
# !rm Questions.csv
# !rm Body.csv
# !rm -rf spark-3.1.1-bin-hadoop2.7

### Upload to session storage the Kaggle API token

In [2]:
from google.colab import files
uploaded = files.upload()

import os
os.environ['KAGGLE_CONFIG_DIR'] = '/content'

Saving kaggle.json to kaggle.json


### Download the dataset through the Kaggle API

In [3]:
# access permissions with the API token
!chmod 600 /content/kaggle.json
!kaggle datasets download -d stackoverflow/stacksample
!unzip \*.zip && rm *.zip
# remove datasets which are not needed
!rm Answers.csv
!rm Tags.csv

Downloading stacksample.zip to /content
 99% 1.10G/1.11G [00:10<00:00, 140MB/s]
100% 1.11G/1.11G [00:10<00:00, 113MB/s]
Archive:  stacksample.zip
  inflating: Answers.csv             
  inflating: Questions.csv           
  inflating: Tags.csv                


### Spark environment setup

In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz 
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark
!rm /content/spark-3.1.1-bin-hadoop2.7.tgz

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

import findspark
findspark.init("spark-3.1.1-bin-hadoop2.7")# SPARK_HOME
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

import pyspark
type(spark)

sc = spark.sparkContext

### Load Dataset
Spark reads files line by line for performance reasons and CSVs with newline characters cause problems for the parser. In this case the Body column of the file "Questions.csv" has characters like "\n" and "\r" which compromise the correct loading of the dataset. To solve the problem a third party parser capable of coping with this issue was used and a .csv file without newline characters is written on Disk. An alternative in a production scenario would be storing the files in a database. The RAM used for the parser is then freed up to save space. The new .csv file is then correctly loaded with Spark.

In [5]:
import pandas as pd
parsed = pd.read_csv("Questions.csv", encoding="ISO-8859-1", usecols=["Body"])
parsed['Body'] = parsed['Body'].str.replace(r'\n|\r', '')
parsed.to_csv("Body.csv", index=False)
del parsed

In [6]:
df = spark.read.load("Body.csv", format="csv",
                     inferSchema="true", header="true")
df

DataFrame[Body: string]

In [7]:
df.show(10)

+--------------------+
|                Body|
+--------------------+
|"<p>I've written ...|
|"<p>Are there any...|
|<p>Has anyone got...|
|<p>This is someth...|
|"<p>I have a litt...|
|<p>I am working o...|
|<p>I've been writ...|
|"<p>I wonder how ...|
|<p>I would like t...|
|<p>I'm trying to ...|
+--------------------+
only showing top 10 rows



### Dataset Cleaning

In [8]:
# check for missing values
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+
|Body|
+----+
|   0|
+----+



In [9]:
from pyspark.sql.functions import col, lower, regexp_replace, split

def clean_html(x):
  x = regexp_replace(x, '<.*?>', '')
  return x

df = df.select(clean_html(col("Body")).alias("Body"))

In [10]:
df.show(10)

+--------------------+
|                Body|
+--------------------+
|"I've written a d...|
|"Are there any re...|
|Has anyone got ex...|
|This is something...|
|"I have a little ...|
|I am working on a...|
|I've been writing...|
|"I wonder how you...|
|I would like the ...|
|I'm trying to mai...|
+--------------------+
only showing top 10 rows



In [11]:
# extracting tokens from text
from pyspark.ml.feature import RegexTokenizer

regexTokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'Body', outputCol = 'tokens')
tokenised = regexTokenizer.transform(df)
tokenised.show(3)

+--------------------+--------------------+
|                Body|              tokens|
+--------------------+--------------------+
|"I've written a d...|[i, ve, written, ...|
|"Are there any re...|[are, there, any,...|
|Has anyone got ex...|[has, anyone, got...|
+--------------------+--------------------+
only showing top 3 rows



In [12]:
# stopwords removal
from pyspark.ml.feature import StopWordsRemover
swr = StopWordsRemover(inputCol = 'tokens', outputCol = 'sw_removed')
Body_swr = swr.transform(tokenised)
Body_swr.show(3)

+--------------------+--------------------+--------------------+
|                Body|              tokens|          sw_removed|
+--------------------+--------------------+--------------------+
|"I've written a d...|[i, ve, written, ...|[ve, written, dat...|
|"Are there any re...|[are, there, any,...|[really, good, tu...|
|Has anyone got ex...|[has, anyone, got...|[anyone, got, exp...|
+--------------------+--------------------+--------------------+
only showing top 3 rows



In [27]:
from pyspark.ml.feature import Word2Vec
# average direction of vectorised words to represent a document
# word2vec = Word2Vec(vectorSize = 100, seed=123, inputCol = 'sw_removed', outputCol = 'result', numPartitions=10)
# model = word2vec.fit(Body_swr)
# result = model.transform(Body_swr)

# result.show(3)
# result.select('result').show(1, truncate = True)

In [75]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql.functions import monotonically_increasing_id
input = Body_swr.select("sw_removed").withColumn("label", monotonically_increasing_id())
hashingTF = HashingTF(inputCol="sw_removed", outputCol="rawFeatures")
featurizedData = hashingTF.transform(input)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
for features_label in rescaledData.select("features", "label").take(3):
  print(features_label)

Row(features=SparseVector(262144, {3564: 2.3421, 61318: 4.0538, 65318: 4.2535, 69397: 2.3377, 98424: 6.9016, 114395: 2.6633, 115524: 6.5918, 116494: 3.9547, 128188: 10.2433, 133613: 2.9221, 145068: 4.7731, 162155: 3.4204, 190256: 1.5111, 194618: 2.845, 206622: 2.3711, 212505: 8.5528, 254260: 3.4079, 257347: 5.8671}), label=0)
Row(features=SparseVector(262144, {53082: 7.0086, 62427: 3.5962, 68303: 4.3525, 89898: 7.083, 113432: 3.6944, 116159: 3.511, 145488: 8.2329, 162759: 8.3801, 168843: 5.3235, 229264: 3.5039, 231489: 7.3083, 235375: 3.8117}), label=1)
Row(features=SparseVector(262144, {3564: 2.3421, 13828: 1.9558, 19839: 8.0967, 35817: 6.9095, 40268: 4.3414, 46479: 7.7419, 51471: 2.0083, 55875: 5.1861, 69397: 2.3377, 71450: 2.4535, 81229: 4.9653, 83161: 3.9522, 84007: 4.5196, 94851: 7.0498, 98877: 6.1708, 110427: 2.5464, 110865: 8.0304, 111430: 6.6555, 116506: 3.9762, 127591: 3.6063, 129946: 4.2873, 151571: 4.1815, 160860: 2.8886, 161880: 5.543, 167807: 4.7721, 172452: 3.3252, 173787

In [47]:
rescaledData.show(10)

+--------------------+-----+--------------------+--------------------+
|          sw_removed|label|         rawFeatures|            features|
+--------------------+-----+--------------------+--------------------+
|[ve, written, dat...|    0|(262144,[3564,613...|(262144,[3564,613...|
|[really, good, tu...|    1|(262144,[53082,62...|(262144,[53082,62...|
|[anyone, got, exp...|    2|(262144,[3564,138...|(262144,[3564,138...|
|[something, ve, p...|    3|(262144,[23736,37...|(262144,[23736,37...|
|[little, game, wr...|    4|(262144,[61318,74...|(262144,[61318,74...|
|[working, collect...|    5|(262144,[8538,978...|(262144,[8538,978...|
|[ve, writing, web...|    6|(262144,[9781,158...|(262144,[9781,158...|
|[wonder, guys, ma...|    7|(262144,[3564,688...|(262144,[3564,688...|
|[like, version, p...|    8|(262144,[11680,12...|(262144,[11680,12...|
|[m, trying, maint...|    9|(262144,[9144,126...|(262144,[9144,126...|
+--------------------+-----+--------------------+--------------------+
only s

In [83]:
rescaledData.select("features").take(1)

[Row(features=SparseVector(262144, {3564: 2.3421, 61318: 4.0538, 65318: 4.2535, 69397: 2.3377, 98424: 6.9016, 114395: 2.6633, 115524: 6.5918, 116494: 3.9547, 128188: 10.2433, 133613: 2.9221, 145068: 4.7731, 162155: 3.4204, 190256: 1.5111, 194618: 2.845, 206622: 2.3711, 212505: 8.5528, 254260: 3.4079, 257347: 5.8671}))]

In [97]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()

The hashed dataset where hashed values are stored in the column 'hashes':
+---+-----------+--------------------+
| id|   features|              hashes|
+---+-----------+--------------------+
|  0|  [1.0,1.0]|[[-1.0], [-1.0], ...|
|  1| [1.0,-1.0]|[[0.0], [0.0], [0...|
|  2|[-1.0,-1.0]|[[0.0], [0.0], [0...|
|  3| [-1.0,1.0]|[[-1.0], [-1.0], ...|
+---+-----------+--------------------+

Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:
+---+---+-----------------+
|idA|idB|EuclideanDistance|
+---+---+-----------------+
|  0|  4|              1.0|
|  2|  5|              1.0|
|  1|  7|              1.0|
|  3|  5|              1.0|
|  1|  4|              1.0|
|  3|  6|              1.0|
|  0|  6|              1.0|
|  2|  7|              1.0|
+---+---+-----------------+

Approximately searching dfA for 2 nearest neighbors of the key:
+---+----------+--------------------+-------+
| id|  features|              hashes|distCol|
+---+----------+--------------------+-------+
