In [1]:
import pyspark
from pyspark.sql import functions as F
from pyspark.sql.functions import desc
from pyspark.sql.window import Window
import datetime
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.types import *
%matplotlib inline
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("Altigran P3") \
                    .setMaster('local') \
                    .set('spark.executor.memory','14G') \
                    .set('spark.cores.max', '24') \
                    .set('spark.sql.tungsten.enabled', 'true')

sc = SparkContext(conf=conf)

sc._jsc.hadoopConfiguration().set('textinputformat.record.delimiter', '\nFrom: ')

In [2]:
spark = pyspark.sql.SparkSession.builder \
            .master('local') \
            .appName('Altigran BD2') \
            .getOrCreate() 

In [3]:
from os import listdir
from os.path import isfile, join
NEWSGROUPS_PATH = './20-newsgroups/'
files = [f for f in listdir(NEWSGROUPS_PATH) if isfile(join(NEWSGROUPS_PATH, f))]
files.remove('list.csv')

## Load DFs for each file in the directory and concat to a single DF

In [4]:
field = [StructField('class',StringType(), True),StructField('text', StringType(), True)]
schema = StructType(field)
union_df = spark.createDataFrame(sc.emptyRDD(), schema)


for file in files:
    filePath = NEWSGROUPS_PATH + file
    newsgroup = file.split('.txt')[0]
    df = sc.textFile(filePath).map(lambda x: (newsgroup, x)).toDF(['class', 'text'])
    union_df = union_df.union(df)

## Sanitize text 

In [5]:
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer

In [6]:
# Clean text
df_clean = union_df.select('class', (lower(regexp_replace('text', "[^a-zA-Z\\s]", "")).alias('cleaned_text')))

# Tokenize text
tokenizer = Tokenizer(inputCol='cleaned_text', outputCol='words_token')
df_words_token = tokenizer.transform(df_clean).select('class', 'words_token')

# Remove stop words
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
df_words_no_stopw = remover.transform(df_words_token).select('class', 'words_clean')

# Stem text
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
df_stemmed = df_words_no_stopw.withColumn("words_stemmed", stemmer_udf("words_clean")).select('class', 'words_stemmed')

# Filter length word >= 2
filter_length_udf = udf(lambda row: [x for x in row if len(x) >= 2], ArrayType(StringType()))
df_final_words = df_stemmed.withColumn('words', filter_length_udf(col('words_stemmed'))).select('class','words')

In [7]:
df_final_words.take(10000)[9999]

Row(class=u'comp.sys.mac.hardware', words=[u'eapuorionoacuciedu', u'wayn', u'chen', u'subject', u're', u'disappoint', u'la', u'cie', u'articl', u'bcfdnewsserviceuciedu', u'wayn', u'chen', u'eapuorionoacuciedu', u'write', u'industri', u'sound', u'unfair', u'someon', u'oop', u'meant', u'fair', u'unfair', u'newsgroup', u'compsysmachardwar', u'documentid'])

## Create dictinary with words index for sparce vector

In [8]:
def emitWords(doc):
    array = []
    for word in doc[0]:
        array.append( (word,1) )
    return array
rdd = df_final_words.select('words').rdd.flatMap(emitWords) \
                                         .reduceByKey(lambda a,b: a+b) \
                                         .sortByKey() 

distinct_words_array = rdd.collect()
DICTIONARY = {}
index = 0
for word in distinct_words_array:
    DICTIONARY[word[0]] = index
    index = index + 1
    
NUMBER_OF_WORDS = len(DICTIONARY)
NUMBER_OF_WORDS

128250

## Create dense vector representation of the words

In [9]:
from pyspark.sql.types import BooleanType
from pyspark.ml.linalg import Vectors, VectorUDT

def createVector(words):
    word_indexes = sorted(set([DICTIONARY[word] for word in words]))
    array = [1.0] * len(word_indexes)
    return Vectors.sparse(NUMBER_OF_WORDS, word_indexes, array )
    
udf_func = udf(createVector,  VectorUDT())
    
df_final = df_final_words.withColumn('features', udf_func(col('words'))).select('class','features')

df_final.first()

Row(class=u'talk.politics.guns', features=SparseVector(128250, {26073: 1.0, 82178: 1.0, 111218: 1.0}))

## Split data into train and test

In [10]:
(train_df, test_df) = df_final.randomSplit([0.9,0.1])

In [11]:
print('Total elements: ' + str(df_final.count()))
print('Count train elements: ' + str(train_df.count()))
print('Count test elements: ' + str(test_df.count()))

Total elements: 39298
Count train elements: 35325
Count test elements: 3973


## Use LSH and train

In [12]:
from pyspark.ml.feature import MinHashLSH

mh = MinHashLSH(inputCol='features', outputCol='hashes', numHashTables=5)
model = mh.fit(train_df)
model.transform(train_df).show()

+------------------+--------------------+--------------------+
|             class|            features|              hashes|
+------------------+--------------------+--------------------+
|talk.politics.guns|(128250,[0,62,262...|[[9133272.0], [16...|
|talk.politics.guns|(128250,[0,62,262...|[[9133272.0], [16...|
|talk.politics.guns|(128250,[0,384,49...|[[1919832.0], [16...|
|talk.politics.guns|(128250,[0,384,49...|[[1919832.0], [16...|
|talk.politics.guns|(128250,[1,127,30...|[[5759166.0], [32...|
|talk.politics.guns|(128250,[1,127,30...|[[5759166.0], [32...|
|talk.politics.guns|(128250,[12,1199,...|[[6728792.0], [48...|
|talk.politics.guns|(128250,[12,1199,...|[[6728792.0], [48...|
|talk.politics.guns|(128250,[12,1199,...|[[6728792.0], [48...|
|talk.politics.guns|(128250,[12,1199,...|[[6728792.0], [48...|
|talk.politics.guns|(128250,[20,1195,...|[[4.8711272E7], [...|
|talk.politics.guns|(128250,[20,1195,...|[[4.8711272E7], [...|
|talk.politics.guns|(128250,[116,262,...|[[3509762.0], 

### Testing with regular strings

In [13]:
from pyspark.sql.types import StringType

def convertStringToSparseVector(string):
    words = string.split(' ')
    word_indexes = sorted(set([DICTIONARY[word] for word in words]))
    array = [1.0] * len(word_indexes)
    return Vectors.sparse(NUMBER_OF_WORDS-2, word_indexes, array )

In [14]:
STRING = 'like kill handgun'

element = convertStringToSparseVector(STRING)
model.approxNearestNeighbors(train_df, element, 5).show()

+------------------+--------------------+--------------------+------------------+
|             class|            features|              hashes|           distCol|
+------------------+--------------------+--------------------+------------------+
|talk.politics.guns|(128250,[3327,743...|[[4.397985E7], [1...|0.9130434782608696|
|talk.politics.guns|(128250,[3327,743...|[[4.397985E7], [1...|0.9130434782608696|
|talk.politics.misc|(128250,[4036,109...|[[2.57833503E8], ...|0.9285714285714286|
|talk.politics.misc|(128250,[4036,109...|[[2.57833503E8], ...|0.9285714285714286|
|   sci.electronics|(128250,[1433,260...|[[4.56319426E8], ...|0.9333333333333333|
+------------------+--------------------+--------------------+------------------+



### Testing with regular strings

In [15]:
STRING = 'god'

element = convertStringToSparseVector(STRING)
model.approxNearestNeighbors(train_df, element, 5).show()

+------------------+--------------------+--------------------+------------------+
|             class|            features|              hashes|           distCol|
+------------------+--------------------+--------------------+------------------+
|talk.religion.misc|(128250,[4455,602...|[[2.1941446E7], [...|            0.9375|
|talk.religion.misc|(128250,[4455,602...|[[2.1941446E7], [...|            0.9375|
|talk.religion.misc|(128250,[4455,104...|[[1.68356544E8], ...|0.9444444444444444|
|talk.religion.misc|(128250,[4455,104...|[[1.68356544E8], ...|0.9444444444444444|
|talk.religion.misc|(128250,[1396,445...|[[3.221884E7], [8...|0.9473684210526316|
+------------------+--------------------+--------------------+------------------+



In [16]:
STRING = 'hardwar'

element = convertStringToSparseVector(STRING)
model.approxNearestNeighbors(train_df, element, 5).show()

+--------------------+--------------------+--------------------+------------------+
|               class|            features|              hashes|           distCol|
+--------------------+--------------------+--------------------+------------------+
|comp.sys.mac.hard...|(128250,[12878,15...|[[4.4425894E7], [...|0.9444444444444444|
|comp.sys.mac.hard...|(128250,[12878,15...|[[4.4425894E7], [...|0.9444444444444444|
|       comp.graphics|(128250,[503,1637...|[[1.44771775E8], ...|              0.95|
|       comp.graphics|(128250,[503,1637...|[[1.44771775E8], ...|              0.95|
|     sci.electronics|(128250,[2354,260...|[[1.43278567E8], ...|0.9545454545454546|
+--------------------+--------------------+--------------------+------------------+



In [17]:
STRING = 'computer'

element = convertStringToSparseVector(STRING)
rr = model.approxNearestNeighbors(train_df, element, 5)

### Testing training set

In [None]:
training_set = df_final.collect()
classified = []
result = []
for element in training_set:
    result.append(model.approxNearestNeighbors(train_df, element[1], 1).collect())