In [1]:
from sklearn.datasets import fetch_20newsgroups
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, MinHashLSH
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as f
from pyspark.sql.types import *
import numpy as np
import pandas as pd
from tqdm import tqdm

### Carregar dataset ``20 News Groups``

In [2]:
# executar essa célula pois vai ser útil no futuro
data = fetch_20newsgroups(subset='all')

In [11]:
data_group = list(zip(data.data, data.target.tolist()))
data_group = [[tupla[0],tupla[1]] for tupla in data_group]

In [12]:
for i in range(len(data_group)):
    data_group[i][1] = data.target_names[data_group[i][1]]

In [13]:
for i in range(len(data_group)):
    data_group[i][0] = data_group[i][0].replace('\n',' ')
    data_group[i][0] = data_group[i][0].replace('\t',' ')
    data_group[i][0] = data_group[i][0].replace('\r',' ')
    data_group[i][0] = data_group[i][0].replace(';','')

#### Salva num .csv

In [14]:
df = pd.DataFrame(data_group, columns=['data','target'])

In [15]:
df.to_csv('newsgroups_data.csv', index=True, sep=';', index_label='id')

### Carregar o .csv em dataframe

In [3]:
spark = SparkSession.builder.appName('Trabalho III'). \
        config('spark.some.config.option','some-value').getOrCreate()
sc = spark.sparkContext

In [4]:
df_data = spark.read.load('newsgroups_data.csv', format='csv', sep=';', header=True)
df_data = df_data.where(f.col("data").isNotNull())
df_data = df_data.fillna({'data':''})

#### Tratamento do texto

In [5]:
tokenizer = Tokenizer(inputCol='data', outputCol='tokens')
swremover = StopWordsRemover(inputCol='tokens', outputCol='words')
cv = CountVectorizer(inputCol='words', outputCol='rawFeatures', vocabSize=1000)
idf = IDF(inputCol='rawFeatures',outputCol='features')

In [6]:
pipeline = Pipeline(stages=[tokenizer, swremover, cv, idf])
fit_df_data = pipeline.fit(dataset=df_data).transform(df_data)

#### Modelo LSH

In [9]:
mh = MinHashLSH(inputCol='features', outputCol='hashes', numHashTables=5)
model = mh.fit(fit_df_data)
fit_df_data = model.transform(fit_df_data)

In [10]:
explodeHashes = f.udf(lambda l: float(l[0]), FloatType())
fit_df_data = fit_df_data.select('id', 'features', 'target', \
                                 f.explode('hashes').alias('hashes'))
fit_df_data = fit_df_data.withColumn('extracted', explodeHashes(f.col('hashes')))

#### Divisão entre treino e teste

In [11]:
train_ratio = 0.9
test_ratio = 0.1

In [12]:
train, test = fit_df_data.randomSplit([train_ratio,test_ratio])

In [13]:
# pegando os primeiros 100 exemplos para teste
test_values = test.take(100)

In [14]:
def distanceByKey(vect1, vect2):
    set1 = set(vect1)
    set2 = set(vect2)
    lenInter = len(set1.intersection(set2))
    return (1 - lenInter) / float(len(set1) + len(set2) - lenInter)

def distance(keyFeat):
    return f.udf(lambda l: distanceByKey(l, keyFeat), DoubleType())

## K-NN aproximado

In [15]:
# escolha do K
K = 5

In [16]:
predictionAndLabels = []
for value in tqdm(test_values):
    hashEncounters = train.where(f'extracted == {value["extracted"]}')
    dists = hashEncounters.withColumn('distCol', distance(value['features'])(f.col('features')))
    predictedList = dists.orderBy(f.col('distCol').desc()).limit(K).select('target').collect()
    
    predicts = [predictedList[i][0] for i in range(len(predictedList))]
    frequency = {}
                                 
    for predict in predicts:
        if(predict not in frequency):
            frequency[predict] = 1
        else:
            frequency[predict] += 1
    if(len(predictedList) > 0):
        predictionAndLabels.append([sorted(frequency.items(), key=lambda x: x[1],\
                                   reverse=True)[0][0], value['target']])

100%|██████████| 100/100 [13:17<00:00,  7.97s/it]


In [17]:
class_to_float = {}
i = float(1.0)
for name in data.target_names:
    class_to_float[name] = i
    i += 1.0

In [18]:
for i in range(len(predictionAndLabels)):
    for j in range(2):
        predictionAndLabels[i][j] = class_to_float[predictionAndLabels[i][j]]

In [24]:
metricsDf = spark.createDataFrame(predictionAndLabels,['pred','label'])
metrics = MulticlassClassificationEvaluator(predictionCol='pred', labelCol='label')

In [25]:
print('Precision:', metrics.evaluate(metricsDf, {metrics.metricName :'weightedPrecision'}))

Precision: 0.10783333333333334


In [26]:
print('Recall:', metrics.evaluate(metricsDf, {metrics.metricName :'weightedRecall'}))

Recall: 0.060000000000000005


In [27]:
print('F1:', metrics.evaluate(metricsDf,{metrics.metricName :'f1'}))

F1: 0.07057575757575757
