## Dependências

In [9]:
#!pip install pyspark
#!pip install nltk
#!pip install pandas

Defaulting to user installation because normal site-packages is not writeable
Looking in indexes: https://erlonbie:****@nexus.apps.jusbr.com/repository/pypi-all/simple
Collecting nltk
  Downloading https://nexus.apps.jusbr.com/repository/pypi-all/packages/nltk/3.8.1/nltk-3.8.1-py3-none-any.whl (1.5 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m MB/s[0m eta [36m0:00:01[0m:01[0m
Installing collected packages: nltk
Successfully installed nltk-3.8.1


## Inicialização

In [2]:
import warnings
warnings.filterwarnings('ignore')

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

# Desconecta o SparkContext, caso esteja conectado
#sc.stop()

# Opção 1
conf = SparkConf().setMaster("local").setAppName("tp3")

# Opção 2 
#conf = SparkConf().setMaster("spark://10.208.205.1:7077").setAppName("UFAM-Lab1")


# Conecta ao Cluster Spark
sc = SparkContext.getOrCreate(conf = conf) 

# Define um "entry point" para toda as operação SPARK SQL
sqlc = SQLContext(sc)

23/01/29 11:14:12 WARN Utils: Your hostname, erlonbie-xps139310 resolves to a loopback address: 127.0.1.1; using 192.168.0.143 instead (on interface wlp0s20f3)
23/01/29 11:14:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/29 11:14:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Leitura dos dados

In [111]:
cases = sqlc.read.load("train.csv",
                       format="csv", 
                       sep=",", 
                       inferSchema="true", 
                       header="true")
cases.show()

+---+-----+--------------------+
| id|label|               tweet|
+---+-----+--------------------+
|  1|    0| @user when a fat...|
|  2|    0|@user @user thank...|
|  3|    0|  bihday your maj...|
|  4|    0|#model   i love u...|
|  5|    0| factsguide: soci...|
|  6|    0|[2/2] huge fan fa...|
|  7|    0| @user camping to...|
|  8|    0|the next school y...|
|  9|    0|we won!!! love th...|
| 10|    0| @user @user welc...|
| 11|    0| â #ireland con...|
| 12|    0|we are so selfish...|
| 13|    0|i get to see my d...|
| 14|    1|@user #cnn calls ...|
| 15|    1|no comment!  in #...|
| 16|    0|ouch...junior is ...|
| 17|    0|i am thankful for...|
| 18|    1|retweet if you ag...|
| 19|    0|its #friday! ð...|
| 20|    0|as we all know, e...|
+---+-----+--------------------+
only showing top 20 rows



## Tratamento dos dados

In [112]:
cases = cases.fillna({'tweet':''})

In [113]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol='tweet', outputCol='tokens')
cases = tokenizer.transform(cases)

In [114]:
rdd = cases.rdd.map(lambda x: (x['id'], x['label'], x['tweet'], list(filter(lambda x: x != '', x['tokens']))))
cases = rdd.toDF(cases.columns)
cases.show()

+---+-----+--------------------+--------------------+
| id|label|               tweet|              tokens|
+---+-----+--------------------+--------------------+
|  1|    0| @user when a fat...|[@user, when, a, ...|
|  2|    0|@user @user thank...|[@user, @user, th...|
|  3|    0|  bihday your maj...|[bihday, your, ma...|
|  4|    0|#model   i love u...|[#model, i, love,...|
|  5|    0| factsguide: soci...|[factsguide:, soc...|
|  6|    0|[2/2] huge fan fa...|[[2/2], huge, fan...|
|  7|    0| @user camping to...|[@user, camping, ...|
|  8|    0|the next school y...|[the, next, schoo...|
|  9|    0|we won!!! love th...|[we, won!!!, love...|
| 10|    0| @user @user welc...|[@user, @user, we...|
| 11|    0| â #ireland con...|[â, #ireland, c...|
| 12|    0|we are so selfish...|[we, are, so, sel...|
| 13|    0|i get to see my d...|[i, get, to, see,...|
| 14|    1|@user #cnn calls ...|[@user, #cnn, cal...|
| 15|    1|no comment!  in #...|[no, comment!, in...|
| 16|    0|ouch...junior is 

## K-means 

In [None]:
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import IDF

hashingTF = HashingTF(inputCol="tokens", outputCol="rawFeatures", numFeatures=1000)
idf = IDF(inputCol="rawFeatures", outputCol="features")

In [119]:
# KMeans Clustering 

from pyspark.ml.clustering import KMeans

numIterations = 200 
numberClusters = 2
kmeans = KMeans().setMaxIter(numIterations).setK(numberClusters).setSeed(1).setDistanceMeasure('cosine') 

## Versão 1

In [116]:
rdd = cases.rdd.map(lambda x:(x['id'], x['label'], x['tweet'], list(filter(lambda x: '#' not in x, x['tokens']))))
cases_v1 = rdd.toDF(cases.columns)
cases_v1.show()

+---+-----+--------------------+--------------------+
| id|label|               tweet|              tokens|
+---+-----+--------------------+--------------------+
|  1|    0| @user when a fat...|[@user, when, a, ...|
|  2|    0|@user @user thank...|[@user, @user, th...|
|  3|    0|  bihday your maj...|[bihday, your, ma...|
|  4|    0|#model   i love u...|[i, love, u, take...|
|  5|    0| factsguide: soci...|[factsguide:, soc...|
|  6|    0|[2/2] huge fan fa...|[[2/2], huge, fan...|
|  7|    0| @user camping to...|[@user, camping, ...|
|  8|    0|the next school y...|[the, next, schoo...|
|  9|    0|we won!!! love th...|[we, won!!!, love...|
| 10|    0| @user @user welc...|[@user, @user, we...|
| 11|    0| â #ireland con...|[â, consumer, p...|
| 12|    0|we are so selfish...|[we, are, so, sel...|
| 13|    0|i get to see my d...|[i, get, to, see,...|
| 14|    1|@user #cnn calls ...|[@user, calls, mi...|
| 15|    1|no comment!  in #...|  [no, comment!, in]|
| 16|    0|ouch...junior is 

In [117]:
cases_v1 = hashingTF.transform(cases_v1)
idfModel = idf.fit(cases_v1)
cases_v1 = idfModel.transform(cases_v1)
cases_v1.limit(5).toPandas()

                                                                                

Unnamed: 0,id,label,tweet,tokens,rawFeatures,features
0,1,0,@user when a father is dysfunctional and is s...,"[@user, when, a, father, is, dysfunctional, an...","(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 5.230670706306456, 0.0, 0..."
1,2,0,@user @user thanks for #lyft credit i can't us...,"[@user, @user, thanks, for, credit, i, can't, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,3,0,bihday your majesty,"[bihday, your, majesty]","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,4,0,#model i love u take with u all the time in ...,"[i, love, u, take, with, u, all, the, time, in...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,5,0,factsguide: society now #motivation,"[factsguide:, society, now]","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [None]:
kmeans_model = kmeans.fit(cases_v1)
predictions_v1 = kmeans_model.transform(cases_v1)
predictions_v1.select('id','tweet','label','prediction').limit(5).toPandas()

Exception in thread "stdout writer for python3" java.lang.OutOfMemoryError: Java heap space
	at java.base/java.util.ArrayList.<init>(ArrayList.java:156)
	at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:64)
	at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:58)
	at org.apache.spark.sql.Dataset.$anonfun$javaToPython$1(Dataset.scala:3679)
	at org.apache.spark.sql.Dataset$$Lambda$3290/0x0000000100f31a68.apply(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.ContextAwareIterator.next(ContextAwareIterator.scala:41)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:90)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:80)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(Ser

23/01/29 15:25:33 ERROR Utils: Uncaught exception in thread stdout writer for python3
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.util.ArrayList.<init>(ArrayList.java:156)
	at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:64)
	at org.apache.spark.sql.execution.python.EvaluatePython$.toJava(EvaluatePython.scala:58)
	at org.apache.spark.sql.Dataset.$anonfun$javaToPython$1(Dataset.scala:3679)
	at org.apache.spark.sql.Dataset$$Lambda$3290/0x0000000100f31a68.apply(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.ContextAwareIterator.next(ContextAwareIterator.scala:41)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:90)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:80)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.api.python.Se

[Stage 38:>                                                         (0 + 1) / 1]

23/01/29 21:06:51 WARN TransportChannelHandler: Exception in connection from /192.168.0.143:46801
java.io.IOException: Connection timed out
	at java.base/sun.nio.ch.SocketDispatcher.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:47)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:339)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:293)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:268)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:425)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:258)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOpti

[Stage 38:>                                                         (0 + 1) / 1]

In [72]:
## Converter para dataframe Pandas
import pandas as pd

cases.limit(20).toPandas()

Unnamed: 0,id,label,tweet
0,1,0,@user when a father is dysfunctional and is s...
1,2,0,@user @user thanks for #lyft credit i can't us...
2,3,0,bihday your majesty
3,4,0,#model i love u take with u all the time in ...
4,5,0,factsguide: society now #motivation
5,6,0,[2/2] huge fan fare and big talking before the...
6,7,0,@user camping tomorrow @user @user @user @use...
7,8,0,the next school year is the year for exams.ð...
8,9,0,we won!!! love the land!!! #allin #cavs #champ...
9,10,0,@user @user welcome here ! i'm it's so #gr...


In [73]:
df_teste = cases.limit(20).toPandas()

In [74]:
df_teste

Unnamed: 0,id,label,tweet
0,1,0,@user when a father is dysfunctional and is s...
1,2,0,@user @user thanks for #lyft credit i can't us...
2,3,0,bihday your majesty
3,4,0,#model i love u take with u all the time in ...
4,5,0,factsguide: society now #motivation
5,6,0,[2/2] huge fan fare and big talking before the...
6,7,0,@user camping tomorrow @user @user @user @use...
7,8,0,the next school year is the year for exams.ð...
8,9,0,we won!!! love the land!!! #allin #cavs #champ...
9,10,0,@user @user welcome here ! i'm it's so #gr...


In [75]:
import re
a1 = "we won!!! love the land!!! #allin #cavs #champ"
a2 =" ".join(filter(lambda x:x[0]!='#', a1.split()))
print(a2)

we won!!! love the land!!!


In [77]:
def filtra(x):
    return " ".join(filter(lambda y:y[0]!='#', x.split()))


In [81]:
df_teste['tweet'] = df_teste['tweet'].apply(filtra)

In [82]:
df_teste

Unnamed: 0,id,label,tweet
0,1,0,@user when a father is dysfunctional and is so...
1,2,0,@user @user thanks for credit i can't use caus...
2,3,0,bihday your majesty
3,4,0,i love u take with u all the time in urð±!!!...
4,5,0,factsguide: society now
5,6,0,[2/2] huge fan fare and big talking before the...
6,7,0,@user camping tomorrow @user @user @user @user...
7,8,0,the next school year is the year for exams.ð...
8,9,0,we won!!! love the land!!! â¦
9,10,0,@user @user welcome here ! i'm it's so !


In [12]:
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
set(stopwords.words('english'))

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/erlonbie/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


{'a',
 'about',
 'above',
 'after',
 'again',
 'against',
 'ain',
 'all',
 'am',
 'an',
 'and',
 'any',
 'are',
 'aren',
 "aren't",
 'as',
 'at',
 'be',
 'because',
 'been',
 'before',
 'being',
 'below',
 'between',
 'both',
 'but',
 'by',
 'can',
 'couldn',
 "couldn't",
 'd',
 'did',
 'didn',
 "didn't",
 'do',
 'does',
 'doesn',
 "doesn't",
 'doing',
 'don',
 "don't",
 'down',
 'during',
 'each',
 'few',
 'for',
 'from',
 'further',
 'had',
 'hadn',
 "hadn't",
 'has',
 'hasn',
 "hasn't",
 'have',
 'haven',
 "haven't",
 'having',
 'he',
 'her',
 'here',
 'hers',
 'herself',
 'him',
 'himself',
 'his',
 'how',
 'i',
 'if',
 'in',
 'into',
 'is',
 'isn',
 "isn't",
 'it',
 "it's",
 'its',
 'itself',
 'just',
 'll',
 'm',
 'ma',
 'me',
 'mightn',
 "mightn't",
 'more',
 'most',
 'mustn',
 "mustn't",
 'my',
 'myself',
 'needn',
 "needn't",
 'no',
 'nor',
 'not',
 'now',
 'o',
 'of',
 'off',
 'on',
 'once',
 'only',
 'or',
 'other',
 'our',
 'ours',
 'ourselves',
 'out',
 'over',
 'own',
 'r

In [13]:
stop = stopwords.words('english')

In [14]:
def stop_words(df):
    df['stopwords'] = df['tweet'].apply(lambda x: len([x for x in x.split() if x in stop]))
    print(df[['tweet','stopwords']].head())

In [16]:
stop_words(cases.toPandas())

                                               tweet  stopwords
0   @user when a father is dysfunctional and is s...         10
1  @user @user thanks for #lyft credit i can't us...          5
2                                bihday your majesty          1
3  #model   i love u take with u all the time in ...          5
4             factsguide: society now    #motivation          1
