In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
import re
import shutil
from google.colab import output
import numpy as np
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import DataFrameWriter
import pyspark.sql.functions as func
from pyspark.sql.types import StringType
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, Word2VecModel, CountVectorizer, HashingTF, IDF
from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.clustering import LDA, LDAModel


In [4]:
!wget --no-check-certificate 'https://docs.google.com/uc?export=download&id=1xWfOl7SXQVwt0jZGAvIuQ16aEOgA5UOF' -O "Adventures_of_Sherlock_Holmes.txt"
!wget --no-check-certificate 'https://docs.google.com/uc?export=download&id=1MIJHD5xJP2ZNJhGYUkOZQl7e7FmKPJi1' -O 'Alice_in_wornderland.txt'
!wget --no-check-certificate 'https://docs.google.com/uc?export=download&id=1KlPWjw8IlP5QswnPqso_LabNiqOXm2uC' -O 'Dracula_bromstoker.txt'
!wget --no-check-certificate 'https://docs.google.com/uc?export=download&id=1358EsgbTUx8OjrlvuvQcsU2Bv8ead82z' -O 'Jane_Eyre.txt'

source_dir = '/content/'
target_dir = '/content/books/'
os.mkdir(target_dir)
files = os.listdir(source_dir)

for file in files:
    source = os.path.join(source_dir, file)
    target = os.path.join(target_dir, file)
    if file.endswith('.txt') and file != 'janeAustine.txt':
        shutil.move(source, target)
output.clear()

In [5]:
sparkNLP = SparkSession.builder.master("local[*]").getOrCreate()
sparkNLP.conf.set("spark.sql.repl.eagerEval.enabled", True)
sparkNLP
sc = SparkContext.getOrCreate(sparkNLP.conf)

# textRdd = sparkNlp.sparkContext.wholeTextFiles("/content/books/*")

In [6]:

from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

# def cosine_similarity(vec1, vec2):
#     return vec1.dot(vec2) / (Vectors.norm(vec1, 1) * Vectors.norm(vec2, 1))

# cosine_similarity_udf = udf(cosine_similarity, FloatType())

data = [('Dracula_bromstoker.txt', open('/content/books/Dracula_bromstoker.txt', 'r').read()),
        ('Adventures_of_Sherlock_Holmes.txt', open('/content/books/Adventures_of_Sherlock_Holmes.txt', 'r').read()),
        ('Alice_in_wornderland.txt', open('/content/books/Alice_in_wornderland.txt', 'r').read()),
        ('Jane_Eyre.txt', open('/content/books/Jane_Eyre.txt', 'r').read())]

df = sparkNLP.createDataFrame(data, ['book', 'content'])
# tokenizer = Tokenizer(inputCol='content', outputCol='words')
# wordsData = tokenizer.transform(df)

# SWordFilter = StopWordsRemover(inputCol='words', outputCol='SWRemoved', caseSensitive=False)
# processedData = SWordFilter.transform(wordsData)

# hashingTF = HashingTF(inputCol='SWRemoved', outputCol='rawFeatures')
# featurizedData = hashingTF.transform(processedData)


# pairs = featurizedData.alias("df1").crossJoin(featurizedData.alias("df2"))

# result = pairs.withColumn("similarity", cosine_similarity_udf(func.col("df1.rawFeatures"), func.col("df2.rawFeatures")))

# result.select("df1.files", "df2.files", "similarity").show()

In [7]:
df.show()

+--------------------+--------------------+
|                book|             content|
+--------------------+--------------------+
|Dracula_bromstoke...|The Project Guten...|
|Adventures_of_She...|
Project Gutenber...|
|Alice_in_wornderl...|The Project Guten...|
|       Jane_Eyre.txt|The Project Guten...|
+--------------------+--------------------+



In [8]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql.functions import col
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

def text_similarity(df, text1, text2):
    tokenizer = Tokenizer(inputCol="content", outputCol="words")
    wordsData = tokenizer.transform(df)

    hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10000)
    featurizedData = hashingTF.transform(wordsData)

    idf = IDF(inputCol="rawFeatures", outputCol="features")
    idfModel = idf.fit(featurizedData)
    rescaledData = idfModel.transform(featurizedData)

    text1_features = rescaledData.filter(col('book') == text1).select('features').first()[0]
    text2_features = rescaledData.filter(col('book') == text2).select('features').first()[0]

    cosine_similarity = float(text1_features.dot(text2_features) / (text1_features.norm(2) * text2_features.norm(2)))

    print(f"Cosine similarity between {text1} and {text2}: {round(cosine_similarity * 100, 2)}")


for i in range(len(data)):
    for j in range(i, len(data)):
        text_similarity(df, data[i][0], data[j][0])


Cosine similarity between Dracula_bromstoker.txt and Dracula_bromstoker.txt: 100.0
Cosine similarity between Dracula_bromstoker.txt and Adventures_of_Sherlock_Holmes.txt: 33.46
Cosine similarity between Dracula_bromstoker.txt and Alice_in_wornderland.txt: 1.74
Cosine similarity between Dracula_bromstoker.txt and Jane_Eyre.txt: 41.36
Cosine similarity between Adventures_of_Sherlock_Holmes.txt and Adventures_of_Sherlock_Holmes.txt: 100.0
Cosine similarity between Adventures_of_Sherlock_Holmes.txt and Alice_in_wornderland.txt: 2.97
Cosine similarity between Adventures_of_Sherlock_Holmes.txt and Jane_Eyre.txt: 64.74
Cosine similarity between Alice_in_wornderland.txt and Alice_in_wornderland.txt: 100.0
Cosine similarity between Alice_in_wornderland.txt and Jane_Eyre.txt: 5.31
Cosine similarity between Jane_Eyre.txt and Jane_Eyre.txt: 100.0


In [9]:
# textRdd.collect()

In [10]:
# textDf = textRdd.toDF(["file", "text"])

# tokenizer = Tokenizer(inputCol="text", outputCol="words")
# wordsData = tokenizer.transform(textDf)

# stopWordsFilter = StopWordsRemover(inputCol='words', outputCol="SWRemoved", caseSensitive=False)
# wordsDataClean = stopWordsFilter.transform(wordsData)

# hashingTF = HashingTF(inputCol="SWRemoved", outputCol="raw_features")
# featurizedData = hashingTF.transform(wordsDataClean)

# idf = IDF(inputCol="raw_features", outputCol="features")
# idfModel = idf.fit(featurizedData)
# tfidfData = idfModel.transform(featurizedData)


In [11]:
# tfidfData

In [12]:
# lda = LDA(k=2, maxIter=10, featuresCol="features")
# model = lda.fit(tfidfData)

# transformed = model.transform(tfidfData)

In [13]:
# topics = model.describeTopics(2)
# topics

In [14]:
# transformed

In [15]:
# from pyspark.sql.functions import udf
# from pyspark.sql.types import DoubleType
# from scipy.spatial.distance import cosine


# def cosine_similarity(x, y):
#     return float(1 - cosine(x, y))

# cosineSimilarityUdf = udf(cosine_similarity, DoubleType())

# dfCross = transformed.crossJoin(transformed.withColumnRenamed("topicDistribution", "topicDistribution2"))
# dfSimilarity = dfCross.withColumn("similarity", cosineSimilarityUdf(dfCross["topicDistribution"], dfCross["topicDistribution2"]))

In [16]:
# dfSimilarity.show()

In [17]:
sparkNLP.stop()