In [6]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.datasets import fetch_20newsgroups
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark import SparkConf
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, Word2Vec, MinHashLSH, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from operator import itemgetter
from tqdm import tqdm_notebook

# Definindo spark e carregando a base
A base utilizada é a mesma especificada no trabalho  
mas foi carregada direto do sklearn que já tem a base como exemplo

In [7]:
# Defines spark and load dataset!
conf = SparkConf()
spark = SparkSession.builder.config(conf=conf).appName('bd2-ṕi3').getOrCreate()
proc_data = fetch_20newsgroups(subset='all', remove=('headers', 'footers', 'quotes'))

## Limpeza do dataset
O dataset possui muita sujeira nas string  
então é feito uma limpeza para melhor coleta  
das features.

In [8]:
# Clean dataset
dataset = list(zip(proc_data.data,proc_data.target.tolist()))
for i in range(len(dataset)):
    dataset[i] = (' '.join(dataset[i][0].replace('\n','').replace('\t','').split()).strip(),proc_data.target_names[dataset[i][1]])  #Clean the horrible text!
dataset = spark.createDataFrame(dataset,['data','target'])
dataset = dataset.where(functions.col("data").isNotNull())
dataset = dataset.where(functions.col("target").isNotNull())
dataset = dataset.fillna({'data':''})
dataset.show()

+--------------------+--------------------+
|                data|              target|
+--------------------+--------------------+
|I am sure some ba...|    rec.sport.hockey|
|My brother is in ...|comp.sys.ibm.pc.h...|
|Finally you said ...|talk.politics.mid...|
|Think!It's the SC...|comp.sys.ibm.pc.h...|
|1) I have an old ...|comp.sys.mac.hard...|
|Back in high scho...|     sci.electronics|
|AE is in Dallas.....|comp.sys.mac.hard...|
|[stuff deleted]Ok...|    rec.sport.hockey|
|Yeah, it's the se...|    rec.sport.hockey|
|If a Christian me...|  talk.religion.misc|
|the blood of the ...|  talk.religion.misc|
|>say they have a ...|           sci.crypt|
|930418Do what tho...|  talk.religion.misc|
|How about Kirlian...|             sci.med|
|There is no notio...|         alt.atheism|
|In the following ...|talk.politics.mid...|
|Many thanks to th...|     sci.electronics|
|.........I, some ...|     sci.electronics|
|The Supreme Court...|           sci.crypt|
|ed>1. All of us t...|     rec.m

# Coleta de Features
A partir do dataset utilizamos as ferramentas do pyspark  
para coletar features que serão utilizadas pelo modelo  

In [9]:
# Extract features from dataset
tokenizer = Tokenizer(inputCol="data", outputCol="words")

stop_words = StopWordsRemover(inputCol='words', outputCol='clean_words')

word_count = CountVectorizer(inputCol='words', outputCol='features', vocabSize=1000)

idf = IDF(inputCol='features',outputCol='new_features')

## Pipline
É utilizado para acelerar o processamento das features

In [10]:
pipeline = Pipeline(stages=[tokenizer,stop_words,word_count,idf])
pipe = pipeline.fit(dataset)
dataset = pipe.transform(dataset)
dataset.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                data|              target|               words|         clean_words|            features|        new_features|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|I am sure some ba...|    rec.sport.hockey|[i, am, sure, som...|[sure, bashers, p...|(1000,[0,1,2,3,4,...|(1000,[0,1,2,3,4,...|
|My brother is in ...|comp.sys.ibm.pc.h...|[my, brother, is,...|[brother, market,...|(1000,[0,3,5,6,8,...|(1000,[0,3,5,6,8,...|
|Finally you said ...|talk.politics.mid...|[finally, you, sa...|[finally, said, d...|(1000,[0,1,2,3,4,...|(1000,[0,1,2,3,4,...|
|Think!It's the SC...|comp.sys.ibm.pc.h...|[think!it's, the,...|[think!it's, scsi...|(1000,[0,1,2,3,4,...|(1000,[0,1,2,3,4,...|
|1) I have an old ...|comp.sys.mac.hard...|[1), i, have, an,...|[1), old, jasmine...|(1000,[0,1,3,5,6,..

# MinHash
Utilização do MinHash para reduzir o uso  
de memória na hora do treinamento

In [11]:
mh = MinHashLSH(inputCol='features', outputCol='hashes', numHashTables=10)
model = mh.fit(dataset)

In [None]:
collect_features = dataset.select('features').collect()
predict_list = [] # Array with the preditcion from model.

for i in tqdm_notebook(range(dataset.count())):
    try: #Some values of the features are all 0 and breakes the KNN (sorry no time to fix this)
        df_predict = model.approxNearestNeighbors(dataset, collect_features[i][0], 10) #utiliza o modelo do MinHash para treino
        select_target = df_predict.select('target').take(1)
        predict_list.append(select_target)
    except:
        pass

HBox(children=(IntProgress(value=0, max=18846), HTML(value='')))