In [None]:
from pyspark.sql import SparkSession

In [None]:
spark: SparkSession = SparkSession.builder.master('local[*]').getOrCreate()

# Import and Processing of data

In [None]:
data = spark.read.csv('./data/imdb-reviews-pt-br.csv', header=True, escape='\"', inferSchema=True)

In [None]:
print(f'Número de linhas: {data.count()} | Número de colunas: {len(data.columns)}')

In [None]:
data.printSchema()

In [None]:
data.show()

In [None]:
data.filter(data.id == 190).select('text_pt').show(truncate=False)

In [None]:
data.filter(data.id == 12427).select('text_pt').show(truncate=False)

In [None]:
data.groupBy('sentiment').count().show()

# WordCloud

In [None]:
from wordcloud import WordCloud
import matplotlib.pyplot as plt

In [None]:
sample = data.select('text_pt').sample(fraction=0.1, seed=101)

In [None]:
all = [text['text_pt'] for text in sample.collect()]

In [None]:
wordcloud = WordCloud(
  background_color='white',
  width=1920,
  height=1080,
  collocations=False,
  prefer_horizontal=1
).generate(str(all))

In [None]:
plt.figure(figsize=(20,8))
plt.imshow(wordcloud)
plt.axis('off')
plt.show()

# Limpeza

In [None]:
import string

In [None]:
string.punctuation

In [None]:
sample = spark.createDataFrame(
  [
    ("Oi, JP! Blz?",),
    ("$$$\\ |~ Parabéns ~| \\$$$",),
    ("(#amovc #paz&amor ^.^)",),
    ("\"bora *_* \"",),
    ("=>->'...``` vc foi selecionad@ ´´´...'<=<-",),
    ("{comprar: arroz; feijão e pepino} //",),
    ("!\"#$&'()*+,-./:;<=>?@[\]^_`{|}~",),
    ("ana@gmail.com",)
  ],
  ["texts"]
)

In [None]:
import pyspark.sql.functions as f

In [None]:
sample = sample.withColumn('text_regex', f.regexp_replace('texts', '[\$#,\"!%&\'()*+-./;:<=>?@^_`´{|}~\\\\]', ''))

In [None]:
sample = sample.withColumn('clean_text', f.trim('text_regex'))

In [None]:
sample.show(truncate=False)

In [None]:
data = data.withColumn('text_en_regex', f.regexp_replace('text_en', '[\$#,\"!%&\'()*+-./;:<=>?@^_`´{|}~\\\\]', ''))
data = data.withColumn('text_pt_regex', f.regexp_replace('text_pt', '[\$#,\"!%&\'()*+-./;:<=>?@^_`´{|}~\\\\]', ''))

In [None]:
data = data.withColumn('clean_text_en', f.trim('text_en_regex'))
data = data.withColumn('clean_text_pt', f.trim('text_pt_regex'))

In [None]:
data.limit(2).show(truncate=False)

# Tokenização

In [None]:
from pyspark.ml.feature import Tokenizer

In [None]:
tokenizer = Tokenizer(inputCol='clean_text_pt', outputCol='tokens')

In [None]:
tokenized = tokenizer.transform(data)

In [None]:
tokenized.select('clean_text_pt', 'tokens').show()

In [None]:
from pyspark.sql.types import IntegerType

countTokens = f.udf(lambda tokens: len(tokens), IntegerType())

tokenized \
  .select('clean_text_pt', 'tokens') \
  .withColumn('freq_tokens', countTokens(f.col('tokens'))) \
  .show()

# StopWords

In [None]:
import nltk

nltk.download('stopwords')

from nltk.corpus import stopwords
stop_A = stopwords.words('portuguese')

In [None]:
from pyspark.ml.feature import StopWordsRemover

In [None]:
remover = StopWordsRemover(inputCol='tokens', outputCol='final_text', stopWords=stop_A)

In [None]:
df = remover.transform(tokenized)

In [None]:
df.show()

In [None]:
df \
  .select('tokens', 'final_text') \
  .withColumn('freq_tokens', countTokens(f.col('tokens'))) \
  .withColumn('freq_clean_tokens', countTokens(f.col('final_text'))) \
  .show()

# Vetorização

In [None]:
from pyspark.ml.feature import CountVectorizer

In [None]:
cv = CountVectorizer(inputCol='final_text', outputCol='count_vec')

In [None]:
model = cv.fit(df)

In [None]:
count_vectorizer_features = model.transform(df)

In [None]:
count_vectorizer_features.select('final_text', 'count_vec').show(truncate=False)

In [None]:
model.vocabulary

# Hasing TF

Alternativa ao Count Vector

In [None]:
from pyspark.ml.feature import HashingTF

In [None]:
hashing_tf = HashingTF(inputCol='final_text', outputCol='hashing_tf')

In [None]:
hashing_tf.setNumFeatures(50)

In [None]:
htf_featurezed_data = hashing_tf.transform(count_vectorizer_features)

In [None]:
htf_featurezed_data.select('final_text', 'hashing_tf').show()

# TF-IDF

Estabelecendo pesos

In [None]:
from pyspark.ml.feature import IDF

In [None]:
idf = IDF(inputCol='hashing_tf', outputCol='features')

In [None]:
idf_model = idf.fit(htf_featurezed_data)

In [None]:
tf_idf_featurized_data = idf_model.transform(htf_featurezed_data)

In [None]:
tf_idf_featurized_data.select('final_text', 'features').show(truncate=False)

# Codificando

Variável e resposta

In [None]:
tf_idf_featurized_data.groupBy('sentiment').count().show()

In [None]:
from pyspark.ml.feature import StringIndexer

In [None]:
stringindexes = StringIndexer(inputCol='sentiment', outputCol='label')

In [None]:
data = stringindexes.fit(data).transform(data)

In [None]:
data.show()

In [None]:
data.groupBy('sentiment', 'label').count().show()

# Pipeline

Transformação dos dados

In [None]:
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol = "clean_text_pt", outputCol = "tokens")
stopwords = StopWordsRemover(inputCol="tokens", outputCol="final_text")
hashingTF = HashingTF(inputCol=stopwords.getOutputCol(), outputCol="htf", numFeatures=1000)
tfidf = IDF(inputCol="htf", outputCol="features")

pipeline = Pipeline(stages = [tokenizer, stopwords, hashingTF, tfidf])

In [None]:
transform_data = pipeline.fit(data).transform(data)

In [None]:
transform_data.show(truncate=False)

# Model

In [None]:
feature_df = transform_data.select('features', 'label')

In [None]:
feature_df.printSchema()

In [None]:
feature_df.show()

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier

tokenizer = Tokenizer(inputCol = "clean_text_pt", outputCol = "tokens")
stopwords = StopWordsRemover(inputCol='tokens', outputCol='final_text', stopWords=stop_A)
hashingTF = HashingTF(inputCol=stopwords.getOutputCol(), outputCol="htf", numFeatures=1000)
tfidf = IDF(inputCol="htf", outputCol="features")
dt = DecisionTreeClassifier(featuresCol='features', labelCol='label', maxDepth=10)

pipeline = Pipeline(stages = [tokenizer, stopwords, hashingTF, tfidf, dt])

# Amostra e Treino

In [None]:
train, test = data.randomSplit([0.7, 0.3], seed=101)

In [None]:
dt_model = pipeline.fit(train)

In [None]:
predictions = dt_model.transform(test)

In [None]:
predictions.select('label', 'prediction').show()

# Acuracia

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

In [None]:
accuracy = evaluator.evaluate(predictions)

In [None]:
print(f'Acuracia = {accuracy}')

In [None]:
new_base = spark.createDataFrame([
    (1, "Esse é sem dúvida o pior filme que já vi, odiei a atuação dos atores."),
    (0, "Eu amei esse filme, excelente atuação!"),
  ], ["id", "clean_text_pt"])

In [None]:
new_class = dt_model.transform(new_base)

In [None]:
new_class.show()