In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
import pandas as pd
import pickle

In [4]:
# only run once to transform pdf files to text file
import textract
import glob
from time import time

t0=time()
path="data/"
files=glob.glob(path +'*.pdf')

for file in files:
    #text_doc = convert(file)
    text_doc = textract.process(file).decode('utf-8')
    new_path = file.split('.')[0]+".txt"
    new_file = open(new_path,'w')
    new_file.write(text_doc)
    new_file.close()

print("done in %0.3fs." % (time() - t0))

# to apply directly in the command line
shopt -s nullglob
for f in *.txt
do
    echo "splitting - $f"
    split -l 500 "$f" "${f%.txt}_"
done

done in 1.810s.


In [6]:
import glob
from time import time
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType
from pyspark.ml.feature import HashingTF, IDF
from pyspark.sql.functions import explode, col
from pyspark.sql.functions import monotonically_increasing_id 



# Creates a dataframe with the text files
def create_dataframe(path_directory="data/", file_extension="*.txt"):    
    
    path=path_directory
    files=glob.glob(path + file_extension)
    #print(files)
    schema = StructType([StructField("row_id", IntegerType(), True), 
                         StructField("name", StringType(), True),
                     StructField("text", StringType(), True)])
    
    for idx,f in enumerate(files):
        if idx == 0:
            with open(f, 'r') as file:
                doc= " ".join(line.strip() for line in file)
                df = spark.createDataFrame([(idx, f, doc)],schema)
                dff = df
        else:
            with open(f, 'r') as file:
                doc= " ".join(line.strip() for line in file)
                df = spark.createDataFrame([(idx, f, doc)],schema)
                dff = dff.union(df)

    return dff


# Tokenize and remove stopwords from a dataframe

def tokenize_dataframe(dataframe, language="spanish", pattern="[.:\s]+"):    

    STOPWORDS_v0 = StopWordsRemover.loadDefaultStopWords(language) 
    STOPWORDS_v0 = [str(i) for i in STOPWORDS_v0]

    tokenizer = RegexTokenizer(pattern=pattern, inputCol='text', outputCol="z_words")
    wordsData1 = tokenizer.transform(dataframe)

    remover = StopWordsRemover(inputCol="z_words", outputCol="z_filtered", stopWords=STOPWORDS_v0)
    #wordsDataFiltered1 = remover.transform(wordsData1).drop("text","z_words")
    wordsDataFiltered1 = remover.transform(wordsData1).drop("z_words")

    return wordsDataFiltered1.drop("text")

# Split texts in chunks of words, create index

def split_chunks(tok_dataframe,n_words=100, inputCol="z_filtered", outputCol="list_parag"):
    
    def split_list_words(list_words,n_words=n_words):
        counter=0
        new_list_word=[]
        for i in range(0,len(list_words),n_words):
                new_list_word.append(list_words[counter*n_words:i+n_words])
                counter+=1
        return new_list_word

    dummy_function_udf = udf(split_list_words, ArrayType(ArrayType(StringType())))
    wordsDataFiltered2=tok_dataframe.select("row_id",inputCol, 
                              dummy_function_udf(inputCol).alias(outputCol))
    
    wordsDataFiltered3 = wordsDataFiltered2.select("row_id",explode(col(outputCol)))
    df_index = wordsDataFiltered3.withColumn("para_id", monotonically_increasing_id())

    return df_index


#Create tf

def create_TF(dataframe, inputCol="z_filtered", outputCol="features", minDocFreq=3, 
              numFeatures=20):    

    hashingTF = HashingTF(inputCol=inputCol, outputCol=outputCol, numFeatures=numFeatures)
    featurizedData1 = hashingTF.transform(dataframe)

    return featurizedData1.drop(inputCol)


In [4]:
t0=time()
dff=create_dataframe(path_directory="data_split/", file_extension='*')   
print("done in %0.3fs." % (time() - t0))
dff.show()

done in 5.425s.
+------+--------------------+--------------------+
|row_id|                name|                text|
+------+--------------------+--------------------+
|     0|data_split/AREQUI...|PONTIFICIA UNIVER...|
|     1|data_split/AREQUI...| Iniciativas soci...|
|     2|data_split/AREQUI...|medio ambiente y ...|
|     3|data_split/AREQUI...|un comportamiento...|
|     4|data_split/AREQUI...|investigación: 1....|
|     5|data_split/AREQUI...|Sí  12  Sí  Sí  S...|
|     6|data_split/AREQUI...|t  i  bi S i ...|
|     7|data_split/AREQUI...|fue utilizado par...|
|     8|data_split/AREQUI...|image and reputat...|
|     9|data_split/AREQUI...|Sólo voltear la p...|
|    10|data_split/AREQUI...| Apoya a programa...|
|    11|data_split/CABRER...|PONTIFICIA UNIVER...|
|    12|data_split/CABRER...|infraestructura, ...|
|    13|data_split/CABRER...|también un destin...|
|    14|data_split/CABRER...| La desmotivación...|
|    15|data_split/CABRER...| 0.90  Valo r 4  ...|
|    16|data_sp

In [5]:
dff.count()

177

In [15]:
# Tokenize and remove stopwords


t0=time()
wordsDataFiltered1 = tokenize_dataframe(dff, language="spanish", pattern="[.:\s]+")
#wordsDataFiltered2 = split_chunks(wordsDataFiltered1,n_words=100, inputCol="z_filtered", 
                                  #outputCol="list_parag")
print("done in %0.3fs." % (time() - t0))

wordsDataFiltered1.show()

done in 0.165s.
+------+--------------------+--------------------+
|row_id|                name|          z_filtered|
+------+--------------------+--------------------+
|     0|data_split/AREQUI...|[pontificia, univ...|
|     1|data_split/AREQUI...|[iniciativas, soc...|
|     2|data_split/AREQUI...|[medio, ambiente,...|
|     3|data_split/AREQUI...|[comportamiento, ...|
|     4|data_split/AREQUI...|[investigación, 1...|
|     5|data_split/AREQUI...|[12, x, 10, 12, x...|
|     6|data_split/AREQUI...|[t, i, , bi, s...|
|     7|data_split/AREQUI...|[utilizado, calcu...|
|     8|data_split/AREQUI...|[image, and, repu...|
|     9|data_split/AREQUI...|[sólo, voltear, p...|
|    10|data_split/AREQUI...|[apoya, programas...|
|    11|data_split/CABRER...|[pontificia, univ...|
|    12|data_split/CABRER...|[infraestructura,...|
|    13|data_split/CABRER...|[destino, importa...|
|    14|data_split/CABRER...|[desmotivación, c...|
|    15|data_split/CABRER...|[0, 90, valo, r, ...|
|    16|data_sp

In [8]:
t0=time()
tf_df = create_TF(wordsDataFiltered1, inputCol="z_filtered", minDocFreq=5, numFeatures=40000)
tf_df.show(5)
print("done in %0.3fs." % (time() - t0))

+------+--------------------+--------------------+
|row_id|                name|            features|
+------+--------------------+--------------------+
|     0|data_split/AREQUI...|(40000,[20,43,70,...|
|     1|data_split/AREQUI...|(40000,[5,7,16,18...|
|     2|data_split/AREQUI...|(40000,[7,42,46,5...|
|     3|data_split/AREQUI...|(40000,[68,98,104...|
|     4|data_split/AREQUI...|(40000,[78,138,22...|
+------+--------------------+--------------------+
only showing top 5 rows

done in 1.973s.


In [9]:
from pyspark.ml.feature import MinHashLSH, MinHashLSHModel

t0=time()

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5, seed=12345)
model = mh.fit(tf_df)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(tf_df).show()
print("done in %0.3fs." % (time() - t0))

The hashed dataset where hashed values are stored in the column 'hashes':
+------+--------------------+--------------------+--------------------+
|row_id|                name|            features|              hashes|
+------+--------------------+--------------------+--------------------+
|     0|data_split/AREQUI...|(40000,[20,43,70,...|[[-2.034555E9], [...|
|     1|data_split/AREQUI...|(40000,[5,7,16,18...|[[-2.033150372E9]...|
|     2|data_split/AREQUI...|(40000,[7,42,46,5...|[[-2.03531092E9],...|
|     3|data_split/AREQUI...|(40000,[68,98,104...|[[-2.037395876E9]...|
|     4|data_split/AREQUI...|(40000,[78,138,22...|[[-2.029477984E9]...|
|     5|data_split/AREQUI...|(40000,[138,192,2...|[[-2.019928688E9]...|
|     6|data_split/AREQUI...|(40000,[55,63,104...|[[-2.029477984E9]...|
|     7|data_split/AREQUI...|(40000,[20,25,42,...|[[-2.034781776E9]...|
|     8|data_split/AREQUI...|(40000,[7,18,20,3...|[[-2.034781776E9]...|
|     9|data_split/AREQUI...|(40000,[739,1139,...|[[-1.9404790

In [12]:
#To correct: Create tf with same words as database of document

# Creates a dataframe with the text files for Target_data

t0=time()
dff_target=create_dataframe(path_directory="target_data/", file_extension="*")  
tok_target = tokenize_dataframe(dff_target, language="spanish", pattern="[.:\s]+")
split_in_chunks = split_chunks(tok_target,n_words=100, inputCol="z_filtered", 
                                  outputCol="list_parag")
tf_df_target = create_TF(split_in_chunks, inputCol="col", minDocFreq=5, numFeatures=40000)

print("done in %0.3fs." % (time() - t0))

done in 0.387s.


In [19]:
# the lowest the value, the highest the similarity
# we filter value = 0 because it is comparing the same documents (, not applicable allways)
t0=time()
threshold = 0.8
#similar=model.approxSimilarityJoin(tf_df, tf_df, threshold).filter("distCol != 0")
similar=model.approxSimilarityJoin(tf_df, tf_df, threshold)
similar.orderBy(similar.distCol.asc()).show()
print("done in %0.3fs." % (time() - t0))

+--------------------+--------------------+-------+
|            datasetA|            datasetB|distCol|
+--------------------+--------------------+-------+
|[173,data_split/V...|[173,data_split/V...|    0.0|
|[17,data_split/CA...|[17,data_split/CA...|    0.0|
|[139,data_split/O...|[139,data_split/O...|    0.0|
|[105,data_split/C...|[105,data_split/C...|    0.0|
|[135,data_split/O...|[135,data_split/O...|    0.0|
|[137,data_split/O...|[137,data_split/O...|    0.0|
|[33,data_split/CA...|[33,data_split/CA...|    0.0|
|[0,data_split/ARE...|[0,data_split/ARE...|    0.0|
|[34,data_split/CA...|[34,data_split/CA...|    0.0|
|[160,data_split/R...|[160,data_split/R...|    0.0|
|[128,data_split/O...|[128,data_split/O...|    0.0|
|[176,data_split/V...|[176,data_split/V...|    0.0|
|[154,data_split/R...|[154,data_split/R...|    0.0|
|[118,data_split/G...|[118,data_split/G...|    0.0|
|[161,data_split/R...|[161,data_split/R...|    0.0|
|[71,data_split/CA...|[71,data_split/CA...|    0.0|
|[124,data_s

In [21]:
similar.count()

513

In [20]:
similar.filter("distCol != 0").orderBy(similar.distCol.asc()).show()

+--------------------+--------------------+-------------------+
|            datasetA|            datasetB|            distCol|
+--------------------+--------------------+-------------------+
|[22,data_split/CA...|[23,data_split/CA...|0.15000000000000002|
|[23,data_split/CA...|[22,data_split/CA...|0.15000000000000002|
|[22,data_split/CA...|[64,data_split/CA...|               0.28|
|[64,data_split/CA...|[22,data_split/CA...|               0.28|
|[64,data_split/CA...|[23,data_split/CA...|0.31999999999999995|
|[23,data_split/CA...|[64,data_split/CA...|0.31999999999999995|
|[65,data_split/CA...|[66,data_split/CA...| 0.4571428571428572|
|[66,data_split/CA...|[65,data_split/CA...| 0.4571428571428572|
|[64,data_split/CA...|[65,data_split/CA...| 0.4666666666666667|
|[65,data_split/CA...|[64,data_split/CA...| 0.4666666666666667|
|[25,data_split/CA...|[20,data_split/CA...| 0.4922779922779923|
|[20,data_split/CA...|[25,data_split/CA...| 0.4922779922779923|
|[23,data_split/CA...|[65,data_split/CA.